Microservices in the Chronicle world – Part 2
In this part we look at turning a component into a service.
In Part 1, we looked at how we can easily create and test components which expect asynchronous messages in and produce asynchronous messages out. However, how do we turn this into a service?
Turning our components into a service.
The key thing which is missing from our components is a transport. A lack of a transport simplifies testing, profiling and debugging, but we need to distribute our components, and for this we need a transport.
There is a wide range of possible transports available:
- Chronicle Queue
- Raw TCP messages or UDP packets with a library like Netty
- JMS Messaging
- REST API[4]
- Database tables via JDBC
- Files dropped into a directory and use the directory WatchService
- a thread safe Queue, like BlockingQueue
- shared memory
- Apache Aries for pluggable transports
- no transport at all (method calls are fine for a given use case)
It is the Chronicle Queue we will be looking at in the post.
Using queue in a unit test
Chronicle Queue is persisted, however in unit tests you usually want to start fresh and remove the queue afterwards. The idiom you can use is as follows;
Creating a temporary Queue
File queuePath = new File(OS.TARGET, "testName-" + System.nanoTime()); try { try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(queuePath).build()) { // use the queue } } finally { IOTools.shallowDeleteDirWithFiles(queuePath); }
This creates a queue which is stored in a single file. The file is rolled daily by default and includes the current date in the path.
Writing events
If we repeat our test as before, instead of using a mock listener, we can use a listener which writes each method called to a queue:
Write methods called to a queue for either interface
OrderIdeaListener orderManager = queue.createAppender() .methodWriter(OrderIdeaListener.class, MarketDataListener.class);
Our combiner writes to this queue, as does our test:
The SidedPrice combiner
SidedMarketDataCombiner combiner = new SidedMarketDataCombiner((MarketDataListener) orderManager);
We can also repeat the events inbound. Putting all this together we get:
try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(queuePath).build()) { OrderIdeaListener orderManager = queue.createAppender().methodWriter(OrderIdeaListener.class, MarketDataListener.class); SidedMarketDataCombiner combiner = new SidedMarketDataCombiner((MarketDataListener) orderManager); // events in orderManager.onOrderIdea(new OrderIdea("EURUSD", Side.Buy, 1.1180, 2e6)); // not expected to trigger combiner.onSidedPrice(new SidedPrice("EURUSD", 123456789000L, Side.Sell, 1.1172, 2e6)); combiner.onSidedPrice(new SidedPrice("EURUSD", 123456789100L, Side.Buy, 1.1160, 2e6)); combiner.onSidedPrice(new SidedPrice("EURUSD", 123456789100L, Side.Buy, 1.1167, 2e6)); orderManager.onOrderIdea(new OrderIdea("EURUSD", Side.Buy, 1.1165, 1e6)); // expected to trigger }
Once we have written all the events to our queue, we can process the queue in our test. A more realistic example would be to run these two components in different threads, processes or on different machines, however this just complicates the tests and the result should be the same provided the transport does it’s job.
Reading events.
When we read the events we need a component which implements the methods called above and a mock listener to ensure it triggers the right events.
Read all the events and check the right output
// what we expect to happen OrderListener listener = createMock(OrderListener.class); listener.onOrder(new Order("EURUSD", Side.Buy, 1.1167, 1_000_000)); replay(listener); try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(queuePath).build()) { // build our scenario OrderManager orderManager = new OrderManager(listener); (1) MethodReader reader = queue.createTailer().methodReader(orderManager); (2) for (int i = 0; i < 5; i++) assertTrue(reader.readOne()); (3) assertFalse(reader.readOne()); (4) System.out.println(queue.dump()); (5) } verify(listener);
our component to test
our queue reader which will call the methods
loop to read/process one method at a time.
we have no more messages
dump the queue contents so we can see what the input was.
Finally, the test dumps the raw contents of the queue. This reads the data stored in the file that queue uses. This dump is only useful for smaller queue with a few MB of data. If you have a few GB, it won’t be able to be stored in a String. You can use DumpQueueMain
The output of dump()
--- !!meta-data #binary header: !SCQStore { wireType: !WireType BINARY, writePosition: 777, roll: !SCQSRoll { length: 86400000, format: yyyyMMdd, epoch: 0 }, indexing: !SCQSIndexing { indexCount: !int 8192, indexSpacing: 64, index2Index: 0, lastIndex: 0 } } # position: 227 --- !!data #binary onOrderIdea: { symbol: EURUSD, side: Buy, limitPrice: 1.118, quantity: 2000000.0 } # position: 306 --- !!data #binary onTopOfBookPrice: { symbol: EURUSD, timestamp: 123456789000, buyPrice: NaN, buyQuantity: 0, sellPrice: 1.1172, sellQuantity: 2000000.0 } # position: 434 --- !!data #binary onTopOfBookPrice: { symbol: EURUSD, timestamp: 123456789100, buyPrice: 1.116, buyQuantity: 2000000.0, sellPrice: 1.1172, sellQuantity: 2000000.0 } # position: 566 --- !!data #binary onTopOfBookPrice: { symbol: EURUSD, timestamp: 123456789100, buyPrice: 1.1167, buyQuantity: 2000000.0, sellPrice: 1.1172, sellQuantity: 2000000.0 } # position: 698 --- !!data #binary onOrderIdea: { symbol: EURUSD, side: Buy, limitPrice: 1.1165, quantity: 1000000.0 } ... # 83885299 bytes remaining
To run the test and dump the queue in my IDE took 233 ms.
Conclusion
We can test components stand alone with a queue or in a chain by using more queues. More importantly we can test our components without the infrastructure complicating the debugging process. When our components work without a transport, we can show they do the same thing with a transport.
In our next part
In part 3, we will look at benchmarking and profiling with Queue. While Queue is designed to be simple and transparent, it is also designed to be faster than other persisted transports, even with no tuning.
Reference: | Microservices in the Chronicle world – Part 2 from our JCG partner Peter Lawrey at the Vanilla Java blog. |