Event Streaming with Kafka Streams

A presentation at Scale By the Bay​ in November 2020 in by Viktor Gamov

Slide 1

Slide 1

Event Streaming with Kafka Streams Scale By The Bay 2020 / Online @gAmUssA | #bythebay | @confluentinc

Slide 2

Slide 2

@gamussa | #bythebay | @confluentinc

Slide 3

Slide 3

Preface

Slide 4

Slide 4

Stream Processing is the toolset for dealing with events as they move! @gamussa | #bythebay | @confluentinc

Slide 5

Slide 5

Java Apps with Kafka Streams or KSQL Serving Layer (Microservices, Elastic, etc.) Event Streaming Continuous platform Computation API based clustering @gamussa | #bythebay | @confluentinc

Slide 6

Slide 6

Apache Kafka® Event Streaming Platform 101

Slide 7

Slide 7

Event Streaming Platform Architecture Application Application Application KSQL Native Client library Kafka Streams Kafka Streams Load Balancer * REST Proxy Schema Registry Kafka Brokers @gamussa | Kafka Connect Zookeeper Nodes @ #bythebay | @confluentinc

Slide 8

Slide 8

The log is a simple idea New Old Messages are added at the end of the log @gamussa | #bythebay | @confluentinc

Slide 9

Slide 9

Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Scan Viktor is here @gamussa | #bythebay | @confluentinc Scan

Slide 10

Slide 10

Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Scan @gamussa | #bythebay | @confluentinc Viktor is here Scan

Slide 11

Slide 11

Shard data to get scalability Producer (1) Cluster of machines Producer (2) Producer (3) Messages are sent to different partitions Partitions live on different machines @gamussa | #bythebay | @confluentinc

Slide 12

Slide 12

// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

Slide 13

Slide 13

// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

Slide 14

Slide 14

// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

Slide 15

Slide 15

// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

Slide 16

Slide 16

while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

Slide 17

Slide 17

while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; // actually doing something useful groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc

Slide 18

Slide 18

if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc

Slide 19

Slide 19

if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc

Slide 20

Slide 20

if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc

Slide 21

Slide 21

https://twitter.com/monitoring_king/status/1048264580743479296

Slide 22

Slide 22

LET’S TALK ABOUT THIS FRAMEWORK OF YOURS. I THINK@gamussa ITS GOOD, EXCEPT IT SUCKS | #bythebay | @confluentinc

Slide 23

Slide 23

SO LET ME SHOW KAFKA STREAMS @gamussa | #bythebay BE | @confluentinc THAT WAY IT MIGHT REALLY GOOD

Slide 24

Slide 24

Talk is cheap! Show me code!

Slide 25

Slide 25

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc

Slide 26

Slide 26

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc

Slide 27

Slide 27

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc

Slide 28

Slide 28

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc

Slide 29

Slide 29

@gamussa | #bythebay | @confluentinc

Slide 30

Slide 30

Every framework Wants to be when it grows up Scalable Elastic Stateful Fault-tolerant Distributed @gamussa | #bythebay | @confluentinc

Slide 31

Slide 31

Where do I put my compute? @gAmUssA | #bythebay | @confluentinc

Slide 32

Slide 32

Where do I put my state? @gAmUssA | #bythebay | @confluentinc

Slide 33

Slide 33

The actual question is Where is my code? @gAmUssA | #bythebay | @confluentinc

Slide 34

Slide 34

Slide 35

Slide 35

the KAFKA STREAMS API is a JAVA API to BUILD REAL-TIME APPLICATIONS @gamussa | #bythebay | @confluentinc

Slide 36

Slide 36

App Streams API Not running inside brokers! @gamussa | #bythebay | @confluentinc

Slide 37

Slide 37

Same app, many instances App App App Streams API Streams API Streams API @gamussa | #bythebay | @confluentinc Brokers? Nope!

Slide 38

Slide 38

Before Processing Cluster Shared Database Your Job @gamussa | #bythebay | @confluentinc Dashboard

Slide 39

Slide 39

After Dashboard APP Streams API @gamussa | #bythebay | @confluentinc

Slide 40

Slide 40

this means you can DEPLOY your app ANYWHERE using WHATEVER TECHNOLOGY YOU WANT @gamussa | #bythebay | @confluentinc

Slide 41

Slide 41

So many places to run you app! …and many more… @gamussa | #bythebay | @confluentinc

Slide 42

Slide 42

Things Kafka Stream Does Enterprise Support Open Source Powerful Processing incl. Filters, Transforms, Joins, Aggregations, Windowing Runs Everywhere Supports Streams and Tables Elastic, Scalable, Fault-tolerant Exactly-Once Processing @gamussa | #bythebay | @confluentinc Kafka Security Integration Event-Time Processing

Slide 43

Slide 43

Want to learn more? @gamussa | #bythebay | @confluentinc

Slide 44

Slide 44

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io Watch full version https://gamov.dev/developer

Slide 45

Slide 45

@gamussa | #bythebay | @confluentinc

Slide 46

Slide 46