UDP Messaging with Aeron
UDP (User Datagram Protocol) is a connectionless protocol used for transmitting data across networks. Unlike TCP, which ensures reliability and order, UDP provides faster, lightweight communication without such guarantees. Aeron is a high-performance messaging library designed to efficiently handle messaging over UDP and other protocols, particularly useful for real-time applications that need low latency and high throughput.
In this article, we will explore how to use Aeron for UDP messaging. We will cover setting up Aeron, understanding its components, and creating a simple UDP messaging system.
1. Overview of Aeron
Aeron is a high-performance messaging library that supports:
- Reliable Messaging: Through mechanisms that avoid data loss.
- Low Latency: Optimized for real-time performance.
- High Throughput: Efficient handling of large amounts of data.
Aeron uses the concept of publication and subscription for message exchanges, making it suitable for a variety of messaging scenarios.
1.1 Key Concepts
Concept | Description |
---|---|
Publication | Channel used for sending messages. |
Subscription | Channel used for receiving messages. |
MediaDriver | Manages the transport layer and buffers. |
FragmentHandler | Callback used to process received messages. |
1.2 Architecture
The diagram below provides a high-level overview of how Aeron facilitates UDP messaging between publishers and subscribers through its Media Driver, channels, and buffering mechanisms.
1.3 Explanation of Components
- Aeron Publisher:
- Sends Messages: The publisher creates and sends messages.
- Uses Publication API: Utilizes Aeron’s
Publication
API to send messages to the Media Driver.
- Media Driver:
- Manages Transport: Handles the low-level transport of messages.
- Buffers & Routes Data: Buffers incoming data from the publisher and routes it to the appropriate subscriber.
- Aeron Subscriber:
- Receives Messages: The subscriber receives messages from the Media Driver.
- Uses Subscription API: Utilizes Aeron’s
Subscription
API to listen for and receive messages.
- Channel:
- Buffer & Stream: Represents the communication channel where messages are buffered and streamed between the publisher and the subscriber.
2. Media Driver
The Media Driver is an integral part of the Aeron framework. It manages the low-level aspects of message transport, including data buffering, serialization, and deserialization. It also handles the transport layer, including UDP communication, ensuring reliable message exchanges.
2.1 Running the Media Driver
We can run the Media Driver in two ways: as a standalone process or embedded within our application.
Standalone:
To run the Media Driver as a standalone process, use the following command:
java -cp aeron-client-1.44.1.jar io.aeron.driver.MediaDriver
Embedded:
We can also launch the Media Driver embedded within our application. This is useful for simpler setups or when testing locally. The code to launch the Media Driver embedded is:
import io.aeron.driver.MediaDriver; public class EmbeddedMediaDriverExample { public static void main(String[] args) { MediaDriver mediaDriver = MediaDriver.launchEmbedded(); // Your Aeron client code here } }
3. Aeron API Client
The Aeron API Client interacts with the Media Driver to publish and subscribe to messages. It provides a high-level interface for creating publications and subscriptions, and for sending and receiving messages.
3.1 Creating an Aeron Client
First, add the Aeron dependencies to the pom.xml if using Maven:
<dependency> <groupId>io.aeron</groupId> <artifactId>aeron-all</artifactId> <version>1.44.1</version> </dependency>
To add Aeron dependencies in a Gradle project, you will need to include the aeron-all
dependencies in your build.gradle
file.
dependencies { // Aeron Dependency implementation 'io.aeron:aeron-client:1.44.1' // Use the latest version available }
To create an Aeron client, we need to connect to the Media Driver and set up the channels and streams for communication.
import io.aeron.Aeron; import io.aeron.Aeron.Context; import io.aeron.driver.MediaDriver; public class AeronClientExample { public static void main(String[] args) { // Launch the Media Driver embedded MediaDriver mediaDriver = MediaDriver.launchEmbedded(); // Create Aeron context and connect Context aeronContext = new Context(); Aeron aeron = Aeron.connect(aeronContext); // Your Aeron client code here // Close resources when done aeron.close(); mediaDriver.close(); } }
In this example, we start by launching the Media Driver embedded within the application. This is achieved using MediaDriver.launchEmbedded()
, which starts the Media Driver in the same process. We then create an Aeron context using new Context()
, which holds configuration details for the Aeron instance. With this context, we connect to the Media Driver by calling Aeron.connect(aeronContext)
.
At this point, the Aeron client is ready to create publications and subscriptions for sending and receiving messages.
4. Sending and Receiving Messages
Sending and receiving messages with Aeron involves setting up publications and subscriptions. Publications are used to send messages, while subscriptions are used to receive them.
4.1 Sending Messages
To send messages, we need to create a Publication
and offer data to it. Here is an example of sending a message:
import io.aeron.Aeron; import io.aeron.Publication; import io.aeron.driver.MediaDriver; import org.agrona.BufferUtil; import org.agrona.concurrent.UnsafeBuffer; public class Publisher { private static final String CHANNEL = "aeron:udp?endpoint=localhost:40123"; private static final int STREAM_ID = 10; public static void main(String[] args) { MediaDriver mediaDriver = MediaDriver.launchEmbedded(); Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName())); Publication publication = aeron.addPublication(CHANNEL, STREAM_ID); String message = "Hello from Java Code Geeks!"; UnsafeBuffer messageBytes = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64)); messageBytes.putStringWithoutLengthUtf8(0, message); while (true) { if (publication.offer(messageBytes) < 0) { System.out.println("Message not sent. Retrying..."); } else { System.out.println("Message sent: " + message); } try { Thread.sleep(1000); } catch (InterruptedException e) { } } } }
In this example, we first launch the Media Driver embedded within the application using MediaDriver.launchEmbedded()
. Then, we create an Aeron instance by connecting to the Media Driver with Aeron.connect()
. We set up a Publication
on a specific channel and stream ID using aeron.addPublication(CHANNEL, STREAM_ID)
. The Publication
represents the endpoint where messages will be sent.
We then prepare a message, convert it to bytes, and attempt to send it using publication.offer(messageBytes)
. The offer
method returns a value indicating whether the message was successfully sent.
4.2 Receiving Messages
To receive messages, we need to create a Subscription
and handle incoming data using a FragmentHandler
. Here is an example of receiving messages:
import io.aeron.Aeron; import io.aeron.Subscription; import io.aeron.driver.MediaDriver; import io.aeron.logbuffer.FragmentHandler; public class Subscriber { private static final String CHANNEL = "aeron:udp?endpoint=localhost:40123"; private static final int STREAM_ID = 10; public static void main(String[] args) { MediaDriver mediaDriver = MediaDriver.launchEmbedded(); Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName())); Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID); FragmentHandler fragmentHandler = (buffer, offset, length, header) -> { String messageBytes = buffer.getStringWithoutLengthUtf8(offset, length); System.out.println("Received message: " + messageBytes); }; while (true) { subscription.poll(fragmentHandler, 1); try { Thread.sleep(1000); } catch (InterruptedException e) { } } } }
In this example, we first launch the Media Driver embedded within the application using MediaDriver.launchEmbedded()
. Then, we create an Aeron instance by connecting to the Media Driver with Aeron.connect()
. We set up a Subscription
on the same channel and stream ID used by the publisher with aeron.addSubscription(CHANNEL, STREAM_ID)
. The Subscription
represents the endpoint where messages will be received.
We then define a FragmentHandler
that processes incoming messages. The handler extracts the message bytes from the buffer and prints the received message. The subscription.poll(fragmentHandler, 1)
method is called in a loop to continuously check for new messages and process them using the fragment handler.
Output of Running the Application:
Publisher output: The publisher continuously sends the message “Hello from Java Code Geeks!” every second.
Subscriber output: The subscriber continuously polls for new messages and prints them as they are received.
Received message: Hello from Java Code Geeks! Received message: Hello from Java Code Geeks! Received message: Hello from Java Code Geeks! Received message: Hello from Java Code Geeks! Received message: Hello from Java Code Geeks! Received message: Hello from Java Code Geeks!
This setup demonstrates basic UDP messaging using Aeron, showcasing the simplicity of the Aeron messaging framework.
5. Conclusion
In this article, we explored the capabilities of Aeron for high-performance UDP messaging. We delved into setting up Aeron, understanding its key components such as the Media Driver and Aeron API Client, and demonstrated how to send and receive messages efficiently.
For further details, you can refer to the Aeron documentation and explore additional features and configurations.
6. Download the Source Code
This article covered Java Aeron UDP messaging.
You can download the full source code of this example here: Java Aeron UDP messaging