Thursday, November 2, 2017

JMS with ActiveMQ

JMS(or Java Message Service) API which is a part of java EE platform, provides a way to do communications between computers in a network by sending and receiving objects called messages. JMS API can be used to assure loose coupling, reliability, and asynchronousness of messaging.

JMS Architecture
Members of a JMS communication mainly consists of below components. Producers and Consumers are commonly called as clients.
  1. JMS Provider
    JMS Provider is the messaging system that implements the JMS API.
    Apache ActiveMQ, RabbitMQ, HornetQ and OpenJMS are some example JMS providers.
  2. Producer
  3. Sender of the message
  4. Consumer
  5. Receiver of the message

Involvement of these three members is shown in below diagram.



ActiveMQ as a JMS Provider

As I have mentioned before ActiveMQ is a JMS provider which implements the JMS API.

Below three classes show how to implement the above architecture using an ActiveMQ as an embedded broker.

Broker
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;

import javax.jms.Connection;
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;

public class AMQService {
  public static final String BROKER_URI = "tcp://localhost:61616";
  private static ActiveMQConnectionFactory connectionFactory;
  private static Connection connection;
  static BufferedReader keyboard = new BufferedReader(new InputStreamReader(System.in));

  public static void main(String[] args) {
    try {
      startBroker();
      startListener();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  
  private static void startBroker() {
    BrokerService broker = new BrokerService();
    TransportConnector connector = new TransportConnector();
    try {
      connector.setUri(new URI(BROKER_URI));
      broker.addConnector(connector);
      broker.start();
      System.out.println("Message broker started at " + BROKER_URI);
    } catch (URISyntaxException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  private static void startListener() {
    connectionFactory = new ActiveMQConnectionFactory(BROKER_URI);
    try {
      connection = connectionFactory.createConnection();
      connection.start();
      System.out.println("Listener started....");
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}


Producer
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    private Connection connection;
    private Session session;
    private MessageProducer messageProducer;

    public void create(String destinationName) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                AMQService.BROKER_URI);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(destinationName);
        messageProducer = session.createProducer(destination);
    }

    public void close() throws JMSException {
        connection.close();
    }

    public void sendMessage(String message) throws JMSException {
        TextMessage textMessage = session.createTextMessage(message);
        messageProducer.send(textMessage);
        System.out.println("Producer sent: " + message);
    }
}

Consumer
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    private Connection connection;
    private Session session;
    private MessageConsumer messageConsumer;

    public void create(String destinationName) throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                AMQService.BROKER_URI);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(destinationName);
        messageConsumer = session.createConsumer(destination);
        connection.start();
    }

    public void close() throws JMSException {
        connection.close();
    }

    public String getMessage(int timeout) throws JMSException {
    String text = "";
        Message message = messageConsumer.receive(timeout);

        if (message != null) {
            TextMessage textMessage = (TextMessage) message;
            text = textMessage.getText();
            System.out.println("Consumer received: " + text);
        } else {
        text = "";
            System.out.println(text);
        }
        return text;
    }
}

You can use below class to test above consumer producer communication.
import javax.jms.JMSException;

public class Test {
private static Producer producer;
private static Consumer consumer;
private static String queueName = "queue1";
public static void main(String[] args) throws JMSException {
producer = new Producer();
producer.create(queueName);

consumer = new Consumer();
consumer.create(queueName);

try {
String message = "Hello my consumers....";
producer.sendMessage(message);

String receivedMessage = consumer.getMessage(1000);
String status = message.equals(receivedMessage) ? "Success..." : "Fail...";
System.out.println(status);
} catch (JMSException e) {
System.out.println("JMS Exception occurred");
}
producer.close();
consumer.close();
}
}