Event Streaming with Kafka Streams Scale By The Bay 2020 / Online @gAmUssA | #bythebay | @confluentinc
A presentation at Scale By the Bay in November 2020 in by Viktor Gamov
Event Streaming with Kafka Streams Scale By The Bay 2020 / Online @gAmUssA | #bythebay | @confluentinc
@gamussa | #bythebay | @confluentinc
Preface
Stream Processing is the toolset for dealing with events as they move! @gamussa | #bythebay | @confluentinc
Java Apps with Kafka Streams or KSQL Serving Layer (Microservices, Elastic, etc.) Event Streaming Continuous platform Computation API based clustering @gamussa | #bythebay | @confluentinc
Apache Kafka® Event Streaming Platform 101
Event Streaming Platform Architecture Application Application Application KSQL Native Client library Kafka Streams Kafka Streams Load Balancer * REST Proxy Schema Registry Kafka Brokers @gamussa | Kafka Connect Zookeeper Nodes @ #bythebay | @confluentinc
The log is a simple idea New Old Messages are added at the end of the log @gamussa | #bythebay | @confluentinc
Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Scan Viktor is here @gamussa | #bythebay | @confluentinc Scan
Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Scan @gamussa | #bythebay | @confluentinc Viktor is here Scan
Shard data to get scalability Producer (1) Cluster of machines Producer (2) Producer (3) Messages are sent to different partitions Partitions live on different machines @gamussa | #bythebay | @confluentinc
// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; // actually doing something useful groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc
if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc
if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc
https://twitter.com/monitoring_king/status/1048264580743479296
LET’S TALK ABOUT THIS FRAMEWORK OF YOURS. I THINK@gamussa ITS GOOD, EXCEPT IT SUCKS | #bythebay | @confluentinc
SO LET ME SHOW KAFKA STREAMS @gamussa | #bythebay BE | @confluentinc THAT WAY IT MIGHT REALLY GOOD
Talk is cheap! Show me code!
final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc
final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc
final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc
final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc
@gamussa | #bythebay | @confluentinc
Every framework Wants to be when it grows up Scalable Elastic Stateful Fault-tolerant Distributed @gamussa | #bythebay | @confluentinc
Where do I put my compute? @gAmUssA | #bythebay | @confluentinc
Where do I put my state? @gAmUssA | #bythebay | @confluentinc
The actual question is Where is my code? @gAmUssA | #bythebay | @confluentinc
the KAFKA STREAMS API is a JAVA API to BUILD REAL-TIME APPLICATIONS @gamussa | #bythebay | @confluentinc
App Streams API Not running inside brokers! @gamussa | #bythebay | @confluentinc
Same app, many instances App App App Streams API Streams API Streams API @gamussa | #bythebay | @confluentinc Brokers? Nope!
Before Processing Cluster Shared Database Your Job @gamussa | #bythebay | @confluentinc Dashboard
After Dashboard APP Streams API @gamussa | #bythebay | @confluentinc
this means you can DEPLOY your app ANYWHERE using WHATEVER TECHNOLOGY YOU WANT @gamussa | #bythebay | @confluentinc
So many places to run you app! …and many more… @gamussa | #bythebay | @confluentinc
Things Kafka Stream Does Enterprise Support Open Source Powerful Processing incl. Filters, Transforms, Joins, Aggregations, Windowing Runs Everywhere Supports Streams and Tables Elastic, Scalable, Fault-tolerant Exactly-Once Processing @gamussa | #bythebay | @confluentinc Kafka Security Integration Event-Time Processing
Want to learn more? @gamussa | #bythebay | @confluentinc
Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io Watch full version https://gamov.dev/developer
@gamussa | #bythebay | @confluentinc