Kafka Streams is a Java library for building real-time stream processing applications. It reads from Kafka topics, transforms data, and writes results back to Kafka — no separate cluster needed.

Core Concepts

Concept Description
KStream Stream of key-value records
KTable Changelog stream (table)
GlobalKTable Fully replicated KTable
Topology DAG of processing nodes
State Store Local RocksDB for stateful operations

Setup

  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.6.1</version>
</dependency>
  

Word Count Example

  Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-input");

KTable<String, Long> wordCounts = textLines
    .flatMapValues(text -> Arrays.asList(text.toLowerCase().split("\\W+")))
    .groupBy((key, word) -> word)
    .count(Materialized.as("word-count-store"));

wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  

Stateful Operations

  // Aggregate order totals by customer
KTable<String, Double> customerTotals = orders
    .groupBy((key, order) -> order.customerId())
    .aggregate(
        () -> 0.0,
        (customerId, order, total) -> total + order.amount(),
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("customer-totals")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Double())
    );
  

Windowed Aggregations

  KTable<Windowed<String>, Long> hourlyCounts = clicks
    .groupBy((key, click) -> click.pageId())
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .count();
  

Joins

  // Stream-Table join (enrichment)
KStream<String, OrderEvent> enriched = orders
    .join(customers,
        (order, customer) -> order.withCustomerName(customer.name()));

// Stream-Stream join (within time window)
KStream<String, PaymentEvent> joined = orders
    .join(payments,
        (order, payment) -> new OrderPayment(order, payment),
        JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)));
  

Spring Cloud Stream with Kafka

  @Bean
public Function<KStream<String, OrderEvent>, KStream<String, OrderSummary>> processOrders() {
    return orders -> orders
        .filter((key, order) -> order.amount() > 100)
        .mapValues(order -> new OrderSummary(order.id(), order.amount()));
}
  

Best Practices

  • Set application.id uniquely per stream processing app
  • Use state store changelog topics for fault tolerance
  • Configure num.stream.threads based on partition count
  • Handle deserialization errors with DeserializationExceptionHandler
  • Monitor state store disk usage — RocksDB can grow large