Tuesday, September 20, 2011

Netflix Splitting is a silly move

Here I am in Italy on my honeymoon, and while taking a brief pass through my email looking for something important, I see an email from Netflix about the separation of their businesses into a physical DVD division (Quickster) and a streaming video division (Netflix).


From a business perspective, I understand this completely. They want to isolate that business from the streaming business so that they can optimize the businesses separately. It will help ensure profitability on both halves (incidentally, that's also part of the reason they split up billing a month or two ago, it was a telegraphing of their moves). It may also help them limit and isolate each business from licensing terms so that they can only affect one and not both.

But from the perspective of the consumer? Well, as a consumer who didn't much care that I was paying an extra $5/month for the service (I definitely get my money's worth of 2 dvds out with unlimited streaming even at $20/month), not having the sites integrated actually pisses me off. When browsing through videos, I would usually check whether the streaming version has subtitles (my wife speaks English as a 2nd language, and finds subtitles easier to follow than spoken dialog in movies, especially British films), and if not, request the DVD. Or, when streaming wasn't available, directly request the DVD.

My interaction with Netflix is defined by video availability and feaures in the two formats in which I consume content. To separate them into two different sites makes it *more difficult* for me to choose media in a reasonable way. Three years ago, I wished for a web site that knew about my memberships to all the different sites I wanted to consume from, knew what I watched, what I wanted to watch, and told me where I could get it. Netflix is going in the opposite direction.

Remember back when they tried to remove profiles (that little feature where different people in a family could have different viewing preferences, recommendations, and number of DVDs they could have out)? You know, a practically useful feature? They ultimately kept it because so many people spoke out about it. I don't know if enough people are going to recognize how awful this recent move is and complain, but it would sure be nice to be able to know whether a movie is available in DVD and streaming without visiting two sites and jumping between them.


I've heard the stories about Netflix, about how awesome their benefits, pay, etc. are. Heck, I've even received recruiting emails from them. But you know what they need? They need to hire people who tell the upper management "no". One of the greatest bits of engineer and business productivity that we've cultivated at Adly is the ability for people to say no. We have management with great vision and ideas, a founder who can get us new designs overnight, and an engineering team that can do it almost as fast. But when you move too fast, when you are only concerned about the *right now*, you lose sight of where you are going. By saying no to daily updates, we focused on longer-term goals and company progress. Netflix needs people to say no, not because they are moving too fast (as was our problem), but because they are making decisions that make their customers angry. They need an actual customer advocate working for Netflix. Someone who talks with real customers.

I know, customers don't like change. People don't like change. We saw it continually at YouTube. But there are some people, who you have to contact and cultivate, who understand change is necessary, and who will tell you that your changes are good when they are good, and suck when they suck.

Netflix, I'm willing to be that guy. I'll do it for free: this multi-site thing sucks. Stop it.

Thursday, September 15, 2011

Improving Performance by 1000x

Whenever possible at Adly, we like to use existing software to solve as many problems as we can. From PostgreSQL for our database, to syslog-ng and Flask for our logging, to Redis for search, caching, and other fast-data operations, ActiveMQ for our message broker, etc. But there are times when existing software just isn't fast enough, takes too much memory, or just isn't suited for the task. This is one of those times.


In our Adly Analytics product, we calculate a number for pairs of Twitter users called "Also Follows". For two twitter users, it is the number of followers that follow both of those users. We had tried using Redis for this directly, but we were looking at roughly 700 megs of memory to store one set of 7.5 million followers. Ugh! Straight Python did a bit better at 500 megs, but that was still far more than we could reasonably use to store the data. Combine that with Redis intersections taking multiple seconds to run, and the fact that we needed to intersect 1,000 sets against 10,000 sets every week (these were due to the nature of the kinds of updates we were performing), and we'd need to have 33 processors plus roughly 300 gigs of memory to just perform this operation fast enough. That's just too expensive (over $6k/month just for machines in EC2 for the memory).

When faced with seemingly insurmountable costs, we did what any tech-directed company would do; we wrote custom software to solve our problem. What did we do?


First, we got rid of hash tables. They are great for random lookups, but in the case for fixed-size data (we had 32 bit integers for Twitter user ids), a sorted array would be roughly 1/20th the size of the set in Redis, while offering a method for random lookup (if necessary).

When we are fetching the follower lists, we get them in chunks of 5000 at a time, unsorted. We write them to temporary files on disk while we are reading them, then when done, we read the list into memory, then sort it using the C standard library quicksort. After sorting, we scan the sequence for any duplicate values and discard them. Once we have a sorted sequence of unique integers, we write them to disk.

Once they are on disk, we signal our intersection daemon to memory map the file*, letting the operating system cache the file as necessary. When we need to perform the "also follows" calculation, we run a custom intersection algorithm that is tuned to rely on memory bandwidth, which allows us to intersect a set of 5 million and 7.5 million integers with roughly 2 million shared integers in roughly 6 milliseconds on a 3ghz Core 2 Duo (using one processor). That is roughly 2 billion integers examined every second. Compare that to Redis taking roughly 7 seconds to perform that same operation (due to the random hash table lookups, allocating and copying data, etc.), and we've improved our performance by 1000x.

Our results:
For the data we need to store in memory, we went from over 300 gigs to under 15 gigs. We also went from taking around 45 minutes to intersect 1 item against 10,000, to taking 3 seconds**.  While we still believe in using existing software as much as possible, sometimes it is just worth it to write what you need to in C/C++ to get the speed you need.

Along the way, we tried to use scipy.weave.inline to inline C into Python. Sadly, due to some threading issues, we got some nasty behavior and segfaults. I didn't dig too far into it and just wrote what we needed as a plain Python C extension. It's pretty easy.

* Memory mapped files are an interesting and crazy thing, available in Posix (Linux, BSD, OS X, etc.), Windows, and just about any other mature system out there. They allow you to open a file in such a way that you can access the contents of the file as if it were a C array. You can map the entire file or parts of the file. The operating system caches the data as necessary, and if you have multiple processes memory mapping the same file, the memory is shared between them.

** Theoretically it was dropped to 3 seconds, but due to processor caching behavior, Python overhead, etc., this is actually around 45-50 seconds. In an hour, we sustain roughly 2.4 trillion integers examined, or roughly 670 million integers intersected against each other every second on average.

If you want to see more posts like this, you can buy my book, Redis in Action from Manning Publications today!

Update: You can read why we didn't use a bloom filter in a followup post.

Friday, September 2, 2011

Building for Uptime, Expecting Failure talk now available

Way back in June, I posted an article on building Adly's real-time ad network. Well, on August 1, presented at the Los Angeles Dev Ops meetup, and I just now got it uploaded to YouTube. You can view Building for Uptime, Expecting Failure now.

The full slides are available via Google Docs.

Thursday, August 25, 2011

Followup with Time Warner

Just before lunch yesterday, I received a call from John, a representative of Time Warner Security. They had apparently found the Reddit post and my blog entry, rolled a truck to my building, and was ... less than pleased.

We had a brief but pleasant conversation in which we both agreed that if my cable were to be disconnected again, I should call him directly so that he could have the matter resolved in a more reasonable manner. He said that they had found my note, verified that in fact I was a paying customer, replaced the red tag with a green one, and that my line shouldn't be disconnected again.


Overall, while I was dissatisfied with the offered repair time when the outage originally occurred, I was pleasantly surprised by their response at my less than elegant solution. That said, I would not recommend doing what I did.

Monday, August 22, 2011

How to solve your cable/internet outage with a drill

This Sunday evening, after spending the day out and about picking up some wedding stuff, getting dinner, etc., my fiance and I came home to find that our cable and internet were out. A quick reboot of the cable modem told me that just like 2 times before, when hooking up the cable for the newly arrived neighbors, the cable guy disconnected our cable.

For some reason, the local Time Warner Cable installation guys have some bad paperwork and think that there is no paying customer in my apartment, so when they come to connect someone new, because there are only 5 jacks in a cable box for 8 apartments, they disconnect me.

In the past, this was a simple problem to remedy. I would go down to where the external box is, open it up (because no cable dude bothers to lock it), add a splitter and a splice, and I'm back up. But not this time.

This time, there was an obstacle. Seems the Time Warner Cable dude locked the cable box, forcing me to call up Time Warner Cable. The conversation went something like this...

Me: Hi, my cable and internet is out.
Customer Service: Oh, I'm sorry to hear that. Is it both your cable and internet?
Me: Yes. A new resident moved in today, and the local installer plugged them in. As has happened the last two times someone has moved in, the cable dude has disconnected my cable.
(we go back and forth about my account information)
CS: Well sir, I am very sorry to hear that. I'll go ahead and give you refund because you've had so many outages.
Me: Thank you. In the past, I've just gone down there, opened up the cable box, and reconnected the cable myself. This time, the cable guy has locked the box, putting me six inches away from where I need to be to fix the problem that he caused.
CS: We can send someone out there to get this problem resolved as soon as possible.
Me: Great, when can he be here?
CS: Well, it looks like I can get someone out there on Tuesday, how does that sound?
Me: Not sufficient. Your service guy disconnected my cable, like they have done twice before, this makes it number three.
CS: I'm sorry sir, but that's the soonest I can get him there.
Me: It will take a cable guy five minutes to show up, unlock the box, reconnect my cable, and leave. He doesn't even need to knock on my door to tell me it's done. Or, he can just unlock the box, and I'll take care of it when I get home tomorrow.
CS: Sir, everyone just needs five minutes of his time, and I can't take any of our personnel off their current calls.
Me: Well, I guess you leave me with no choice. I'll just have to drill the lock and reconnect my cable.
(a short interaction that I can't remember where the customer service dude may have suggested I not do that)
Me: You know what, you don't need to bother to send someone, I'll be taking care of this. If you can send someone tomorrow morning, I recommend you give me a call at the phone number listed on my account soon, because I'm going downstairs right now to resolve this. Thank you, and goodbye.

That was probably not the most prudent thing to do, but I was tired and annoyed that we lost our internet access for the 3rd time because their technicians have bad paperwork. In the past, I've even caught a tech disconnecting me (I was surfing the net at the time), and had them plug me back in.

So, what did I do? Exactly what I said I'd do. I pulled out my drill and drilled a lock for the first time (read farther down for my method). It went pretty well. The low-quality lock body was no match for my $5 bit set and $30 Ryobi.


Upon opening the cable box, I discovered that the cable guy was being particularly malicious, and had cut the splice that my cable had been using to connect (my cable line is about 8 inches too short to connect to any of the jacks directly, so I had been using a cable splice that some other cable guy had left in there). Thankfully, there was another piece that was long enough, but there were no more available jacks, and I was fresh out of splitters.

A trip to Home Depot (2 miles from my apartment) got me two splitters (a spare for next time :P) and a bunch of stuff we need for the wedding. A quick disconnect, some screwing in, and we were up. Going back inside, cable and internet were back on. Woo.

The updated box with note:


The note reads:
Cable guy/gal,
Apartment 3 is a paying
customer until at least
June 2012. Please
stop disconnecting me.
  Thank you :)

I'm hoping that the note and drilled out lock is sufficient to keep me connected. Time will tell.

My method for drilling a lock:
I used a 1/4" steel twist bit and drilled in the center of where the key goes (not the center of the mechanism, see the picture farther up). When I would get far enough in for the drill to vibrate a pin into the bit, the drill would sieze up and kick (I have a keyless chuck drill, so my bit would slip). I would reverse the drill, use a bit of wire to scrape the pin out of it's hole, and continue. This lock had what looked to be 8 or 9 pins, though I didn't count as I removed them.

After I had all the pins out, and could see straight through into the box, I swapped to a 5/16" wood spade bit, jammed it in the 1/4" hole, and pulled the trigger. Anything that was left in the locking mechanism (springs, etc.) were sheared off, and the mechanism spun freely.

Once my internet was up, I had a chance to check videos of the "proper" method: drill out the pins themselves, use a screwdriver to turn the mechanism while you tap the lock to get the pins to adjust enough so the lock can turn. If you fail there, you just drill out the entire lock. On the one hand, I'm a bit sad that I didn't discover the "right" way on my own. On the other hand, putting a 1/4" hole into a lock does feel really good.

Update: an update to this saga is available here.

Thursday, July 14, 2011

A Neat Python Hack? No, Broken Code

A few months ago I saw an anti-pattern that was offered by a much-loved member of the Python community. He offered it as a neat little hack, and if it worked, it would be awesome. Sadly, it does not work. The anti-pattern is available as a recipe on the Python Cookbook here, which I alter to make fully incorrect, later switch back to the original form to show that it also doesn't work, and explain why it can't work in the general case.

# known incorrect lightweight lock / critical section
ci = sys.getcheckinterval()
sys.setcheckinterval(0)
try:
    # do stuff
finally:
    sys.setcheckinterval(ci)

A cursory reading of the interactive Python help below would suggest that the inside the try block, no other Python code should be able to execute, thus offering a lightweight way of making race conditions impossible.
Help on built-in function setcheckinterval in module sys:

setcheckinterval(...)
    setcheckinterval(n)
    
    Tell the Python interpreter to check for asynchronous events every
    n instructions.  This also affects how often thread switches occur.
According to the basic docs, calling sys.setcheckinterval(0) might prevent the GIL from being released, and offer us a lightweight critical section. It doesn't. Running the following code...

import sys
import threading

counter = 0

def foo():
    global counter
    ci = sys.getcheckinterval()
    sys.setcheckinterval(0)
    try:
        for i in xrange(10000):
            counter += 1
    finally:
        sys.setcheckinterval(ci)

threads = [threading.Thread(target=foo) for i in xrange(10)]
for i in threads:
    i.setDaemon(1)
    i.start()

while threads:
    threads.pop().join()

assert counter == 100000, counter
I get AssertionErrors raised with values typically in the range of 30k-60k. Why? In the case of ints and other immutable objects, counter += 1 is really a shorthand form for counter = counter + 1. When threads switch, there is enough time to read the same counter value from multiple threads (40-70% of the time here). But why are there thread swaps in the first place? The full sys.setcheckinterval documentation tells the story:

sys.setcheckinterval(interval)
Set the interpreter’s "check interval". This integer value determines how often the interpreter checks for periodic things such as thread switches and signal handlers. The default is 100, meaning the check is performed every 100 Python virtual instructions. Setting it to a larger value may increase performance for programs using threads. Setting it to a value <= 0 checks every virtual instruction, maximizing responsiveness as well as overhead.

Okay. That doesn't work because our assumptions about sys.setcheckinterval() on boundary conditions were wrong. But maybe we can set the check interval to be high enough so the threads never swap?

import os
import sys
import threading

counter = 0
each = 10000
data = os.urandom(64).encode('zlib')
oci = sys.getcheckinterval()

def foo():
    global counter
    ci = sys.getcheckinterval()
    # using sys.maxint fails on some 64 bit versions
    sys.setcheckinterval(2**31-1)
    try:
        for i in xrange(each):
            counter += (1, data.decode('zlib'))[0]
    finally:
        sys.setcheckinterval(ci)

threads = [threading.Thread(target=foo) for i in xrange(10)]
for i in threads:
    i.setDaemon(1)
    i.start()

while threads:
    threads.pop().join()

assert counter == 10 * each and oci == sys.getcheckinterval(), \
    (counter, 10*each, sys.getcheckinterval())

When running this, I get assertion errors showing the counter typically in the range 10050 to 10060, and showing that the check interval is still 2**31-1. That means that our new counter += (1, data.decode('zlib'))[0] line, which can be expanded to counter = counter + (1, data.decode('zlib'))[0] reads the same counter value almost 90% of the time. There are more thread swaps with the huge check interval than with a check interval that says it will check and swap at every opcode!

Obviously, the magic line is counter += (1, data.encode('zlib'))[0]. The built-in zlib libraries have an interesting feature; whenever it compresses or decompresses data, it releases the GIL. This is useful in the case of threaded programming, as it allows other threads to execute without possibly blocking for a very long time - even if the check interval has not passed (which we were trying to prevent with our sys.setcheckinterval() calls). If you are using zlib (via gzip, etc.), this can allow you to use multiple processors for compression/decompression, but it can also result in race conditions, as I've shown.

In the standard library, there are a handful of C extension modules that do similar "release the GIL to let other threads run", which include (but are not limited to) some of my favorites: bz2, select, socket, and time. Not all of the operations in those libraries will force a thread switch like zlib compression, but you also can't really rely on any module to not swap threads on you.

In Python 3.x, sys.getcheckinterval()/sys.setcheckinterval() have been deprecated in favor of sys.getswitchinterval()/sys.setswitchinterval(), the latter of which takes the desired number of seconds between switch times. Don't let this fool you, it has the exact same issues as the just discussed Python 2.3+ sys.getcheckinterval()/sys.setcheckinterval() calls.


