Kafka Streams IQ: «Зачем нам база данных? Нам база не нужна!»

A presentation at Moscow JUG in May 2019 in Moscow, Russia by Viktor Gamov

Slide 1

Slide 1

ЗАчем нам БД? Нам База не нужна… May, 2019 / Moscow, Russia @gamussa | #jugmsk | @ConfluentINc

Slide 2

Slide 2

2 @gamussa | #jugmsk | @ConfluentINc

Slide 3

Slide 3

Friendly reminder Follow @gamussa 📸🖼🏋 Tag @gamussa With #jugmsk @confluentinc

Slide 4

Slide 4

4 Special thanks! @bbejeck @MatthiasJSax @gamussa | #jugmsk | @ConfluentINc

Slide 5

Slide 5

https://cnfl.io/streams-movie-demo @gamussa | #jugmsk | @ConfluentINc

Slide 6

Slide 6

6 Agenda Kafka Streams 101 What is state and why stateful stream processing? Technical Deep Dive on Interactive Queries What does IQ provide? State handling with Kafka Streams How to use IQ? @gamussa | #jugmsk | @ConfluentINc

Slide 7

Slide 7

Kafka Streams 101 @gamussa | #jugmsk | @ConfluentINc

Slide 8

Slide 8

8 Kafka Streams – 101 Stream Processing @gamussa | #jugmsk | @ConfluentINc Other Systems Kafka Connect Kafka Connect Other Systems Kafka Streams

Slide 9

Slide 9

https://twitter.com/monitoring_king/status/1048264580743479296

Slide 10

Slide 10

10 LET’S TALK ABOUT THIS FRAMEWORK OF YOURS. I THINK ITS GOOD, EXCEPT IT SUCKS @gamussa | #jugmsk | @ConfluentINc

Slide 11

Slide 11

11 SO LET ME SHOW KAFKA STREAMS THAT WAY IT MIGHT BE REALLY GOOD @gamussa | #jugmsk | @ConfluentINc

Slide 12

Slide 12

12 We need a processing cluster like Spark or Flink @gamussa | #jugmsk | @ConfluentINc

Slide 13

Slide 13

13 I want write apps not an infrastructure @gamussa | #jugmsk | @ConfluentINc

Slide 14

Slide 14

14 You can’t do distributed stream processing without cluster @gamussa | #jugmsk | @ConfluentINc

Slide 15

Slide 15

15 @gamussa | #jugmsk | @ConfluentINc

Slide 16

Slide 16

16 Stay and write your Java apps, I’m going to write ETL jobs @gamussa | #jugmsk | @ConfluentINc

Slide 17

Slide 17

17 Before Processing Cluster Shared Database Your Job @gamussa | #jugmsk | @ConfluentINc Dashboard

Slide 18

Slide 18

18 After Dashboard APP Streams API @gamussa | #jugmsk | @ConfluentINc

Slide 19

Slide 19

the KAFKA STREAMS API is a JAVA API to BUILD REAL-TIME APPLICATIONS @gamussa | #jugmsk | @confluentinc

Slide 20

Slide 20

20 App Not running inside brokers! Streams API @gamussa | #jugmsk | @ConfluentINc

Slide 21

Slide 21

21 Same app, many instances App Streams API @gamussa | App Streams API #jugmsk | App Streams API @ConfluentINc Brokers? Nope!

Slide 22

Slide 22

22 Stateful Stream Processing What is State? ○ Anything your application needs to “remember” beyond the scope of a single record @gamussa | #jugmsk | @ConfluentINc

Slide 23

Slide 23

23 Stateful Stream Processing Stateless operators: ○ filter, map, flatMap, foreach Stateful operator: ○ Aggregations ○ Any window operation ○ Joins ○ CEP @gamussa | #jugmsk | @ConfluentINc

Slide 24

Slide 24

24 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | #jugmsk | @ConfluentINc

Slide 25

Slide 25

25 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | #jugmsk | @ConfluentINc

Slide 26

Slide 26

26 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | #jugmsk | @ConfluentINc

Slide 27

Slide 27

27 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | #jugmsk | @ConfluentINc

Slide 28

Slide 28

28 Topologies builder.stream Source Node state stores source.groupByKey .windowedBy(…) .aggregate(…) Processor Node mapValues() Processor Node streams Sink Node to(…) @gamussa | #jugmsk | @ConfluentINc Processor Topology

Slide 29

Slide 29

29 Accessing Application State External DB: query database Local State: Interactive Queries ○ Allows to query the local DB instances ○ Kafka Streams application is the database ○ Read-Only access (no “external” state updates) ○ Works for DSL or Processor API @gamussa | #jugmsk | @ConfluentINc

Slide 30

Slide 30

30 Accessing Application State – “DB to go” Interactive Queries turn Kafka Streams application into queryable light-weight database @gamussa | #jugmsk | @ConfluentINc

Slide 31

Slide 31

31 Kafka Streams uses Local State Light-weight embedded DB Local data access No RPC, network I/O @gamussa | #jugmsk | @ConfluentINc

Slide 32

Slide 32

32 Scaling Local State Topics are scaled via partitions Kafka Streams scales via tasks • 1:1 mapping from input topic partitions to tasks State can be sharded based on partitions • Local DB for each shard • 1:1 mapping from state shard to task @gamussa | #jugmsk | @ConfluentINc

Slide 33

Slide 33

33 Partitions, Tasks, and Consumer Groups input topic Task executes processor topology One consumer group: can be executed with 1 - 4 threads on 1 - 4 machines 4 input topic partitions => 4 tasks result topic @gamussa | #jugmsk | @ConfluentINc

Slide 34

