The Kafka Java client provides KafkaProducer and KafkaConsumer for publishing and consuming records. Spring Kafka wraps these with higher-level abstractions.

Producer

  Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

try (KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props)) {
    OrderEvent event = new OrderEvent("order-123", "CREATED", Instant.now());
    ProducerRecord<String, OrderEvent> record =
        new ProducerRecord<>("orders", event.orderId(), event);

    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            log.error("Failed to send", exception);
        } else {
            log.info("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
        }
    });
}
  

Producer Configuration

Property Recommended Description
acks all Wait for all ISR replicas
retries 3 Retry on transient failures
enable.idempotence true Exactly-once semantics
batch.size 16384 Batch size in bytes
linger.ms 5 Wait time for batching

Consumer

  Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

try (KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(List.of("orders"));

    while (running) {
        ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, OrderEvent> record : records) {
            processOrder(record.value());
        }
        consumer.commitSync();  // manual commit after processing
    }
}
  

Spring Kafka

  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  
  @Service
public class OrderProducer {
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void publish(OrderEvent event) {
        kafkaTemplate.send("orders", event.orderId(), event);
    }
}

@Component
public class OrderConsumer {
    @KafkaListener(topics = "orders", groupId = "order-processor")
    public void handle(ConsumerRecord<String, OrderEvent> record) {
        processOrder(record.value());
    }
}
  

Consumer Groups

  Topic "orders" (3 partitions)
  Consumer Group "processor":
    Consumer 1 → Partition 0
    Consumer 2 → Partition 1
    Consumer 3 → Partition 2
  

Each partition is consumed by exactly one consumer in the group. Adding consumers scales parallelism up to the partition count.

Error Handling

  @KafkaListener(topics = "orders")
public void handle(OrderEvent event) {
    try {
        processOrder(event);
    } catch (RetryableException e) {
        throw e;  // retry via error handler
    } catch (Exception e) {
        // send to dead letter topic
        kafkaTemplate.send("orders-dlt", event.orderId(), event);
    }
}
  

Best Practices

  • Use manual commit (enable.auto.commit=false) for at-least-once processing
  • Enable idempotent producer for exactly-once delivery
  • Size consumer group ≤ number of partitions
  • Use dead letter topics for poison messages
  • Monitor consumer lag with Kafka tools or Micrometer metrics