On this page
JMS Producer and Consumer
This article covers practical JMS producer and consumer patterns using both the native API and Spring JMS integration.
Point-to-Point Producer
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("orders");
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage("Order #12345 created");
message.setStringProperty("orderId", "12345");
message.setStringProperty("eventType", "ORDER_CREATED");
producer.send(message);
}
Point-to-Point Consumer
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("orders");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(5000); // 5s timeout
if (message instanceof TextMessage textMessage) {
processOrder(textMessage.getText());
message.acknowledge();
}
}
Pub/Sub
// Publisher
Topic topic = session.createTopic("order.events");
MessageProducer producer = session.createProducer(topic);
TextMessage msg = session.createTextMessage("Order shipped");
msg.setStringProperty("event", "ORDER_SHIPPED");
producer.send(msg);
// Subscriber (durable)
MessageConsumer consumer = session.createDurableSubscriber(topic, "shipping-service");
consumer.setMessageListener(message -> {
try {
processEvent((TextMessage) message);
} catch (JMSException e) {
log.error("Failed to process", e);
}
});
Async Message Listener
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("tasks");
session.createConsumer(queue).setMessageListener(message -> {
try {
if (message instanceof TextMessage text) {
processTask(text.getText());
}
} catch (JMSException e) {
log.error("Processing failed", e);
}
});
Spring JMS
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
@Service
public class OrderProducer {
private final JmsTemplate jmsTemplate;
public void sendOrderEvent(OrderEvent event) {
jmsTemplate.convertAndSend("orders", event);
}
public void publishEvent(OrderEvent event) {
jmsTemplate.convertAndSend(new ActiveMQTopic("order.events"), event);
}
}
@Component
public class OrderConsumer {
@JmsListener(destination = "orders", containerFactory = "queueFactory")
public void handleOrder(OrderEvent event) {
processOrder(event);
}
@JmsListener(destination = "order.events", containerFactory = "topicFactory",
subscription = "inventory-service")
public void handleOrderEvent(OrderEvent event) {
updateInventory(event);
}
}
Transactional Messaging
@Transactional
public void createOrderWithEvent(CreateOrderRequest request) {
Order order = orderRepository.save(new Order(request));
jmsTemplate.convertAndSend("orders", new OrderEvent(order.getId(), "CREATED"));
// Both DB save and JMS send commit or rollback together
}
Configure JMS session transacted:
@Bean
public JmsListenerContainerFactory<?> queueFactory(ConnectionFactory cf) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(cf);
factory.setSessionTransacted(true);
factory.setConcurrency("3-10");
return factory;
}
Message Selectors
Filter messages at the broker level:
MessageConsumer consumer = session.createConsumer(queue, "eventType = 'ORDER_CREATED'");
MessageConsumer highPriority = session.createConsumer(queue, "priority > 5");
Best Practices
- Use Spring JMS for cleaner integration with Spring Boot applications
- Set concurrency range (
3-10) based on processing capacity - Use durable subscribers for pub/sub when message loss is unacceptable
- Combine JMS with database transactions for exactly-once processing
- Use message selectors to route messages to specialized consumers