On this page
Kafka Producers and Consumers
The Kafka Java client provides KafkaProducer and KafkaConsumer for publishing and consuming records. Spring Kafka wraps these with higher-level abstractions.
Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props)) {
OrderEvent event = new OrderEvent("order-123", "CREATED", Instant.now());
ProducerRecord<String, OrderEvent> record =
new ProducerRecord<>("orders", event.orderId(), event);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("Failed to send", exception);
} else {
log.info("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
}
});
}
Producer Configuration
| Property | Recommended | Description |
|---|---|---|
acks |
all |
Wait for all ISR replicas |
retries |
3 |
Retry on transient failures |
enable.idempotence |
true |
Exactly-once semantics |
batch.size |
16384 |
Batch size in bytes |
linger.ms |
5 |
Wait time for batching |
Consumer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
try (KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("orders"));
while (running) {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, OrderEvent> record : records) {
processOrder(record.value());
}
consumer.commitSync(); // manual commit after processing
}
}
Spring Kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
@Service
public class OrderProducer {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void publish(OrderEvent event) {
kafkaTemplate.send("orders", event.orderId(), event);
}
}
@Component
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-processor")
public void handle(ConsumerRecord<String, OrderEvent> record) {
processOrder(record.value());
}
}
Consumer Groups
Topic "orders" (3 partitions)
Consumer Group "processor":
Consumer 1 → Partition 0
Consumer 2 → Partition 1
Consumer 3 → Partition 2
Each partition is consumed by exactly one consumer in the group. Adding consumers scales parallelism up to the partition count.
Error Handling
@KafkaListener(topics = "orders")
public void handle(OrderEvent event) {
try {
processOrder(event);
} catch (RetryableException e) {
throw e; // retry via error handler
} catch (Exception e) {
// send to dead letter topic
kafkaTemplate.send("orders-dlt", event.orderId(), event);
}
}
Best Practices
- Use manual commit (
enable.auto.commit=false) for at-least-once processing - Enable idempotent producer for exactly-once delivery
- Size consumer group ≤ number of partitions
- Use dead letter topics for poison messages
- Monitor consumer lag with Kafka tools or Micrometer metrics