What are some lessons we can learn here?
  1. Always check before using anyone's quick hacks.
  2. If you want to make your system more or less responsive, use sys.setcheckinterval().
  3. If you want to execute code atomically, use a real lock or semaphore available in the thread or threading modules.


If you want to see more posts like this, you can buy my book, Redis in Action from Manning Publications today!

Friday, July 8, 2011

Building Adly's Twitter Analytics


If you are a regular reader of my blog, you will know that since joining Adly, I've been a little busy... Together, we have designed and built a real-time search platform, a content and location targeted ad network, and this morning we announced the public release of our Twitter Audience Analytics Platform, Adly Analytics.

This post will discuss some of the tools and methods that we used to pull and process the data to turn it into what it is today. As discussed in our press release and on TechCrunch, we have 8 tabs (12 views) worth of information that represent some of the ways that Adly influencers can view their audience data: Top Mentions, Also Follows, Top Followers, Follower Growth, Gender, City, State, and Country. In each section below, I will describe some of the techniques we use to gather and process this information.

Basic Overview

Before I dig into each of the specific tabs, I wanted to give a brief overview of the technology we use to make everything happen. There will be more detail to come on this front, and I am in the process of writing some technical whitepapers, but for now here is the big picture.

The main technical tool at our disposal is our custom Twitter Spider. Similar in a sense to GoogleBot that crawls much of the web, our spider crawls different parts of Twitter. On the more technical side of things, our spider communicates with Twitter's servers using their API.

Each different kind of data that we fetch requires a different type of spider, and each type of data is stored in one or more different formats. The underlying technology is actually very straightforward; we use ActiveMQ as a coarse-grained task queue (one of our senior engineers, Eric Van Dewoestine, who is behind Eclim, wrote our custom Python bindings about a year ago for our Ad Network), Redis as our general high-speed data store, and a custom task processor written in Python to spider, process, and store the data back into Redis.

Let's look at a few of the tabs (you can see them all on Adly.com/Analytics):

Top Mentions

The first tab, Top Mentions, is intended as a way to allow you to discover who are the most influential people that are @mentioning or retweeting you. We pull this information direct from Twitter and filter it to only represent those most influential people who are already interacting with you.

Follower Growth

The data that is behind Follower Growth is used as part of many other parts of our system. Generally, any time we receive user information from Twitter (sometimes we get it as part of the call, like in the case for Top Mentions), we check to see if the information is for a user that we have determined to be influential (in the Adly network, is from a set of the most influential Twitter users, etc.). If it is, we update the current count for their number of followers, and place that user in a list of users whose followers we want to pull. Over time, we will fetch the full listing of followers for everyone we have determined to be influential, and combine all of these lists to find new users whose information we do not yet have. Some of these users will then be influential, thus helping us to further develop our listing of influential people, find more followers, etc.

Also Follows

Once we have the full listing of followers for any two influencers, we can calculate how many followers they share. For example, 24% of @JetBlue's followers also follow @50Cent, but only 8% of @50Cent's followers follow @JetBlue. Then again, @50Cent has over 12 times the number of followers, so his followers do tend to follow @JetBlue far more than is typical. This gives both @JetBlue and @50Cent the opportunity to discover brands and influencers that have something in common with themselves.


Top Followers

Like Also Follows, since we have the full listing of followers on hand, and because we also have the public user information for all of those followers, we can easily determine things like Justin Bieber (@justinbieber) and Ashton Kutcher (@aplusk) are @charliesheen's two biggest followers. This is useful to help discover the biggest influencers that are interested in what you say, and who you may want to start interacting with.

Gender, City, State, and Country

Like Top Followers, again we have the full listing of followers on hand, but we've pre-processed the user information to determine the gender and location of Twitter users around the world. We use this information to give both an individual "62% of Kim Kardashian's followers are women" or "6.8% of Fox News' followers are in Texas", as well as "Kim Kardashian has 1.3 times the number of women following her than expected" or "Charlie Sheen has 3.2 times the number of followers in Ireland than expected".

For instance, @SnoopDogg has one of the most diverse and globally representative followings we've ever seen on Twitter. He has fans all around the world, and in many cities and states one might not expect to find a lot of Twitter users.

Beyond interesting, this is useful Business Intelligence for Snoop and his managers. Understanding who his mega fans are, and where they are most conventrated can be helpful in planning tours, personal appearances, PR and more to really bring his Twitter following to life in a meaningful way.



Anyway, I am actually headed out for some much needed R&R, so will come back to this topic later with more technical discussion and some graphics to illustrate how we created the new service.

If you want to check it out for yourself, leave a comment below with your Twitter @name, and I will be sure to get you in the queue.

Tuesday, July 5, 2011

Port Forwarding: Just use SSH

This post is going to deviate a bit from what I normally post about. Normally, I talk about building software and systems. I include code snippets, rationale for why we made a particular design decision, etc. This post is going to be different.


A few nights ago, I decided it was time for to have my own personal server in the cloud. A remotely accessible server for personal projects. For the last couple years, I'd been building personal projects with the AppEngine SDK (using Python, obviously), but constantly running into limitations in terms of data store queries, transactions, types of data to cache, etc. Having a server just allows me to put the software in the cloud that I need. Things like Redis (of which I am fond), a standard SQL server (PostgreSQL is my preferred DB, but MySQL works very well too), or if I feel like dog-fooding and the kinds of queries that a non-relational store offers YogaTable is a great little tool.

This leads me to a 2 hour battle I had the other night, culminating in a face-palm moment. As some of you may or may not know, I use Windows as my desktop operating system. It's a personal choice that is the result of a lot of frustration with both OS X (beachballs of death, multi-minute system hangs, bad external monitor behavior, visually pretty but poor quality software, ...) and with Linux as a desktop (I shouldn't have to spend time troubleshooting a kernel update that disables my wireless card weeks after the kernel update). On the other hand, I love Linux as a server, especially when someone else has already configured system images that work perfectly in the cloud. This leaves me in a bit of a bind.

For my professional work at Ad.ly, I've actually got a physical machine sitting under my desk at our office running Linux (same version as we run in production). To access it, I have a local VM (which was originally my development environment) that I use as my ssh tunneler; either directly to it's IP when I'm at work, or to it's VPN IP when I'm at home (using the convenient aliases "work" and "home", respectively). It forwards an ssh port (for FreeNX typically), a development http port, and a Samba port. A few nights ago when trying to set up a secondary set of tunnels to the new machine, I decided it was time to reboot my local VM (there were a few security updates). After the reboot, it would seem that whatever incantation that made the Samba port forward work (which is on port 139), was no longer working.

Now, instead of needing one more set of port forwards, I now needed the original set again. After mucking about for 2 hours with netcat, inetd (which doesn't allow different servers on different IPs), iptables, and authbind, I realized that my issue (I needed to forward port 139 on a few different IP addresses to different destinations) could be easily handled with a root local forward:
$ sudo ssh -L 192.168.a.b:139:192.168.a.b:1139 \
           -L 192.168.a.c:139:192.168.a.c:1139 \
           ... localhost
Then I just need to forward the proper ports 1139 to the destination boxes, which is where the facepalm comes in. I'd struggled with three of those four methods when I originally set up the box (the new one is authbind, which didn't let me bind port 139 as non-root), and I keep forgetting that for "forward port X on host A to port Y on host B", even with A and B being the same host, SSH is generally the simplest path to success. Not netcat, not iptables, and not inetd. Lesson re-learned.

Thursday, June 9, 2011

Building an Ad Network Ready for Failure

A little over a year ago in May of 2010, I was tasked with the job of creating a self-serve location-targeted ad network for distributing ads to web, desktop, and mobile applications that typically displayed Twitter and Facebook status messages. This post will describe the architecture and systems that our team put together for ad network that ran 24/7 for over 6 months, serving up to 200 ad calls per second, targeting under 200ms response time, despite network hiccups and VPS failures, and how you can do the same with your architecture.

Our Rambo Architecture

Before we had started on our adventure to build an ad network, we had suffered hiccups with Amazon's EC2 systems. While most of the time the VPSes that we had created months prior had suffered no issues, occasionally we would have a VPS disappear from the network for some time, EBS volumes would hang on us, EBS volumes would be unable to be re-assigned to another VPS, VPS instances would be stuck in a state of being restarted, all of the things that people have come to expect from EC2. Given these intermittent failures, we understood that our entire system needed to be able to recover from a hang/failure of just about any set of machines. Some months after running the system, we read about Neflix' experience, their Rambo Architecture, and their use of Chaos Monkey. We never bothered to implement a Chaos Monkey.

Load Balancing

Initially, we had used Apache as a load balancer behind an Amazon elastic IP. Before switching live, we decided to go with HAProxy due to it's far better performance characteristics, it's ability to detect slow/failing hosts, and it's ability to send traffic to different hosts based on weights (useful for testing a new configuration, pushing more to beefier hosts, etc.). The *only* unfortunate thing about HAProxy was that when you reloaded the configuration (to change host weights, etc.), all counters were reset. Being able to pass SIGHUP to refresh the configuration would have been nice. Other than that minor nit, HAProxy was phenominal.

Worker Machines

After running through our expected memory use on each of our worker machines (I will describe their configurations in a bit), we determined that we could get by with a base of 750 megs + 300 megs/processor for main memory to max out the throughput on any EC2 instance. We were later able to reduce this to 450 + 300/processor, but either way, our ad serving scaled (effectively) linearly as a function of processor speed and number. In EC2, this meant that we could stick with Amazon's Hi-CPU Medium VPS instances, which offered the best bang for the $ in terms of performance. We had considered the beefier processors in Hi-Memory instances, but we had literally zero use for the extra memory. If our service took off, we had planned on moving to fewer High CPU Extra Large instances, but for the short term, we stuck with the flexibility and experimentation opportunities that Medium instances offered (thanks to HAProxy's ability to send different traffic volumes to different hosts).

The actual software stack running on each box was relatively slim: Redis (ad targeting and db cache), ActiveMQ (task queues), Apache + mod_wsgi + Python (to serve the ads and interact with everything), and a custom Geocode server written in Python (for handling ip -> location, lat,lon -> location, city/state/country -> location).

Software Stack

The geocoding server was the biggest memory user of them all, starting out at 600 megs when we first pushed everything out. Subsequent refinements on how data was stored dropped that by 300 megs. This process was, in my opinion, the most interesting of the technology that we developed for the ad targeting system, as it was able to take a latitude,longitude pair and determine what country, what state, and what zipcode that location was in (as applicable) in under 1 ms, yet do so in 300 megs of memory using pure Python. It had a simple threaded HTTP server interface. Each VPS ran an instance to minimize round-trip time and to minimize dependencies on other machines.

We had originally run with RabbitMQ, but after running for a week or two, ran into the same crashing bug that Reddit ran into. Our switch to ActiveMQ took a little work (ActiveMQ wasn't disconnecting some of it's sockets, forcing us to write a Twisted server to act as a broker to minimize connections, but one of our engineers sent a patch upstream to fix ActiveMQ), but we've been pretty happy with it. In addition to the queue, we also had a task processor written in Python that processed impressions and clicks, updating the database and the ad index as necessary if/when an ad ran out of money.

