On this page
Kafka Streams
Kafka Streams is a Java library for building real-time stream processing applications. It reads from Kafka topics, transforms data, and writes results back to Kafka — no separate cluster needed.
Core Concepts
| Concept | Description |
|---|---|
| KStream | Stream of key-value records |
| KTable | Changelog stream (table) |
| GlobalKTable | Fully replicated KTable |
| Topology | DAG of processing nodes |
| State Store | Local RocksDB for stateful operations |
Setup
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.1</version>
</dependency>
Word Count Example
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-input");
KTable<String, Long> wordCounts = textLines
.flatMapValues(text -> Arrays.asList(text.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-count-store"));
wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Stateful Operations
// Aggregate order totals by customer
KTable<String, Double> customerTotals = orders
.groupBy((key, order) -> order.customerId())
.aggregate(
() -> 0.0,
(customerId, order, total) -> total + order.amount(),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-totals")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double())
);
Windowed Aggregations
KTable<Windowed<String>, Long> hourlyCounts = clicks
.groupBy((key, click) -> click.pageId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count();
Joins
// Stream-Table join (enrichment)
KStream<String, OrderEvent> enriched = orders
.join(customers,
(order, customer) -> order.withCustomerName(customer.name()));
// Stream-Stream join (within time window)
KStream<String, PaymentEvent> joined = orders
.join(payments,
(order, payment) -> new OrderPayment(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)));
Spring Cloud Stream with Kafka
@Bean
public Function<KStream<String, OrderEvent>, KStream<String, OrderSummary>> processOrders() {
return orders -> orders
.filter((key, order) -> order.amount() > 100)
.mapValues(order -> new OrderSummary(order.id(), order.amount()));
}
Best Practices
- Set
application.iduniquely per stream processing app - Use state store changelog topics for fault tolerance
- Configure
num.stream.threadsbased on partition count - Handle deserialization errors with
DeserializationExceptionHandler - Monitor state store disk usage — RocksDB can grow large