Who’s tweeting about #scalalove @GAMUSSA | #SCALALOVE | @CONFLUENTINC
A presentation at Scala Love in April 2020 in by Viktor Gamov
Who’s tweeting about #scalalove @GAMUSSA | #SCALALOVE | @CONFLUENTINC
@GAMUSSA | #SCALALOVE | @CONFLUENTINC
3 I build highly scalable Hello World apps @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Apache Kafka Producer Consumer The Log Connectors Streaming processing @GAMUSSA | #SCALALOVE | @CONFLUENTINC
{ “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash } @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Streams of events Time @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Stream Processing with ksqlDB Stream: widgets Stream: widgets_red @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Stream Processing with Kafka Streams Stream: widgets final StreamsBuilder builder = new StreamsBuilder() .stream(“widgets”, Consumed.with(stringSerde, widgetsSerde)) .filter( (key, widget) “-> widget.getColour().equals(“RED”) ) .to(“widgets_red”, Produced.with(stringSerde, widgetsSerde)); Stream: widgets_red @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Stream Processing with ksqlDB Stream: widgets ksqlDB CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE color=’RED’; Stream: widgets_red @GAMUSSA | #SCALALOVE | @CONFLUENTINC
{ “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash } @GAMUSSA | #SCALALOVE | @CONFLUENTINC
SELECT COUNT(*) FROM WIDGETS GROUP BY PRODUCTION_LINE { “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 } SELECT AVG(TEMP_CELCIUS) AS TEMP FROM WIDGETS GROUP BY SENSOR_ID HAVING TEMP>20 Photo by Franck V. on Unsplash SELECT * FROM WIDGETS WHERE WEIGHT_G > 120 CREATE SINK CONNECTOR dw WITH ( Object store, ‘connector.class’ = ‘S3Connector’, data warehouse, ‘topics’ = ‘widgets’); RDBMS @GAMUSSA | #SCALALOVE | @CONFLUENTINC
ksqlDB The event streaming database purpose-built for stream processing applications. @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Stream Processing with ksqlDB Source stream @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Stream Processing with ksqlDB Source stream @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Stream Processing with ksqlDB Source stream @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Stream Processing with ksqlDB Source stream Analytics @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Stream Processing with ksqlDB Source stream Applications / Microservices @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Stream Processing with ksqlDB …SUM(TXN_AMT) GROUP BY AC_ID AC _I D= 42 BA LA NC AC E= _I 94 D= .0 42 0 Source stream Applications / Microservices @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Photo by Raoul Droog on Unsplash DEMO https:”//gamov.dev/scala-love-demo @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Interacting with ksqlDB Photo by Tim Mossholder on Unsplash
ksqlDB - Confluent Control Center @GAMUSSA | #SCALALOVE | @CONFLUENTINC
ksqlDB - REST API @GAMUSSA | #SCALALOVE | @CONFLUENTINC
ksqlDB - Native client (coming soon) @GAMUSSA | #SCALALOVE | @CONFLUENTINC
What else can ksqlDB do? Photo by Sereja Ris on Unsplash @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Message transformation with ksqlDB ORDERS s i h t t r e v o Con t p m a t s e m i t n a hum e l b a d a re t a m r o f { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } } @GAMUSSA | #SCALALOVE | @CONFLUENTINC Drop the se address fields
Message transformation with ksqlDB { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } CREATE STREAM ORDERS_NO_ADDRESS_DATA AS SELECT ORDERTIME, ORDERID, ITEMID,} ORDERUNITS ORDERS ksqlDB FROM ORDERS; @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Message transformation with ksqlDB { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: CREATE STREAM ORDERS_NO_ADDRESS_DATA AS “California” } SELECT TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd HH:mm:ss’) } ORDERS ksqlDB AS ORDER_TIMESTAMP, ORDERID, ITEMID, ORDERUNITS FROM ORDERS; ORDERS_NO_ADDRESS_DATA { “order_ts”: “2020-02-14 15:10:58”, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5 } @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Lookups and Joins with ksqlDB ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Lookups and Joins with ksqlDB ITEMS ORDERS { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Lookups and Joins with ksqlDB ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Lookups and Joins with ksqlDB { ITEMS ORDERS ksqlDB “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} ORDERS_ENRICHED CREATE STREAM AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9, “total_order_value”: 99.5 ORDERS_ENRICHED } @GAMUSSA | #SCALALOVE | @CONFLUENTINC
@GAMUSSA | #SCALALOVE | @CONFLUENTINC Photo by Mak on Unsplash Connecting ksqlDB to other systems
Connecting ksqlDB to other systems syslog Google BigQuery Amazon S3 @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Connecting ksqlDB to other systems CREATE SOURCE CONNECTOR SOURCE_MYSQL_01 WITH ( syslog ‘connector.class’ = ‘i.d.c.mysql.MySqlConnector’, ‘database.hostname’ = ‘mysql’, ‘table.whitelist’ = ‘demo.customers’); @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Connecting ksqlDB to other systems CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH ( ‘connector.class’ = Google BigQuery ‘…ElasticsearchSinkConnector’, ‘connection.url’ = ‘http:”//elasticsearch:9200’, ‘topics’ = ‘orders’); Amazon S3 @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Streams & Tables @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Streams (k/v and Tables ksqlDB Kafka bytes) topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } Stream +——————————+———-+————-+ |EVENT_TS |PERSON |LOCATION | +——————————+———-+————-+ |2020-02-17 15:22:00 |robin |Leeds | |2020-02-17 17:23:00 |robin |London | |2020-02-17 22:23:00 |robin |Wakefield| |2020-02-18 09:00:00 |robin |Leeds | Stream: Topic + Schema { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @GAMUSSA | #SCALALOVE | @CONFLUENTINC ksqlDB Table +———-+————-+ |PERSON |LOCATION | +———-+————-+ |robin |Leeds |London |Wakefield| | Table: state for given key Topic + Schema
Stateful aggregations in ksqlDB Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } SELECT PERSON, COUNT(*) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | LOCATION_CHANGES | +———-+—————————+ |robin | 4 1 2 3 | SELECT PERSON, COUNT_DISTINCT(LOCATION) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | UNIQUE_LOCATIONS | +———-+—————————+ |robin | 3 1 2 | { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Aggregations can be across the entire input, or windowed (TUMBLING, HOPPING, SESSION) @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Pull and Push queries in ksqlDB Pull query Tells you: Exits: Point in time value Immediately Push query All value changes Never @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Under the covers of ksqlDB @GAMUSSA | #SCALALOVE | @CONFLUENTINC Photo by Vinicius de Moraes on Unsplas
Kafka consume ksqlDB produce @GAMUSSA | #SCALALOVE | @CONFLUENTINC
JVM Kafka consume ksqlDB produce RocksDB Kafka Streams @GAMUSSA | #SCALALOVE | @CONFLUENTINC
L Q S K & ^ Fully Managed Kafka as a Service
Running ksqlDB - self-managed DEB, RPM, ZIP, TAR downloads http://confluent.io/download Docker images ksqlDB Server confluentinc/ksqldb-server (JVM process) …and many more… @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Scaling ksqlDB Kafka cluster ksqlDB @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Scaling ksqlDB Kafka cluster ksqlDB Work split by partition ksqlDB ksqlDB cluster @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Think Applications, not database instances Kafka cluster ksqlDB cluster Inventory ksqlDB cluster Orders @GAMUSSA | #SCALALOVE | @CONFLUENTINC ksqlDB cluster Fraud
ksqlDB or Kafka Streams? @GAMUSSA | #SCALALOVE | @CONFLUENTINC Photo by Ramiz Dedaković on Unsplash
ksqlDB Builds on Streams ksqlDB Kafka Streams Consumer, Producer @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Photo by Tucker Good on Unsplas Want to learn more? @GAMUSSA | #SCALALOVE | @CONFLUENTINC CTAs, not CATs (sorry, not sorry)
Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io
Confluent Community Slack group cnfl.io/slack @GAMUSSA | #SCALALOVE | @CONFLUENTINC
Free Books! https:”//cnfl.io/book-bundle @GAMUSSA | #SCALALOVE | @CONFLUENTINC
@GAMUSSA | #SCALALOVE | @CONFLUENTINC