Lessons learned: ActiveMQ, Apache Camel and connection pooling
Every once in a while, I run into an interesting problem related to connections and pooling with ActiveMQ, and today I’d like to discuss something that is not always very clear and could potentially cause you to drink heavily when using ActiveMQ and Camel JMS. Not to say that you won’t want to drink heavily when using ActiveMQ and Camel anyway… in celebration of how delightful integration and messaging become when using them of course.
So first up. Connection pooling.
Sure, you’ve always heard to pool your connections. What does that really mean, and why do you want to do it?
Opening up a connection to an ActiveMQ broker is a relativley expensive operation when compared to other actions like creating a session or consumer. So when sending or receiving messages and generally interacting with the broker, you’d like to reuse existing connections if possible. What you don’t want to do is rely on a JMS library (like Spring JmsTemplate for example) that opens and closes connections for each send or receive of a message… unless you can pool/cache your connections.
So if we can agree that pooling connections is a good idea, take a look at an example config:
01 02 03 04 05 06 07 08 09 10 11 | < bean id = "pooledConnectionFactory" class = "org.apache.activemq.pool.PooledConnectionFactory" init-method = "start" destroy-method = "stop" > < property name = "maxConnections" value = "10" /> < property name = "maximumActiveSessionPerConnection" value = "10" /> < property name = "connectionFactory" > < bean class = "org.apache.activemq.ActiveMQConnectionFactory" > < property name = "brokerURL" value = "tcp://127.0.0.1:61616" /> </ bean > </ property > </ bean > |
You may even want to use Apache Camel and its wonderful camel-jms component because doing otherwise would just be silly. So maybe you want to set up a JMS config similar to so:
1 2 3 4 5 6 7 8 | < bean id = "jmsConfig" class = "org.apache.camel.component.jms.JmsConfiguration" > < property name = "connectionFactory" ref = "pooledConnectionFactory" /> < property name = "transacted" value = "true" /> < property name = "concurrentConsumers" value = "15" /> < property name = "deliveryPersistent" value = "true" /> < property name = "requestTimeout" value = "10000" /> < property name = "cacheLevelName" value = "CACHE_CONSUMER" /> </ bean > |
This config basically means for consumers, set up 15 concurrent consumers, use transactions (local), use PERSISTENT messages for producers, set a timeout for 10000 for request-reply etc, etc.
Huge note: If you want a more thorough taste of the configs for the jms component, especially around caching consumers, transactions and more, please take a look at Torsten’s excellent blog on Camel JMS with transactions – lesson learned. Maybe you should also spend some time poking around his blog as he’s got lots of good Camel/ActiveMQ stuff too!
Awesome so far. We have a connection pool of 10 connections, we will expect 10 sessions per connection (for a total of 100 sessions if we needed that…), and 15 concurrent consumers. We should be able to deal with some serious load, right?
Take a look at this route here. It’s simple enough, exposes the activemq component (which will use the jmsConfig from above, so 15 concurrent consumers) and just does some logging:
1 2 3 | from( "activemq:test.queue" ) .routeId( "test.queue.routeId" ) .to( "log:org.apache.camel.blog?groupSize=100" ); |
Try and run this. You will find your consumers blocked up right away and stack traces will show this beauty:
01 02 03 04 05 06 07 08 09 10 11 | "Camel (camel-1) thread #1 - JmsConsumer[test.queue]" daemon prio=5 tid=7f81eb4bc000 nid=0x10abbb000 in Object.wait() [10abba000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <7f40e9070> (a org.apache.commons.pool.impl.GenericKeyedObjectPool$Latch) at java.lang.Object.wait(Object.java:485) at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) - locked <7f40e9070> (a org.apache.commons.pool.impl.GenericKeyedObjectPool$Latch) at org.apache.activemq.pool.ConnectionPool.createSession(ConnectionPool.java:146) at org.apache.activemq.pool.PooledConnection.createSession(PooledConnection.java:173) at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196) .... |
How can that possibly be? We have connection pooling… we have sessions per connection set to 10 per connection, so how are we all blocked up on creating new sessions?
The answer is you’re exhausting the number of sessions, as you can expect by the stack trace. But how? And how much do I need to drink to resolve this?
Well hold on now. Grab a beer and hear me out.
First understand this. ActiveMQ’s pooling implementation uses commons-pool and the maxActiveSessionsPerConnection attribute is actually mapped to the maxActive property of the underlying pool. From the docs this means:
1 | maxActive controls the maximum number of objects (per key) that can allocated by the pool (checked out to client threads, or idle in the pool) at one time. |
The key here is “key” (literally… the ‘per key’ clause of the documentation). So in the ActiveMQ implementation the key is an object that represents 1) whether the session mode is transacted and 2) what the acknowledgement mode is () as seen here. So in plain terms, you’ll end up with a “maxActive” sessions for each key that’s used on that connection.. so if you have clients that use transactions, no transactions, client-ack, auto-ack, transacted-session, dups-okay, etc you can start to see that you’d end up with “maxActive” sessions for each permutation. So if you have maxActiveSesssionsPerConnection set to 10, you could really end up with 10 x 2 x 4 == 80 sessions. This is something to tuck away in the back of your mind.
The second key here is that when the camel-jms component sets up consumers, it ends up sharing a single connection among all the consumers specified by the concurrentConsumers session. This is an interesting point, because camel-jms uses the underlying Spring framework’s DefaultMessageListenerContainer and unfortunately this restriction comes from that library. So if you have 15 concurrent consumers, they will all share a single connection (even if pooling… it will grab one connection from the pool and hold it). So if you have 15 consumers that each share a connection, each share a transacted mode, each share an ack mode, then you end up trying to create 15 sessions for that one connection. And you end up with the above.
So my rule of thumb for avoiding these scenarios:
- Understand exactly what each of your producers and consumers are doing, what their TX and ACK modes are
- Always tune the max sessions param when you NEED to (too many session threads? i dunno..) but always do concurrentConsumers+1 as the value AT LEAST
- If producers and consumers are producing/consuming the same destination SPLIT UP THE CONNECTION POOL: one pool for consumers, one pool for producers
Dunno how valuable this info will be, but I wanted to jot it down for myself. If someone else finds it valuable, or has questions, let me know in the comments.