Crossing the Streams: Rethinking Stream Processing with Kafka Streams and KSQL @gamussa #NYCKafka @confluentinc

https://twitter.com/gAmUssA/status/1048258981595111424

Streaming is the toolset for dealing with events as they move! @gamussa #NYCKafka @confluentinc

@gamussa #NYCKafka @ @confluentinc

@gamussa #NYCKafka @ @confluentinc

High Throughput Streaming platform @gamussa #NYCKafka @ @confluentinc

Java Apps / Kafka Streams High Throughput Continuous Streaming platform Computation @gamussa #NYCKafka @ API based clustering @confluentinc

Java Apps / Kafka Streams Serving Layer (Cassandra, Elastic, etc.) High Throughput Continuous Streaming platform Computation @gamussa #NYCKafka @ API based clustering @confluentinc

Stream Processing by Analogy $ cat < in.txt | grep "ksql" | tr a-z A-Z > out.txt @gamussa #NYCKafka @confluentinc

Stream Processing by Analogy $ cat < in.txt | grep "ksql" | tr a-z A-Z > out.txt Kafka Cluster @gamussa #NYCKafka @confluentinc

Stream Processing by Analogy Connect API Connect API $ cat < in.txt | grep "ksql" | tr a-z A-Z > out.txt Kafka Cluster @gamussa #NYCKafka @confluentinc

Stream Processing by Analogy Connect API Stream Processing Connect API $ cat < in.txt | grep "ksql" | tr a-z A-Z > out.txt Kafka Cluster @gamussa #NYCKafka @confluentinc

Streaming Platform Architecture @gamussa #NYCKafka @ @confluentinc

Streaming Platform Architecture Kafka Brokers @gamussa #NYCKafka @ @confluentinc

Streaming Platform Architecture Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Streaming Platform Architecture Application Native Client library Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Streaming Platform Architecture Application Native Client library Schema Registry Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Streaming Platform Architecture Application Application Native Client library Kafka Streams Schema Registry Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Streaming Platform Architecture Application Application KSQL Native Client library Kafka Streams Kafka Streams Schema Registry Kafka Brokers @gamussa Zookeeper Nodes #NYCKafka @ @confluentinc

Streaming Platform Architecture Application Application KSQL Native Client library Kafka Streams Kafka Streams Schema Registry Kafka Brokers @gamussa Kafka Connect Zookeeper Nodes #NYCKafka @ @confluentinc

Streaming Platform Architecture Application Application Application KSQL Native Client library Kafka Streams Kafka Streams Load Balancer * REST Proxy Schema Registry Kafka Brokers @gamussa Kafka Connect Zookeeper Nodes #NYCKafka @ @confluentinc

Streaming Platform Architecture Application Application Application KSQL Native Client library Kafka Streams Kafka Streams Load Balancer * REST Proxy Schema Registry Kafka Brokers @gamussa Kafka Connect Zookeeper Nodes #NYCKafka @ @confluentinc

https://twitter.com/monitoring_king/status/1048264580743479296

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; "// actually doing something useful groupByCounts.put(key, count); } if (counter"++ % sendInterval "== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>("group-by-counts", groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa #NYCKafka @ @confluentinc

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

"// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList("A", "B")); while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

while (true) { "// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count "== null) { count = 0; } count += 1; !// actually doing something useful groupByCounts.put(key, count); } } @gamussa #NYCKafka @ @confluentinc

