Deploying Kafka Streams Applications with Docker and Kubernetes March, 2019 / Portland, OR @gamussa | @pjug | @ConfluentINc
A presentation at Portland JUG in March 2019 in Portland, OR, USA by Viktor Gamov
Deploying Kafka Streams Applications with Docker and Kubernetes March, 2019 / Portland, OR @gamussa | @pjug | @ConfluentINc
2 @gamussa | @PJUG | @ConfluentINc
Raffle, yeah 🚀 Follow @gamussa 📸🖼🏋 Tag @gamussa With @pjug @confluentinc
4 Special thanks! @gwenshap @gamussa @MatthiasJSax | @PJUG | @ConfluentINc
5 Agenda Kafka Streams 101 How do Kafka Streams applications scale? Kubernetes 101 Recommendations for Kafka Streams @gamussa | @pjug | @ConfluentINc
https://gamov.dev/ks-k8s-stocks @gamussa | @PJUG | @ConfluentINc
7 Kafka Streams – 101 Your App @gamussa | @PJUG | @ConfluentINc Other Systems Kafka Connect Kafka Connect Other Systems Kafka Streams
8 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | @PJUG | @ConfluentINc
9 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | @PJUG | @ConfluentINc
10 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | @PJUG | @ConfluentINc
11 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | @PJUG | @ConfluentINc
12 Topologies builder.stream Source Node state stores source.groupByKey .windowedBy(…) .aggregate(…) Processor Node mapValues() Processor Node to(…) streams Sink Node @gamussa | @PJUG | @ConfluentINc Processor Topology
How Do Kafka Streams Application Scale? @gamussa | @PJUG | @ConfluentINc
14 Partitions, Tasks, and Consumer Groups input topic Task executes processor topology One consumer group: can be executed with 1 - 4 threads on 1 - 4 machines 4 input topic partitions => 4 tasks result topic @gamussa | @PJUG | @ConfluentINc
15 Scaling with State “no state” Trade Stats App Instance 1 @gamussa | @PJUG | @ConfluentINc
Scaling with State “no state” Trade Stats App Trade Stats App Instance 1 @gamussa Instance 2 | @PJUG | @ConfluentINc 16
17 Scaling with State “no state” Trade Stats App Trade Stats App Instance 1 @gamussa Trade Stats App Instance 2 | @PJUG | @ConfluentINc Instance 3
18 Scaling and FaultTolerance Two Sides of Same Coin @gamussa | @pjug | @ConfluentINc
19 Fault-Tolerance Trade Stats App Trade Stats App Instance 1 @gamussa | Instance 2 @PJUG | Trade Stats App @ConfluentINc Instance 3
20 Fault-Tolerant State State Updates Input Topic Changelog Topic Result Topic @gamussa | @PJUG | @ConfluentINc
21 Migrate State Trade Stats App Instance 1 Trade Stats App Instance 2 restore @gamussa | Changelog Topic @PJUG | @ConfluentINc
22 Recovery Time • Changelog topics are log compacted • Size of changelog topic linear in size of state • Large state implies high recovery times @gamussa | @pjug | @ConfluentINc
23 Recovery Overhead Changelog topic State size: 20 GB (per shard) Active Segment Topic size can grow larger if not compacted Segments (default size 1GB) After compaction Min Topic Size: 21 GB (per shard) Active Segment Segments (default size 1GB) @gamussa Recovery overhead about 5% | @PJUG | @ConfluentINc
24 Recovery Overhead Changelog topic State size: 100 MB (per shard) Active Segment Segments (default size 1GB) Min Topic Size: 1.1 GB Compaction Recovery overhead about 1000% Active Segment Each key is stored up to 11 times… Segment (only 100 MB) @gamussa | @PJUG | @ConfluentINc
25 Recovery Overhead • Recovery overhead is proportional to ○ segment-size / state-size • Segment-size is smaller than state-size => reduced overhead • Update changelog topic segment size accordingly ○ topic config: log.segments.bytes ○ log cleaner interval important, too @gamussa | @pjug | @ConfluentINc
26 Kubernetes Fundamentals @gamussa | @PJUG | @ConfluentINc
27 https://twitter.com/sahrizv/status/1018184792611827712 @gamussa | @PJUG | @ConfluentINc
28 Orchestration ●Compute ●Networking ●Storage ●Service Discovery @gamussa | @PJUG | @ConfluentINc
29 Kubernetes ●Schedules and allocates resources ●Networking between Pods ●Storage ●Service Discovery @gamussa | @PJUG | @ConfluentINc
30 Refresher - Kubernetes Architecture kubectl https://thenewstack.io/kubernetes-an-overview/ @gamussa | @PJUG | @ConfluentINc
31 Pod • Basic Unit of Deployment in Kubernetes • A collection of containers sharing: • Namespace • Network • Volumes @gamussa | @pjug | @ConfluentINc
32 Storage • Persistent Volume (PV) & Persistent Volume Claim (PVC) • Both PV and PVC are ‘resources’ @gamussa | @pjug | @ConfluentINc
33 Storage • Persistent Volume (PV) & Persistent Volume Claim (PVC) • PV is a piece of storage that is provisioned dynamic or static of any individual pod that uses the PV @gamussa | @pjug | @ConfluentINc
34 Storage • Persistent Volume (PV) & Persistent Volume Claim (PVC) • PVC is a request for storage by a User @gamussa | @pjug | @ConfluentINc
35 Storage • Persistent Volume (PV) & Persistent Volume Claim (PVC) • PVCs consume PV @gamussa | @pjug | @ConfluentINc
36 Stateful Workloads @gamussa | @PJUG | @ConfluentINc
37 StatefulSet ● Rely on Headless Headless Service Service to provide network identity ● Ideal for highly available stateful Pod-0 Pod-1 Pod-2 Containers Containers Containers Volumes Volumes Volumes workloads @gamussa | @pjug | @ConfluentINc
Recommendations for Kafka Streams @gamussa | @PJUG | @ConfluentINc
39 Stock Stats App Stock Stats App Stock Stats App Kafka Streams Kafka Streams Kafka Streams Instance 1 Instance 2 Instance 3 @gamussa | @PJUG | @ConfluentINc
40 WordCount App WordCount App WordCount App Kafka Streams Kafka Streams Kafka Streams Instance 1 Instance 2 Instance 3 @gamussa | @PJUG | @ConfluentINc
41 StatefulSets are new and complicated. We don’t need them. @gamussa | @PJUG | @ConfluentINc
42 Recovering state takes time. Statelful is faster. @gamussa | @PJUG | @ConfluentINc
43 But I’ll want to scale-out and back anyway. @gamussa | @PJUG | @ConfluentINc
44 @gamussa | @PJUG | @ConfluentINc
45 I don’t really trust my storage admin anyway @gamussa | @PJUG | @ConfluentINc
46 Recommendations: ● Keep changelog shards small ● If you trust your storage: Use StatefulSets ● Use anti-affinity when possible ● Use “parallel” pod management
apiVersion: apps/v1 kind: “Deployment” metadata: name: “streams-stock-stats” spec: replicas: 1 template: spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - “streams-stock-stats” topologyKey: “kubernetes.io/hostname” containers: - name: kafka-streams-stockstat image: gamussa/kafka-streams-stockstat:latest @gamussa | @PJUG | @ConfluentINc 47
apiVersion: apps/v1 kind: “Deployment” metadata: name: “streams-stock-stats” spec: replicas: 1 template: spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - “streams-stock-stats” topologyKey: “kubernetes.io/hostname” containers: - name: kafka-streams-stockstat image: gamussa/kafka-streams-stockstat:latest @gamussa | @PJUG | @ConfluentINc 48
apiVersion: apps/v1 kind: “Deployment” metadata: name: “streams-stock-stats” spec: replicas: 1 template: spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - “streams-stock-stats” topologyKey: “kubernetes.io/hostname” containers: - name: kafka-streams-stockstat image: gamussa/kafka-streams-stockstat:latest @gamussa | @PJUG | @ConfluentINc 49
apiVersion: apps/v1 kind: StatefulSet spec: serviceName: “streams-stock-stats” replicas: 2 podManagementPolicy: “Parallel” template: spec: containers: - name: kafka-streams-stockstat image: kafka-streams-stockstat:latest volumeMounts: - name: rocksdb mountPath: /var/lib/kafka-streams volumeClaimTemplates: - metadata: name: rocksdb spec: accessModes: [ “ReadWriteOnce” ] resources: requests: storage: 1Gi @gamussa | @PJUG | @ConfluentINc 50
apiVersion: apps/v1 kind: StatefulSet spec: serviceName: “streams-stock-stats” replicas: 2 podManagementPolicy: “Parallel” template: spec: containers: - name: kafka-streams-stockstat image: kafka-streams-stockstat:latest volumeMounts: - name: rocksdb mountPath: /var/lib/kafka-streams volumeClaimTemplates: - metadata: name: rocksdb spec: accessModes: [ “ReadWriteOnce” ] resources: requests: storage: 1Gi @gamussa | @PJUG | @ConfluentINc 51
apiVersion: apps/v1 kind: StatefulSet spec: serviceName: “streams-stock-stats” replicas: 2 podManagementPolicy: “Parallel” template: spec: containers: - name: kafka-streams-stockstat image: kafka-streams-stockstat:latest volumeMounts: - name: rocksdb mountPath: /var/lib/kafka-streams volumeClaimTemplates: - metadata: name: rocksdb spec: accessModes: [ “ReadWriteOnce” ] resources: requests: storage: 1Gi @gamussa | @PJUG | @ConfluentINc 52
53 🛑 Stop! Demo time! @gamussa | @pjug | @ConfluentINc
54 Summary Kafka Streams has recoverable state, that gives streams apps easy elasticity and high availability Kubernetes makes it easy to scale applications It also has StatefulSets for applications with state @gamussa | @pjug | @ConfluentINc
55 Summary Now you know how to deploy Kafka Streams on Kubernetes and take advantage on all the scalability and highavailability capabilities @gamussa | @pjug | @ConfluentINc
56 But what about Kafka itself? @gamussa | @pjug | @ConfluentINc
57 Confluent Operator Automate provisioning Scale your Kafkas and CP clusters elastically Monitor SLAs through Confluent Control Center or Prometheus Operate at scale with enterprise support from Confluent @gamussa | @PJUG | @ConfluentINc
58 Resources and Next Steps https://cnfl.io/helm_video https://cnfl.io/cp-helm https://cnfl.io/k8s https://slackpass.io/confluentcommunity #kubernetes @gamussa | @PJUG | @ConfluentINc
59 One more thing…
https://kafka-summit.org Gamov30 @gamussa | @ @tlberglund | #DEVnexus
Thanks! @gamussa viktor@confluent.io We are hiring! https://www.confluent.io/careers/ @gamussa | @pjug @ | @ConfluentINc
62