Enterprise Java

Using Oracle AQ via Java 8 Streams

One of the most awesome features of the Oracle database is Oracle AQ: Oracle Database Advanced Queuing. The AQ API implements a full fledged, transactional messaging system directly in the database.

In a classic architecture where the database is at the center of your system, with multiple applications (some of which written in Java, others written in Perl or PL/SQL, etc.) accessing the same database, using AQ for inter-process communication is just great. If you’re more on the Java EE side, you might purchase a Java-based MQ solution, and put that message bus / middleware at the center of your system architecture. But why not use the database instead?

How to use the PL/SQL AQ API with jOOQ

The PL/SQL API for AQ message enqueuing and dequeuing is rather simple, and it can be accessed very easily from Java using jOOQ’s OracleDSL.DBMS_AQ API.

The queue configuration used here would look something like:

  ID         NUMBER(7),
  title      VARCHAR2(100 CHAR)

    queue_table => 'message_aq_t',
    queue_payload_type => 'message_t'

    queue_name => 'message_q',
    queue_table => 'message_aq_t'

    queue_name => 'message_q'

And the jOOQ code generator would generate the useful classes with all the type information directly associated with them (simplified example):

class Queues {
    static final Queue<MessageTRecord> MESSAGE_Q = 
        new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T);

class MessageTRecord {
    void setId(Integer id) { ... }
    Integer getId() { ... }
    void setTitle(String title) { ... }
    String getTitle() { ... }
        Integer id, String title
    ) { ... }

These classes can then be used to enqueue and dequeue messages type safely directly on the generated queue references:

// The jOOQ configuration
Configuration c = ...

// Enqueue a message
DBMS_AQ.enqueue(c, MESSAGE_Q, 
    new MessageTRecord(1, "test"));

// Dequeue it again
MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q);

Easy, isn’t it?

Now, let’s leverage Java 8 features

A message queue is nothing other than an infinite (blocking) stream of messages. Since Java 8, we have a formidable API for such message streams, the Stream API.

This is why we have added (for the upcoming jOOQ 3.8) a new API that combines the existing jOOQ AQ API with Java 8 Streams:

// The jOOQ configuration
Configuration c = ...

DBMS_AQ.dequeueStream(c, MESSAGE_Q)
       .filter(m -> "test".equals(m.getTitle()))

The above stream pipeline will listen on the MESSAGE_Q queue, consume all messages, filter out messages that do not contain the "test", and print the remaining messages.

Blocking streams

The interesting thing is the fact that this is a blocking, infinite stream. As long as there is no new message in the queue, the stream pipeline processing will simply block on the queue, waiting for new messages. This is not an issue for sequential streams, but when calling Stream.parallel(), what happens then?

jOOQ will consume each message in a transaction. A jOOQ 3.8 transaction runs in a ForkJoinPool.ManagedBlocker:

static <T> Supplier<T> blocking(Supplier<T> supplier) {
    return new Supplier<T>() {
        volatile T result;

        public T get() {
            try {
                ForkJoinPool.managedBlock(new ManagedBlocker() {
                    public boolean block() {
                        result = supplier.get();
                        return true;

                    public boolean isReleasable() {
                        return result != null;
            catch (InterruptedException e) {
                throw new RuntimeException(e);

            return asyncResult;

This isn’t a lot of magic. A ManagedBlocker runs some special code when it is run by a ForkJoinWorkerThread, making sure the thread’s ForkJoinPool won’t suffer from thread exhaustion and thus from deadlocks. For more info, read this interesting article here: http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health

Or this Stack Overflow answer: http://stackoverflow.com/a/35272153/521799

So, if you want a super-fast parallel AQ dequeuing process, just run:

// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...

DBMS_AQ.dequeueStream(c, MESSAGE_Q)
       .filter(m -> "test".equals(m.getTitle()))

And you’ll have several threads that will dequeue messages in parallel.

Don’t want to wait for jOOQ 3.8?

No problem. Use the current version and wrap the dequeue operation in your own Stream:

Stream<MessageTRecord> stream = Stream.generate(() ->
    DSL.using(config).transactionResult(c ->
        dequeue(c, MESSAGE_Q)


Bonus: Asynchronous dequeuing

While we were at it, another very nice feature of queuing systems is their asynchronicity. With Java 8, a very useful type to model (and compose) asynchronous algorithms is the CompletionStage, and it’s default implementation the CompletableFuture, which executes tasks in the ForkJoinPool again.

Using jOOQ 3.8, you can again simply call

// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...

CompletionStage<MessageTRecord> stage =
DBMS_AQ.dequeueAsync(c, MESSAGE_Q)
       .thenCompose(m -> ...)

Stay tuned for another article on the jOOQ blog soon, where we look into more sophisticated use-cases for asynchronous, blocking SQL statements with jOOQ 3.8 and Java 8

Reference: Using Oracle AQ via Java 8 Streams from our JCG partner Lukas Eder at the JAVA, SQL, AND JOOQ blog.

Lukas Eder

Lukas is a Java and SQL enthusiast developer. He created the Data Geekery GmbH. He is the creator of jOOQ, a comprehensive SQL library for Java, and he is blogging mostly about these three topics: Java, SQL and jOOQ.
Notify of

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Newest Most Voted
Inline Feedbacks
View all comments
Back to top button