Enterprise Java

UDP Messaging with Aeron

UDP (User Datagram Protocol) is a connectionless protocol used for transmitting data across networks. Unlike TCP, which ensures reliability and order, UDP provides faster, lightweight communication without such guarantees. Aeron is a high-performance messaging library designed to efficiently handle messaging over UDP and other protocols, particularly useful for real-time applications that need low latency and high throughput.

In this article, we will explore how to use Aeron for UDP messaging. We will cover setting up Aeron, understanding its components, and creating a simple UDP messaging system.

1. Overview of Aeron

Aeron is a high-performance messaging library that supports:

  • Reliable Messaging: Through mechanisms that avoid data loss.
  • Low Latency: Optimized for real-time performance.
  • High Throughput: Efficient handling of large amounts of data.

Aeron uses the concept of publication and subscription for message exchanges, making it suitable for a variety of messaging scenarios.

1.1 Key Concepts

ConceptDescription
PublicationChannel used for sending messages.
SubscriptionChannel used for receiving messages.
MediaDriverManages the transport layer and buffers.
FragmentHandlerCallback used to process received messages.

1.2 Architecture

The diagram below provides a high-level overview of how Aeron facilitates UDP messaging between publishers and subscribers through its Media Driver, channels, and buffering mechanisms.

1.3 Explanation of Components

  • Aeron Publisher:
    • Sends Messages: The publisher creates and sends messages.
    • Uses Publication API: Utilizes Aeron’s Publication API to send messages to the Media Driver.
  • Media Driver:
    • Manages Transport: Handles the low-level transport of messages.
    • Buffers & Routes Data: Buffers incoming data from the publisher and routes it to the appropriate subscriber.
  • Aeron Subscriber:
    • Receives Messages: The subscriber receives messages from the Media Driver.
    • Uses Subscription API: Utilizes Aeron’s Subscription API to listen for and receive messages.
  • Channel:
    • Buffer & Stream: Represents the communication channel where messages are buffered and streamed between the publisher and the subscriber.

2. Media Driver

The Media Driver is an integral part of the Aeron framework. It manages the low-level aspects of message transport, including data buffering, serialization, and deserialization. It also handles the transport layer, including UDP communication, ensuring reliable message exchanges.

2.1 Running the Media Driver

We can run the Media Driver in two ways: as a standalone process or embedded within our application.

Standalone:

To run the Media Driver as a standalone process, use the following command:

java -cp aeron-client-1.44.1.jar io.aeron.driver.MediaDriver

Embedded:

We can also launch the Media Driver embedded within our application. This is useful for simpler setups or when testing locally. The code to launch the Media Driver embedded is:

import io.aeron.driver.MediaDriver;

public class EmbeddedMediaDriverExample {
    public static void main(String[] args) {
        MediaDriver mediaDriver = MediaDriver.launchEmbedded();
        // Your Aeron client code here
    }
}

3. Aeron API Client

The Aeron API Client interacts with the Media Driver to publish and subscribe to messages. It provides a high-level interface for creating publications and subscriptions, and for sending and receiving messages.

3.1 Creating an Aeron Client

First, add the Aeron dependencies to the pom.xml if using Maven:

<dependency>
    <groupId>io.aeron</groupId>
    <artifactId>aeron-all</artifactId>
    <version>1.44.1</version>
</dependency>

To add Aeron dependencies in a Gradle project, you will need to include the aeron-all dependencies in your build.gradle file.

dependencies {
    // Aeron Dependency
    implementation 'io.aeron:aeron-client:1.44.1' // Use the latest version available
}

To create an Aeron client, we need to connect to the Media Driver and set up the channels and streams for communication.

import io.aeron.Aeron;
import io.aeron.Aeron.Context;
import io.aeron.driver.MediaDriver;

public class AeronClientExample {

    public static void main(String[] args) {
        // Launch the Media Driver embedded
        MediaDriver mediaDriver = MediaDriver.launchEmbedded();

        // Create Aeron context and connect
        Context aeronContext = new Context();
        Aeron aeron = Aeron.connect(aeronContext);

        // Your Aeron client code here
        
        // Close resources when done
        aeron.close();
        mediaDriver.close();
    }
}

In this example, we start by launching the Media Driver embedded within the application. This is achieved using MediaDriver.launchEmbedded(), which starts the Media Driver in the same process. We then create an Aeron context using new Context(), which holds configuration details for the Aeron instance. With this context, we connect to the Media Driver by calling Aeron.connect(aeronContext).

At this point, the Aeron client is ready to create publications and subscriptions for sending and receiving messages.

4. Sending and Receiving Messages

