Event Streaming with Kafka Streams Scale By The Bay 2020 / Online @gAmUssA | #bythebay | @confluentinc

@gamussa | #bythebay | @confluentinc

Preface

Stream Processing is the toolset for dealing with events as they move! @gamussa | #bythebay | @confluentinc

Java Apps with Kafka Streams or KSQL Serving Layer (Microservices, Elastic, etc.) Event Streaming Continuous platform Computation API based clustering @gamussa | #bythebay | @confluentinc

Apache Kafka® Event Streaming Platform 101

Event Streaming Platform Architecture Application Application Application KSQL Native Client library Kafka Streams Kafka Streams Load Balancer * REST Proxy Schema Registry Kafka Brokers @gamussa | Kafka Connect Zookeeper Nodes @ #bythebay | @confluentinc

The log is a simple idea New Old Messages are added at the end of the log @gamussa | #bythebay | @confluentinc

Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Scan Viktor is here @gamussa | #bythebay | @confluentinc Scan

Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Scan @gamussa | #bythebay | @confluentinc Viktor is here Scan

Shard data to get scalability Producer (1) Cluster of machines Producer (2) Producer (3) Messages are sent to different partitions Partitions live on different machines @gamussa | #bythebay | @confluentinc

// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; // actually doing something useful groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc

if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc

if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc

https://twitter.com/monitoring_king/status/1048264580743479296

LET’S TALK ABOUT THIS FRAMEWORK OF YOURS. I THINK@gamussa ITS GOOD, EXCEPT IT SUCKS | #bythebay | @confluentinc

SO LET ME SHOW KAFKA STREAMS @gamussa | #bythebay BE | @confluentinc THAT WAY IT MIGHT REALLY GOOD

Talk is cheap! Show me code!

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc

@gamussa | #bythebay | @confluentinc

Every framework Wants to be when it grows up Scalable Elastic Stateful Fault-tolerant Distributed @gamussa | #bythebay | @confluentinc

Where do I put my compute? @gAmUssA | #bythebay | @confluentinc

Where do I put my state? @gAmUssA | #bythebay | @confluentinc

The actual question is Where is my code? @gAmUssA | #bythebay | @confluentinc

the KAFKA STREAMS API is a JAVA API to BUILD REAL-TIME APPLICATIONS @gamussa | #bythebay | @confluentinc

App Streams API Not running inside brokers! @gamussa | #bythebay | @confluentinc

Same app, many instances App App App Streams API Streams API Streams API @gamussa | #bythebay | @confluentinc Brokers? Nope!

Before Processing Cluster Shared Database Your Job @gamussa | #bythebay | @confluentinc Dashboard

After Dashboard APP Streams API @gamussa | #bythebay | @confluentinc

this means you can DEPLOY your app ANYWHERE using WHATEVER TECHNOLOGY YOU WANT @gamussa | #bythebay | @confluentinc

So many places to run you app! …and many more… @gamussa | #bythebay | @confluentinc

Things Kafka Stream Does Enterprise Support Open Source Powerful Processing incl. Filters, Transforms, Joins, Aggregations, Windowing Runs Everywhere Supports Streams and Tables Elastic, Scalable, Fault-tolerant Exactly-Once Processing @gamussa | #bythebay | @confluentinc Kafka Security Integration Event-Time Processing

Want to learn more? @gamussa | #bythebay | @confluentinc

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io Watch full version https://gamov.dev/developer

@gamussa | #bythebay | @confluentinc