What's New in JMS 2.0, Part Two—New Messaging Features
by Nigel Deakin
Published May 2013
Learn how to take advantage of new messaging features in JMS 2.0.
This article, which is the second in a two-part series, introduces some of the new messaging features introduced in Java Message Service (JMS) 2.0. It assumes a basic familiarity with JMS 1.1.
In Part One, we looked at the new ease-of-use features introduced in JMS 2.0. Here, we look at important new messaging features.
JMS 2.0, which was released in April 2013, is the first update to the JMS specification since version 1.1 was released in 2002. One might think that an API that has remained unchanged for so long has grown moribund and unused. However, if you judge the success of an API standard by the number of different implementations, JMS is one of the most successful APIs around.
In JMS 2.0, the emphasis has been on catching up with ease-of-use improvements that have been made to other enterprise Java technologies in recent years. The opportunity has been taken to introduce a number of new messaging features as well.
JMS 2.0 is part of the Java EE 7 platform and can be used in Java EE Web or EJB applications. It can also be used standalone in a Java SE environment. As I explain below, some of the features are available only in a standalone environment while others are available only in Java EE Web or EJB applications.
Here we discuss five important new messaging features in JMS 2.0.
Multiple Consumers Allowed on the Same Topic Subscription
In JMS 1.1, a subscription on a topic was not permitted to have more than one consumer at a time. This meant that the work of processing messages on a topic subscription could not be shared among multiple threads, connections, or Java Virtual Machines (JVMs), thereby limiting the scalability of the application. This restriction has been removed in JMS 2.0 by the introduction of a new kind of topic subscription called a shared subscription.
Let's review how topic subscriptions worked in JMS 1.1. In Listing 1, the createConsumer
method on Session
is used to create a nondurable subscription on the specified topic (we'll discuss durable subscriptions in just a moment):
private void createUnsharedConsumer(ConnectionFactory connectionFactory, Topic topic)
throws JMSException {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(topic);
connection.start();
Message message = messageConsumer.receive(10000);
while (message != null) {
System.out.println("Message received: " + ((TextMessage) message).getText());
message = messageConsumer.receive(10000);
}
connection.close();
}
Listing 1
In Listing 1, the consumer will receive a copy of every message sent to the topic. However, what if the application takes a long time to process each message? How do we make the application more scalable by sharing the work of processing these messages between, say, two JVMs, with one JVM processing some of the messages and the other JVM processing the remaining messages?
In JMS 1.1, there's no way to do this in a normal Java SE application. (In Java EE, you could do it using a pool of message-driven beans [MDBs]). If you use createConsumer
to create a second consumer in a separate JVM (or a separate thread on the same JVM), each consumer will use a separate subscription, and so it will receive a copy of every message received by the topic. That's not what we want. If you think of a "subscription" as a logical entity that receives a copy of every message sent to the topic, then we want the two consumers to use the same subscription.
JMS 2.0 provides a solution. You can create a "shared" nondurable subscription using a new method: createSharedConsumer
. This method is available both on Session
(for applications using the classic API) and on JMSContext
(for applications using the simplified API). Since the two JVMs need to be able to identify the subscription that they need to share, they need to supply a name to identify the shared subscription, as shown in Listing 2.
private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createSharedConsumer(topic,"mySubscription");
connection.start();
Message message = messageConsumer.receive(10000);
while (message != null) {
System.out.println("Message received: " + ((TextMessage) message).getText());
message = messageConsumer.receive(10000);
}
connection.close();
}
Listing 2
If you run the code in Listing 2 in two separate JVMs, each message sent to the topic will be delivered to one or the other of the two consumers. This allows them to share the work of processing messages from the subscription.
The same feature is available to applications that use a durable subscription. In JMS 1.1, a durable subscription was created using the method createDurableSubscriber
on Session
:
MessageConsumer messageConsumer = session.createDurableSubscription(topic,"myDurableSub");
This creates a durable subscription called myDurableSub
on the specified topic. However, as before, there is no way to share the work of processing the messages on this durable subscription between two JVMs or between two threads on the same JVM. Depending on exactly what you try to do, you'll either get a JMSException
or two different subscriptions.
Once again, JMS 2.0 provides a solution to this problem. You can now create a "shared" durable subscription using the new method createSharedDurableConsumer
. This method is available both on Session
(for applications using the classic API) and on JMSContext
(for applications using the simplified API).
MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic,"myDurableSub");
In summary, then, whereas JMS 1.1 defined two different types of topic subscription, JMS 2.0 defines four types, all of which can be created using either the classic or simplified APIs:
- Unshared nondurable subscriptions. These are available in both JMS 1.1 and JMS 2.0 and are created using
createConsumer
. They can have only a single consumer. Setting the client identifier is optional. - Unshared durable subscriptions. These are available in both JMS 1.1 and JMS 2.0 and are created using
createDurableSubscriber
or (in JMS 2.0 only)createDurableConsumer
. They can have only a single consumer. Setting the client identifier is compulsory, and the subscription is identified by the combination of the subscription name and client identifier. - Shared nondurable subscriptions. These are available in JMS 2.0 only and are created using
createSharedConsumer
. They can have any number of consumers. Setting the client identifier is optional. The subscription is identified by the combination of the subscription name and the client identifier, if it is set. - Shared durable subscriptions. These are available in JMS 2.0 only and are created using
createSharedDurableConsumer
. They can have any number of consumers. Setting the client identifier is optional. The subscription is identified by the combination of the subscription name and the client identifier, if it is set.
Delivery Delay
You can now specify a delivery delay on a message. The JMS provider will not deliver the message until after the specified delivery delay has elapsed.
If you're using the classic API, you need to set the delivery delay (in milliseconds) by calling setDeliveryDelay
on the MessageProducer
prior to sending the message, as shown in Listing 3.
private void sendWithDeliveryDelayClassic(ConnectionFactory connectionFactory,Queue queue)
throws JMSException {
// send a message with a delivery delay of 20 seconds
try (Connection connection = connectionFactory.createConnection();){
Session session = con.createSession();
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliveryDelay(20000);
TextMessage textMessage = session.createTextMessage("Hello world");
messageProducer.send(textMessage);
}
}
Listing 3
If you're using the simplified API, you need to call setDeliveryDelay
on the JMSProducer
prior to sending the message. This method returns the JMSProducer
object, which allows you to create the JMSProducer, set the delivery delay, and send the message all on the same line, as shown in Listing 4.
private void sendWithDeliveryDelaySimplified(ConnectionFactory connectionFactory,Queue queue)
throws JMSException {
// send a message with a delivery delay of 20 seconds
try (JMSContext context = connectionFactory.createContext();){
context.createProducer().setDeliveryDelay(20000).send(queue,"Hello world");
}
}
Listing 4
Sending Messages Asynchronously
Another new feature of JMS 2.0 is the ability to send a message asynchronously.
This feature is available for applications running in Java SE or the Java EE application client container. It is not available to applications that run in the Java EE Web or EJB container.
Normally, when a persistent message is sent, the send method does not return until the JMS client has sent the message to the server and received a reply to notify the client that the message has been safely received and persisted. We call this a synchronous send.
JMS 2.0 introduces the ability to perform an asynchronous send. When a message is sent asynchronously, the send method sends the message to the server and then returns control to the application without waiting for a reply from the server. Instead of being blocked unproductively while the JMS client waits for a reply, the application can do something useful, such as sending a further message or performing some processing.
When a reply is received back from the server to indicate that the message has been received by the server and persisted, the JMS provider notifies the application by invoking the callback method onCompletion
on an application-specified CompletionListener
object.
There are two main ways in which you might use an asynchronous send in an application
- To allow the application to do something else (such as update the display or write to a database) during the interval when it would otherwise be waiting for a reply from the server
- To allow a large number of messages to be sent in succession without waiting for a reply from the server after each message
Listing 5 is an example of how you might implement the first of these using the classic API:
private void asyncSendClassic(ConnectionFactory connectionFactory,Queue queue)
throws Exception {
// send a message asynchronously
try (Connection connection = connectionFactory.createConnection();){
Session session = connection.createSession();
MessageProducer messageProducer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage("Hello world");
CountDownLatch latch = new CountDownLatch(1);
MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
messageProducer.send(textMessage,new MyCompletionListener(latch));
System.out.println("Message sent, now waiting for reply");
// at this point we can do something else before waiting for the reply
// this is not shown here
// now wait for the reply from the server
latch.await();
if (myCompletionListener.getException()==null){
System.out.println("Reply received from server");
} else {
throw myCompletionListener.getException();
}
}
}
Listing 5
The class MyCompletionListener
used in Listing 5 is a separate class provided by the application, which implements the javax.jms.CompletionListener
interface, as shown in Listing 6:
class MyCompletionListener implements CompletionListener {
CountDownLatch latch;
Exception exception;
public MyCompletionListener(CountDownLatch latch) {
this.latch=latch;
}
@Override
public void onCompletion(Message message) {
latch.countDown();
}
@Override
public void onException(Message message, Exception exception) {
latch.countDown();
this.exception=exception;
}
public Exception getException(){
return exception;
}
}
Listing 6
In Listing 6, we use a new method on MessageProducer
to send a message without waiting for a reply from the server. This is send(Message message, CompletionListener listener)
. Using this method to send a message allows the application to do something else while the message is processed in the server. When the application is ready to continue, it uses a java.util.concurrent.CountDownLatch
to wait until the reply has been received from the server. When the reply is received, the application can then proceed with the same degree of confidence that the message has been successfully sent that it would have had after a normal synchronous send.
Sending a message asynchronously is slightly simpler if you are using the JMS 2.0 simplified API, as shown in Listing 7:
private void asyncSendSimplified(ConnectionFactory connectionFactory,Queue queue)
throws Exception {
// send a message asynchronously
try (JMSContext context = connectionFactory.createContext();){
CountDownLatch latch = new CountDownLatch(1);
MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
context.createProducer().setAsync(myCompletionListener).send(queue,"Hello world");
System.out.println("Message sent, now waiting for reply");
// at this point we can do something else before waiting for the reply
// this is not shown here
latch.await();
if (myCompletionListener.getException()==null){
System.out.println("Reply received from server");
} else {
throw myCompletionListener.getException();
}
}
}
Listing 7
In this case, the method setAsync(CompletionListener listener)
is called on the JMSProducer
prior to calling send(Message message)
. And since the JMSProducer
supports method chaining, you can do this both on the same line.
JMSXDeliveryCount
JMS 2.0 allows applications that receive a message to determine how many times the message has been redelivered. This information can be obtained from the message property JMSXDeliveryCount
:
int deliveryCount = message.getIntProperty("JMSXDeliveryCount");
JMSXDeliveryCount
is not a new property; it was defined in JMS 1.1. However, in JMS 1.1, it was optional for a JMS provider to actually set it, which meant application code that used it was not portable. In JMS 2.0, it becomes mandatory for JMS providers to set this property, allowing portable applications to make use of it.
So why might an application want to know how many times a message has been redelivered?
If a message is being redelivered, this means that a previous attempt to deliver the message failed for some reason. If a message is being redelivered repeatedly, the cause is likely to be a problem in the receiving application. Perhaps the application is able to receive the message but is unable to process it, and so it throws an exception or rolls back the transaction. If there is a prolonged reason why the message cannot be processed, such as the message being "bad" in some way, the same message will be redelivered over and over again, wasting resources and preventing subsequent "good" messages from being processed.
The JMSXDeliveryCount
property allows a consuming application to detect that a message has been redelivered multiple times and is, therefore, "bad" in some way. The application can use this information to take some special action (instead of simply triggering yet another redelivery), such as consuming the message and sending it to a separate queue of "bad" messages for administrator action.
Some JMS providers already offer nonstandard facilities to detect messages that have been redelivered repeatedly and divert them to a dead-message queue. While JMS 2.0 defines how such messages should be handled, the JMSXDeliveryCount
property allows applications to implement their own "bad" message handling code in a portable way.
Listing 8 shows a MessageListener
that throws a RuntimeException
to simulate an error in processing a "bad" message. The MessageListener
uses the JMSXDeliveryCount
property to detect that a message has been redelivered ten times and take a different action.
class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
int deliveryCount = message.getIntProperty("JMSXDeliveryCount");
if (deliveryCount<10){
// now throw a RuntimeException
// to simulate a problem processing the message
// the message will then be redelivered
throw new RuntimeException("Exception thrown to simulate a bad message");
} else {
// message has been redelivered ten times,
// let's do something to prevent endless redeliveries
// such as sending it to dead message queue
// details omitted
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
Listing 8
MDB Configuration Properties
A Java EE application that needs to receive messages asynchronously does so using an MDB, which is configured by specifying a number of configuration properties.
Previous versions of Java EE were distinctly vague about how an MDB was configured. In EJB 3.1, the only configuration properties defined were:
acknowledgeMode
(used only when transactions are bean-managed; can be set to eitherAuto-acknowledge
orDups-ok-acknowledg
e)messageSelector
destinationType
(can be set to eitherjavax.jms.Queue
orjavax.jms.Topic
)subscriptionDurability
(used only for topics; can be set to eitherDurable
orNonDurable
)
However, EJB 3.1 didn't define how the application should specify which queue or topic the MDB received messages from. It was left to the application server or resource adapter to define a nonstandard way to do this.
EJB 3.1 also didn't define—when messages were received from a topic and the subscriptionDurability
property was set to Durable
—how the subscription name and client identifier should be specified. And there was no standard way in EJB 3.1 to specify the connection factory that was used by the MDB to create its connection to the JMS server.
These rather surprising limitations are all addressed in the latest version of Java EE. EJB 3.2 (part of Java EE 7) defines the following additional configuration properties:
destinationLookup
: The JNDI name of an administratively defined Queue or Topic object that represents the queue or topic from which the MDB will receive messagesconnectionFactoryLookup
: The JNDI name of an administratively definedConnectionFactory
object that the MDB will use to connect to the JMS providerclientId
: A client identifier used when the MDB connects to the JMS providersubscriptionName
: A durable subscription name used whensubscriptionDurability
is set toDurable
Most application servers supported clientId
and subscriptionName
anyway, so defining these as standard is simply formalizing existing practice.
Of course, it was always possible to configure the queue or topic used by a JMS MDB, and many (but not all) application servers provided a way to specify the connection factory. However, the way this was done was nonstandard and varied from one application server to another. Application servers remain free to continue to support these nonstandard mechanisms. However, you can be confident that applications that use destinationLookup
and connectionFactoryLookup
will work with multiple application servers.
Listing 9 shows a JMS MDB that consumes messages from a durable subscription on a topic and uses the new standard properties:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(
propertyName = "connectionFactoryLookup", propertyValue = "jms/MyConnectionFactory"),
@ActivationConfigProperty(
propertyName = "destinationLookup", propertyValue = "jmq/PriceFeed"),
@ActivationConfigProperty(
propertyName = "destinationType ", propertyValue = "javax.jms.Topic "),
@ActivationConfigProperty(
propertyName = "subscriptionDurability ", propertyValue = "Durable"),
@ActivationConfigProperty(
propertyName = "subscriptionName", propertyValue = "MySub"),
@ActivationConfigProperty(
propertyName = "clientId", propertyValue = "MyClientId") })
public class MyMDB implements MessageListener {
public void onMessage(Message message) {
...
Listing 9
Conclusion
All of the five features described above lead to easier messaging for Java developers. Taken together with the ease-of-use features discussed in Part One, they represent a major step forward for JMS 2.0—which should continue to flourish as one of the most successful APIs in the Java landscape.
See Also
About the Author
Nigel Deakin, a Principal Member of Technical Staff at Oracle, was Specification Lead for JSR 343, Java Message Service 2.0. In addition to his responsibilities for leading the next versions of the JMS specification, he is a member of Oracle's JMS development team, working on Open Message Queue and the GlassFish application server. He has spoken recently at JavaOne in San Francisco, US, and at Devoxx in Antwerp, Belgium, and he is based in Cambridge, UK.
Join the Conversation
Join the Java community conversation on Facebook, Twitter, and the Oracle Java Blog!