Enterprise Java
ElasticMQ message replication with JGroups
ElasticMQ is a messaging server, with a Scala, Java, and an Amazon SQS-compatible interface. It supports guaranteed messaging by replicating the messages across a cluster of servers, and message persistence with journalling.
Message replication is one of the core features of ElasticMQ. However if you look at the code, it’s only a handful of classes, the longest having 76 lines (remember that this is Scala, though ;) ). That is because ElasticMQ uses JGroups as the underlying communication library. JGroups is quite old, especially for a Java library – the first release was in 1999 (!). But it’s quite far from being outdated and obsolete – it has a nice API, works without problems, has a good community; and as any Java library cooperates well with Scala.
JGroups has a lot of useful features:
- reliable multicast
- cluster management
- failure detection
- node discovery
- many years of performance improvements
which are extensively used for implementing replication in ElasticMQ. Below is a summary of how it’s done.
How does an ElasticMQ cluster work?
In a single ElasticMQ cluster one node is always the master. You can only execute operations against this node. The results of each operation are then replicated to the other members. There are a couple of options related to blocking; the replication can be fully asynchronous, or it can wait until at least one or all nodes ack the operation. To make sure that in case of cluster partitions the same messages aren’t received from different partitions, only the partition which has at least half+1 of the nodes operations is active.
The central concept in ElasticMQ is the message storage. A storage executes commands (such as a send message command, delete message command etc.). The replication layer is just a wrapper around any other storage. Note though, that we can only replicate the resulting storage mutations (so after the command is executed), not the original command itself. For example, if the command is “receive a message”, the results of executing it may be different on each machine. Hence we replicate only the change of the message visibility (in ElasticMQ, similarly to Amazon SQS, if a message is received it is blocked from a subsequent receive for a specified period of time), if receiving a message succeedes. You can see this basic logic in JGroupsReplicatedStorage.
Initializing the cluster
Before we get to the replication itself though, the first thing to do is to initialize the cluster; that is done in ReplicatedStorageConfigurator. As a parameter there, we need a JGroups configuration file, which is a stack of protocols. You don’t really need to know what each of these protocols do and what all those configuration parameters mean. The two most useful are udp.xml and tcp.xml. The first one should be used if you have multicast in your network, the second – if all communication should go through TCP (e.g. on EC2). In the latter case you will also need to provide the list of initial IPs. The list doesn’t have to be exhaustive, just a list of seeds.
Having a protocol stack, ElasticMQ creates a JChannel, and connects it, which simply means connecting to the cluster. And that’s in fact all you need to do to create a cluster with JGroups – pretty simple, right? As you can see at the end of ReplicatedStorageConfigurator, the first thing after connecting is a call to channel.getState(null, 0). This will go to the current master node (more on master election later), fetch the state (current queues and messages) and apply it on the current node (see the very simple JGroupsStateTransferMessageListener – handles both sending and receiving). There are two important things to note here. First is that this transfer doesn’t block the whole cluster from normal operation. Second is that if an operation is executed during the state transfer, it will also be replicated. So it may happen that one command is executed twice on the new node. That doesn’t matter though, as each replicated command is idempotent, so may be applied many times. In other scenarios some application-side mechanism would have to be implemented to prevent such situations.
Replicating data
Finally we get to the core: replicating the commands. On the sender side, this is handled by JGroupsReplicationMessageSender. Again, not a very complicated class. It uses the MessageDispatcher “building block” from JGroups which, apart from multicasing messages across the cluster, enables you to wait until a specified number of nodes receive it. On the receiving side, we have JGroupsRequestHandler. Again, pretty simple. When a message is received it is simply sent to the storage.
Cluster management
You may have also noticed the SetMaster special message. This is needed for the user to be able to read the current master’s node address. Master election (deciding which node is the master) is handled entirely with JGroups. There isn’t a specific algorithm in JGroups to elect a master, however we can use the fact that each node has the same cluster view, which is represented by the JGroups View class. All we need to do is simply get the first (or last, or 3rd, etc. – anything as long as it’s the same on all nodes) element from that list, and set it as the master.
Cluster view is handled by the last “core” replication class, JGroupsMembershipListener. Two things happen there. The viewAccepted callback is called whenever a new node joins or leaves the cluster; each node with the same (well, equal :) ) instance of the View class. The master broadcasts its address (which is the ElasticMQ server address, not the internal JGroups cluster-communication address) in a separate thread. It’s a very easy mistake to perform a blocking operation in one of the JGroups callback methods. You should never do that, as the whole stack can then lock. We also need the FLUSH protocol (which is always added during the cluster setup); this protocol makes sure that no new messages are sent when before the new view has been installed by all nodes, and hence we make sure that a new node always receives the master information.
The membership listener also handles cluster merges. Again, JGroups gives us a view of the partitions that were merged and the new combined view. In ElasticMQ, all partitions except the primary parition (which is the largest one) request a state transfer, just as after connecting to the cluster. That way the data is kept in a consistent state.
Summing up
It’s also worth noting that ElasticMQ’s replication is fully tested using ScalaTest. Each test creates a cluster of in-memory storages, creating new nodes or simulating node crashes. See the JGroupsReplicatedStorageTest class.
Having the mechanisms from JGroups it is pretty straightforward to implement cluster communication. As always, though, you need to remember about some traps when it comes to concurrency (e.g. that there may be cluster activity when a new node joins; that partitions and merges can happen anytime; that there’s no ordering between normal messages and cluster view changes; that messages may be sent during state transfer; etc.). However both the JGroups tutorial and manual is pretty comprehensive and given the additional help from the forums (thanks Bela!), you should be good to go.
You can try how the replication works in practice by downloading a standalone distribution of ElasticMQ or running it embedded.
Reference: Implementing message replication in ElasticMQ with JGroups from our JCG partner Adam Warski at the Blog of Adam Warski blog.