Heartbeat logging while consuming Twitter streams using Phirehose
Phirehose is an awesomely useful Twitter Streaming API client library, written in PHP by Fenn Bailey.
Heartbeat logging is something that I originally added for Rainmaker, and I finally got around to contributing those modifications, which you can see here on GitHub.
Why log heartbeats in Phirehose?
- To gain assurance that Phirehose is still alive, and actually functioning. In our case, missing tweets means lost money and unhappy clients. We needed to monitor this very closely.
- To enable automatically detecting connection drops and rewinding the count parameter to pick up those tweets, or backfilling them in using the Twitter Search API.
- To collect usage data for reporting purposes.
Usage
To use this, simply declare a heartbeat(array $data) method in your Phirehose child class.
Here is an example:
<?php
// USAGE:
//
// require_once 'phirehose.php';
// $dbh = mysql_connect($host, $username, $password);
// PhirehoseConsumer::Initialize($dbh);
// PhirehoseConsumer::start();
class PhirehoseConsumer extends Phirehose {
private static $instance;
private static $dbh;
protected $consumer_start_time = 0;
protected $consumer_uptime = 0;
public static function Initialize($dbh) {
if (! self::$instance instanceof self) {
self::$dbh = $dbh;
self::$instance = new self ( TWITTER_API_USERNAME, TWITTER_API_PASSWORD, Phirehose::METHOD_FILTER );
}
}
protected static function get_last_heartbeat() {
$sql = 'SELECT time_stamp FROM stream_log ORDER BY time_stamp DESC LIMIT 1';
$result = mysql_query ( self::$dbh, $sql );
$last_heartbeat_ts = mysql_result ( $result, 0, 'time_stamp' );
if (strtotime ( $last_heartbeat_ts ) === FALSE)
$last_heartbeat_ts = date ( 'Y-m-d H:i:s' ); // default to now
return $last_heartbeat_ts;
}
protected static function get_missed_count() {
// get timestamp of the most recent intake script heartbeat
$last_heartbeat_ts = self::get_last_heartbeat ();
// how long since we were last running?
$downtime = ( int ) (time () - strtotime ( $last_heartbeat_ts ));
// Based on the past average, how many tweets did we probably miss?
//
// This calculation is based on the statusRate averaged over a period of
// time not greater than $downtime, and multiplied by a 1.5x fudge factor.
$sql = <<<SQL
SELECT CEIL(1.5 * AVG(statusRate) * $downtime) AS count
FROM stream_log WHERE
(UNIX_TIMESTAMP('$last_heartbeat_ts') - UNIX_TIMESTAMP(time_stamp) > 0) AND
(UNIX_TIMESTAMP('$last_heartbeat_ts') - UNIX_TIMESTAMP(time_stamp) > 0) <= $downtime
SQL;
$result = mysql_query ( self::$dbh, $sql );
$count = mysql_result ( $result, 0, 'count' );
return ($count === FALSE) ? 0 : ( int ) $count;
}
public static function start() {
// get timestamp of the most recent heartbeat
$last_heartbeat_ts = self::get_last_heartbeat ();
// Estimate how many tweets were missed, and rewind.
//
// Note that if you're using the filter streaming method, this will fail with the following error:
// "The Streaming API count parameter is not allowed in role statusDefaultFiltered."
$count = self::get_missed_count ();
if ($count) {
self::$instance->log ( "Using count: $count" );
self::$instance->setCount ( $count );
}
// run backfill script in background - uses Twitter Search API
self::$instance->log ( "Backfilling from $last_heartbeat_ts" );
passthru ( "backfill.php '$last_heartbeat_ts' >/dev/null &" );
self::$instance->checkFilterPredicates ();
self::$instance->consume ();
}
public function enqueueStatus($status) {
// do something with your tweet
return TRUE;
}
public function heartbeat(array $data) {
$sql = <<<SQL
INSERT INTO stream_log SET
time_stamp = CURRENT_TIMESTAMP,
elapsed = $data[elapsed],
statusRate = $data[statusRate],
statusCount = $data[statusCount],
enqueueSpent = $data[enqueueSpent],
enqueueSpentAvg = $data[enqueueSpentAvg],
filterCheckCount = $data[filterCheckCount],
filterCheckSpent = $data[filterCheckSpent],
idlePeriod = $data[idlePeriod],
maxIdlePeriod = $data[maxIdlePeriod]
SQL;
mysql_query ( $dbh, $sql );
}
protected function checkFilterPredicates() {
// useful for debugging
$this->consumer_uptime = time () - $this->consumer_start_time;
$this->log ( sprintf ( 'Uptime: %s, memory usage: %skb', $this->consumer_uptime, number_format ( memory_get_usage () / 1024, 0 ) ) );
// update filter predicates...
}
protected function connect() {
$this->consumer_start_time = time ();
parent::connect ();
}
}
Written on June 11, 2011