if (counter"++ % sendInterval "== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>("group-by-counts", groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa #NYCKafka @ @confluentinc

if (counter"++ % sendInterval "== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>("group-by-counts", groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa #NYCKafka @ @confluentinc

if (counter"++ % sendInterval "== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>("group-by-counts", groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa #NYCKafka @ @confluentinc

LET’S TALK ABOUT THIS FRAMEWORK OF YOURS. I THINK ITS GOOD, EXCEPT IT SUCKS @gamussa #NYCKafka @ @confluentinc

SO LET ME WRITE THE FRAMEWORK THAT’S WHY IT MIGHT BE REALLY GOOD @gamussa #NYCKafka @ @confluentinc

Every framework Wants to be when it grows up @gamussa #NYCKafka @confluentinc

Every framework Wants to be when it grows up Scalable @gamussa #NYCKafka @confluentinc

Every framework Wants to be when it grows up Scalable Elastic @gamussa #NYCKafka @confluentinc

Every framework Wants to be when it grows up Scalable Elastic @gamussa #NYCKafka Fault-tolerant @confluentinc

Every framework Wants to be when it grows up Scalable Elastic Stateful @gamussa #NYCKafka Fault-tolerant @confluentinc

Every framework Wants to be when it grows up Scalable Elastic Stateful @gamussa #NYCKafka Fault-tolerant Distributed @confluentinc

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList("A", "B")); stream.groupByKey() .count() .toStream() .to("group-by-counts", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa #NYCKafka @ @confluentinc

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList("A", "B")); "// actual work stream.groupByKey() .count() .toStream() .to("group-by-counts", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa #NYCKafka @ @confluentinc

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList("A", "B")); "// actual work stream.groupByKey() .count() .toStream() .to("group-by-counts", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa #NYCKafka @ @confluentinc

final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList("A", "B")); "// actual work stream.groupByKey() .count() .toStream() .to("group-by-counts", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa #NYCKafka @ @confluentinc

@gamussa #NYCKafka @ @confluentinc

@gamussa #NYCKafka @ @confluentinc

https://twitter.com/157rahul/status/1050505569746841600

The log is a simple idea New Old Messages are added at the end of the log @gamussa #NYCKafka @confluentinc

Consumers have a position all of their own George is here Scan New Old Fred is here @gamussa Sally is here Scan #NYCKafka Scan @confluentinc

Only Sequential Access Old Read to offset & scan @gamussa #NYCKafka New @confluentinc

Shard data to get scalability @gamussa #NYCKafka @confluentinc

Shard data to get scalability Cluster of machines @gamussa #NYCKafka @confluentinc

Shard data to get scalability Producer (1) Producer (2) Producer (3) Messages are sent to different partitions Cluster of machines Partitions live on different machines @gamussa #NYCKafka @confluentinc

CONSUMER GROUP COORDINATOR

CONSUMERS CONSUMER GROUP COORDINATOR

CONSUMERS CONSUMER GROUP CONSUMER GROUP COORDINATOR

Linearly Scalable Architecture Producers Consumers @gamussa #NYCKafka @confluentinc

Linearly Scalable Architecture Producers Single topic: - Many producers machines - Many consumer machines - Many Broker machines Consumers @gamussa #NYCKafka @confluentinc

Linearly Scalable Architecture Producers Single topic: - Many producers machines - Many consumer machines - Many Broker machines No Bottleneck!! Consumers @gamussa #NYCKafka @confluentinc

Talk is cheap! Show me code! https://cnfl.io/streams-movie-demo

As developers, we want to build APPS not INFRASTRUCTURE @gamussa #NYCKafka @confluentinc

@

the KAFKA STREAMS API is a JAVA API to BUILD REAL-TIME APPLICATIONS @gamussa #NYCKafka @confluentinc

App Streams API @gamussa #NYCKafka @confluentinc

App Streams API @gamussa #NYCKafka Not running inside brokers! @confluentinc

Same app, many instances @gamussa App App App Streams API Streams API Streams API #NYCKafka Brokers? Nope! @confluentinc

Same app, many instances @gamussa App App App Streams API Streams API Streams API #NYCKafka Brokers? Nope! @confluentinc

Before Processing Cluster Shared Database Your Job @gamussa #NYCKafka @confluentinc Dashboard

After Dashboard APP Streams API @gamussa #NYCKafka @confluentinc

this means you can DEPLOY your app ANYWHERE using WHATEVER TECHNOLOGY YOU WANT

So many places to run you app! ...and many more... @gamussa #NYCKafka @confluentinc

Things Kafka Stream Does Enterprise Support Open Source Powerful Processing incl. Filters, Transforms, Joins, Aggregations, Windowing @gamussa Runs Everywhere Supports Streams and Tables #NYCKafka Elastic, Scalable, Fault-tolerant Exactly-Once Processing Kafka Security Integration Event-Time Processing @confluentinc

Table-Stream Duality @gamussa #NYCKafka @confluentinc

Table-Stream Duality @gamussa #NYCKafka @confluentinc

TABLE @gamussa STREAM #NYCKafka TABLE @confluentinc

TABLE Gwen STREAM TABLE 1 @gamussa #NYCKafka @confluentinc

TABLE Gwen STREAM TABLE 1 (“Gwen”, 1) @gamussa #NYCKafka @confluentinc

TABLE Gwen STREAM TABLE 1 (“Gwen”, 1) @gamussa #NYCKafka Gwen 1 @confluentinc

TABLE Gwen STREAM 1 (“Gwen”, 1) Gwen Matthias TABLE Gwen 1 1 1 @gamussa #NYCKafka @confluentinc

TABLE Gwen STREAM 1 (“Gwen”, 1) Gwen Matthias TABLE 1 1 (“Matthias”, 1) @gamussa #NYCKafka Gwen 1 Gwen Matthias 1 1 @confluentinc

TABLE Gwen STREAM 1 (“Gwen”, 1) Gwen Matthias 1 1 (“Matthias”, 1) Gwen Matthias TABLE Gwen 1 Gwen Matthias 1 1 2 1 (“Gwen”, 2) @gamussa #NYCKafka @confluentinc

TABLE Gwen STREAM 1 Gwen 1 (“Matthias”, 1) Gwen Matthias 1 1 (“Gwen”, 2) Gwen Matthias 2 1 (“Gwen”, 1) Gwen Matthias Gwen Matthias TABLE 1 1 2 1 @gamussa #NYCKafka @confluentinc

TABLE Gwen STREAM 1 Gwen 1 (“Matthias”, 1) Gwen Matthias 1 1 (“Gwen”, 2) Gwen Matthias 2 1 (“Gwen”, 1) Gwen Matthias Gwen Matthias Gwen Matthias Viktor TABLE 1 1 2 1 2 1 1 (“Viktor”, 1) @gamussa #NYCKafka @confluentinc

TABLE Gwen STREAM 1 Gwen 1 (“Matthias”, 1) Gwen Matthias 1 1 (“Gwen”, 2) Gwen Matthias 2 1 (“Viktor”, 1) Gwen Matthias Viktor 2 1 1 (“Gwen”, 1) Gwen Matthias Gwen Matthias Gwen Matthias Viktor TABLE 1 1 2 1 2 1 1 @gamussa #NYCKafka @confluentinc

Do you think that’s a table you are querying ?

Talk is cheap! Show me code!

What’s next?

https://twitter.com/IDispose/status/1048602857191170054

KSQL #FTW @gamussa #NYCKafka @confluentinc

KSQL #FTW 1 UI @gamussa #NYCKafka @confluentinc

KSQL #FTW ksql> 1 UI 2 @gamussa CLI #NYCKafka @confluentinc

KSQL #FTW ksql> 1 UI 2 @gamussa POST /query CLI 3 #NYCKafka REST @confluentinc

KSQL #FTW ksql> 1 UI 2 @gamussa POST /query CLI 3 #NYCKafka REST 4 @confluentinc Headless

Interaction with Kafka Kafka (data) @gamussa #NYCKafka @confluentinc

Interaction with Kafka KSQL (processing) Kafka JVM application with Kafka Streams (processing) (data) Does not run on Kafka brokers @gamussa Does not run on Kafka brokers #NYCKafka @confluentinc

Interaction with Kafka KSQL (processing) Kafka JVM application with Kafka Streams (processing) (data) Does not run on Kafka brokers @gamussa Does not run on Kafka brokers #NYCKafka @confluentinc

Fault-Tolerance, powered by Kafka @gamussa #NYCKafka @confluentinc

Fault-Tolerance, powered by Kafka @gamussa #NYCKafka @confluentinc

Fault-Tolerance, powered by Kafka @gamussa #NYCKafka @confluentinc

Standing on the shoulders of Streaming Giants @gamussa #NYCKafka @confluentinc

Standing on the shoulders of Streaming Giants @gamussa #NYCKafka @confluentinc

Standing on the shoulders of Streaming Giants Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Standing on the shoulders of Streaming Giants Kafka Streams Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Standing on the shoulders of Streaming Giants Kafka Streams Powered by Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Standing on the shoulders of Streaming Giants KSQL KSQL UDFs Kafka Streams Powered by Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Standing on the shoulders of Streaming Giants KSQL Powered by KSQL UDFs Kafka Streams Powered by Producer, Consumer APIs @gamussa #NYCKafka @confluentinc

Standing on the shoulders of Streaming Giants KSQL Ease of use Powered by KSQL UDFs Kafka Streams Powered by Producer, Consumer APIs @gamussa Flexibility #NYCKafka @confluentinc

One last thing…

https://kafka-summit.org Gamov30 @gamussa #NYCKafka @ @confluentinc

Thanks! @gamussa viktor@confluent.io We are hiring! https://www.confluent.io/careers/ @gamussa #NYCKafka @ @confluentinc