This article covers practical JMS producer and consumer patterns using both the native API and Spring JMS integration.

Point-to-Point Producer

  ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

try (Connection connection = factory.createConnection()) {
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = session.createQueue("orders");
    MessageProducer producer = session.createProducer(queue);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    TextMessage message = session.createTextMessage("Order #12345 created");
    message.setStringProperty("orderId", "12345");
    message.setStringProperty("eventType", "ORDER_CREATED");
    producer.send(message);
}
  

Point-to-Point Consumer

  try (Connection connection = factory.createConnection()) {
    connection.start();
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    Queue queue = session.createQueue("orders");
    MessageConsumer consumer = session.createConsumer(queue);

    Message message = consumer.receive(5000);  // 5s timeout
    if (message instanceof TextMessage textMessage) {
        processOrder(textMessage.getText());
        message.acknowledge();
    }
}
  

Pub/Sub

  // Publisher
Topic topic = session.createTopic("order.events");
MessageProducer producer = session.createProducer(topic);
TextMessage msg = session.createTextMessage("Order shipped");
msg.setStringProperty("event", "ORDER_SHIPPED");
producer.send(msg);

// Subscriber (durable)
MessageConsumer consumer = session.createDurableSubscriber(topic, "shipping-service");
consumer.setMessageListener(message -> {
    try {
        processEvent((TextMessage) message);
    } catch (JMSException e) {
        log.error("Failed to process", e);
    }
});
  

Async Message Listener

  connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("tasks");

session.createConsumer(queue).setMessageListener(message -> {
    try {
        if (message instanceof TextMessage text) {
            processTask(text.getText());
        }
    } catch (JMSException e) {
        log.error("Processing failed", e);
    }
});
  

Spring JMS

  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
  
  spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
  
  @Service
public class OrderProducer {
    private final JmsTemplate jmsTemplate;

    public void sendOrderEvent(OrderEvent event) {
        jmsTemplate.convertAndSend("orders", event);
    }

    public void publishEvent(OrderEvent event) {
        jmsTemplate.convertAndSend(new ActiveMQTopic("order.events"), event);
    }
}

@Component
public class OrderConsumer {
    @JmsListener(destination = "orders", containerFactory = "queueFactory")
    public void handleOrder(OrderEvent event) {
        processOrder(event);
    }

    @JmsListener(destination = "order.events", containerFactory = "topicFactory",
                 subscription = "inventory-service")
    public void handleOrderEvent(OrderEvent event) {
        updateInventory(event);
    }
}
  

Transactional Messaging

  @Transactional
public void createOrderWithEvent(CreateOrderRequest request) {
    Order order = orderRepository.save(new Order(request));
    jmsTemplate.convertAndSend("orders", new OrderEvent(order.getId(), "CREATED"));
    // Both DB save and JMS send commit or rollback together
}
  

Configure JMS session transacted:

  @Bean
public JmsListenerContainerFactory<?> queueFactory(ConnectionFactory cf) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(cf);
    factory.setSessionTransacted(true);
    factory.setConcurrency("3-10");
    return factory;
}
  

Message Selectors

Filter messages at the broker level:

  MessageConsumer consumer = session.createConsumer(queue, "eventType = 'ORDER_CREATED'");
MessageConsumer highPriority = session.createConsumer(queue, "priority > 5");
  

Best Practices

  • Use Spring JMS for cleaner integration with Spring Boot applications
  • Set concurrency range (3-10) based on processing capacity
  • Use durable subscribers for pub/sub when message loss is unacceptable
  • Combine JMS with database transactions for exactly-once processing
  • Use message selectors to route messages to specialized consumers