Our Apache + mod_wsgi + Python server mostly acted as a broker between encoding/decoding requests/reponses, ad calls to Redis, cache pulls from Redis, geocoding requests, and logging results. This is where we scaled/processor, and the memory use was primarily the result of running so many threads to maximize IO between our servers and Redis. For a brief time, we were also performing content analysis for further targeting (simple bayesian categorization across 25 categories, matched against pre-categorized ads, calculated in Python). This was consistently the slowest part of our calls, amounting for ~40ms, which we later dropped to 5ms with a little bit of numpy. Sadly, we found that content analysis was less effective for the bottom line than just paying attention to a calculated CPM on individual ads (calculated via CPC * CTR), so we tossed it for the sake of simplicity.

The real workhorse of our ad targeting platform was Redis. Each box slaved from a master Redis, and on failure of the master (which happened once), a couple "slaveof" calls got us back on track after the creation of a new master. A combination of set unions/intersections with algorithmically updated targeting parameters (this is where experimentation in our setup was useful) gave us a 1 round-trip ad targeting call for arbitrary targeting parameters. The 1 round-trip thing may not seem important, but our internal latency was dominated by network round-trips in EC2. The targeting was similar in concept to the search engine example I described last year, but had quite a bit more thought regarding ad targeting. It relied on the fact that you can write to Redis slaves without affecting the master or other slaves. Cute and effective. On the Python side of things, I optimized the redis-py client we were using for a 2-3x speedup in network IO for the ad targeting results.

The master Redis was merely Redis that was populated from the database by a simple script, and kept up-to-date by the various distributed tasks processes, which syndicated off to the slaves automatically.

After Responding to a Request

After an ad call was completed, an impression tracking pixel was requested, or a click occurred, we would throw a message into our task queue to report on what happened. No database connection was ever established from the web server, and the web server only ever made requests to local resources. The task server would connect to the database, update rows there, handle logging (initially to the database, then briefly to mongodb, and finally to syslog-ng, which we use today for everything), and update the Redis master as necessary. In the case of database or Redis master failure, the task queue would merely stop processing the tasks, batching them up for later.

The Weak Points

Looking at this setup, any individual VPS could go down except for the load balancer and all other parts would continue to work. Early on we had experimented with Amazons load balancer, but found out that it wouldn't point to an elastic IP (I can't remember why this was important at the time), so we used a VPS with HAProxy. Thankfully, the load balancer VPS never went down, and we had a hot spare ready to go with an elastic IP update.

Any worker box could go down, and it wouldn't effect serving except for a small set of requests being dropped. Our Redis master or master DB could even go down, and some ads may/may not be served when they should/shouldn't. We did lose the Redis master once, due to a network hiccup (which caused replication to one of the slaves to hang, blow up memory use on the master, and subsequently get the master killed by the Linux OOM killer). But this caused zero downtime, zero loss of $, and was useful in testing our emergency preparedness. We had some API workers go down on occasion due to EBS latency issues, but we always had 2-3x the number necessary to serve the load.

Our true weakness was our use of PostgreSQL 8.4 in a single-server setup. Our write load (even with API calls coming in at a high rate) was low enough to not place much of a load on our database, so we were never felt pressured to switch to something with more built-in options (PostgreSQL 9 came out about 3-4 months after we had started running the system). But really, for this particular data, using Amazon's RDS with multi-AZ would have been the right thing to do.

Where is it now?

After 6 months of development, getting application developers to start using our API, getting reasonable traffic, getting repeat ad spends, etc., we determined that the particular market segment we were going for was not large enough to validate continuing to run the network at the level of hardware and personnel support necessary for it to be on 24/7, so we decided to end-of-life the product.

I'll not lie, it was a little painful at first (coincidentally, a project in which I was the only developer, YouTube Groups, had been end-of-lifed a couple weeks before), but we all learned quite a bit about Amazon as a platform, and my team managed to build a great product.

If you want to see more posts like this, you can buy my book, Redis in Action from Manning Publications today!

Friday, May 20, 2011

The Essentials Behind Building a Streaming API

The other day, the Redis mailing list was posed with a question: how would you build Twitter's streaming API using Redis? It doesn't make sense to stream everything to remote clients to filter there. And while you could do processing on your web server, that would likely severely affect the performance of every web request as every status message would be processed multiple times on every web process. Also, while you could build parts of it trivially with publish/subscribe, if any particular filter wasn't able to run fast enough to keep up, growing outgoing buffers can take down older versions of Redis. So what do we do? We don't use publish/subscribe.

Update: One comment from Reddit pointed out "I have no idea what is going on here without some context."

If you are a consumer of the Twitter Firehose, you are receiving a lot of data. You've probably got a lot of ideas of what to do with that data, and you're probably building your own APIs for filtering, processing, etc. If you are already used to using Twitter's API for streaming, filtering, etc., this gives you a way to build on top of what you already know.

