January 20, 2010
This article was contributed by Martin Sustrik & Martin Lucina
BSD sockets have been used in thousands of applications over the years, but
they suffer from some limitations. The low-level nature of the socket API leads
developers to reimplementing the same functionality on top of sockets over and
over again. Alternatives exist in the form of various "I/O frameworks" and
"enterprise messaging systems" but both of these approaches have their own set
of drawbacks. The former are generally bound to a certain programming
languages or paradigms, while the latter tend to be bloated, proprietary
solutions with
resident daemons that hog system resources.
0MQ ("Zero-Em-Queue") is a messaging
system that tackles these issues by taking a different approach. Instead of
inventing new APIs and complex wire protocols, 0MQ extends the socket API,
eliminating the learning curve and allowing a network programmer to master it
in a couple of hours. The wire protocols are simplistic, even trivial.
Performance matches and often exceeds that of raw sockets.
A client/server example
Let's have a look at a trivial example of 0MQ usage. Say we want to
implement an SQL server. The networking part of the code-base is fairly complex;
we have to manage multiple connections in a non-blocking fashion, a pool of
worker threads is needed to send large result sets in the background allowing
the SQL engine to process new requests in the meantime, and so on.
Here's how we can accomplish this using the 0MQ C language bindings:
#include <zmq.h>
int main ()
{
void *ctx, *s;
zmq_msg_t query, resultset;
/* Initialise 0MQ context, requesting a single application thread
and a single I/O thread */
ctx = zmq_init (1, 1, 0);
/* Create a ZMQ_REP socket to receive requests and send replies */
s = zmq_socket (ctx, ZMQ_REP);
/* Bind to the TCP transport and port 5555 on the loopback interface */
zmq_bind (s, "tcp://lo:5555");
while (1) {
/* Allocate an empty message to receive a query into */
zmq_msg_init (&query);
/* Receive a message, blocks until one is available */
zmq_recv (s, &query, 0);
/* Allocate a message for sending the resultset */
zmg_msg_init (&resultset);
/* TODO: Process the query here and fill in the resultset */
/* Deallocate the query */
zmq_msg_close (&query);
/* Send back our canned response */
zmq_send (s, &resultset, 0);
zmq_msg_close (&resultset);
}
}
This example shows us several basic principles of 0MQ:
0MQ transports data as messages, represented in this example by
zmq_msg_t. 0MQ considers a message to be a discrete unit of transport,
and message data as an opaque blob. This is considered a deliberate improvement
compared to systems like CORBA with their 1000+ pages of core specification.
Developers can always use a third-party library such as Google protocol buffers
if they do not wish to write custom serialization code.
0MQ supports different socket types which are specified at
socket creation time and implement different messaging patterns. In
this example we use ZMQ_REP which stands for the replier
pattern, meaning that we wish to receive requests from many sources and send
replies back to the original requester.
zmq_init() has two arguments related to threads. The first
argument is the maximum number of application threads that will access the 0MQ
API. The second argument specifies the size of the I/O thread pool 0MQ will
create and use to retrieve and send messages in the background. For instance,
when sending a large result set the send() function will return
immediately and the actual work of pushing the data to the network will be
offloaded to an I/O thread in the background.
For the sake of brevity, error handling has been omitted in
this example. The 0MQ C binding uses standard POSIX conventions, so most
functions return 0 on success and -1 on failure, with the
actual error code being stored in errno. 0MQ also provides a
zmq_strerror() function to handle it's specific error codes.
The client code is equally simple (C++):
#include <string.h>
#include <stdio.h>
#include <zmq.hpp>
int main ()
{
try {
// Initialise 0MQ context with one application and one I/O thread
zmq::context_t ctx (1, 1);
// Create a ZMQ_REQ socket to send requests and receive replies
zmq::socket_t s (ctx, ZMQ_REQ);
// Connect it to port 5555 on localhost using the TCP transport
s.connect ("tcp://localhost:5555");
// Construct an example message containing our query
const char *query_string = "SELECT * FROM mytable";
zmq::message_t query (strlen (query_string) + 1);
memcpy (query.data (), query_string, strlen (query_string) + 1);
// Send the query
s.send (query);
// Receive the result
zmq::message_t resultset;
s.recv (&resultset);
// TODO: Process the resultset here
}
catch (std::exception &e) {
// 0MQ throws standard exceptions just like any other C++ API
printf ("An error occurred: %s\n", e.what());
return 1;
}
return 0;
}
This example uses the ZMQ_REQ socket type, which specifies the
requester messaging pattern, meaning that we wish to send requests,
which may be possibly load-balanced to all peers listening for such requests,
and we wish to receive replies to our requests. The example also nicely shows
how the 0MQ C++ language bindings map to the native language features
and allow us to use exceptions for error handling.
One socket, many endpoints: A publish/subscribe example
Now, let's have a look at another common pattern used in messaging, where
one application (the publisher) publishes a stream of messages while
other interested applications (subscribers) receive and process the
messages.
The publisher application (Java):
import org.zmq.*;
class publisherApp
{
public static void main (String [] args)
{
// Initialise 0MQ with a single application and I/O thread
org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);
// Create a PUB socket for port 5555 on the eth0 interface
org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.PUB);
s.bind ("tcp://eth0:5555");
for (;;) {
// Create a new, empty, 10 byte message
byte msg [] = new byte [10];
// TODO: Fill in the message here
// Publish our message
s.send (msg, 0);
}
}
}
The subscriber application (Python):
import libpyzmq
def main ():
# Initialise 0MQ with a single application and I/O thread
ctx = libpyzmq.Context (1, 1)
# Create a SUB socket ...
s = libpyzmq.Socket (ctx, libpyzmq.SUB)
# ... ask to subscribe to all messages ...
s.setsockopt (libpyzmq.SUBSCRIBE , "")
# ... request the tcp transport with the endpoint myserver.lan:5555
s.connect ("tcp://myserver.lan:5555")
while True:
# Receive one message
msg = s.recv ()
# TODO: Process the message here
if __name__ == "__main__":
main ()
As with our previous examples, we have deliberately omitted error handling
and processing code for the sake of brevity. Error handling in both the Java
and Python bindings is implemented using native exceptions.
These examples demonstrate the following:
Message subscriptions using the setsockopt() call.
Using this call the subscriber indicates that it is only interested in the
subset of messages sent by the publisher starting with the specified string.
For example, to subscribe to only those messages beginning with ABC we
would use the call:
s.setsockopt (libpyzmq.SUBSCRIBE , "ABC")
To boost performance 0MQ subscriptions are simple plain strings rather
than regular expressions, however, you can use them for simple prefix-style
matching where a subscription to animals. would match messages such
as animals.dogs and animals.cats.
Now, let's look at a more complex scenario such as one that is often
encountered in the stock trading business. We want to send a high volume
message feed of stock prices to multiple applications, some of which are
located on our local LAN and others which are located at branch offices
connected via slow and expensive WAN links. The message load is so high that
sending the feed to each receiver on our LAN, individually using TCP, would
exhaust our LAN bandwidth.
For the subscribers located on our local LAN, the ideal solution would be to
use a multicast transport. 0MQ supports a reliable multicast protocol known as
Pragmatic
General Multicast (PGM) which suits this purpose ideally. Many LWN
readers may
not have heard of PGM — without going into too much detail here we can say that
it's an industry standard protocol specified in RFC 3208 and implemented
mostly by proprietary messaging and operating system vendors such as Tibco, IBM,
and Microsoft. Luckily, the excellent Open Source OpenPGM implementation exists and
is used by 0MQ.
Back to our stock trading example: While using a PGM transport is fine on
our local LAN, multicast won't work too well for our overseas offices, so we
really want to be able to talk to those using plain old TCP. In terms of
code, we want
something like this to bind our sending socket to both a TCP port as
well as a multicast group:
s.bind ("tcp://eth0:5555");
s.bind ("pgm://eth0;224.0.0.1:5555");
This example shows off two major features of 0MQ. First, the ability to bind
or connect a socket to multiple endpoints by simply calling bind()
and/or connect() more than once. Second, the use of different
underlying transports for a socket. 0MQ supports several transports,
of which the most important are tcp, pgm, and inproc
(optimized for sending messages between threads within the same application).
Using the same API for in-process and remote communication allows us to easily
move computations from local worker threads to remote hosts and vice versa.
While the example above works, it results in each overseas receiver creating a
separate TCP connection to the publisher in our main office which may result in
messages being transferred multiple times over the WAN link. What we would like
instead is a proxy application that would run at each branch office (ideally
directly on it's edge router to minimize the number of network hops, thus
reducing latency to a minimum) connecting to the publisher and re-distributing
the messages on the branch office LAN via reliable multicast.
For the scenario described above we can use zmq_forwarder which is
part of the 0MQ distribution. Running zmq_forwarder with a simple XML
configuration file that describes the inbound and outbound connections
is all that is needed.
Performance
Of course, none of this is any good if your code runs slowly. Assuming
recent and well-tuned hardware, the end to end latency for transferring a small
message (64 bytes or less) on a 1GbE LAN using 0MQ is approximately 30-40
microseconds. As for throughput, when transferring a single stream of one byte
messages 0MQ achieves rates of up to 3-4 million messages per second.
Observant readers will note that achieving these throughput figures with raw
BSD sockets is impossible. The 0MQ approach is to use "message batching" (i.e.
sending messages in batches rather than one by one) thus avoiding frantic
up-and-down traversal of the networking stack. Thanks to smart batching
algorithms, message batching has almost no impact on latency.
The memory footprint of 0MQ will be particularly interesting to embedded
developers, as it is much smaller that of conventional messaging systems.
For instance on Linux/x86, the core code occupies only a couple of pages in
resident memory.
Conclusion
While 0MQ is still a young project which is evolving rapidly,
it is an interesting and powerful alternative for those who prefer a messaging
system which emphasizes simplicity, efficiency, and low footprint over the
complex bells and whistles of most current enterprise messaging systems.
Particularly noteworthy is 0MQ's extension of the well known socket APIs and
it's ambition to be "just a transport" which gets out of the way rather than
yet another bloated messaging or RPC framework.
0MQ is licensed under the LGPL and you can
download it on the project
website. API documentation is provided in the form of traditional UNIX
man pages and help can be found
on the community mailing
list or IRC channel. The
project also has a
GIT repository and is accepting
contributions
from the wider community.
As of this writing, 0MQ runs on Linux, Windows, Solaris, and many other POSIX
platforms. Language bindings available include C, C++, Common Lisp, Java,
Python, Ruby, and more. Unfortunately, packaging is lagging behind, so community
contributions in this area would be very helpful. Due to the closed nature of
stock trading systems it's hard to get a handle on actual adoption of 0MQ
in the wild, but there is one case
study available.
(
Log in to post comments)