Heartbeat logging while consuming Twitter streams using Phirehose

Phirehose is an awesomely useful Twitter Streaming API client library, written in PHP by Fenn Bailey.

Heartbeat logging is something that I originally added for Rainmaker, and I finally got around to contributing those modifications, which you can see here on GitHub.

Why log heartbeats in Phirehose?

  • To gain assurance that Phirehose is still alive, and actually functioning. In our case, missing tweets means lost money and unhappy clients. We needed to monitor this very closely.
  • To enable automatically detecting connection drops and rewinding the count parameter to pick up those tweets, or backfilling them in using the Twitter Search API.
  • To collect usage data for reporting purposes.

Usage

To use this, simply declare a heartbeat(array $data) method in your Phirehose child class.

Here is an example:

<?php

// USAGE:
//
//   require_once 'phirehose.php';
//   $dbh = mysql_connect($host, $username, $password);
//   PhirehoseConsumer::Initialize($dbh);
//   PhirehoseConsumer::start();

class PhirehoseConsumer extends Phirehose {

	private static $instance;
	private static $dbh;

	protected $consumer_start_time = 0;
	protected $consumer_uptime = 0;

	public static function Initialize($dbh) {
		if (! self::$instance instanceof self) {
			self::$dbh = $dbh;
			self::$instance = new self ( TWITTER_API_USERNAME, TWITTER_API_PASSWORD, Phirehose::METHOD_FILTER );
		}
	}

	protected static function get_last_heartbeat() {
		$sql = 'SELECT time_stamp FROM stream_log ORDER BY time_stamp DESC LIMIT 1';
		$result = mysql_query ( self::$dbh, $sql );
		$last_heartbeat_ts = mysql_result ( $result, 0, 'time_stamp' );
		if (strtotime ( $last_heartbeat_ts ) === FALSE)
			$last_heartbeat_ts = date ( 'Y-m-d H:i:s' ); // default to now
		return $last_heartbeat_ts;
	}

	protected static function get_missed_count() {

		// get timestamp of the most recent intake script heartbeat
		$last_heartbeat_ts = self::get_last_heartbeat ();

		// how long since we were last running?
		$downtime = ( int ) (time () - strtotime ( $last_heartbeat_ts ));

		// Based on the past average, how many tweets did we probably miss?
		//
		// This calculation is based on the statusRate averaged over a period of
		// time not greater than $downtime, and multiplied by a 1.5x fudge factor.
		$sql = <<<SQL
SELECT CEIL(1.5 * AVG(statusRate) * $downtime) AS count
FROM stream_log WHERE
    (UNIX_TIMESTAMP('$last_heartbeat_ts') - UNIX_TIMESTAMP(time_stamp) > 0) AND
    (UNIX_TIMESTAMP('$last_heartbeat_ts') - UNIX_TIMESTAMP(time_stamp) > 0) <= $downtime
SQL;
		$result = mysql_query ( self::$dbh, $sql );
		$count = mysql_result ( $result, 0, 'count' );

		return ($count === FALSE) ? 0 : ( int ) $count;
	}

	public static function start() {
		// get timestamp of the most recent heartbeat
		$last_heartbeat_ts = self::get_last_heartbeat ();

		// Estimate how many tweets were missed, and rewind.
		//
		// Note that if you're using the filter streaming method, this will fail with the following error:
		//   "The Streaming API count parameter is not allowed in role statusDefaultFiltered."
		$count = self::get_missed_count ();
		if ($count) {
			self::$instance->log ( "Using count: $count" );
			self::$instance->setCount ( $count );
		}

		// run backfill script in background - uses Twitter Search API
		self::$instance->log ( "Backfilling from $last_heartbeat_ts" );
		passthru ( "backfill.php '$last_heartbeat_ts' >/dev/null &" );

		self::$instance->checkFilterPredicates ();
		self::$instance->consume ();
	}

	public function enqueueStatus($status) {
		// do something with your tweet
		return TRUE;
	}

	public function heartbeat(array $data) {
		$sql = <<<SQL
INSERT INTO stream_log SET
    time_stamp = CURRENT_TIMESTAMP,
    elapsed = $data[elapsed],
    statusRate = $data[statusRate],
    statusCount = $data[statusCount],
    enqueueSpent = $data[enqueueSpent],
    enqueueSpentAvg = $data[enqueueSpentAvg],
    filterCheckCount = $data[filterCheckCount],
    filterCheckSpent = $data[filterCheckSpent],
    idlePeriod = $data[idlePeriod],
    maxIdlePeriod = $data[maxIdlePeriod]
SQL;
		mysql_query ( $dbh, $sql );
	}

	protected function checkFilterPredicates() {
		// useful for debugging
		$this->consumer_uptime = time () - $this->consumer_start_time;
		$this->log ( sprintf ( 'Uptime: %s, memory usage: %skb', $this->consumer_uptime, number_format ( memory_get_usage () / 1024, 0 ) ) );

		// update filter predicates...
	}

	protected function connect() {
		$this->consumer_start_time = time ();
		parent::connect ();
	}

}
Written on June 11, 2011