One of the purposes of the Twitter Firehose is to allow 3rd parties to offer that data via other APIs. For example, Gnip offers a streaming API to re-package the data that they receive. Like any other provider ever, they use an alternative streaming API implementation so that anyone must rewrite their streaming code to use Gnip. Well, with the code below, they could actually offer exactly the Twitter Streaming API (they'd obviously have to build the proper web serving frontend), and paying/trial users could consume it without having to re-implement their technology.

Or, say that you want to offer a Twitter-like Streaming API in your Status.net installation (or clone), or use this to perform syndication between nodes in such a network. This code would get you most of the way there.

end update

We instead use zsets for timelines, lists for message queues, and simple values for the data to filter/syndicate out.

First thing's first, let's post a status message:
def got_status(conn, status, id=None):
    '''
    This will work until there are 2**53 ids generated, then we may get
    duplicate messages sent to the workers. There are some work-arounds, but
    they confuse the clean flow of the existing code.

    This function takes a Redis connection object, a status message, and an
    optional id. If the id is not None, the status message is assumed to be
    pre-dumped to json. If the id is None, the status will have a new id
    assigned to it, along with the current timestamp in seconds since the
    standard unix epoch.
    '''
    dumped = status
    if id is None:
        id = conn.incr(ID_KEY)
        status['id'] = id
        status['created_at'] = time.time()
        dumped = json.dumps(status)

    pipeline = conn.pipeline(True) # a pipeline returns itself
    pipeline.zadd(QUEUE, id, id)
    pipeline.set(STATUS_MESSAGE%(id,), dumped)
    pipeline.execute()
    return id

This is all pretty straightforward; we're going to generate an id if it doesn't exist, set some metadata on the status message, dump it to json, then add the id itself as both a member and a score to a zset, while also adding the status message data.

Next, assuming that we've got some sort of task queue implementation available, and we've got a method of creating a new Redis connection:
def spawn_worker_and_subscribe(which, content=None, backlog=0):
    '''
    This would be called by a web server to connect to some Redis server that
    is holding all of the status messages and data, yielding results as they
    become available.
    
    This function requires two utility functions be present:
    get_new_redis_connection(which):
        This will create or reuse a connection to some Redis server that is
        hosting the status messages.
    spawn_worker(...):
        This will spawn the worker() function above on some worker box
        somewhere, pushing matched status messages to the client via a list
        named sub:...
    '''
    conn = get_new_redis_connection(which)
    channel = 'sub:' + os.urandom(16).encode('hex')
    spawn_worker(conn.hostinfo, backlog, which, content, channel)
    
    while True:
        result = conn.blpop(channel, timeout=60)
        if result in (None, '<close>'):
            break
        yield result

Notice how we're not using publish/subscribe here? Using lists will allow us to pick some backlog on potentially a per-client basis in order to stop ourselves from using too much memory. In this case, the worker can tell us explicitly that it has closed, or if we don't receive anything for 60 seconds, we're going to assume that the worker has exited and this function should exit too.

Finally, let's get to the code that does all of the work (and nasty bits of handling slow outgoing clients). This code would be started up for every call of the spawn_worker_and_subscribe() function above on some task queue worker box:
def worker(hostinfo, backlog, which, content, subscriber):
    '''
    This worker handles the scanning of status message content against the
    user-requested filters.
    '''
    criteria = None
    if which == 'track':
        # should be a comma separated list of word strings
        # Given: 'streamapi,streaming api'
        # The first will match any status with 'streamapi' as an individual
        # word. The second will match any status with 'streaming' and 'api'
        # both in the status as individual words.
        criteria = TrackCriteria(content)
    elif which == 'follow':
        # should be a list of @names without the @
        criteria = FollowCriteria(content)
    elif which == 'location':
        # should be a list of boxes: [{'minlat':..., 'maxlat':..., ...}, ...]
        criteria = LocationCriteria(content)
    elif which == 'firehose':
        criteria = lambda status: True
    elif which == 'gardenhose':
        criteria = lambda status: not random.randrange(10)
    elif which == 'spritzer':
        criteria = lambda status: not random.randrange(100)
    elif which == 'links':
        criteria = lambda status: status['has_link']

    conn = get_new_redis_connection(hostinfo)
    if criteria is None:
        conn.rpush(subscriber, '&ltclose>')
        return

    # set up the backlog stuff
    end = 'inf'
    now = int(conn.get(ID_KEY) or 0)
    if backlog < 0:
        end = now
    now -= abs(backlog)

    pipeline = conn.pipeline(False)
    sent = 1
    tossed = tossed_notified = 0
    keepalive = time.time() + KEEPALIVE_TIMEOUT
    last_sent_keepalive = False
    # In Python 2.x, all ints/longs compare smaller than strings.
    while sent and now < end:
        found_match = False
        # get the next set of messages to check
        ids = conn.zrangebyscore(QUEUE, now, end, start=0, num=CHUNKSIZE)
        if ids:
            # actually pull the data
            for id in ids:
                pipeline.get(STATUS_MESSAGE%(id,))
            pipeline.llen(subscriber)
            statuses = pipeline.execute()
            outgoing_backlog = statuses.pop()

            for data in statuses:
                if not data:
                    # We weren't fast enough, someone implemented delete and
                    # the message is gone, etc.
                    continue
                result = json.loads(data)
                # check the criteria
                if criteria(result):
                    if outgoing_backlog >= MAX_OUTGOING_BACKLOG:
                        tossed += 1
                        continue
                    # send the result to the subscriber
                    last_sent_keepalive = False
                    if tossed_notified != tossed:
                        pipeline.rpush(subscriber, json.dumps({"limit":{which:tossed}}))
                        tossed_notified = tossed
                    outgoing_backlog += 1
                    found_match = True
                    pipeline.rpush(subscriber, data)

            if found_match:
                keepalive = time.time() + KEEPALIVE_TIMEOUT
                sent = any(pipeline.execute())
            # update the current position in the zset
            now = int(ids[-1])

        elif end == 'inf':
            time.sleep(NO_MESSAGES_WAIT)

        else:
            # we have exhausted the backlog stream
            break

        curtime = time.time()
        if not found_match and curtime > keepalive:
            keepalive = curtime + KEEPALIVE_TIMEOUT
            should_quit = last_sent_keepalive and conn.llen(subscriber)
            should_quit = should_quit or conn.rpush(subscriber, '{}') >= MAX_OUTGOING_BACKLOG
            if should_quit:
                # Can't keep up even though it's been 30 seconds since we saw
                # a match. We'll kill the queue here, and the client will time
                # out on a blpop() call if it retries.
                conn.delete(subscriber)
                break
            last_sent_keepalive = True

This function is a bit more involved. First we set up the proper filter criteria functions/classes. Then we set our start/end conditions given the user-specified backlog of status messages to check. We then pull status messages chunk by chunk, checking for matches, and putting matches into the outgoing queues. If the outgoing queue grows too large, we stop sending messages. If we don't have any matches for 30 seconds, we send a keepalive. If we were going to send a second keepalive, and the client hasn't seen the first keepalive, or if the queue is full, we consider that a client disconnect, and bail.

There are two other utility functions, some globals, and the criteria filters that are omitted here. You can see them at this Github Gist.

One incidental benefit of using the zset to store the timeline instead of using publish/subscribe to send all status messages to all workers, is that you get "backlog" processing for free.

If you want to see more posts like this, you can buy my book, Redis in Action from Manning Publications today!

Tuesday, February 15, 2011

Some Redis Use-cases

About 6 months ago, the organizer of the LA NoSQL meetup was looking for presenters. Since my coworkers and I had been using Redis fairly heavily for a few months, I offered to do a presentation on Redis. Sadly, that presentation never happened, as the event was delayed and then cancelled for one reason or another. Because I've had the slides, and because I think the information is still useful, I thought I would reformat and rewrite it as a blog post.

What is Redis? From redis.io:
Redis is an open source, advanced key-value store. It is often referred to as a data structure server since keys can contain strings, hashes, lists, sets and sorted sets.
For those of you who are familiar with memcached, it's sort of like that, but more. That is, while you can only store strings/integers with memcached, you can store a few pre-defined data structures with Redis, which are optionally persisted to disk.

Let's start with the basic strings/integers. You can use strings as a serialized cached database row, maybe session storage, a resettable counter (incr with getset), or using setnx for distributed locks. Really, anything that you might have used memcached for, you can use Redis for.

Many people have found Redis lists to be useful as a simple fifo work queue (with the ability to insert/pop from either end, move items from one list to another atomically, limit list length, etc.). Lists can also be the source (and are always the result of when using the STORE option) of a sort call, which by itself can simply be the input keys, or even automatically pull results from string keys or hashes.

Simple 0/1 queue:
def add_work(item):
    rconn.lpush('work-queue', item)

def get_work():
    return rconn.rpop('work-queue')

def retry(item):
    rconn.rpush('work-queue', item)

There is also the set datatype, which has all of the common union, intersection, and difference operations available across set keys. Common use-cases include de-duplicating items for work queues, keeping 'classes' of items (rather than keeping a user:id:sex -> m, you can use user:sex:m -> {id1, id2, ...}), or even as a set of documents for a Redis-backed search engine.

More complex 1+ queue:
def add_work(item):
    rconn.lpush('work-queue', item)

def get_work():
    return rconn.rpoplpush('work-queue', 'in-progress')

def done(item)
    rconn.sadd('done', item)

def retry_failed():
    while rconn.llen('in-progress'):
        check = rconn.lindex('in-progress', -1)
        if not rconn.sismember('done', check):
            rconn.rpoplpush('in-progress', 'work-queue')
        else:
            rconn.srem('done', rconn.rpop('in-progress'))

Another very useful datatype in Redis is the hash. In Redis, hashes are string keys to string or integer values. Useful as a way of gathering similar kinds of data together, a hash can store a row from a database table with each column an entry, which allows for sorting and retrieval via sort and the various hash access methods. Add in the ability to increment/decrement columns in hashes, pull full hashes, etc., the existence of an object/model mapper, and Redis can be easily replace many uses of a traditional SQL database. Throw in Jak Sprat's Alchemy Database, which adds a SQL layer with Lua scripting inside Redis, and for small data sets, a Redis solution may be all you need.

Ad-hoc data sorts:
def insert(data):
    rconn.sadd('known-ids', data['id'])
    rconn.hmset('data:%s'%(data['id'],), data)

def sort_fetch(column, desc=True, num=10):
    results = rconn.sort('known-ids', start=0, num=num, desc=desc, by='data:*->%s'%(column,))
    p = rconn.pipeline(False)
    map(p.hgetall, ['data:'+id for id in results])
    return p.execute()

For those use-cases where having a sortable score over unique items is useful, Redis has the zset or sorted set data type, where each member in the set also has an associated float/double score, which produces an ordering over all keys in the sorted set, and which you can query by member, score, or rank. Some common use cases include priority queues, tag clouds, timeouts, rate limiting, and Redis-backed scored search engine.

Rate limiting:
def can_use(key, count, limit, timeout):
    if rconn.zrank('reset', key) == None:
      pipe.zadd('reset', key, time.time() + timeout)
    pipe.hincrby('counts', key, count)
    return pipe.execute()[-1] <= limit

def reset_counters():
    default = ((None, None),)
    key, ts = (rconn.zrange('reset', 0, 0, withscores=True) or default)[0]
    while ts is not None and ts <= time.time():
      pipe.zrem('reset', ts)
      pipe.hdel('counts', ts)
      pipe.zrange('reset', 0, 0, withscores=True) 
      key, ts = (pipe.execute()[-1] or default)[0]
Redis does support key expiration, but in pre-Redis 2.2 versions, key expiration can have confusing behavior. Use the following to manually expire keys... Manual key expiration:
def set_expire(key, timeout):
    rconn.zadd('expire', key, time.time()+timeout)

def expire_keys():
    p = rconn.pipeline(True)
    for key in rconn.zrangebyscore('expire', 0, time.time()-1):
        p.delete(key)
        p.zrem('expire', key)
    p.execute()
With these simple ideas and structures, even more complex behavior can be defined. Things like per-user prioritized queues, counting semaphores (for limiting worker counts in this case), per-page/site recent viewer lists, finding jobs that a user has the skills to perform (aka the pizza topping problem), navigation trees, and many more.

If you want to see more posts like this, you can buy my book, Redis in Action from Manning Publications today!