Writing JMS Application, JMS Producer and JMS Consumer

1. WRITING JMS APPLICATION

The following are the basic steps to write a JMS client:

  • Create/Look up a ConnectionFactory object.
  • Use the ConnectionFactory to create a Connection.
  • Use the Connection to create one or more Session objects.
  • Create/Look up one or more Destination objects.
  • Use a Session and a Destination to create MessageProducer and MessageConsumer objects.
  • Send and receive messages.

To write any JMS client, we first need a reference to a ConnectionFactory object, which encapsulates the necessary configuration properties for creating connections to the MQ broker. We can easily create such an object by instantiating it explicitly as follows:

ConnectionFactory cf = new com.sun.messaging.ConnectionFactory();

This object may be obtained by JNDI lookup, which we shall discuss later in this chapter. Since, we have created this object, we have to configure it properly. The minimal configuration requires specifying port number and name or address of the machine where broker is running. We do this as follows:

((com.sun.messaging.ConnectionFactory)cf).setProperty(com.sun.messaging.Connect

ionConfiguration.imqAddressList,”172.16.5.81:7676″);

This ConnectionFactory object is used to create connections with the Sun’s Open Message Queue JMS provider. The following is an example:

Connection con = cf.createConnection();

It represents an open connection (usually a TCP/IP socket) between a JMS client and a broker. It is responsible for authenticating clients, providing connection meta data, creating Session objects etc. and relatively heavyweight object. So, it is recommended to create one Connection object, unless it is unavoidable. This connection initially remains in the stopped mode. It means, it does not allow to consume messages from a destination. However, producers may produce their messages, even though there is a stopped connection.

We then create a Session object:

Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

A Session object represents a singled threaded communication session between a JMS client and a JMS broker. It allows us to create producers/consumers, queues/topics, produce/consume messages etc.

This portion is common in all JMS clients. Let us now write code that sends messages to a queue using P2P model.

2. WRITING A P2P PRODUCER

A producer in P2P model is called queue sender. Let us write a simple queue sender that produces a simple text message. Since, in P2P model, a message destination is called a queue, let us create a Queue object explicitly as follows:

Destination dest = sn.createQueue(args[0]);

This creates a Queue in the broker with name AQueue, if it does not exist already. A Destination object is also a JMS administered object and may also be obtained from a JNDI look up, which we shall discuss later in this chapter.

To send messages to a destination, a queue sender uses a MessageProducer object, which is created by passing a Destination object to createProducer() method of Session:

MessageProducer mp = sn.createProducer(dest);

It is also possible to create a MessageProducer without supplying a destination. The destination is specified later when a message is sent using send() method. We then create a text message to be sent as follows:

TextMessage tm = sn.createTextMessage();

tm.setText(”A test message”);

A TextMessage object is one that contains a String object, which has been specified using setText() method. Finally, the message is sent to the queue using send() method:

mp.send(tm);

The interesting part is that there is no trace of who will finally read the message. Actually, the producer need not know where or who the consumer is; it just sends messages into queue AQueue and what happens from there to the sent messages is not of producer’s interest any more.

It is a good idea to close session and connection, so that resources occupied by these objects are released:

sn.close();

con.close();

The entire source code (Sender.java) of the sender is shown below:

import javax.jms.*; public class Sender {

public static void main(String[] args) {

try {

ConnectionFactory cf = new com.sun.messaging.ConnectionFactory();

((com.sun.messaging.ConnectionFactory)cf).setProperty(com.sun.messaging.Connect

ionConfiguration.imqAddressList,”172.16.5.81:7676”);

Connection con = cf.createConnection();

Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination dest = sn.createQueue(args[0]);

MessageProducer mp = sn.createProducer(dest);

TextMessage tm = sn.createTextMessage();

tm.setText(”A test message”);

mp.send(tm);

System.out.println(”Message sent:”);

sn.close();

con.close();

} catch (Exception e) { e.printStackTrace(); }

}

}

2.1. Running Example

To send a message, make sure that the MQ broker is running port 7676 in a machine having IP address 172.16.5.81.

Compile the program as follows:

javac -cp jms.jar;imq.jar;. Sender.java

The sender can then be executed as follows:

java -cp jms.jar;imq.jar;. Sender AQueue

If everything goes fine, the following message appears on the screen:

Message sent:

The content of the queue may also be verified using imqcmd utility as follows:

imqcmd query dst -n AQueue -t q -u admin

Supply the default password of admin when prompted to do so. A sample output is shown below:

Destination Name                                      AQueue

