-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wrong replication of Kafka messages happening in the same millisecond #20
Comments
hmm since Kafka timestamp is in milliseconds, how about adding the kafka record offset % 1000 (mod 1000) as microseconds? e.g. with offset of below first kafka msg at
|
seems the timestamp incorrect unit has already been fixed in #29 |
but not yet released? |
I'd create PR for the proposed solution, what do you think @avelanarius? |
changed into draft, didn't consider the order might actually be reversed when the offset happens to be passing **999...*1000 🤐 Will give that some more thought... |
Overview
When sending Kafka messages quickly, some of them might have the same timestamp (millisecond precision). If multiple messages regarding the same row happen on a single millisecond, the Connector incorrectly applies only the first message (in that millisecond) onto the database, because it relies on the timestamp for determining order.
The environment I was testing the Connector on involved only a single partition. Kafka guarantees preserving the order of messages within a partition (partition offset). The exact scenario can also happen on a multi-partition topic, as many Kafka producers send messages with the same key to the same partition, so messages regarding the same row will end up in the same Kafka topic partition.
As you will see in "Futher investigation" the Connector already receives the messages in correct order (in my testing), but is unable to apply them correctly. A partition offset is also available to determine the correct order.
Reproduction
t
.input
file: input. It consists of 10 operation triplets, which add a row withv = 0
, delete the row and add it again withv = 1
. Therefore, the final table should contain 10 rows withv = 1
:kafka-avro-console-producer
provided by Confluent to write messages frominput
:Got:
Expected: 10 rows with
v = 1
andpk
,ck
from 1 to 10.Futher investigation
Using
kafka-avro-console-consumer
I verified the messages were sent in the correct order (value part only shown):After adding additional log statements in the Connector, (surprisingly?) it also received the messages in the correct order:
Root cause
kafka-connect-scylladb/src/main/java/io/connect/scylladb/ScyllaDbSinkTaskHelper.java
Lines 94 to 98 in aa89618
The Connector uses a
setDefaultTimestamp()
method with the timestamp of a Kafka message. It is translated into CQLUSING TIMESTAMP
when executing a query and it prevents execution of next queries with timestamp lesser or equal.In the reproduction example, row with
pk = 1, ck = 1
is missing. It is caused by theDELETE
andINSERT
operation having the sameUSING TIMESTAMP
, soINSERT
is ignored:There is another smaller issue in those lines in
ScyllaDbSinkTaskHelper.java
:setDefaultTimestamp()
expects a epoch timestamp in microseconds, but a millisecond timestamp (from Kafka) is assigned, so theWRITETIME
in database is off by a factor of 1000:The text was updated successfully, but these errors were encountered: