JMS Patterns with ActiveMQ
Quite some time ago, before I started this blog, I used to have a Geocities (now Yahoo!) homepage where I would write articles about stuff I did, much like this blog here. One such article, “Java Patterns for MQ Series Clients”, described various types of clients that could be written against a MQ-Series server. Unlike most of the other stuff I wrote there, it proved to be somewhat popular, a couple of people actually liked it enough to send me email about it.
I was working on a Java/MQ-Series project at the time, and I wrote it as part of learning the MQ-Series Java API. Although the JMS specification was out, and a JMS wrapper around the MQ-Series API was also available, it was still quite new and relatively untested, so a decision was made to code directly against the MQ-Series Java API (which itself was a wrapper over the MQ-Series C API) instead. Since then, no JMS project has come my way, so, even though I had read about the JMS API and was familiar with the concepts (having worked on its predecessor product), I had never worked with JMS. Since I mostly learn by doing, I figured that it would be instructive to try and code up the same scenarios described in my old article using JMS. This blog post is a result of that effort.
For the server, I chose Apache ActiveMQ, a free and open source JMS implementation. Setting it up was quite simple. All I had to do was download the latest stable binary distribution (4.1.1 in my case), untar it into a directory, then run its startup script to bring up a TCP listener on port 61616.
sujit@sirocco:~/tmp$ tar xvzf apache-activemq-4.1.1.tar.gz sujit@sirocco:~/tmp$ cd apache-activemq-4.1.1 sujit@sirocco:~/tmp/apache-activemq-4.1.1$ bin/activemq
The diagram below describes my basic setup. There is a server which loops forever, reading messages off a request queue, doing some processing, and writing responses to a response queue. Clients communicate with the server by writing messages to the request queue, and optionally reading messages off the response queue. The point-to-point JMS style is used.
The code for the server is shown below. It reads a (text) message from the request queue, prepends “Processed” to it, and places it on the response queue. The init() method sets up the queues, and the destroy() method tears down the queues.
// MessageServer.java
package com.mycompany.mqpatterns;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageServer implements MessageListener {
private final Log log = LogFactory.getLog(getClass());
private Connection connection;
private Session session;
private MessageConsumer consumer;
private MessageProducer producer;
public void init() throws Exception {
// set 'er up
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create our request and response queues
Topic request = session.createTopic("request.queue");
Topic response = session.createTopic("response.queue");
// and attach a consumer and producer to them
consumer = session.createConsumer(request);
consumer.setMessageListener(this);
producer = session.createProducer(response);
// start your engines...
connection.start();
}
public void destroy() throws Exception {
session.close();
connection.close();
}
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
String messageText = ((TextMessage) message).getText();
log.debug("Server: Got request [" + messageText + "]");
Message responseMessage =
session.createTextMessage("Processed " + messageText);
if (message.getJMSCorrelationID() != null) {
// pass it through
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
}
producer.send(responseMessage);
}
} catch (JMSException e) {
log.error(e);
}
}
}
On re-reading my original article, I found that there were actually only 3 basic patterns which I had covered, not 5, so I wrote implementations for these 3 patterns, which are described below. A lot of code in the init() and destroy() methods are boiler-plate code, but they just serve to set up and take down the queues.
- Fire And Forget
- Pseudo Synchronous
- Asynchronous with Callback
Fire And Forget
A real-life analog for this would be writing a letter and dropping it in the mail. You (the client) do not really know when and if it will reach its destination, and you do not expect a response or an acknowledgement from the reciever. The code for this is shown below:
// FireAndForgetClient.java
package com.mycompany.mqpatterns;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class FireAndForgetClient {
private final Log log = LogFactory.getLog(getClass());
private Connection connection;
private Session session;
private MessageProducer producer;
private MessageConsumer consumer;
public void init() throws Exception {
// set 'er up
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create our request and response queues
Topic request = session.createTopic("request.queue");
// and attach a consumer and producer to them
producer = session.createProducer(request);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// and start your engines...
connection.start();
}
public void destroy() throws Exception {
session.close();
connection.close();
}
public void sendMessage(String messageText) throws Exception {
producer.send(session.createTextMessage(messageText));
}
}
Pseudo Synchronous
For all practical purposes, this client is synchronous. I guess the pseudo is only there to emphasise the fact that it is based on a medium that is inherently asynchronous. A real life analog to this would be a (half-duplex) telephone conversation.
// PseudoSynchronousClient.java
package com.mycompany.mqpatterns;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class PseudoSynchronousClient {
private final Log log = LogFactory.getLog(getClass());
private Connection connection;
private Session session;
private MessageProducer producer;
private MessageConsumer consumer;
private String response;
public void init() throws Exception {
// set 'er up
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create our request and response queues
Topic request = session.createTopic("request.queue");
Topic response = session.createTopic("response.queue");
// and attach a consumer and producer to them
producer = session.createProducer(request);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
consumer = session.createConsumer(response);
// and start your engines...
connection.start();
}
public void destroy() throws Exception {
session.close();
connection.close();
}
public String sendMessage(String messageText) throws Exception {
try {
log.info("Client: Send request [" + messageText + "]");
producer.send(session.createTextMessage(messageText));
Message response = consumer.receive();
String responseText = ((TextMessage) response).getText();
log.info("Client: Got response [" + responseText + "]");
return responseText;
} catch (JMSException e) {
log.error("JMS Exception on client", e);
}
return response;
}
}
Asynchronous With Callback
Like the Fire and Forget pattern, this is asynchronous, but unlike it, the client can specify that some action should be triggered when the server is done processing the request. A real-life analog for this case would be mailing a letter with a return receipt. The example I use here is to have an acknowledgement sent back to the client, which the client can store in a local database (an in-memory HashMap in this case). The caller will send a message to the server through the client and get back the message Id. The caller can then check on the status of the message.
The request and response messages are tied together using the JMS Correlation Id. We generate a UUID when the message is sent and put it in the JMS Correlation Id. If the server finds that the incoming message contains a JMS Correlation Id, it passes it through in its response.
// AsynchronousWithCallbackClient.java
package com.mycompany.mqpatterns;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class AsynchronousWithCallback implements MessageListener {
private final Log log = LogFactory.getLog(getClass());
private Connection connection;
private Session session;
private MessageProducer producer;
private MessageConsumer consumer;
// to store acknowledgements as they are recieved. Should be replaced
// by a more persistent mechanism
private Map<String,String> messageStatus = new HashMap<String,String>();
public void init() throws Exception {
// set 'er up
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create our request and response queues
Topic request = session.createTopic("request.queue");
Topic response = session.createTopic("response.queue");
// and attach a consumer and producer to them
producer = session.createProducer(request);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
consumer = session.createConsumer(response);
consumer.setMessageListener(this);
// and start your engines...
connection.start();
}
public void destroy() throws Exception {
session.close();
connection.close();
}
public String sendMessage(String messageText) throws Exception {
TextMessage message = session.createTextMessage(messageText);
String messageId = UUID.randomUUID().toString();
message.setJMSCorrelationID(messageId);
producer.send(message);
return messageId;
}
public String getStatus(String correlationId) {
synchronized(this) {
if (messageStatus.containsKey(correlationId)) {
String status = messageStatus.get(correlationId);
messageStatus.remove(correlationId);
return status;
} else {
return null;
}
}
}
public void onMessage(Message message) {
synchronized(this) {
try {
if (message instanceof TextMessage) {
String originalMessageId = message.getJMSCorrelationID();
String responseText = ((TextMessage) message).getText();
messageStatus.put(originalMessageId, responseText);
}
} catch (JMSException e) {
log.error("JMS Exception encountered on client", e);
}
}
}
}
Test Harness
To test these clients, I set up a JUnit test as follows.
// JmsClientPatternsTest.java
package com.mycompany.mqpatterns;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class JmsClientPatternsTest {
private final Log log = LogFactory.getLog(getClass());
private static MessageServer server;
private static String[] messages = StringUtils.split(
"The quick brown fox jumped over the lazy dog");
private FireAndForgetClient fireAndForgetClient;
private PseudoSynchronousClient pseudoSynchronousClient;
private AsynchronousWithCallbackClient asyncWithCallbackClient;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
server = new MessageServer();
server.init();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
server.destroy();
}
@Before
public void setUp() throws Exception {
fireAndForgetClient = new FireAndForgetClient();
fireAndForgetClient.init();
pseudoSynchronousClient = new PseudoSynchronousClient();
pseudoSynchronousClient.init();
asyncWithCallbackClient = new AsynchronousWithCallbackClient();
asyncWithCallbackClient.init();
}
@After
public void tearDown() throws Exception {
fireAndForgetClient.destroy();
pseudoSynchronousClient.destroy();
asyncWithCallbackClient.destroy();
}
@Test
public void testFireAndForgetClient() throws Exception {
for (String message : messages) {
fireAndForgetClient.sendMessage(message);
}
}
@Test
public void testPseudoSynchronousClient() throws Exception {
for (String message : messages) {
String response = pseudoSynchronousClient.sendMessage(message);
log.debug("response=" + response);
}
}
@Test
public void testAsynchronousWithCallbackClient() throws Exception {
List sentMessageIds = new ArrayList();
for (String message : messages) {
String messageId = asyncWithCallbackClient.sendMessage(message);
sentMessageIds.add(messageId);
}
Thread.sleep(2000);
for (String sentMessageId : sentMessageIds) {
String response = asyncWithCallbackClient.getStatus(sentMessageId);
log.debug("response[" + sentMessageId + "]=" + response);
}
}
}
Conclusion
Comparing to what I remembered with working with the Java API for MQ-Series, JMS is easier to work with. The API provides an onMessage(Message) strategy interface method to be overriden in both the client and server. With the Java API to MQ-Series, the behavior of tying a listener process to a queue had to be configured in the server. Also the queues had to be manually defined prior to being accessed from client code. It is possible that enterprise setups would follow the same strategy, and I don’t know enough about either MQ-Series or ActiveMQ Administration to comment intelligently on that.
Obviously there is much more to JMS than this, but the JMS API is fairly simple and compact and can be easily learned. My take on the complexity of the JMS API is that it is similar to JDBC. You need to learn a few basic things to be able to start coding immediately, and the rest you can learn as you go. I found the simplicity of the API to be very attractive.
However, more than the JMS API itself, what I really like is the idea of neatly decoupling two components by putting a pair of queues in between. There are many applications that inherently lend themselves to this sort of design, and these applications typically scale better than those which don’t.
Often, people get the design right, but end up implementing a home grown thread based solution for the application. Since the average Java programmer is generally not very well versed in thread programming, this can lead to a maintenance bottleneck, with only one or two persons in a group who know enough to debug and fix problems in the code. JMS provides the tools by which such designs can be implemented with no thread based application code.
Thus, for a Java programmer, I think there is a high benefit to cost ratio in favor of learning JMS. Not only is it easy to learn, but it opens your eyes to patterns which take advantage of inherent asynchronicity in an application, and provides you tools to very easily implement these patterns.
(Via Salmon Run.)

Hello!
First of all thank you for this article. But there’s something I don’t have very clear.
In a scenario where the client wants to invoke a remote process or service and needs a response to be sure it worked all fine, why to use asynchronous messaging? I know why to use when you don’t need a reply, but if you DO need it, isn’t it better to use a synchronous communication instead of two-way JMS? It’s not that I disagree, I just that I want to learn why and when to use JMS instead of, for example, SOAP if I’ll have to wait for the response anyway. Is it just for the sake of decoupling? I admit it could be enough.
Thank you very much.
Antonio
I would like to say that after reading this article, I am very much impressed. Please keep on posting more article.
In PseudoSynchronousClient example, I believe you have to attach the producer and consumer to the same queue. In other words, the consumer should read from the request.queue instead of response.queue.
Topic request = session.createTopic(“request.queue”);
// Topic response = session.createTopic(“request.queue”);
The sample hangs if you try to attach the consumer to response.queue
Nice article. You might want to check http://camel.apache.org/ as it provides much more ‘Enterprise Integration Patterns’