Crossing the Streams: Rethinking Stream Processing with KStreams and KSQL

A presentation at Apache Kafka NYC Meetup in January 2019 in New York, NY, USA by Viktor Gamov

Slide 1

Slide 1

Crossing the Streams: Rethinking Stream Processing with Kafka Streams and KSQL @gamussa #NYCKafka @confluentinc

Slide 2

Slide 2

https://twitter.com/gAmUssA/status/1048258981595111424

Slide 3

Slide 3

Streaming is the toolset for dealing with events as they move! @gamussa #NYCKafka @confluentinc

Slide 4

Slide 4

@gamussa #NYCKafka @ @confluentinc

Slide 5

Slide 5

@gamussa #NYCKafka @ @confluentinc

Slide 6

Slide 6

High Throughput Streaming platform @gamussa #NYCKafka @ @confluentinc

Slide 7

Slide 7

Java Apps / Kafka Streams High Throughput Continuous Streaming platform Computation @gamussa #NYCKafka @ API based clustering @confluentinc

Slide 8

Slide 8

Java Apps / Kafka Streams Serving Layer (Cassandra, Elastic, etc.) High Throughput Continuous Streaming platform Computation @gamussa #NYCKafka @ API based clustering @confluentinc

Slide 9

Slide 9

Stream Processing by Analogy $ cat < in.txt | grep "ksql" | tr a-z A-Z > out.txt @gamussa #NYCKafka @confluentinc

Slide 10

Slide 10

Stream Processing by Analogy $ cat < in.txt | grep "ksql" | tr a-z A-Z > out.txt Kafka Cluster @gamussa #NYCKafka @confluentinc

Slide 11

Slide 11

Stream Processing by Analogy Connect API Connect API $ cat < in.txt | grep "ksql" | tr a-z A-Z > out.txt Kafka Cluster @gamussa #NYCKafka @confluentinc

Slide 12

Slide 12

Stream Processing by Analogy Connect API Stream Processing Connect API $ cat < in.txt | grep "ksql" | tr a-z A-Z > out.txt Kafka Cluster @gamussa #NYCKafka @confluentinc

Slide 13

Slide 13

Streaming Platform Architecture @gamussa #NYCKafka @ @confluentinc

Slide 14

Slide 14

Streaming Platform Architecture Kafka Brokers @gamussa #NYCKafka @ @confluentinc

Slide 15

Slide 15

Streaming Platform Architecture Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Slide 16

Slide 16

Streaming Platform Architecture Application Native Client library Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Slide 17

Slide 17

Streaming Platform Architecture Application Native Client library Schema Registry Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Slide 18

Slide 18

Streaming Platform Architecture Application Application Native Client library Kafka Streams Schema Registry Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Slide 19

Slide 19

Streaming Platform Architecture Application Application KSQL Native Client library Kafka Streams Kafka Streams Schema Registry Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Slide 20

Slide 20

Streaming Platform Architecture Application Application KSQL Native Client library Kafka Streams Kafka Streams Schema Registry Kafka Brokers @gamussa Kafka Connect Zookeeper Nodes #NYCKafka @ @confluentinc

Slide 21

Slide 21

Streaming Platform Architecture Application Application Application KSQL Native Client library Kafka Streams Kafka Streams Load Balancer * REST Proxy Schema Registry Kafka Brokers @gamussa Kafka Connect Zookeeper Nodes #NYCKafka @ @confluentinc

Slide 22

Slide 22

Streaming Platform Architecture Application Application Application KSQL Native Client library Kafka Streams Kafka Streams Load Balancer * REST Proxy Schema Registry Kafka Brokers @gamussa Kafka Connect Zookeeper Nodes #NYCKafka @ @confluentinc

Slide 23

Slide 23

https://twitter.com/monitoring_king/status/1048264580743479296

Slide 24

Slide 24

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; "// actually doing something useful groupByCounts.put(key, count); } if (counter"++ % sendInterval "== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>("group-by-counts", groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa #NYCKafka @ @confluentinc

Slide 25

Slide 25

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

Slide 26

Slide 26

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

Slide 27

Slide 27

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

Slide 28

Slide 28

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

Slide 29

