An Alternative Multi-Producer Approach
Recently on InfoQ, Aliaksei Papou posted an article on some of his experiments with high performance interchange of messages between threads. There were a number of examples within the article, but I am going to focus on the multi-producer case. One of the optimisations that the article showed was that if you knew the number of producers that you have at initialisation time you can build a structure that significantly reduces contention. The existing MultiProducerSequencer does not have this constraint, which is essential for a large number of use cases. However, I wanted to see what we could achieve if I applied this approach to this Disruptor.
The design I’m going to use is, instead of having a single Disruptor with a MultiProducerSequencer, I’m going to have a Disruptor per producer with a SingleProducerSequencer for each. In order to have all of the events channel into a single EventHandler I will need to implement a custom EventProcessor that has the capability to poll multiple SequenceBarrier/ DataProviders. This custom MultiBufferBatchEventProcesor can be seen within the com.lmax.disruptor.support package inside of the performance tests. The key component of this class is the main loop, which can be seen here:
while (true) { try { for (int i = 0; i < barrierLength; i++) { long available = barriers[i].waitFor(-1); Sequence sequence = sequences[i]; long previous = sequence.get(); for (long l = previous + 1; l <= available; l++) { handler.onEvent(providers[i].get(l), l, previous == available); } sequence.set(available); count += (available - previous); } Thread.yield(); } // Lines omitted... }
Central to this approach is that the event processor has 2 arrays passed into it’s constructor:
- DataProviders[]; the array of ring buffers to read data from.
- SequenceBarrier[]; the barriers supplied by each of the ring buffers.
From this the event processor will construct an array of Sequences that will be used to track the processed events from each ring buffer. The event loop will iterate through the sequence barriers to determine if any of the ring buffers have a data available to be read. Any available data will passed onto the user supplied event handler.
To test this I’ve created the ThreeToThreeSequencedThroughputTest that start three producers and one consumer. One of the aspects of the code supplied in the InfoQ article is that each train has a capacity of three longs and one “op” is measured as the movement of one long. To make the test more comparable I used an array of three longs as the entry within the ring buffer and multiplied the total number of events moved between threads by 3 to calculate to the total “ops”.
Test Results (Intel(R) Core(TM) i7-3770 CPU @ 3.40GHz)
Disruptor:
Run 0, Disruptor=390,738,060 ops/sec Run 1, Disruptor=387,931,034 ops/sec Run 2, Disruptor=397,058,823 ops/sec Run 3, Disruptor=394,160,583 ops/sec Run 4, Disruptor=396,767,083 ops/sec Run 5, Disruptor=394,736,842 ops/sec Run 6, Disruptor=396,767,083 ops/sec
Railway/Train:
ops/sec = 243,141,801 ops/sec = 302,695,445 ops/sec = 283,096,862 ops/sec = 273,670,298 ops/sec = 268,340,387 ops/sec = 264,802,500 ops/sec = 262,258,028
Clearly this approach has some merit. I am considering adding this to main Disruptor distribution. However, there are still a couple of design issues I need to work through first.
The sequence value passed to the handler is the value from the source ring buffer and may go up and down as the events flow through and I’m not sure if this will be an issue for users. It feels like a fairly minor thing, so hopefully I won’t need to worry about it. The event processor only supports yielding at the moment. I need to figure out how to correctly include the wait strategies as there will be some niggles with the blocking wait strategy and consumer wake up.
This is an idea that we mentioned on the mailing list some time ago and I initially wasn’t particularly interested, but given the appreciable benefit from this approach I think I will take the idea more seriously.