Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong replication of Kafka messages happening in the same millisecond #20

Open
avelanarius opened this issue Jul 19, 2020 · 5 comments · May be fixed by #61
Open

Wrong replication of Kafka messages happening in the same millisecond #20

avelanarius opened this issue Jul 19, 2020 · 5 comments · May be fixed by #61

Comments

@avelanarius
Copy link

Overview

When sending Kafka messages quickly, some of them might have the same timestamp (millisecond precision). If multiple messages regarding the same row happen on a single millisecond, the Connector incorrectly applies only the first message (in that millisecond) onto the database, because it relies on the timestamp for determining order.

The environment I was testing the Connector on involved only a single partition. Kafka guarantees preserving the order of messages within a partition (partition offset). The exact scenario can also happen on a multi-partition topic, as many Kafka producers send messages with the same key to the same partition, so messages regarding the same row will end up in the same Kafka topic partition.

As you will see in "Futher investigation" the Connector already receives the messages in correct order (in my testing), but is unable to apply them correctly. A partition offset is also available to determine the correct order.

Reproduction

  1. Start up Confluent, ScyllaDB Sink Connector. Set up Connector with topic t.
  2. Download the input file: input. It consists of 10 operation triplets, which add a row with v = 0, delete the row and add it again with v = 1. Therefore, the final table should contain 10 rows with v = 1:
{"pk":{"int":1},"ck":{"int":1}}${"ks.t.value_schema":{"pk":{"int":1},"ck":{"int":1},"v":{"int":0}}}
{"pk":{"int":1},"ck":{"int":1}}$null
{"pk":{"int":1},"ck":{"int":1}}${"ks.t.value_schema":{"pk":{"int":1},"ck":{"int":1},"v":{"int":1}}}
{"pk":{"int":2},"ck":{"int":2}}${"ks.t.value_schema":{"pk":{"int":2},"ck":{"int":2},"v":{"int":0}}}
{"pk":{"int":2},"ck":{"int":2}}$null
{"pk":{"int":2},"ck":{"int":2}}${"ks.t.value_schema":{"pk":{"int":2},"ck":{"int":2},"v":{"int":1}}}
[... MORE ...]
  1. Using the kafka-avro-console-producer provided by Confluent to write messages from input:
bin/kafka-avro-console-producer --broker-list localhost:9092 --topic t --property parse.key=true \ 
--property key.schema='{"fields":[{"name":"pk","type":["null","int"]},{"name":"ck","type":["null","int"]}],"name":"key_schema","namespace":"ks.t","type":"record"}'  \
--property "key.separator=$" --property value.schema='["null",{"fields":[{"name":"pk","type":["null","int"]},{"name":"ck","type":["null","int"]},{"name":"v","type":["null","int"]}],"name":"value_schema","namespace":"ks.t","type":"record"}]' \
--timeout 100 --request-required-acks 0 < input
  1. Select rows in the destination table:
SELECT * FROM test.t;

Got:

image

Expected: 10 rows with v = 1 and pk, ck from 1 to 10.

Futher investigation

Using kafka-avro-console-consumer I verified the messages were sent in the correct order (value part only shown):

image

After adding additional log statements in the Connector, (surprisingly?) it also received the messages in the correct order:

image

Root cause

boundStatement.setConsistencyLevel(topicConfigs.getConsistencyLevel());
boundStatement.setDefaultTimestamp(topicConfigs.getTimeStamp());
} else {
boundStatement.setConsistencyLevel(this.scyllaDbSinkConnectorConfig.consistencyLevel);
boundStatement.setDefaultTimestamp(record.timestamp());

The Connector uses a setDefaultTimestamp() method with the timestamp of a Kafka message. It is translated into CQL USING TIMESTAMP when executing a query and it prevents execution of next queries with timestamp lesser or equal.

In the reproduction example, row with pk = 1, ck = 1 is missing. It is caused by the DELETE and INSERT operation having the same USING TIMESTAMP, so INSERT is ignored:

image

There is another smaller issue in those lines in ScyllaDbSinkTaskHelper.java: setDefaultTimestamp() expects a epoch timestamp in microseconds, but a millisecond timestamp (from Kafka) is assigned, so the WRITETIME in database is off by a factor of 1000:

image

@hartmut-co-uk
Copy link

hmm since Kafka timestamp is in milliseconds, how about adding the kafka record offset % 1000 (mod 1000) as microseconds?

e.g. with offset of below first kafka msg at 32176 -> 32176 % 1000 = 176

Offset: 32176, CreateTime:1595177491954    {pk...}    -> USING TIMESTAMP 1595177491954176
Offset: 32177, CreateTime:1595177491978.   null.      -> USING TIMESTAMP 1595177491978177
Offset: 32178, CreateTime:1595177491978.   {pk...}    -> USING TIMESTAMP 1595177491978178

@hartmut-co-uk
Copy link

seems the timestamp incorrect unit has already been fixed in #29

@hartmut-co-uk
Copy link

but not yet released?
I was testing with Version 1.0.0 which still has old timestamps...
https://www.confluent.io/hub/scylladb/kafka-connect-scylladb

@hartmut-co-uk
Copy link

I'd create PR for the proposed solution, what do you think @avelanarius?

@hartmut-co-uk
Copy link

hartmut-co-uk commented Jan 3, 2022

changed into draft, didn't consider the order might actually be reversed when the offset happens to be passing **999...*1000 🤐

Will give that some more thought...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants