At the beginning of the project to build a large-scale targeted multi-recipient messaging server, I opted to use the simplest, most mainstream architecture possible: put everything in a RDBMS and let each web handler run queries.
It is a bit like push email but with groups and you can edit and recall messages even after they are sent and you can schedule them and you can even make complex expressions to determine who gets the message and so on. And it had to be an online system rather than a batch system. All in all, quite a complex bit of software.
I had just finished a project making a transcoding and multi-casting video server that involved getting deep and dirty with edge-triggered asynchronous IO in C++ (the server got open-sourced as hellepoll afterwards), so I really was keen to avoid exactly that type of high-complexity solution. I didn’t think this new project warranted it.
So I opted for Python (it aligned with other teams in the company and to be honest I was willingly forced into using it) with Twisted (as Tornado was at the time very evolutionary and didn’t have all the utility DB stuff and so on). I opted for MySQL too (as that was already in use in the company and that simplified the deploy story).
You already know I’m going to tell you this was a mistake:
I had a tool choice and an architecture choice. It wasn’t a web server technically because it was talking HTTP envelopes over a sort of reverse proxy thing, but client IO aside it was very web-server-like.
Each client had an ‘inbox’ in the DB and each time a client connected those messages marked as undelivered in that inbox would be delivered. Easy!
The code was very easy to write. I like Python syntax. The prototype quickly became the production system. In small-scale tests it worked beautifully.
One fascinating part of the project is that the to-fields in messages are expressions. So I needed to write an expression tree parser, which is always a pleasing sub-project in itself. I then made it generate a horrendous yet correct SQL WHERE clause that represented the to-field exactly, so even determining the recipients of a message could run in the DB!
But small-scale tests really didn’t represent meaningful workloads in production. The truth was it was shockingly slow and it didn’t scale at all!
Python is shockingly slow and the routes for making it fast e.g. shedskin or pypy were not mature at the time (nor even now, yet). Even if we tricked it out, a Python server is lucky to be able to handle a few thousand web requests per second which meant sending a message to a million recipients could take some considerable time.
And the DB was slow. More correctly, spinning disks are slow, and you’ll prise ACID our of my dead hands.
So the system needed a rewrite. Rewrite #1.
At this stage, the single biggest problem was the DB, so what I did is move the DB into RAM and write-through to the database for persistence. Python’s excellent dictionaries, sets and the GIL (!) made it an actual pleasure to work with relational data in memory!
I care a lot about ACID so I still only wrote as quickly as the DB could keep up, but at least I wasn’t now reading from the DB. This meant that writing to the DB needed to be optimised. Luckily I hit gold there. It turns out that the speed of a DB is better measured in requests/sec rather than rows/sec. Inserting 1000 rows in one combined statement is much faster than inserting 1000 rows in 1000 individual statements. Obvious in hindsight.
That a Python server can only handle a few thousand web requests per second was less of a problem because we weren’t actually a web server. Clients would connect to a bunch of reverse HTTP servers in the cloud, and my messaging server would in turn send the messages to these servers.
The high overhead is connections themselves, and less so the number of bytes you send over them (within reason). By introducing a streaming, multiplexing protocol to each of these reverse servers we could handle orders of magnitude more clients per second.
So rewrite #1 was quite successful and still has some deployments. But this too doesn’t scale very well. The problem now becomes the whole inbox concept. If you track, ahead of time, who should get which message then you have an insidious problem:
As people install your app on their phones, they are registered with you as recipients. But when they uninstall or reset/reinstall your application, nobody tells you! The problem with working out who gets what is that the workload increases linearly with the number of subscribers ever rather than the number of active subscribers right now…
When you start having lots of dictionaries with millions of elements in a Python server things start to get a bit dicy stability- and performance-wise!
We also run into an extremely nasty bug in the Python kafka client and kudos to my colleague Jianhua for spotting it from code inspection.
It was time for rewrite #2 to deliver a system that effortless maintains millions and millions of active users.
I begun by re-evaluating platform choices. The NoSQL movement was gaining momentum and I was well read and followed the programming news closely and did some prototyping. I also had plenty of exposure to message queues and so on under my belt.
But to counterbalance this, I had become allergic to cogs. Cogs bad! When your system is a constellation of software components and servers and you stress it, all those message queues and so on start failing and you’re left with an inconsistent mess to clean up.
Another thing that was up for grabs was how clients connected. In the previous versions we had an external constraint on using these reverse servers and so on, but by this time that was no longer a requirement.
So how is rewrite #2 doing now in production? Its blazingly fast! It scales. Here’s what it is:
Its written in Java. This was a popular choice as we have lots of other servers written in Java and its a platform we know well.
A central server (shardable) tracks all the messages and talks to the push systems for various platforms like iPhone and Android.
Devices poll a second type of server that is stateless. These servers have a full set of messages and group membership but work out what messages any particular client should have each and every time that client connects. So the expensive work scales with the number of active users, not total users ever, and is trivially horizontally scalable.
All live data is kept in RAM and persisted in MySQL (still, sadly, a good deployment simplification and we now know most of its warts) and history is kept in hadoop via kafka. We are considering TokuDB there too though, and so should you for write-heavy loads now that its open-sourced.
We make extensive use of Java’s standard concurrent data structures to ensure we can be updating the world-view whilst serving clients and no client sees an inconsistent world.
The setup is obviously eventually-consistent from a client perspective but with secure cookies and sequence numbers everywhere the client can know if any server it talks to is older than the previous server it talked to and so on so we can overcome that.
We don’t just have sequence numbers in the database, we version number the database itself too. So we can roll back to a backup which will lose some transactions and clients can cope with that too even though sequence numbers get reused in such a scenario.
The big scaling problem we discovered in stress tests was obvious only in hindsight: if you want to send a million messages (in the message queue sense) between two components, you can’t just write them to a socket! Your framework - netty in our case - will nicely buffer all those bytes for you, and when the recipient disconnects because of timeouts trying to process all those incoming bytes, you run out of memory really quickly when the reconnect again sends those same bytes to a buffer and so on in a death spiral. Garbage collectors cannot save you when everything is retrying everything at full speed and writing a few billion bytes (to a buffer) takes no time at all.
To make a robust message-sending system between two components you really need an ACK system after every so many bytes. We introduced a simple throttle on the sendMessage() function that, when you’ve written more than N bytes (e.g. 1 or 10 MB) will instead send a single last message to the other side saying ‘there are more messages awaiting’. The consumer than sends a message saying ‘send me all messages newer than xyz’ and the throttle resets. Really obvious in hindsight.
Early on we used msgpack but recently we went to using DataInput/OutputStreams directly for a nice performance improvement. We also moved from the shockingly slow Apache Simple JSON to the much faster minidev one, and finally we now author most JSON responses using our own JSONWriter class for a really massive speedup. Its sad when the server is taking more time (and creating buckets of garbage whose collection hurts longer-term) doing JSON de/serialisation than anything else. Keeping things around as pre-serialised bytes than can be blindly written is a very major performance improvement for us.
But I still keep an eye out for scaling problems. Right now we have two scaling limits:
The first is very simple: there’s a real limit on how many TLS connections you can terminate even when you have dedicated hardware as we do in data-centres.
The second is something more controllable: at what point do we have more active messages or device group information than we have RAM?
After so long in C/C++, the size of Java data-types and the overhead of being an object is a staggering, sobering realisation.
We run into classic perm-gen out-of-memory problems when using String.intern() and had to use our own WeakHashMap<T,WeakReference<T» to imitate it.
Each Java String takes 50 bytes + the size of the character data itself. If Strings are created by StringTokenizer or String.substring() they point to the original character data, instead of creating a copy too. This is both super cool and super worrying. In our case we leverage that to reduce RAM.
I’d be happier if I could use bytes that I knew were UTF8 encoded at all times instead of having to expand them into strings though. Its basically doubling the RAM needed right there, although with the de-dupe of strings this is not the top memory problem.
Now you shouldn’t send 64 bit IDs as number in JSON, and our client IDs are 64 bit random numbers. We send them as hexadecimal strings and this is all right and proper. But we’ve also been using them internally and we’d be saving over 60 bytes per device in the lookup when we move to using longs internally and more-so when this enables us to adopt GNU Trove collections, because the top problem right now is just how wasteful hash tables are in Java. There are various 3rd party hash tables that are designed to address precisely this problem, and we’ll be evaluating these compact hash tables shortly.
All in all we can now run a system that can effortless push to millions upon millions of devices in a reasonable time on a shockingly small amount of hardware and its easy to distribute between data centres, reason about, and maintain! Big win for rewrite #2.
There’s no rewrite #3 on the horizon other than perhaps for the tradition of it. I’ll keep you posted ;)
↓ click the "share" button below!