Destination Type                                      Queue

Destination State                                     RUNNING

Created Administratively                              false

Current Number of Messages

Actual                                                1

3. WRITING A P2P SYNCHRONOUS CONSUMER

A consumer in P2P model is called a queue receiver. Let us now see how to write such a receiver to consume the sent message. Like producer, it also creates Session and Destination objects. Then it creates a MessageConsumer:

MessageConsumer mc = sn.createConsumer(dest);

Note that a connection remains in the stopped mode where a consumer is not allowed to consume any message from a destination. So, let’s make it running:

con.start();

Finally, the message is consumed explicitly by invoking received method. The receiver method just returns a Message object and there are no separate methods for receiving just TextMessage. So, we type cast it and print text message’s content:

TextMessage msg = (TextMessage) mc.receive();

System.out.println(”Received message: ”+msg.getText());

Note that the received method blocks the caller if there is no message in the queue currently. As a result, the consumer cannot continue its normal flow of execution. However, as soon as a message arrives in the queue, it is delivered to the consumer. This way of message consumption is called synchronous consumption of message.

The complete source code (SynReceiver.java) is shown below:

import javax.jms.*;

public class SynReceiver {

public static void main(String[] args) {

try {

ConnectionFactory cf = new com.sun.messaging.ConnectionFactory();

((com.sun.messaging.ConnectionFactory)cf).setProperty(com.sun.messaging.Connect

ionConfiguration.imqAddressList,”172.16.5.81:7676″);

Connection con = cf.createConnection();

Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination dest = sn.createQueue(args[0]);

MessageConsumer mc = sn.createConsumer(dest);

con.start();

TextMessage msg = (TextMessage) mc.receive();

System.out.println(”Received message: ”+msg.getText());

sn.close();

con.close();

} catch (Exception e) { e.printStackTrace(); }

}

}

Compile the program as follows:

javac -cp jms.jar;imq.jar;. SynReceiver.java

The receiver can then be executed as follows:

java -cp jms.jar;imq.jar;. SynReceiver AQueue

If everything goes fine, the following message appears on the screen:

Received message: A test message

The content of the queue may also be verified using imqcmd utility as follows:

imqcmd query dst -n AQueue -t q -u admin

Supply the default password of admin when prompted to do so. A sample output is shown below:

Destination Name                                      AQueue

Destination Type                                      Queue

Destination State                                     RUNNING

Created Administratively                              false

Current Number of Messages

Actual                                                0

It indicates that the message that was produced earlier has been consumed and no longer exists in the queue.

4. WRITING A P2P ASYNCHRONOUS CONSUMER

As mentioned in the previous section, the receiver gets blocked and cannot continue its normal execution if it invokes a receive() method and the queue is empty. This can be avoided by consuming messages asynchronously. In this style of message consumption, consumer simply sets up a listener and continues its normal work. The listener, on behalf of the consumer, listens on thereafter and gets notified by the broker when a message appears in the destination.

A message listener must implement MessageListener interface. Here is a sample listener class:

