On this page
Messaging Patterns
RabbitMQ supports several messaging patterns for different use cases — from simple task distribution to request-reply RPC and reliable dead letter handling.
Work Queue (Competing Consumers)
Distribute tasks among multiple workers:
// Producer
channel.queueDeclare("tasks", true, false, false, null);
for (String task : tasks) {
channel.basicPublish("", "tasks", MessageProperties.PERSISTENT_TEXT_PLAIN, task.getBytes());
}
// Worker (multiple instances)
channel.basicQos(1); // fair dispatch
channel.basicConsume("tasks", false, (tag, delivery) -> {
processTask(new String(delivery.getBody()));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, tag -> {});
Fair dispatch ensures workers with short tasks are not starved.
Publish/Subscribe
Broadcast events to multiple consumers:
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT, true);
channel.queueDeclare("", false, true, true, null); // anonymous, auto-delete
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, (tag, delivery) -> {
log.info("Received: {}", new String(delivery.getBody()));
}, tag -> {});
Routing (Selective Subscribers)
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT, true);
String severity = "error";
channel.basicPublish("direct_logs", severity, null, message.getBytes());
// Bind only to "error" and "warning"
channel.queueBind(queueName, "direct_logs", "error");
channel.queueBind(queueName, "direct_logs", "warning");
RPC (Request-Reply)
// Client
String callbackQueue = channel.queueDeclare().getQueue();
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(correlationId)
.replyTo(callbackQueue)
.build();
channel.basicPublish("", "rpc_queue", props, request.getBytes());
// Wait for response on callback queue
channel.basicConsume(callbackQueue, true, (tag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(correlationId)) {
response = new String(delivery.getBody());
}
}, tag -> {});
// Server
channel.basicConsume("rpc_queue", false, (tag, delivery) -> {
String response = processRpc(new String(delivery.getBody()));
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, tag -> {});
Spring alternative:
@RabbitListener(queues = "rpc.queue")
public String handleRpc(String request) { return processRequest(request); }
String response = rabbitTemplate.convertSendAndReceive("rpc.queue", request);
Dead Letter Queue Pattern
// Retry with backoff via TTL + DLX
Map<String, Object> retryArgs = Map.of(
"x-dead-letter-exchange", "retry.exchange",
"x-message-ttl", 30000 // retry after 30s
);
channel.queueDeclare("orders.retry", true, false, false, retryArgs);
channel.queueBind("orders.retry", "retry.exchange", "order.retry");
channel.queueBind("orders.queue", "retry.exchange", "order.process");
Idempotent Consumer
Prevent duplicate processing:
@RabbitListener(queues = "orders.queue")
public void handleOrder(OrderEvent event, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
if (processedIds.contains(event.id())) {
channel.basicAck(tag, false); // already processed
return;
}
processOrder(event);
processedIds.add(event.id());
channel.basicAck(tag, false);
}
Best Practices
- Use work queues with
basicQos(1)for fair task distribution - Implement idempotent consumers to handle duplicate deliveries
- Use RPC pattern sparingly — prefer async event-driven for decoupling
- Configure retry with DLX + TTL rather than in-process retry loops
- Monitor queue depth — growing queues indicate consumer bottleneck