Author: Pieter Hintjens, iMatix Corporation
Version: 1.2, revised 24 February, 2011
This is a background paper for a new project to build an open source data plant based on ØMQ. That is, a data distribution system capable of handling high volumes across a concurrent network. In this document we explain the need for such a product, and we propose an initial architecture.
The Whaleshark Diet
The Internet produces an estimated 3.75 exabytes of data per day (in February 2011), and the volume doubles every 12 months. This data presents great opportunities to those who can work with it. The challenge is to process these huge volumes rapidly and cheaply. How to make money from this data? You need to build what we'll term "whalesharks", applications capable of sifting through massive volumes of data to find profitable patterns. The real-life whaleshark sifts massive volumes of water to extract plankton, and grows to an immense size.
A typical example of a whaleshark application is one that processes data from financial markets and analyses trends. For example, movements on the earlier Asian or European markets may be used to predict movements on the US market. Such applications must get and process financial data within milliseconds, and in some cases (such as algorithmic trading done in real time), microseconds. Such systems are known as "ticker plants".
However the need to process massive amounts of data rapidly and cheaply is not exclusive to the financial markets. Tweets, weather, traffic data: the digital world produces rivers of data to those able to use them. Whalesharks can be profitable in many areas, if the cost of making them is kept low enough.
A whaleshark is at first sight not so complex. Get data, process and collect, then act on any patterns detected. The difficulty lies in keeping the cost low and performance and scalability high. The volumes of data can reach millions of events per second, and it is often necessary to process data in milliseconds or less. It thus becomes a hard problem to solve.
We believe there are two principal rules for building a competitive whaleshark:
- It must work with commodity hardware, networks, operating systems, and programming languages. If it has any dependencies on exotic technology, that will become its Achilles heel. Every technology becomes a competitive liability at some point, and must be swappable by newer, cheaper technologies with minimal impact.
- It must be economical at all stages of its lifecycle: from experimental baby whaleshark on one core to successful massive whaleshark on thousands of cores. If it can't scale linearly, it will die prematurely, overwhelmed by the exponential rise in data volumes, beaten by more efficient competitors.
Whalesharks are a common class of application but few obey these two rules. The main error that most architects make is to build the whaleshark as a single thing, connectivity and business intelligence together. For a small new whaleshark, connectivity is easy to trivialize. There are few pieces, and they don't carry much data. As the whaleshark grows, connectivity becomes a more visible challenge. There are more pieces, they come and go randomly, and volumes grow. New classes of problem, such as addressing and queueing, appear.
If the whaleshark architect has seen this before, he will understand that connectivity is in fact the key to scaling, and he will search for a ready-made solution. There are commercial solutions, and they are expensive. That won't be viable unless the whaleshark is already profitable, which creates a catch-22 situation. There are free products that can solve many of the connectivity problems, but none really ideal for whalesharks.
So most whalesharks grow on an uneasy mixture of exotic technology and custom solutions. It is a diet that works, but creates long-term dependencies with high costs. These will ultimately kill any business in an open market with real competitors.
We can list the high-level requirements of an ideal data delivery architecture - a "data plant" - that provides what a whaleshark needs, no matter what its size:
- It must carry data from a set of producers (typically a dozen to a hundred) to a set of consumers (typically several hundred to tens of thousands), where "data" means flows of small messages (typically under 1000 bytes) at high rates (typically a million or more per second). This excludes peer-to-peer protocols like Bittorrent or SIP.
- It must solve the unavoidable problems of addressing, data routing, queueing, dynamic networks, security, performance, transport, etc. This excludes simple network protocols like TCP and HTTP. The data plant must be based on a proper message passing product, or it must implement one.
- It must be cheap enough for baby whalesharks. If its costly to license, learn, or work with, baby whalesharks won't be able to use it. Only open source can be made this cheaply. This excludes all commercial proprietary solutions and all exotic network technologies.
- It must scale cheaply to any size, to tens of thousands of cores and processes, and in the future to millions of cores. Moore's Law never sleeps. Only a fully distributed architecture can scale like this. This excludes all solutions based on "messaging brokers", and excludes most if not all custom code.
- It must work with any programming language, any data, any network protocol, and any operating system. This excludes all solutions with their own data encoding, dedicated network protocol, or designed for specific programming languages or operating systems.
Looking at this list, only one option remains. That is, a free and open source peer-to-peer data plant designed from the start to be easy to use, scalable to a million cores, and technology neutral. As far as we know, no such thing exists today, which is why we propose to build one.
The Messaging Layer
The basis for a data plant is an appropriate messaging layer. Messaging - also known as "message-oriented middleware" - solves a set of connectivity problems faced by most if not all non-trivial distributed applications, of which the whalesharks are one category:
- How do we handle I/O? Does our application block, or do we handle I/O in the background? This is a key design decision. Blocking I/O creates architectures that do not scale well. But background I/O can be very hard to do right.
- How do we handle dynamic components, i.e. pieces that go away temporarily? Do we formally split components into "clients" and "servers" and mandate that servers cannot disappear? What then if we want to connect servers to servers? Do we try to reconnect every few seconds?
- How do we handle messages that we can't deliver immediately? Particularly, if we're waiting for a component to come back online. Do we discard messages, put them into a database, or into a memory queue?
- Where do we store message queues? What happens if the component reading from a queue is very slow, and causes queues to build up? What's the strategy then?
- How do we handle lost messages? Do we wait for fresh data, request a resend, or do we build some kind of reliability layer that ensures messages cannot be lost? What if that layer itself crashes?
- What if we need to use a different network transport. Say, multicast instead of TCP unicast? Do we need to rewrite the applications, or is the transport abstracted in some layer?
- How do we route messages from producers to consumers? How do we distribute data, distribute work, send replies, etc.? Do we use concepts like topics and queues or are there other models that work better?
- How do we represent a message on the wire? Should the messaging layer try to represent data or do we want to make this an external decision?
- How do we write APIs for multiple programming languages? How do we represent data so that it can be read between different CPU architectures? How do we handle network errors? How do we work on IPv6? How do we handle security?
And so on. It turns out that building reusable messaging systems is really difficult, which is why so few FOSS projects ever tried, and why commercial messaging products are complex, expensive, and fragile. Most projects that try to solve this long list of problems in a reusable way do so by inventing a new concept, the "broker", that handles addressing, routing, and queueing in a single box. Then we get a client-server protocol or a set of APIs that let applications speak to this broker.
But adding broker-centric messaging often makes things worse, not better. It means adding and managing more hardware, and a new single point of failure. A broker becomes a bottleneck, and a fragile one. If the software supports it, you can add a second, third, fourth broker and make some failover scheme. People do this. It creates more moving pieces, more complexity, more things to break.
Furthermore, broker-based does not scale to the size we need for even a modest whaleshark. What we need is something that solves the well-known problems of messaging but does it in such a simple and cheap way that it can work in any application, with close to zero cost. It should be a library that we just link with, without any other dependencies. No additional moving pieces. It should run on any OS and work with any programming language. And it should leave higher-level problems to be solved by higher-level layers.
And this is ØMQ (ZeroMQ). A small, embeddable library that solves most of the problems an application needs to become nicely elastic across a network. And which does this with as close to zero cost as is possible today. Specifically:
- Components can come and go dynamically and ØMQ will automatically reconnect. This means we can start components in any order. We can create service-oriented architectures (SOAs) where services can join and leave the network at any time.
- It handles I/O asynchronously, in background threads. These communicate with application threads using lockfree data structures, so ØMQ applications need no locks, semaphores, or other wait states.
- It queues messages automatically when needed. It does this intelligently, pushing messages to as close as possible to the receiver before queuing them.
- It has ways of dealing with over-full queues (called "high water mark"). When a queue is full, ØMQ automatically blocks senders, or throws away messages, depending on the kind of messaging we are doing (the so-called "pattern").
- It lets applications talk to each other over arbitrary transports: TCP, multicast, inproc, IPC. We do not need to change the code to use a different transport.
- It handles slow/blocked readers safely, using different strategies that depend on the messaging pattern.
- It lets us route messages using a variety of patterns such as request-reply and publish-subscribe. These patterns are how we create the topology, the structure of the network.
- It does not impose any format on messages. They are blobs of zero to gigabytes large. When we want to represent data we choose some other product on top, such as Google's protocol buffers.
- It handles network errors intelligently depending on the messaging pattern. Sometimes it retries, sometimes it tells us an operation failed.
What ØMQ's thin asynchronous messaging ultimately gives us is a "scalable elastic architecture" that lets us stretch applications across a hundred, a thousand, a million cores. Which gives us what we need as the basis for our data plant.
Defining the Models
To design the data plant we define a layered stack of models, keeping in mind our stated requirements. Each model defines a set of rules for one aspect of the design. Once learned, these rules make it easier for users to understand the whole. Regular models help to make the overall architecture simpler. The models form a knowledge stack, and we can list them in order from lowest level of abstraction to highest level of abstraction:
- Our network model. How do components connect to each other?
- Our discovery model. How do components learn each others' addresses?
- Our framing model. How do we delimit and format data into messages?
- Our routing model. How do we route messages from producers to consumers?
- Our semantics model. How do we expose the message and routing semantics to developers?
- Our security model. How do we authenticate components and control access to data?
- Our reliability model. How do we detect failures in components, and recover from them?
- Our extensibility model. How do we extend the network with custom functionality?
- Our management model. How do we manage the overall network of components?
- Our interoperability model. How do we ensure long term interoperability?
- Our organizational model. How do we design the community around the product?
The Network Model
A whaleshark is by definition a LAN application with connections to and from the outside world. External connections are bridged, that is managed by specialized applications which connect to both networks and then exchange data between them, filtering and converting as needed. The whaleshark itself runs over any suitable mix of LAN protocols (TCP, PGM, and so on) using the ØMQ network model.
The Discovery Model
ØMQ has no addressing service except DNS, which is not suitable for end-user application topologies since it is a network level service. The simplest working model for a whaleshark is a central addressing service that nodes can connect to, and query for mappings from abstract addresses.
The Framing Model
We transfer messages as opaque binary objects of any size that can be buffered in memory. Using the ØMQ semantics, messages can be multipart, typically with a routing envelope followed by data. A typical routing key would be a stock symbol or topic.
The Routing Model
The core routing concept for data is a publish-subscribe model based on "streams". While there may be tens of thousands of distinct routing keys, there may be a few dozen or hundred streams. Streams are data fan-out objects with one publisher and multiple subscribers. Rather than subscribe to specific routing keys, consumers subscribe to specific streams. Streams are thus connected in a pure directed graph from a set of sources to a set of sinks.
The Semantics Model
Nodes can do a set of operations on streams: publish a stream (announcing it, as its creator), subscribe to a stream, read a message from a stream (or any input stream), and write a message to a stream. A node may subscribe to multiple input streams, and publish zero or more output streams, doing any desired filtering, conversion, or other processing.
The Security Model
We assume a broadcast model of data distribution, i.e. there are no connected sessions and no connect-level security. Security is applied to streams on a as-needed basis. We foresee graduated security levels, since encryption and decryption have a performance cost. One model uses a central security service. Nodes then authenticate individually with this service, and receive keys via a secure connection. They then use these keys to encrypt or decrypt secure streams.
The Reliability Model
Messages are transient, and when things break, we may lose data. We do not try to recover old data, since it rapidly becomes outdated and may even be dangerous. The general strategy for reliability is to try to avoid failure by keeping things simple, and when components do break, to detect this as rapidly as we can, and then to route around the failure. In very specific cases applications may request old messages, but it is an explicit action (e.g. "replay all data since market open"). There is no single point of failure in the network, and any failover happens automatically.
The Extensibility Model
The architecture we present here is essential a framework for plugins. Every node is a plugin. We design a simple API for the four main operations that lets us write nodes in any programming language, rapidly and safely. A basic library of nodes (split, merge, log, delay, filter, …) can be reused in every whaleshark application.
The Management Model
In small whalesharks, nodes can be deployed manually. In larger architectures, we foresee a deployment service where each computer system runs a management layer that talks to a central management service across ØMQ sockets. This management network would download, configure, start, monitor, stop nodes as needed.
The Interoperability Model
To ensure that the architecture is open to all teams to participate, all wire level protocols and APIs are fully documented as free and open standards. This includes protocols for the name service, security service, management, and streams.
The Organizational Model
Where possible, layers of the data plant architecture are stand-alone projects that can attract their own communities of contributors and users. We have at least these component projects: name service, security service, reliable publish-subscribe, and deployment and management. Each should be hosted as a git project with clear and attractive guidelines for contributors.
Understanding Streams
A stream is a logical name chosen by the application architect. Nodes query the name service to translate a stream name into ØMQ endpoints. They then bind or connect publisher or subscriber sockets to these endpoints. In future data plant architectures, streams may use other ØMQ patterns such as push-pull.
The use of this logical (abstracted) addressing means that streams can move across nodes without the underlying topology changing. This may be used to create strategies for reliability, scaling, etc. For example streams might resolve to multiple redundant publishers so that subscribers are distributed between these. Or streams might resolve to a primary and backup publisher to provide a failover model.
Streams are also the basis for security. That is, when the data plant carries data that must be encrypted or otherwise protected from casual recording, it does this by encrypting specific streams. Readers who have the right decryption key can access the data, those that do not cannot.
Here is a simple example of how streams might work:
- We have a source node (aka a 'feed handler' in ticker plant terminology) that receives stock ticks from the New York Stock Exchange, reformats and filters these, applies a lightweight encryption, and publishes them as "NYSE".
- A filter node read this stream, delays it by 15 minutes, and publishes it as an unencrypted stream "NSYE-DELAYED".
- Sink nodes (real applications) subscribe to these streams, receive data, and process it.
So we have two streams registered in the name service. They resolve to ØMQ endpoints on the LAN, i.e. DNS names or IP addresses and ports. The writers bind publisher sockets to these endpoints, and the readers create ØMQ subscriber sockets and connect their sockets to these endpoints.
The result is that susbcribers connect directly to publishers as normal for a ØMQ network. However, they are ignorant of the actual network topology, which can change at any time (depending only on the name service).
Technical Designs
Technical Requirements
These are the main technical requirements for the data plant:
- Portable to at least Linux, Windows (for clients), and AIX.
- Works with at least C, C++, Java, Python, and C# (and should be easy to wrap in further languages).
- Runs over at least IPC, TCP, PGM and possibly raw UDP.
- Uses commodity network hardware (no dependencies on exotic hardware).
- Performance (capacity and latency) should be close to raw ØMQ, before message processing.
API Architecture
The data plant API follows ØMQ's approach of using a BSD-like socket API. A data plant socket wraps one or more ØMQ sockets but applications cannot access the underlying ØMQ socket directly. Applications can create their own ØMQ sockets for out-of-band work. The main API methods are:
- Bind a socket (to a stream), which registers the stream and socket endpoint with the registration server. A socket can be bound to many streams, which creates a fan-out pattern.
- Connect a socket (to a stream), which resolves the stream name and connects to the endpoint. A socket can be connected to many streams, which creates a fan-in pattern.
- Send a message to a socket, and thus to all streams that socket is bound to.
- Receive a message from a socket, and thus from all streams that socket is connected to.
Like ØMQ, the data plant API is written as an embeddable portable C/C++ library that is easy to wrap in other languages such as Java, Python, C#, etc. The data plant library depends on the ØMQ library.
Standard Filters
We foresee standard filters such as these:
- Clone one stream into two or more identical streams.
- Merge two or more streams into a single stream.
- Filter a stream based on topic pattern or other criteria.
- Log a stream, locally or remotely (over ØMQ).
- Sample a stream, taking a certain percentage of messages.
- Delay a stream for some specified time.
- Record a stream to disk for later replay.
- Conflate a stream, based on routing key or other criteria.
Authentication and access control
On a multicast bus, anyone can read data, so data security depends on encryption and out-of-band key distribution. We provide variable security levels from no security (data in the clear), to soft security (breakable with effort but deters casual snooping), and hard security (encrypted data with secure key distribution mechanisms).
We use a connectionless broadcast security model. This means encrypting messages as they are produced, and decrypting them as they are consumed. The fastest full encryption method is symmetric key, i.e. producers and consumers use the same key. We would use less secure, but faster encryption for high speed streams.
Authentication is per node, and the granularity of access control is per stream. In the typical scenario, a node authenticates and receives a set of keys, one per secured stream. Keys expire after some period, before which the node must re-authenticate. Any node can read a stream but only nodes with the right key can decrypt its data. This approach scales to any size and works over a multicast bus. Over a unicast bus, it ensures that a sender does not need to encrypt a message more than once, even if sending it to many receivers.
Detecting and recovering from failures
The data plant must run without interruption and be servicable without downtime. This means that the data plant must be able to route around software or hardware components that break, and we must be able to add and remove components dynamically, without stopping other components.
These requirements are difficult to fulfill unless we make some strong simplifying assumptions. Note that the data plant will consist of edges (sources and sinks), and a middle (nodes that process streams on the way). We assume that:
- Nodes in the middle have no state or persistence that we could not afford to lose if the node stopped working. I.e. if a box breaks, whatever it was carrying or working on will be lost, and we accept this.
- Nodes have zero knowledge of each other, only of the name service. All addressing is done indirectly via the name service. Zero topological knowledge makes a node easy to plug in and pull out.
- We create reliability through active-active redundancy. Redundancy means that if one component fails, another takes over. Active-active means that no nodes are "special".
- Failure is acceptable, if it happens sufficiently rarely, and we can recover sufficiently rapidly.
Adding custom functionality
Any realistic use case will need to plug custom business logic into the data plant. Custom business logic reads a stream, does some work, and writes output, i.e. it is a filter in the Unix sense. We can connect multiple filters together, logically:
filter | filter…
And connect this chain to input streams and output streams:
input [input…] < [ filter | filter… ] > output [output…]
In practice we'd have a mix of standard filters, and custom filters. The simplest way to create filter chains would be using an API and a programming language. The input streams correspond to connect() methods, the output streams correspond to bind() methods, and the pipes correspond to inproc sockets that connect filters. Filters would run in parallel, each as a thread, each potentially using a processor core.