JMS Architecture
Members of a JMS communication mainly consists of below components. Producers and Consumers are commonly called as clients.
- JMS Provider
JMS Provider is the messaging system that implements the JMS API.
Apache ActiveMQ, RabbitMQ, HornetQ and OpenJMS are some example JMS providers.
- Producer Sender of the message
- Consumer Receiver of the message
Involvement of these three members is shown in below diagram.
ActiveMQ as a JMS Provider
As I have mentioned before ActiveMQ is a JMS provider which implements the JMS API.
Below three classes show how to implement the above architecture using an ActiveMQ as an embedded broker.
Broker
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
public class AMQService {
public static final String BROKER_URI = "tcp://localhost:61616";
private static ActiveMQConnectionFactory connectionFactory;
private static Connection connection;
static BufferedReader keyboard = new BufferedReader(new InputStreamReader(System.in));
public static void main(String[] args) {
try {
startBroker();
startListener();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void startBroker() {
BrokerService broker = new BrokerService();
TransportConnector connector = new TransportConnector();
try {
connector.setUri(new URI(BROKER_URI));
broker.addConnector(connector);
broker.start();
System.out.println("Message broker started at " + BROKER_URI);
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void startListener() {
connectionFactory = new ActiveMQConnectionFactory(BROKER_URI);
try {
connection = connectionFactory.createConnection();
connection.start();
System.out.println("Listener started....");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
public class AMQService {
public static final String BROKER_URI = "tcp://localhost:61616";
private static ActiveMQConnectionFactory connectionFactory;
private static Connection connection;
static BufferedReader keyboard = new BufferedReader(new InputStreamReader(System.in));
public static void main(String[] args) {
try {
startBroker();
startListener();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void startBroker() {
BrokerService broker = new BrokerService();
TransportConnector connector = new TransportConnector();
try {
connector.setUri(new URI(BROKER_URI));
broker.addConnector(connector);
broker.start();
System.out.println("Message broker started at " + BROKER_URI);
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void startListener() {
connectionFactory = new ActiveMQConnectionFactory(BROKER_URI);
try {
connection = connectionFactory.createConnection();
connection.start();
System.out.println("Listener started....");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Producer
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
private Connection connection;
private Session session;
private MessageProducer messageProducer;
public void create(String destinationName) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
AMQService.BROKER_URI);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
messageProducer = session.createProducer(destination);
}
public void close() throws JMSException {
connection.close();
}
public void sendMessage(String message) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
messageProducer.send(textMessage);
System.out.println("Producer sent: " + message);
}
}
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
private Connection connection;
private Session session;
private MessageProducer messageProducer;
public void create(String destinationName) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
AMQService.BROKER_URI);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
messageProducer = session.createProducer(destination);
}
public void close() throws JMSException {
connection.close();
}
public void sendMessage(String message) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
messageProducer.send(textMessage);
System.out.println("Producer sent: " + message);
}
}
Consumer
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
private Connection connection;
private Session session;
private MessageConsumer messageConsumer;
public void create(String destinationName) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
AMQService.BROKER_URI);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
messageConsumer = session.createConsumer(destination);
connection.start();
}
public void close() throws JMSException {
connection.close();
}
public String getMessage(int timeout) throws JMSException {
String text = "";
Message message = messageConsumer.receive(timeout);
if (message != null) {
TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
System.out.println("Consumer received: " + text);
} else {
text = "";
System.out.println(text);
}
return text;
}
}
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
private Connection connection;
private Session session;
private MessageConsumer messageConsumer;
public void create(String destinationName) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
AMQService.BROKER_URI);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
messageConsumer = session.createConsumer(destination);
connection.start();
}
public void close() throws JMSException {
connection.close();
}
public String getMessage(int timeout) throws JMSException {
String text = "";
Message message = messageConsumer.receive(timeout);
if (message != null) {
TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
System.out.println("Consumer received: " + text);
} else {
text = "
System.out.println(text);
}
return text;
}
}
You can use below class to test above consumer producer communication.
import javax.jms.JMSException;
public class Test {
private static Producer producer;
private static Consumer consumer;
private static String queueName = "queue1";
public static void main(String[] args) throws JMSException {
producer = new Producer();
producer.create(queueName);
consumer = new Consumer();
consumer.create(queueName);
try {
String message = "Hello my consumers....";
producer.sendMessage(message);
String receivedMessage = consumer.getMessage(1000);
String status = message.equals(receivedMessage) ? "Success..." : "Fail...";
System.out.println(status);
} catch (JMSException e) {
System.out.println("JMS Exception occurred");
}
producer.close();
consumer.close();
}
}
public class Test {
private static Producer producer;
private static Consumer consumer;
private static String queueName = "queue1";
public static void main(String[] args) throws JMSException {
producer = new Producer();
producer.create(queueName);
consumer = new Consumer();
consumer.create(queueName);
try {
String message = "Hello my consumers....";
producer.sendMessage(message);
String receivedMessage = consumer.getMessage(1000);
String status = message.equals(receivedMessage) ? "Success..." : "Fail...";
System.out.println(status);
} catch (JMSException e) {
System.out.println("JMS Exception occurred");
}
producer.close();
consumer.close();
}
}