Slide 29

while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

Slide 30

Slide 30

while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; !// actually doing something useful groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

Slide 31

Slide 31

if (counter"++ % sendInterval "== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>("group-by-counts", groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa #NYCKafka @ @confluentinc

Slide 32

Slide 32

if (counter"++ % sendInterval "== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>("group-by-counts", groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa #NYCKafka @ @confluentinc

Slide 33

Slide 33

if (counter"++ % sendInterval "== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>("group-by-counts", groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa #NYCKafka @ @confluentinc

Slide 34

Slide 34

LET’S TALK ABOUT THIS FRAMEWORK OF YOURS. I THINK ITS GOOD, EXCEPT IT SUCKS @gamussa #NYCKafka @ @confluentinc

Slide 35

Slide 35

SO LET ME WRITE THE FRAMEWORK THAT’S WHY IT MIGHT BE REALLY GOOD @gamussa #NYCKafka @ @confluentinc

Slide 36

Slide 36

Every framework Wants to be when it grows up @gamussa #NYCKafka @confluentinc

Slide 37

Slide 37

Every framework Wants to be when it grows up Scalable @gamussa #NYCKafka @confluentinc

Slide 38

Slide 38

Every framework Wants to be when it grows up Scalable Elastic @gamussa #NYCKafka @confluentinc

Slide 39

Slide 39

Every framework Wants to be when it grows up Scalable Elastic @gamussa #NYCKafka Fault-tolerant @confluentinc

Slide 40

Slide 40

Every framework Wants to be when it grows up Scalable Elastic Stateful @gamussa #NYCKafka Fault-tolerant @confluentinc

Slide 41

Slide 41

Every framework Wants to be when it grows up Scalable Elastic Stateful @gamussa #NYCKafka Fault-tolerant Distributed @confluentinc

Slide 42

Slide 42

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList("A", "B")); stream.groupByKey() .count() .toStream() .to("group-by-counts", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa #NYCKafka @ @confluentinc

Slide 43

Slide 43

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList("A", "B")); "// actual work stream.groupByKey() .count() .toStream() .to("group-by-counts", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa #NYCKafka @ @confluentinc

Slide 44

Slide 44

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList("A", "B")); "// actual work stream.groupByKey() .count() .toStream() .to("group-by-counts", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa #NYCKafka @ @confluentinc

Slide 45

Slide 45

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList("A", "B")); "// actual work stream.groupByKey() .count() .toStream() .to("group-by-counts", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa #NYCKafka @ @confluentinc

Slide 46

Slide 46

@gamussa #NYCKafka @ @confluentinc

Slide 47

Slide 47

@gamussa #NYCKafka @ @confluentinc

Slide 48

Slide 48

https://twitter.com/157rahul/status/1050505569746841600

Slide 49

Slide 49

The log is a simple idea New Old Messages are added at the end of the log @gamussa #NYCKafka @confluentinc

Slide 50

Slide 50

Consumers have a position all of their own George is here Scan New Old Fred is here @gamussa Sally is here Scan #NYCKafka Scan @confluentinc

Slide 51

Slide 51

Only Sequential Access Old Read to offset & scan @gamussa #NYCKafka New @confluentinc

Slide 52

Slide 52

Shard data to get scalability @gamussa #NYCKafka @confluentinc

Slide 53

Slide 53

Shard data to get scalability Cluster of machines @gamussa #NYCKafka @confluentinc

Slide 54

Slide 54

Shard data to get scalability Producer (1) Producer (2) Producer (3) Messages are sent to different partitions Cluster of machines Partitions live on different machines @gamussa #NYCKafka @confluentinc

Slide 55

Slide 55

Slide 56

Slide 56

CONSUMER GROUP COORDINATOR

Slide 57

Slide 57

CONSUMERS CONSUMER GROUP COORDINATOR

Slide 58

Slide 58

CONSUMERS CONSUMER GROUP CONSUMER GROUP COORDINATOR

Slide 59

Slide 59

Linearly Scalable Architecture Producers Consumers @gamussa #NYCKafka @confluentinc

Slide 60

Slide 60

Linearly Scalable Architecture Producers Single topic: - Many producers machines - Many consumer machines - Many Broker machines Consumers @gamussa #NYCKafka @confluentinc

Slide 61

Slide 61

Linearly Scalable Architecture Producers Single topic: - Many producers machines - Many consumer machines - Many Broker machines No Bottleneck!! Consumers @gamussa #NYCKafka @confluentinc

Slide 62

Slide 62

Talk is cheap! Show me code! https://cnfl.io/streams-movie-demo

Slide 63

Slide 63

As developers, we want to build APPS not INFRASTRUCTURE @gamussa #NYCKafka @confluentinc

Slide 64

Slide 64

@

Slide 65

Slide 65

the KAFKA STREAMS API is a JAVA API to BUILD REAL-TIME APPLICATIONS @gamussa #NYCKafka @confluentinc

Slide 66

Slide 66

App Streams API @gamussa #NYCKafka @confluentinc

Slide 67

Slide 67

App Streams API @gamussa #NYCKafka Not running inside brokers! @confluentinc

Slide 68

Slide 68

Same app, many instances @gamussa App App App Streams API Streams API Streams API #NYCKafka Brokers? Nope! @confluentinc

Slide 69

Slide 69

Same app, many instances @gamussa App App App Streams API Streams API Streams API #NYCKafka Brokers? Nope! @confluentinc

Slide 70

Slide 70

Before Processing Cluster Shared Database Your Job @gamussa #NYCKafka @confluentinc Dashboard

Slide 71

Slide 71

After Dashboard APP Streams API @gamussa #NYCKafka @confluentinc

Slide 72

Slide 72

this means you can DEPLOY your app ANYWHERE using WHATEVER TECHNOLOGY YOU WANT

Slide 73

Slide 73

So many places to run you app! ...and many more... @gamussa #NYCKafka @confluentinc

Slide 74

Slide 74

Things Kafka Stream Does Enterprise Support Open Source Powerful Processing incl. Filters, Transforms, Joins, Aggregations, Windowing @gamussa Runs Everywhere Supports Streams and Tables #NYCKafka Elastic, Scalable, Fault-tolerant Exactly-Once Processing Kafka Security Integration Event-Time Processing @confluentinc

Slide 75

Slide 75

Table-Stream Duality @gamussa #NYCKafka @confluentinc

Slide 76

Slide 76

Table-Stream Duality @gamussa #NYCKafka @confluentinc

Slide 77

Slide 77

Slide 78

Slide 78

TABLE @gamussa STREAM #NYCKafka TABLE @confluentinc

Slide 79

Slide 79

TABLE Gwen STREAM TABLE 1 @gamussa #NYCKafka @confluentinc

Slide 80

Slide 80

TABLE Gwen STREAM TABLE 1 (“Gwen”, 1) @gamussa #NYCKafka @confluentinc

Slide 81

Slide 81

TABLE Gwen STREAM TABLE 1 (“Gwen”, 1) @gamussa #NYCKafka Gwen 1 @confluentinc

Slide 82

Slide 82

TABLE Gwen STREAM 1 (“Gwen”, 1) Gwen Matthias TABLE Gwen 1 1 1 @gamussa #NYCKafka @confluentinc

Slide 83

Slide 83

TABLE Gwen STREAM 1 (“Gwen”, 1) Gwen Matthias TABLE 1 1 (“Matthias”, 1) @gamussa #NYCKafka Gwen 1 Gwen Matthias 1 1 @confluentinc

Slide 84

Slide 84

TABLE Gwen STREAM 1 (“Gwen”, 1) Gwen Matthias 1 1 (“Matthias”, 1) Gwen Matthias TABLE Gwen 1 Gwen Matthias 1 1 2 1 (“Gwen”, 2) @gamussa #NYCKafka @confluentinc

Slide 85

Slide 85

TABLE Gwen STREAM 1 Gwen 1 (“Matthias”, 1) Gwen Matthias 1 1 (“Gwen”, 2) Gwen Matthias 2 1 (“Gwen”, 1) Gwen Matthias Gwen Matthias TABLE 1 1 2 1 @gamussa #NYCKafka @confluentinc

Slide 86

Slide 86

TABLE Gwen STREAM 1 Gwen 1 (“Matthias”, 1) Gwen Matthias 1 1 (“Gwen”, 2) Gwen Matthias 2 1 (“Gwen”, 1) Gwen Matthias Gwen Matthias Gwen Matthias Viktor TABLE 1 1 2 1 2 1 1 (“Viktor”, 1) @gamussa #NYCKafka @confluentinc

Slide 87

Slide 87

TABLE Gwen STREAM 1 Gwen 1 (“Matthias”, 1) Gwen Matthias 1 1 (“Gwen”, 2) Gwen Matthias 2 1 (“Viktor”, 1) Gwen Matthias Viktor 2 1 1 (“Gwen”, 1) Gwen Matthias Gwen Matthias Gwen Matthias Viktor TABLE 1 1 2 1 2 1 1 @gamussa #NYCKafka @confluentinc

Slide 88

Slide 88

Do you think that’s a table you are querying ?

Slide 89

Slide 89

Talk is cheap! Show me code!

Slide 90

Slide 90

What’s next?

Slide 91

Slide 91

https://twitter.com/IDispose/status/1048602857191170054

Slide 92

Slide 92

KSQL #FTW @gamussa #NYCKafka @confluentinc

Slide 93

Slide 93

KSQL #FTW 1 UI @gamussa #NYCKafka @confluentinc

Slide 94

Slide 94

KSQL #FTW ksql> 1 UI 2 @gamussa CLI #NYCKafka @confluentinc

Slide 95

Slide 95

KSQL #FTW ksql> 1 UI 2 @gamussa POST /query CLI 3 #NYCKafka REST @confluentinc

Slide 96

Slide 96

KSQL #FTW ksql> 1 UI 2 @gamussa POST /query CLI 3 #NYCKafka REST 4 @confluentinc Headless

Slide 97

Slide 97

Interaction with Kafka Kafka (data) @gamussa #NYCKafka @confluentinc

Slide 98

Slide 98

Interaction with Kafka KSQL (processing) Kafka JVM application with Kafka Streams (processing) (data) Does not run on Kafka brokers @gamussa Does not run on Kafka brokers #NYCKafka @confluentinc

Slide 99

Slide 99

Interaction with Kafka KSQL (processing) Kafka JVM application with Kafka Streams (processing) (data) Does not run on Kafka brokers @gamussa Does not run on Kafka brokers #NYCKafka @confluentinc

Slide 100

Slide 100

Fault-Tolerance, powered by Kafka @gamussa #NYCKafka @confluentinc

Slide 101

Slide 101

Fault-Tolerance, powered by Kafka @gamussa #NYCKafka @confluentinc

Slide 102

Slide 102

Fault-Tolerance, powered by Kafka @gamussa #NYCKafka @confluentinc

Slide 103

Slide 103

Standing on the shoulders of Streaming Giants @gamussa #NYCKafka @confluentinc

Slide 104

Slide 104

Standing on the shoulders of Streaming Giants @gamussa #NYCKafka @confluentinc

Slide 105

Slide 105

Standing on the shoulders of Streaming Giants Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Slide 106

Slide 106

Standing on the shoulders of Streaming Giants Kafka Streams Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Slide 107

Slide 107

Standing on the shoulders of Streaming Giants Kafka Streams Powered by Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Slide 108

Slide 108

Standing on the shoulders of Streaming Giants KSQL KSQL UDFs Kafka Streams Powered by Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Slide 109

Slide 109

Standing on the shoulders of Streaming Giants KSQL Powered by KSQL UDFs Kafka Streams Powered by Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Slide 110

Slide 110

Standing on the shoulders of Streaming Giants KSQL Ease of use Powered by KSQL UDFs Kafka Streams Powered by Producer, Consumer APIs @gamussa Flexibility #NYCKafka @confluentinc

Slide 111

Slide 111

One last thing…

Slide 112

Slide 112

https://kafka-summit.org Gamov30 @gamussa #NYCKafka @ @confluentinc

Slide 113

Slide 113

Thanks! @gamussa viktor@confluent.io We are hiring! https://www.confluent.io/careers/ @gamussa #NYCKafka @ @confluentinc