public class AListener implements MessageListener {

The interface MessageListener has a single method onMessage(), which takes a single parameter Message as follows:

void onMessage(Message message)

The MQ runtime calls this method to deliver a message. We implement the method as follows:

public void onMessage(Message msg) {

TextMessage tm = (TextMessage) msg;

try {

System.out.println(”Received: ” + tm.getText());

} catch (Exception e) { e.printStackTrace(); }

}

An asynchronous consumer sets up a message listener using its setMessageListener() method:

mc.setMessageListener(new AListener());

The consumer can then continue its normal work. However, it must be active and therefore must not close the connection and session. Here is the complete source code (AsynReceiver.java.) of asynchronous consumer:

import javax.jms.*;

public class AsynReceiver {

public static void main(String[] args) {

try {

ConnectionFactory conFactory = new com.sun.messaging.ConnectionFactory();

((com.sun.messaging.ConnectionFactory)conFactory).setProperty(com.sun.messaging

.ConnectionConfiguration.imqAddressList,”172.16.5.81:7676”);

Connection con = conFactory.createConnection();

Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination dest = sn.createQueue(args[0]);

MessageConsumer mc = sn.createConsumer(dest);

con.start();

mc.setMessageListener(new AListener());

System.out.println(”Continuing its own work”);

} catch (Exception e) { e.printStackTrace(); }

}

}

class AListener implements MessageListener {

public void onMessage(Message msg) {

TextMessage tm = (TextMessage) msg;

try {

System.out.println(”Received: ” + tm.getText());

} catch (Exception e) { e.printStackTrace(); }

}

}

Compile and run the program as before. Note that this consumer can continue its work after setting up a listener. The message Continuing its own work appears even if there is no message in the queue.

5. WRITING A PUB/SUB PRODUCER

A producer in a pub/sub model is called a topic publisher. Let us write a simple topic publisher that produces a simple text message. The code for this topic publisher will look exactly like a queue sender except that it creates a topic instead of a queue as follows:

Destination dest = sn.createTopic(“ATopic”);

The complete source code (Publisher.java) is given below:

import javax.jms.*; public class Publisher {

public static void main(String[] args) {

try {

ConnectionFactory cf = new com.sun.messaging.ConnectionFactory();

((com.sun.messaging.ConnectionFactory)cf).setProperty(com.sun.messaging.Connect

ionConfiguration.imqAddressList,”172.16.5.81:7676”);

Connection con = cf.createConnection();

Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination dest = sn.createTopic(”ATopic”);

MessageProducer mp = sn.createProducer(dest);

TextMessage tm = sn.createTextMessage();

tm.setText(”A test message”); mp.send(tm);

System.out.println(”Message sent:”);

sn.close();

con.close();

} catch (Exception e) { e.printStackTrace();}

}

}

6. WRITING A PUB/SUB SYNCHRONOUS CONSUMER

A consumer in a pub/sub model is called a topic subscriber. Let us write a simple topic subscriber that consumes a simple text message. The code for this topic publisher will look exactly like a queue sender, except that it creates a topic instead of a queue as follows:

Destination dest = sn.createTopic(“ATopic”);

The complete source code (SynSubscriber.java) is given below:

import javax.jms.*;

public class SynSubscriber {

public static void main(String[] args) {

try {

ConnectionFactory conFactory = new com.sun.messaging.ConnectionFactory();

((com.sun.messaging.ConnectionFactory)conFactory).setProperty(com.sun.messaging .ConnectionConfiguration.imqAddressList,”172.16.5.81:7676”);

Connection con = conFactory.createConnection();

Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination d = sn.createTopic(”ATopic”);

MessageConsumer mc = sn.createConsumer(d);

con.start();

TextMessage msg = (TextMessage) mc.receive();

System.out.println(”Received message: ”+msg.getText());

sn.close();

con.close();

} catch (Exception e) { e.printStackTrace();

}

}

}

This program is exactly same as SynReceiver.java except that it uses createTopic() method, instead of createdQueue() .

7. RUNNING THIS EXAMPLE

In pub/sub model, publishers and subscribers have a timing dependency. A client can consume only messages published after the client has subscribed itself and remains active. Moreover, the JMS system delivers a message to all active subscribers. So, we can start multiple subscribers to receive the same message. Open two terminals and start two subscribers as follows:

java -cp imq.jar;jms.jar;. SynSubscriber

The subscribers are now ready to receive a single message on topic ATopic. Now start a Publisher that publishes a text message on the same topic.

java -cp imq.jar;jms.jar;. Publisher

A sample result is shown in Figure 25.6:

8. WRITING A PUB/SUB ASYNCHRONOUS CONSUMER

An asynchronous topic subscriber looks almost like an asynchronous queue receiver, except that the former creates a topic instead of a queue. The complete source code (AsynSubscriber.java) is shown below:

import javax.jms.*;

public class AsynSubscriber {

public static void main(String[] args) {

try {

ConnectionFactory conFactory = new com.sun.messaging.ConnectionFactory();

((com.sun.messaging.ConnectionFactory)conFactory).setProperty(com.sun.messaging .ConnectionConfiguration.imqAddressList,”172.16.5.81:7676”);

Connection con = conFactory.createConnection();

Session sn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination dest = sn.createTopic(”ATopic”);

MessageConsumer mc = sn.createConsumer(dest);

con.start();

mc.setMessageListener(new AListener());

System.out.println(”Continuing its own work”);

} catch (Exception e) { e.printStackTrace(); }

}

}

class AListener implements MessageListener {

public void onMessage(Message msg) {

TextMessage tm = (TextMessage) msg; try {

System.out.println(”Received: ” + tm.getText());

} catch (Exception e) { e.printStackTrace(); }

}

}

Note that this program is exactly same as AsynReceiver.java except that it uses createTopic() method instead of createdQueue() .

We can now start multiple subscribers and publishers as before.

Source: Uttam Kumar Roy (2015), Advanced Java programming, Oxford University Press.

Leave a Reply

Your email address will not be published. Required fields are marked *