Friday, May 20, 2011

The Essentials Behind Building a Streaming API

The other day, the Redis mailing list was posed with a question: how would you build Twitter's streaming API using Redis? It doesn't make sense to stream everything to remote clients to filter there. And while you could do processing on your web server, that would likely severely affect the performance of every web request as every status message would be processed multiple times on every web process. Also, while you could build parts of it trivially with publish/subscribe, if any particular filter wasn't able to run fast enough to keep up, growing outgoing buffers can take down older versions of Redis. So what do we do? We don't use publish/subscribe.

Update: One comment from Reddit pointed out "I have no idea what is going on here without some context."

If you are a consumer of the Twitter Firehose, you are receiving a lot of data. You've probably got a lot of ideas of what to do with that data, and you're probably building your own APIs for filtering, processing, etc. If you are already used to using Twitter's API for streaming, filtering, etc., this gives you a way to build on top of what you already know.

One of the purposes of the Twitter Firehose is to allow 3rd parties to offer that data via other APIs. For example, Gnip offers a streaming API to re-package the data that they receive. Like any other provider ever, they use an alternative streaming API implementation so that anyone must rewrite their streaming code to use Gnip. Well, with the code below, they could actually offer exactly the Twitter Streaming API (they'd obviously have to build the proper web serving frontend), and paying/trial users could consume it without having to re-implement their technology.

Or, say that you want to offer a Twitter-like Streaming API in your installation (or clone), or use this to perform syndication between nodes in such a network. This code would get you most of the way there.

end update

We instead use zsets for timelines, lists for message queues, and simple values for the data to filter/syndicate out.

First thing's first, let's post a status message:
def got_status(conn, status, id=None):
    This will work until there are 2**53 ids generated, then we may get
    duplicate messages sent to the workers. There are some work-arounds, but
    they confuse the clean flow of the existing code.

    This function takes a Redis connection object, a status message, and an
    optional id. If the id is not None, the status message is assumed to be
    pre-dumped to json. If the id is None, the status will have a new id
    assigned to it, along with the current timestamp in seconds since the
    standard unix epoch.
    dumped = status
    if id is None:
        id = conn.incr(ID_KEY)
        status['id'] = id
        status['created_at'] = time.time()
        dumped = json.dumps(status)

    pipeline = conn.pipeline(True) # a pipeline returns itself
    pipeline.zadd(QUEUE, id, id)
    pipeline.set(STATUS_MESSAGE%(id,), dumped)
    return id

This is all pretty straightforward; we're going to generate an id if it doesn't exist, set some metadata on the status message, dump it to json, then add the id itself as both a member and a score to a zset, while also adding the status message data.

Next, assuming that we've got some sort of task queue implementation available, and we've got a method of creating a new Redis connection:
def spawn_worker_and_subscribe(which, content=None, backlog=0):
    This would be called by a web server to connect to some Redis server that
    is holding all of the status messages and data, yielding results as they
    become available.
    This function requires two utility functions be present:
        This will create or reuse a connection to some Redis server that is
        hosting the status messages.
        This will spawn the worker() function above on some worker box
        somewhere, pushing matched status messages to the client via a list
        named sub:...
    conn = get_new_redis_connection(which)
    channel = 'sub:' + os.urandom(16).encode('hex')
    spawn_worker(conn.hostinfo, backlog, which, content, channel)
    while True:
        result = conn.blpop(channel, timeout=60)
        if result in (None, '<close>'):
        yield result

Notice how we're not using publish/subscribe here? Using lists will allow us to pick some backlog on potentially a per-client basis in order to stop ourselves from using too much memory. In this case, the worker can tell us explicitly that it has closed, or if we don't receive anything for 60 seconds, we're going to assume that the worker has exited and this function should exit too.

Finally, let's get to the code that does all of the work (and nasty bits of handling slow outgoing clients). This code would be started up for every call of the spawn_worker_and_subscribe() function above on some task queue worker box:
def worker(hostinfo, backlog, which, content, subscriber):
    This worker handles the scanning of status message content against the
    user-requested filters.
    criteria = None
    if which == 'track':
        # should be a comma separated list of word strings
        # Given: 'streamapi,streaming api'
        # The first will match any status with 'streamapi' as an individual
        # word. The second will match any status with 'streaming' and 'api'
        # both in the status as individual words.
        criteria = TrackCriteria(content)
    elif which == 'follow':
        # should be a list of @names without the @
        criteria = FollowCriteria(content)
    elif which == 'location':
        # should be a list of boxes: [{'minlat':..., 'maxlat':..., ...}, ...]
        criteria = LocationCriteria(content)
    elif which == 'firehose':
        criteria = lambda status: True
    elif which == 'gardenhose':
        criteria = lambda status: not random.randrange(10)
    elif which == 'spritzer':
        criteria = lambda status: not random.randrange(100)
    elif which == 'links':
        criteria = lambda status: status['has_link']

    conn = get_new_redis_connection(hostinfo)
    if criteria is None:
        conn.rpush(subscriber, '&ltclose>')

    # set up the backlog stuff
    end = 'inf'
    now = int(conn.get(ID_KEY) or 0)
    if backlog < 0:
        end = now
    now -= abs(backlog)

    pipeline = conn.pipeline(False)
    sent = 1
    tossed = tossed_notified = 0
    keepalive = time.time() + KEEPALIVE_TIMEOUT
    last_sent_keepalive = False
    # In Python 2.x, all ints/longs compare smaller than strings.
    while sent and now < end:
        found_match = False
        # get the next set of messages to check
        ids = conn.zrangebyscore(QUEUE, now, end, start=0, num=CHUNKSIZE)
        if ids:
            # actually pull the data
            for id in ids:
            statuses = pipeline.execute()
            outgoing_backlog = statuses.pop()

            for data in statuses:
                if not data:
                    # We weren't fast enough, someone implemented delete and
                    # the message is gone, etc.
                result = json.loads(data)
                # check the criteria
                if criteria(result):
                    if outgoing_backlog >= MAX_OUTGOING_BACKLOG:
                        tossed += 1
                    # send the result to the subscriber
                    last_sent_keepalive = False
                    if tossed_notified != tossed:
                        pipeline.rpush(subscriber, json.dumps({"limit":{which:tossed}}))
                        tossed_notified = tossed
                    outgoing_backlog += 1
                    found_match = True
                    pipeline.rpush(subscriber, data)

            if found_match:
                keepalive = time.time() + KEEPALIVE_TIMEOUT
                sent = any(pipeline.execute())
            # update the current position in the zset
            now = int(ids[-1])

        elif end == 'inf':

            # we have exhausted the backlog stream

        curtime = time.time()
        if not found_match and curtime > keepalive:
            keepalive = curtime + KEEPALIVE_TIMEOUT
            should_quit = last_sent_keepalive and conn.llen(subscriber)
            should_quit = should_quit or conn.rpush(subscriber, '{}') >= MAX_OUTGOING_BACKLOG
            if should_quit:
                # Can't keep up even though it's been 30 seconds since we saw
                # a match. We'll kill the queue here, and the client will time
                # out on a blpop() call if it retries.
            last_sent_keepalive = True

This function is a bit more involved. First we set up the proper filter criteria functions/classes. Then we set our start/end conditions given the user-specified backlog of status messages to check. We then pull status messages chunk by chunk, checking for matches, and putting matches into the outgoing queues. If the outgoing queue grows too large, we stop sending messages. If we don't have any matches for 30 seconds, we send a keepalive. If we were going to send a second keepalive, and the client hasn't seen the first keepalive, or if the queue is full, we consider that a client disconnect, and bail.

There are two other utility functions, some globals, and the criteria filters that are omitted here. You can see them at this Github Gist.

One incidental benefit of using the zset to store the timeline instead of using publish/subscribe to send all status messages to all workers, is that you get "backlog" processing for free.

If you want to see more posts like this, you can buy my book, Redis in Action from Manning Publications today!


  1. Very nice choice of font. I particularly liked the white background color.

  2. In got_status(), is status['id'] = status what's intended? Or should that be status['id'] = id?

  3. suppressingfire: You are right. Thank you for the correction :)