Slide 34

34 Scaling with State “no state” Trade Stats App Instance 1 @gamussa | #jugmsk | @ConfluentINc

Slide 35

Slide 35

Scaling with State “no state” Trade Stats App Trade Stats App Instance 1 @gamussa Instance 2 | #jugmsk | @ConfluentINc 35

Slide 36

Slide 36

36 Scaling with State “no state” Trade Stats App Trade Stats App Instance 1 @gamussa Trade Stats App Instance 2 | #jugmsk | @ConfluentINc Instance 3

Slide 37

Slide 37

37 Scaling and FaultTolerance Two Sides of Same Coin @gamussa | #jugmsk | @ConfluentINc

Slide 38

Slide 38

38 Fault-Tolerance Trade Stats App Trade Stats App Instance 1 @gamussa | Instance 2 #jugmsk | Trade Stats App @ConfluentINc Instance 3

Slide 39

Slide 39

39 Fault-Tolerant State State Updates Input Topic Changelog Topic Result Topic @gamussa | #jugmsk | @ConfluentINc

Slide 40

Slide 40

40 Migrate State Trade Stats App Instance 1 Trade Stats App Instance 2 restore @gamussa | Changelog Topic #jugmsk | @ConfluentINc

Slide 41

Slide 41

41 Recovery Time Changelog topics are log compacted Size of changelog topic linear in size of state Large state implies high recovery times @gamussa | #jugmsk | @ConfluentINc

Slide 42

Slide 42

42 Recovery Overhead Recovery overhead is proportional to ○ segment-size / state-size Segment-size is smaller than state-size => reduced overhead Update changelog topic segment size accordingly ○ topic config: log.segments.bytes ○ log cleaner interval important, too @gamussa | #jugmsk | @ConfluentINc

Slide 43

Slide 43

Query your app… not database @gamussa | #jugmsk | @ConfluentINc

Slide 44

Slide 44

44 How to use Interactive Queries Query local state Discover remote instances Query remote state key-A 5 key-B 7 @gamussa | key-g 4 key-h 2 #jugmsk | @ConfluentINc key-X 7 key-Y 3

Slide 45

Slide 45

45 Query Local State get(“key-A”) key-A 5 key-B 7 @gamussa | #jugmsk | @ConfluentINc

Slide 46

Slide 46

46 Discover Remote Instances Kafka Streams application instances are started independently ○ What other instances are there? ○ Which instances hold which data? get(“key-C”) get(“key-A”) key-A 5 key-B 7 @gamussa | #jugmsk key-g 4 key-h 2 | @ConfluentINc

Slide 47

Slide 47

47 Discover Remote Instances Kafka Streams metadata API to the rescue: ○ KafkaStreams#allMetadata() ○ KafkaStreams#allMetadataForStore(String storeName) ○ KafkaStreams#allMetadataForKey( String storeName, K key, Serializer<K> keySerializer) Returns StreamsMetadata object @gamussa | #jugmsk | @ConfluentINc

Slide 48

Slide 48

48 Discover Remote Instances StreamsMetadata object ○ Host information (server/port) according to application.server config ○ State store names of hosted stores ○ TopicPartition mapped to the store (ie, shard) @gamussa | #jugmsk | @ConfluentINc

Slide 49

Slide 49

49 How to access data of another instance? get(“key-C”) get(“key-A”) ??? key-A 5 key-B 7 @gamussa key-g 4 key-h 2 | #jugmsk | @ConfluentINc

Slide 50

Slide 50

50 Query remote state via RPC Choose whatever you like: ○ REST ○ gRPC ○ Apache Thrift ○ XML-RPC ○ Java RMI ○ SOAP @gamussa | #jugmsk | @ConfluentINc

Slide 51

Slide 51

51 Query remote state get(“key-A”) 1235 get(“key-C”) application.server=host2:1235 1234 application.server=host2:1234 1234 application.server=host1:1234 <key-A,5> key-g 4 key-h 2 key-A 5 key-B 7 host1 key-X 7 key-Y 3 host2 @gamussa | #jugmsk | @ConfluentINc

Slide 52

Slide 52

52 Take Home Message Before IQ ☹ With IQ ☺ @gamussa | #jugmsk | @ConfluentINc

Slide 53

Slide 53

53 PROS There are fewer moving pieces; no external database It enables faster and more efficient use of the application state It provides better isolation It allows for flexibility Mix-and-match local an remote DB @gamussa | #jugmsk | @ConfluentINc

Slide 54

Slide 54

54 Cons It may involve moving away from a datastore you know and trust You might want to scale storage independently of processing You might need customized queries, specific to some datastores @gamussa | #jugmsk | @ConfluentINc

Slide 55

Slide 55

55 🛑 Stop! Demo time! @gamussa | #jugmsk | @ConfluentINc

Slide 56

Slide 56

56 Summary Interactive Queries is a powerful abstractions that simplifies stateful stream processing There are still cases for which external database/storage might be a better fit – with IQ you can choose! @gamussa | #jugmsk | @ConfluentINc

Slide 57

Slide 57

57 Summary Music example: https://github.com/confluentinc/ examples/blob/master/kafka-streams/src/main/java/io/ confluent/examples/streams/interactivequeries/kafkamusic/ KafkaMusicExample.java Streaming Movie Ratings: https://github.com/ confluentinc/demo-scene/tree/master/streams-movie-demo @gamussa | #jugmsk | @ConfluentINc

Slide 58

Slide 58

Thanks! @gamussa viktor@confluent.io We are hiring! https://www.confluent.io/careers/ @gamussa | @ #jugmsk | @ConfluentINc

Slide 59

Slide 59

59