RabbitMQ supports several messaging patterns for different use cases — from simple task distribution to request-reply RPC and reliable dead letter handling.

Work Queue (Competing Consumers)

Distribute tasks among multiple workers:

  // Producer
channel.queueDeclare("tasks", true, false, false, null);
for (String task : tasks) {
    channel.basicPublish("", "tasks", MessageProperties.PERSISTENT_TEXT_PLAIN, task.getBytes());
}

// Worker (multiple instances)
channel.basicQos(1);  // fair dispatch
channel.basicConsume("tasks", false, (tag, delivery) -> {
    processTask(new String(delivery.getBody()));
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, tag -> {});
  

Fair dispatch ensures workers with short tasks are not starved.

Publish/Subscribe

Broadcast events to multiple consumers:

  channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT, true);
channel.queueDeclare("", false, true, true, null);  // anonymous, auto-delete
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");

channel.basicConsume(queueName, true, (tag, delivery) -> {
    log.info("Received: {}", new String(delivery.getBody()));
}, tag -> {});
  

Routing (Selective Subscribers)

  channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT, true);
String severity = "error";
channel.basicPublish("direct_logs", severity, null, message.getBytes());

// Bind only to "error" and "warning"
channel.queueBind(queueName, "direct_logs", "error");
channel.queueBind(queueName, "direct_logs", "warning");
  

RPC (Request-Reply)

  // Client
String callbackQueue = channel.queueDeclare().getQueue();
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .correlationId(correlationId)
    .replyTo(callbackQueue)
    .build();
channel.basicPublish("", "rpc_queue", props, request.getBytes());

// Wait for response on callback queue
channel.basicConsume(callbackQueue, true, (tag, delivery) -> {
    if (delivery.getProperties().getCorrelationId().equals(correlationId)) {
        response = new String(delivery.getBody());
    }
}, tag -> {});

// Server
channel.basicConsume("rpc_queue", false, (tag, delivery) -> {
    String response = processRpc(new String(delivery.getBody()));
    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
        .correlationId(delivery.getProperties().getCorrelationId())
        .build();
    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes());
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, tag -> {});
  

Spring alternative:

  @RabbitListener(queues = "rpc.queue")
public String handleRpc(String request) { return processRequest(request); }

String response = rabbitTemplate.convertSendAndReceive("rpc.queue", request);
  

Dead Letter Queue Pattern

  // Retry with backoff via TTL + DLX
Map<String, Object> retryArgs = Map.of(
    "x-dead-letter-exchange", "retry.exchange",
    "x-message-ttl", 30000  // retry after 30s
);
channel.queueDeclare("orders.retry", true, false, false, retryArgs);
channel.queueBind("orders.retry", "retry.exchange", "order.retry");
channel.queueBind("orders.queue", "retry.exchange", "order.process");
  

Idempotent Consumer

Prevent duplicate processing:

  @RabbitListener(queues = "orders.queue")
public void handleOrder(OrderEvent event, Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    if (processedIds.contains(event.id())) {
        channel.basicAck(tag, false);  // already processed
        return;
    }
    processOrder(event);
    processedIds.add(event.id());
    channel.basicAck(tag, false);
}
  

Best Practices

  • Use work queues with basicQos(1) for fair task distribution
  • Implement idempotent consumers to handle duplicate deliveries
  • Use RPC pattern sparingly — prefer async event-driven for decoupling
  • Configure retry with DLX + TTL rather than in-process retry loops
  • Monitor queue depth — growing queues indicate consumer bottleneck