Update: One comment from Reddit pointed out "I have no idea what is going on here without some context."
If you are a consumer of the Twitter Firehose, you are receiving a lot of data. You've probably got a lot of ideas of what to do with that data, and you're probably building your own APIs for filtering, processing, etc. If you are already used to using Twitter's API for streaming, filtering, etc., this gives you a way to build on top of what you already know.
One of the purposes of the Twitter Firehose is to allow 3rd parties to offer that data via other APIs. For example, Gnip offers a streaming API to re-package the data that they receive. Like any other provider ever, they use an alternative streaming API implementation so that anyone must rewrite their streaming code to use Gnip. Well, with the code below, they could actually offer exactly the Twitter Streaming API (they'd obviously have to build the proper web serving frontend), and paying/trial users could consume it without having to re-implement their technology.
Or, say that you want to offer a Twitter-like Streaming API in your Status.net installation (or clone), or use this to perform syndication between nodes in such a network. This code would get you most of the way there.
end update
We instead use zsets for timelines, lists for message queues, and simple values for the data to filter/syndicate out.
First thing's first, let's post a status message:
def got_status(conn, status, id=None): ''' This will work until there are 2**53 ids generated, then we may get duplicate messages sent to the workers. There are some work-arounds, but they confuse the clean flow of the existing code. This function takes a Redis connection object, a status message, and an optional id. If the id is not None, the status message is assumed to be pre-dumped to json. If the id is None, the status will have a new id assigned to it, along with the current timestamp in seconds since the standard unix epoch. ''' dumped = status if id is None: id = conn.incr(ID_KEY) status['id'] = id status['created_at'] = time.time() dumped = json.dumps(status) pipeline = conn.pipeline(True) # a pipeline returns itself pipeline.zadd(QUEUE, id, id) pipeline.set(STATUS_MESSAGE%(id,), dumped) pipeline.execute() return id
This is all pretty straightforward; we're going to generate an id if it doesn't exist, set some metadata on the status message, dump it to json, then add the id itself as both a member and a score to a zset, while also adding the status message data.
Next, assuming that we've got some sort of task queue implementation available, and we've got a method of creating a new Redis connection:
def spawn_worker_and_subscribe(which, content=None, backlog=0): ''' This would be called by a web server to connect to some Redis server that is holding all of the status messages and data, yielding results as they become available. This function requires two utility functions be present: get_new_redis_connection(which): This will create or reuse a connection to some Redis server that is hosting the status messages. spawn_worker(...): This will spawn the worker() function above on some worker box somewhere, pushing matched status messages to the client via a list named sub:... ''' conn = get_new_redis_connection(which) channel = 'sub:' + os.urandom(16).encode('hex') spawn_worker(conn.hostinfo, backlog, which, content, channel) while True: result = conn.blpop(channel, timeout=60) if result in (None, '<close>'): break yield result
Notice how we're not using publish/subscribe here? Using lists will allow us to pick some backlog on potentially a per-client basis in order to stop ourselves from using too much memory. In this case, the worker can tell us explicitly that it has closed, or if we don't receive anything for 60 seconds, we're going to assume that the worker has exited and this function should exit too.
Finally, let's get to the code that does all of the work (and nasty bits of handling slow outgoing clients). This code would be started up for every call of the spawn_worker_and_subscribe() function above on some task queue worker box:
def worker(hostinfo, backlog, which, content, subscriber): ''' This worker handles the scanning of status message content against the user-requested filters. ''' criteria = None if which == 'track': # should be a comma separated list of word strings # Given: 'streamapi,streaming api' # The first will match any status with 'streamapi' as an individual # word. The second will match any status with 'streaming' and 'api' # both in the status as individual words. criteria = TrackCriteria(content) elif which == 'follow': # should be a list of @names without the @ criteria = FollowCriteria(content) elif which == 'location': # should be a list of boxes: [{'minlat':..., 'maxlat':..., ...}, ...] criteria = LocationCriteria(content) elif which == 'firehose': criteria = lambda status: True elif which == 'gardenhose': criteria = lambda status: not random.randrange(10) elif which == 'spritzer': criteria = lambda status: not random.randrange(100) elif which == 'links': criteria = lambda status: status['has_link'] conn = get_new_redis_connection(hostinfo) if criteria is None: conn.rpush(subscriber, '<close>') return # set up the backlog stuff end = 'inf' now = int(conn.get(ID_KEY) or 0) if backlog < 0: end = now now -= abs(backlog) pipeline = conn.pipeline(False) sent = 1 tossed = tossed_notified = 0 keepalive = time.time() + KEEPALIVE_TIMEOUT last_sent_keepalive = False # In Python 2.x, all ints/longs compare smaller than strings. while sent and now < end: found_match = False # get the next set of messages to check ids = conn.zrangebyscore(QUEUE, now, end, start=0, num=CHUNKSIZE) if ids: # actually pull the data for id in ids: pipeline.get(STATUS_MESSAGE%(id,)) pipeline.llen(subscriber) statuses = pipeline.execute() outgoing_backlog = statuses.pop() for data in statuses: if not data: # We weren't fast enough, someone implemented delete and # the message is gone, etc. continue result = json.loads(data) # check the criteria if criteria(result): if outgoing_backlog >= MAX_OUTGOING_BACKLOG: tossed += 1 continue # send the result to the subscriber last_sent_keepalive = False if tossed_notified != tossed: pipeline.rpush(subscriber, json.dumps({"limit":{which:tossed}})) tossed_notified = tossed outgoing_backlog += 1 found_match = True pipeline.rpush(subscriber, data) if found_match: keepalive = time.time() + KEEPALIVE_TIMEOUT sent = any(pipeline.execute()) # update the current position in the zset now = int(ids[-1]) elif end == 'inf': time.sleep(NO_MESSAGES_WAIT) else: # we have exhausted the backlog stream break curtime = time.time() if not found_match and curtime > keepalive: keepalive = curtime + KEEPALIVE_TIMEOUT should_quit = last_sent_keepalive and conn.llen(subscriber) should_quit = should_quit or conn.rpush(subscriber, '{}') >= MAX_OUTGOING_BACKLOG if should_quit: # Can't keep up even though it's been 30 seconds since we saw # a match. We'll kill the queue here, and the client will time # out on a blpop() call if it retries. conn.delete(subscriber) break last_sent_keepalive = True
This function is a bit more involved. First we set up the proper filter criteria functions/classes. Then we set our start/end conditions given the user-specified backlog of status messages to check. We then pull status messages chunk by chunk, checking for matches, and putting matches into the outgoing queues. If the outgoing queue grows too large, we stop sending messages. If we don't have any matches for 30 seconds, we send a keepalive. If we were going to send a second keepalive, and the client hasn't seen the first keepalive, or if the queue is full, we consider that a client disconnect, and bail.
There are two other utility functions, some globals, and the criteria filters that are omitted here. You can see them at this Github Gist.
One incidental benefit of using the zset to store the timeline instead of using publish/subscribe to send all status messages to all workers, is that you get "backlog" processing for free.
If you want to see more posts like this, you can buy my book, Redis in Action from Manning Publications today!
Very nice choice of font. I particularly liked the white background color.
ReplyDeleteIn got_status(), is status['id'] = status what's intended? Or should that be status['id'] = id?
ReplyDeletesuppressingfire: You are right. Thank you for the correction :)
ReplyDelete