5
Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice);
stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa
|
#cloudnativenyc
|
@ConfluentINc