Kafka Record Patterns for Data Replication
Imagine going down to your local milkshake bar and signing a contract with the owner so that you could purchase bespoke drinks at a set price. Let’s say you agreed on fresh milk with 3.5% fat and one tablespoon of chocolate powder, per 500ml of milk. Putting that into a table might look like this:
PK | contract_number | start | fat_content | chocolate_powder |
---|---|---|---|---|
100 | 12345678 | 2021-01-01 | 3.5% | 1 tbsp |
After a few weeks, your tastebuds become a little desensitised and you decide you want to add some more chocolate powder. The owner is agile, so he adjusts the contract, meaning we need to add a few columns in order to track validity:
PK | contract_number | contract_from | start | end | fat_content | chocolate_powder |
---|---|---|---|---|---|---|
100 | 12345678 | 2021-01-01 | 0001-01-01 | 2021-01-31 | 3.5% | 1 tbsp |
101 | 12345678 | 2021-01-01 | 2021-02-01 | 9999-12-31 | 3.5% | 2 tbsp |
Note two things: 1) this table is not normalised and 2) I used a low date (year 0001) and high date (year 9999) for the start of the first row and the end of the last row.
In reality we would probably normalise this data. For the sake of this example, I won’t because it will make it more readable as I add more information below.
The low and high dates are there, so that I can always find data, regardless of the date I use – I don’t have to know the contract termination date which is different for every contract, in order to be able to simply ask what the latest recipe is, for a given contract number:
1 2 3 4 5 | select * from contracts where contract_number = '12345678' and '9999-12-31' between start and end; --> returns row with primary key 101 |
After a few more weeks, I realise that I need to reduce my calorific intake, but I’m a complete chocoholic. We agree to reduce the fat content:
PK | contract_number | contract_from | start | end | fat_content | chocolate_powder |
---|---|---|---|---|---|---|
100 | 12345678 | 2021-01-01 | 0001-01-01 | 2021-01-31 | 3.5% | 1 tbsp |
101 | 12345678 | 2021-01-01 | 2021-02-01 | 2021-02-28 | 3.5% | 2 tbsp |
102 | 12345678 | 2021-01-01 | 2021-03-01 | 9999-12-31 | 0.8% | 2 tbsp |
At some point I get bored of milkshakes and I terminate the contract, but because I never purchased a milkshake with 0.8% fat, the owner lets me terminate it with a date in the past, say 2021-02-14, so that we can delete the last row:
PK | contract_number | contract_from | contract_to | start | end | fat_content | chocolate_powder |
---|---|---|---|---|---|---|---|
100 | 12345678 | 2021-01-01 | 2021-02-14 | 0001-01-01 | 2021-01-31 | 3.5% | 1 tbsp |
101 | 12345678 | 2021-01-01 | 2021-02-14 | 2021-02-01 | 9999-12-31 | 3.5% | 2 tbsp |
Note that it is a design choice whether or not we “shorten” the end date. We might want to do that in order to make such data not be found after the contract termination date. It depends on requirements more than anything.
What has all this got to do with Kafka, and data replication?
Imagine a self-contained microservice which needs to have an up to date copy of this data, in memory, in order to run lightning fast. Imagine you want that cache to be distributed across all of your service instances (Kubernetes pods). How about the following 7 lines of Kotlin code that use the nifty Kafka Streams API:
01 02 03 04 05 06 07 08 09 10 | val builder = StreamsBuilder() val globalStore = Materialized.`as`(globalStoreName) // global, so that every pod has access to all data from all partitions: builder.globalTable(CONTRACTS_TOPIC, globalStore) val streams = KafkaStreams(builder.build(), props) streams.start() val globalBausteinView = streams.store(fromNameAndType(globalStoreName, ...) // REST Handler: val contractJson = globalBausteinView.get(contractNumber) |
We need to publish the contract data to the topic used as the input, but before we do that, let’s think about the keys we use, in order to have the data survive log compaction. It would be no good to publish three records, each using the contract number as the key, because as soon as the topic were compacted, only the data from the last row would survive, and any service replicating from scratch would have an incomplete dataset. The solution is to include the start date in the key, e.g. “12345678::2021-02-01”.
We have a number of options regarding the values (payload). Let’s work through the examples.
(Note: initially contracts are valid for 5 years, so the contract_to column always has a value)
1) Denormalised Table, Variation 1 – One Event per Attribute Combination
Use Case | PK | contract_number | contract_from | contract_to | start | end | fat
content | chocolate
powder | records emitted |
---|---|---|---|---|---|---|---|---|---|
Contract Creation | 100 | 12345678 | 2021-01-01 | 2025-12-31 | 0001-01-01 | 9999-12-31 | 3.5% | 1 tbsp | Key: 12345678::2021-01-01
Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-01-01”, end: “2025-12-31”, fatContent: 3.5, choc: 1} |
Change choc powder | 101 | 12345678 | 2021-01-01 | 2025-12-31 | 0001-01-01 | 2021-01-31 | 3.5% | 1 tbsp | Key: 12345678::2021-01-01
Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-01-01”, end: “2021-01-31”, fatContent: 3.5, choc: 1} |
102 | 12345678 | 2025-12-31 | 2025-12-31 | 2021-02-01 | 9999-12-31 | 3.5% | 2 tbsp | Key: 12345678::2021-02-01 Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-02-01”, end: “2025-12-31”, fatContent: 3.5, choc: 2} | |
Change fat content | 101 | 12345678 | 2021-01-01 | 2025-12-31 | 0001-01-01 | 2021-01-31 | 3.5% | 1 tbsp | none – no changes made |
102 | 12345678 | 2021-01-01 | 2025-12-31 | 2021-02-01 | 2021-02-28 | 3.5% | 2 tbsp | Key: 12345678::2021-02-01 Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-02-01”, end: “2021-02-28”, fatContent: 3.5, choc: 2} | |
103 | 12345678 | 2021-01-01 | 2025-12-31 | 2021-03-01 | 9999-12-31 | 0.8% | 2 tbsp | Key: 12345678::2021-03-01 Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, start: “2021-03-01”, end: “2025-12-31”, fatContent: 0.8, choc: 2} | |
Contract Termination | 101 | 12345678 | 2021-01-01 | 2021-02-14 | 0001-01-01 | 2021-01-31 | 3.5% | 1 tbsp | Key: 12345678::2021-01-01
Value: {cn: 12345678, from: “2021-01-01”, to: “2021-02-14”, start: “2021-01-01”, end: “2021-01-31”, fatContent: 3.5, choc: 1} |
102 | 12345678 | 2021-01-01 | 2021-02-14 | 2021-02-01 | 2021-02-14 | 3.5% | 2 tbsp | Key: 12345678::2021-02-01 Value: {cn: 12345678, from: “2021-01-01”, to: “2021-02-14”, start: “2021-02-01”, end: “2021-02-14”, fatContent: 3.5, choc: 2} | |
103 | deleted | Key: 12345678:2021-03-01
Value: null (tombstone record) |
Note how the key and start/end dates are not the ugly technical dates but limited to the atual contract validity. That is a design choice where I chose not to expose technical details.
In this variant, we publish a record for the “lowest common denominators” in terms of validity. There is an event for each time window in which values are constant. Each change, leads to a new record.
Imagine viewing the validities of the values seperately, as they might be if we normalised the table:
Value | January | February | March | April… |
---|---|---|---|---|
Milk Fat Content | 3.5 | 0.8 | ||
Chocolate Powder | 1 | 2 | ||
Resulting Time Windows with constant values | 3.5 & 1 | 3.5 & 2 | 0.8 & 2 |
Each change leads to a new row in the denormalised table and hence a new record in Kafka. The three events that are published are visible on that bottom row.
As an alternative, we could publish one event per contract, with validities inside the payload, as follows.
2) Denormalised Table, Variation 2 – One Event per Contract
Use Case | records emitted |
---|---|
Contract Creation | Key: 12345678
Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”, fatContent: [ {start: “2021-01-01”, end: “2025-12-31”, value: 3.5} ], choc: [ {start: “2021-01-01”, end: “2025-12-31”, value: 1} ] } |
Change chocolate powder | Key: 12345678 Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”,
fatContent: [ {start: “2021-01-01”, end: “2025-12-31”, value: 3.5} ], choc: [ {start: “2021-01-01”, end: “2021-01-31”, value: 1}, {start: “2021-02-01”, end: “2025-12-31”, value: 2} ] } |
With this variation, we end up having to publish a list of values together with their validities.
3) Normalised Table, Each Attribute on its own Topic
The next solution is to publish each attribute on its own topic.
Use Case | records emitted |
---|---|
Contract Creation | Topic: Contract
Key: 12345678 Value: {cn: 12345678, from: “2021-01-01”, to: “2025-12-31”} Topic: Fat Content Key: 12345678::2021-01-01 Value: {start: “2021-01-01”, end: “2025-12-31”, value: 3.5} Topic: Chocolate Powder Key: 12345678::2021-01-01 Value: {start: “2021-01-01”, end: “2025-12-31”, value: 1} |
Change choc powder | Topic: Chocolate Powder
Key: 12345678::2021-01-01 Value: {start: “2021-01-01”, end: “2021-01-31”, value: 1} Key: 12345678::2021-02-01 Value: {start: “2021-02-01”, end: “2025-12-31”, value: 2} |
Change fat content | Topic: Fat Content
Key: 12345678::2021-01-01 Value: {start: “2021-01-01”, end: “2021-02-28”, value: 3.5} Key: 12345678::2021-03-01 Value: {start: “2021-03-01”, end: “2025-12-31”, value: 0.8} |
Contract Termination | Topic: Contract
Key: 12345678 Value: {cn: 12345678, from: “2021-01-01”, to: “2021-02-14”} Topic: Fat Content Key: 12345678::2021-01-01 Value: {start: “2021-01-01”, end: “2021-02-14”, value: 3.5} Key: 12345678::2021-03-01 Value: null (tombstone record) Topic: Chocolate Powder Key: 12345678::2021-01-01 –> no change, so no record emitted Key: 12345678::2021-02-01 Value: {start: “2021-02-01”, end: “2021-02-14”, value: 2} |
4) Verticalised Table, One Topic for all Attributes
The final solution is to use a verticalised table in order to store the data. This has the advantage that you can dynamically add new attributes, and in fact each contract could have different attributes. This is akin to a schemaless document. The publication of records in Kafka becomes quite generic.
Use Case | records emitted |
---|---|
Contract Creation | Key: 12345678::fatContent::2021-01-01
Value: {start: “2021-01-01”, end: “2025-12-31”, value: 3.5} Key: 12345678::chocolatePowder::2021-01-01 Value: {start: “2021-01-01”, end: “2025-12-31”, value: 1} |
Change choc powder | Key: 12345678::fatContent::2021-01-01 –> no change, no event emitted
Key: 12345678::chocolatePowder::2021-01-01 Value: {start: “2021-01-01”, end: “2021-01-31”, value: 1} Key: 12345678::chocolatePowder::2021-02-01 Value: {start: “2021-02-01”, end: “2025-12-31”, value: 2} |
Change fat content | Key: 12345678::fatContent::2021-01-01
Value: {start: “2021-01-01”, end: “2021-02-28”, value: 3.5} Key: 12345678::fatContent::2021-03-01 Value: {start: “2021-03-01”, end: “2021-02-28”, value: 0.8} Key: 12345678::chocolatePowder::2021-01-01 –> no change, no event emitted Key: 12345678::chocolatePowder::2021-02-01 –> no change, no event emitted |
Contract Termination | Key: 12345678::fatContent::2021-01-01
Value: {start: “2021-01-01”, end: “2021-02-14”, value: 3.5} Key: 12345678::fatContent::2021-03-01 Value: null (tombstone record) Key: 12345678::chocolatePowder::2021-01-01 –> no change, no event emitted Key: 12345678::chocolatePowder::2021-02-01 Value: {start: “2021-02-01”, end: “2021-02-14”, value: 2} |
My favourite is the first solution, as I find it to be the closest to the functional business requirements.
Another way to choose which solution to use might be to calculate the effect that the solution has on data volume (storage in Kafka; transport through your landscape; storage in replicates).
If you have other solutions, please get in touch.
Published on Java Code Geeks with permission by Ant Kutschera, partner at our JCG program. See the original article here: Kafka Record Patterns for Data Replication Opinions expressed by Java Code Geeks contributors are their own. |