Kafka Stream Joins
Ktable-Ktable Join
A changelog stream, which stands in for a table that is constantly being updated with new events, is abstracted as a KTable in Kafka Streams. A KTable is essentially a set of key-value pairs, where each key is unique and each value represents the most recent change to that key. A KTable can be used to represent a materialized view of a Kafka topic or a database table.
The join method can be used to join a KTable to another KTable or a KStream. When two KTable instances are joined, a new KTable that combines the two tables according to their keys is created. Only records with matching keys are joined together as a result of the join operation, which is carried out per-key.
A new KTable instance that contains the outcome of the join operation is returned by the join method. By creating a new KTable and populating it with the most recent value for each key found in both tables, the join operation is carried out by emitting updates to both input tables simultaneously.
KTable<String, String> table1 = builder.table("kafka-left-topic");// Key=null,Value=10:00:00AM_GMT
KTable<String, String> table2 = builder.table("kafka-right-topic");// Key=10:00:00AM, Value=task_1_completed
KTable<String, String> joinedTable = table1.join(table2,
(value1, value2) -> value1 + "," + value2
);
joinedTable.toStream().foreach((key, value) -> System.out.println(key + ": " + value));
By reading from the topics “kafka-left-topic” and “kafka-right-topic,” we first create two KTable instances in this example. The values for each key in both tables are then concatenated with a comma as a separator in a join operation on these tables. Finally, we use the foreach method to print each record after converting the resulting KTable to a KStream using the toStream method.
Even if both KTables’ keys differ, we can still join the two KTables together using a foreign key exactor.
KeyValueMapper<String, String, String> foreignKeyExtractor =
(value1, value2) -> value1.split("_")[0];
KTable<String, String> joinedTable = table1.join(table2,foreignKeyExtractor,
(value1, value2) -> value1 + "," + value2
);
Ktable-Kstream Join
Joining a KTable with a KStream can be useful when you want to enrich the KStream data with information from a KTable. Here is an example of how to perform a join between a KTable and a KStream using the Kafka Streams API in Java:
Suppose we have a KStream of orders and KTable of customers, with the following schema:
KStream<String, GenericRecord> ordersStream = ...
KTable<String, GenericRecord> customersTable = ...
We want to join the ordersStream with the customersTable based on the customer_id
field in the orders stream, to enrich the orders with customer information. Here is how we can achieve this:
// Create a foreign key extractor to extract the customer_id from the orders stream
KeyValueMapper<GenericRecord, GenericRecord, String> foreignKeyExtractor =
(order, customer) -> order.get("customer_id").toString();
// Perform the join operation between the orders stream and the customers table
KStream<String, EnrichedOrder> enrichedOrdersStream = ordersStream
.leftJoin(customersTable, (order, customer) -> new EnrichedOrder(order, customer))
.selectKey(foreignKeyExtractor);
The customer_id field from the orders stream is first extracted using a foreign key extractor in this example. The left join operation between the orders stream and the customers table is then carried out using the leftJoin() method. The KTable to join with (customersTable) and a value mapper that specifies how to combine the values of the joined records (in this case, we create an EnrichedOrder object that contains both the order and customer data) are the two arguments that are required by the leftJoin() method.
The KStream that results is then re-keyed using the same foreign key extractor as the new key using the selectKey() method. This is required because the leftJoin() method returns a KStream with the old key type, and we want to re-key it to use the customer_id field as the new key.
Because a left join between a KTable and a KStream can result in multiple records for the same key in the output stream, it is important to note that the enrichedOrdersStream that results is a KStream rather than a KTable. Use the groupByKey() method to group the records by key, then the reduce() method to combine the values if you need to convert the KStream to a KTable.