Sending and receiving messages with Aeron involves setting up publications and subscriptions. Publications are used to send messages, while subscriptions are used to receive them.

4.1 Sending Messages

To send messages, we need to create a Publication and offer data to it. Here is an example of sending a message:

import io.aeron.Aeron;
import io.aeron.Publication;
import io.aeron.driver.MediaDriver;
import org.agrona.BufferUtil;
import org.agrona.concurrent.UnsafeBuffer;


public class Publisher {

    private static final String CHANNEL = "aeron:udp?endpoint=localhost:40123";
    private static final int STREAM_ID = 10;

    public static void main(String[] args) {
        
        MediaDriver mediaDriver = MediaDriver.launchEmbedded();
        Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName()));
        Publication publication = aeron.addPublication(CHANNEL, STREAM_ID);

        String message = "Hello from Java Code Geeks!";

        UnsafeBuffer messageBytes = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));

        messageBytes.putStringWithoutLengthUtf8(0, message);

        while (true) {
            if (publication.offer(messageBytes) < 0) {
                System.out.println("Message not sent. Retrying...");
            } else {
                System.out.println("Message sent: " + message);
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }
    }
}

In this example, we first launch the Media Driver embedded within the application using MediaDriver.launchEmbedded(). Then, we create an Aeron instance by connecting to the Media Driver with Aeron.connect(). We set up a Publication on a specific channel and stream ID using aeron.addPublication(CHANNEL, STREAM_ID). The Publication represents the endpoint where messages will be sent.

We then prepare a message, convert it to bytes, and attempt to send it using publication.offer(messageBytes). The offer method returns a value indicating whether the message was successfully sent.

4.2 Receiving Messages

To receive messages, we need to create a Subscription and handle incoming data using a FragmentHandler. Here is an example of receiving messages:

import io.aeron.Aeron;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.FragmentHandler;

public class Subscriber {

    private static final String CHANNEL = "aeron:udp?endpoint=localhost:40123";
    private static final int STREAM_ID = 10;

    public static void main(String[] args) {

        MediaDriver mediaDriver = MediaDriver.launchEmbedded();
        Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName()));
        Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID);

        FragmentHandler fragmentHandler = (buffer, offset, length, header) -> {
            String messageBytes = buffer.getStringWithoutLengthUtf8(offset, length);
            System.out.println("Received message: " + messageBytes);
        };

        while (true) {
            subscription.poll(fragmentHandler, 1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }
    }
}

In this example, we first launch the Media Driver embedded within the application using MediaDriver.launchEmbedded(). Then, we create an Aeron instance by connecting to the Media Driver with Aeron.connect(). We set up a Subscription on the same channel and stream ID used by the publisher with aeron.addSubscription(CHANNEL, STREAM_ID). The Subscription represents the endpoint where messages will be received.

We then define a FragmentHandler that processes incoming messages. The handler extracts the message bytes from the buffer and prints the received message. The subscription.poll(fragmentHandler, 1) method is called in a loop to continuously check for new messages and process them using the fragment handler.

Output of Running the Application:

Publisher output: The publisher continuously sends the message “Hello from Java Code Geeks!” every second.

Fig 1: Publisher Output for Java Aeron UDP Messaging Example
Fig 1: Publisher Output for Java Aeron UDP Messaging Example

Subscriber output: The subscriber continuously polls for new messages and prints them as they are received.

Received message: Hello from Java Code Geeks!
Received message: Hello from Java Code Geeks!
Received message: Hello from Java Code Geeks!
Received message: Hello from Java Code Geeks!
Received message: Hello from Java Code Geeks!
Received message: Hello from Java Code Geeks!

This setup demonstrates basic UDP messaging using Aeron, showcasing the simplicity of the Aeron messaging framework.

5. Conclusion

In this article, we explored the capabilities of Aeron for high-performance UDP messaging. We delved into setting up Aeron, understanding its key components such as the Media Driver and Aeron API Client, and demonstrated how to send and receive messages efficiently.

For further details, you can refer to the Aeron documentation and explore additional features and configurations.

6. Download the Source Code

This article covered Java Aeron UDP messaging.

Download
You can download the full source code of this example here: Java Aeron UDP messaging

Omozegie Aziegbe

Omos Aziegbe is a technical writer and web/application developer with a BSc in Computer Science and Software Engineering from the University of Bedfordshire. Specializing in Java enterprise applications with the Jakarta EE framework, Omos also works with HTML5, CSS, and JavaScript for web development. As a freelance web developer, Omos combines technical expertise with research and writing on topics such as software engineering, programming, web application development, computer science, and technology.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button