You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
By "data observation lag" I refer to the phenomenon where the data is present in CDC log tables but we don't read it yet. We may introduce such a lag intentionally in order to minimize the chance that an out-of-order write-to-the-past appears which our query window will miss - this is why the "confidence window" concept exists in scylla-cdc-java and scylla-cdc-go. But this lag may also appear unintentionally, as a side effect of library/application design/implementation. Such unintentional lag appears in the source connector during CDC generation switches (which most commonly happen on Scylla cluster topology changes).
Currently the design is roughly as follows. There is a number of "worker" processes and there is a "master" process.
Each worker periodically queries a subset of streams in the current generation. Each worker, roughly each 60 seconds (configurable, I'll call this offset_flush_interval) saves its offsets to some kind of persistent storage (there is one offset per stream, denoting that the worker has read all changes up to this offset in that stream).
The master periodically queries the CDC generations table(s) to check if there are any new generations roughly each 30 seconds (configurable, I'll call this generation_fetch_interval; in code it's called sleepBeforeGenerationDoneMs but I don't like this name). If it sees that there is a generation succeeding the currently operating one, it queries the offsets of all workers from the persistent storage. When it sees that all offsets are >= than the timestamp of the succeeding generation, it turns off the workers and starts new ones which query streams from the new generation.
This design may introduce a huge data observation lag which is unnecessary. New generations appear in the generation table(s) roughly 2 * ring_delay before they start operating, where ring_delay is a Scylla configuration parameter that nobody ever changes (except in tests) and is equal to 30s. So in practice new generations appear 60s before they start operating, speaking in terms of the clock of the Scylla node which creates the generation, and we can probably safely assume that the clocks of all our processes fit within a few-seconds interval, so we can speak in terms of the clock of our master process. This means that the master knows about a generation very early (say, 50s before it starts operating) and can take steps to get rid of the observation lag.
Consider the following example scenario with the current design. Let X be some time point.
at X - 2s the master queries the generations table and sees no new generations.
at X - 1s the workers store their offsets, each offset equal to X - 1s.
at X a new generation appears in the tables with timestamp X + 60s (so that's when it starts operating).
at X + 28s and X + 58s the master queries the generations table and sees a new generation, but does not do anything because the offsets are still < than the new generation's timestamp (X - 1s < X + 60s).
at X + 59s each worker stores its offsets, each offset equal to X + 59s.
at X + 58s + 30s and X + 58s + 60s the master queries the generations table and sees that there is a new generation, but as before, does nothing (X + 59s < X + 60s).
at time X + 59s + 60s each worker again stores its offsets, each offset equal to X + 59s + 60s.
at time X + 58s + 90s the master finally sees that all stored offsets are >= than X + 60s (the generation timestamp) so it performs the switch.
So new workers are created at X + 58s + 90s, but the generation started operating at X + 60s. We get a ~90s lag (90s - epsilon, where epsilon = 2s in my example) before we start observing data from the new generation!
This doesn't have to be the case. Consider the following alternative design (and I'm sure there are many more different/better designs):
As soon as the master sees a new generation in the table (at X + 28s in the above example), it tells the existing workers that they should query no further than the generation's timestamp (X + 60s).
For each worker, as soon as it queries the last window (the window which intersects the X + 60s time point), it persists its offsets and informs the master.
As soon as the master learns that each worker queried the last window, it creates new workers.
Then the observation lag is independent of offset_flush_interval because the workers will truncate this interval when they learn about a new generation (they'll do the last flush earlier than usual). Furthermore, if generation_fetch_interval < 2 * ring_delay = 60s, the master will learn about the new generation before it starts operating. Then the observation lag will depend only on the querying frequency of each worker, the confidence window, and the communication delays between master and workers; thus, assuming that the communication delay is small, the lag will be roughly the same as if no generation switch was performed.
The text was updated successfully, but these errors were encountered:
By "data observation lag" I refer to the phenomenon where the data is present in CDC log tables but we don't read it yet. We may introduce such a lag intentionally in order to minimize the chance that an out-of-order write-to-the-past appears which our query window will miss - this is why the "confidence window" concept exists in scylla-cdc-java and scylla-cdc-go. But this lag may also appear unintentionally, as a side effect of library/application design/implementation. Such unintentional lag appears in the source connector during CDC generation switches (which most commonly happen on Scylla cluster topology changes).
Currently the design is roughly as follows. There is a number of "worker" processes and there is a "master" process.
Each worker periodically queries a subset of streams in the current generation. Each worker, roughly each 60 seconds (configurable, I'll call this
offset_flush_interval
) saves its offsets to some kind of persistent storage (there is one offset per stream, denoting that the worker has read all changes up to this offset in that stream).The master periodically queries the CDC generations table(s) to check if there are any new generations roughly each 30 seconds (configurable, I'll call this
generation_fetch_interval
; in code it's calledsleepBeforeGenerationDoneMs
but I don't like this name). If it sees that there is a generation succeeding the currently operating one, it queries the offsets of all workers from the persistent storage. When it sees that all offsets are>=
than the timestamp of the succeeding generation, it turns off the workers and starts new ones which query streams from the new generation.This design may introduce a huge data observation lag which is unnecessary. New generations appear in the generation table(s) roughly
2 * ring_delay
before they start operating, wherering_delay
is a Scylla configuration parameter that nobody ever changes (except in tests) and is equal to30s
. So in practice new generations appear60s
before they start operating, speaking in terms of the clock of the Scylla node which creates the generation, and we can probably safely assume that the clocks of all our processes fit within a few-seconds interval, so we can speak in terms of the clock of our master process. This means that the master knows about a generation very early (say,50s
before it starts operating) and can take steps to get rid of the observation lag.Consider the following example scenario with the current design. Let
X
be some time point.X - 2s
the master queries the generations table and sees no new generations.X - 1s
the workers store their offsets, each offset equal toX - 1s
.X
a new generation appears in the tables with timestampX + 60s
(so that's when it starts operating).X + 28s
andX + 58s
the master queries the generations table and sees a new generation, but does not do anything because the offsets are still<
than the new generation's timestamp (X - 1s < X + 60s
).X + 59s
each worker stores its offsets, each offset equal toX + 59s
.X + 58s + 30s
andX + 58s + 60s
the master queries the generations table and sees that there is a new generation, but as before, does nothing (X + 59s < X + 60s
).X + 59s + 60s
each worker again stores its offsets, each offset equal toX + 59s + 60s
.X + 58s + 90s
the master finally sees that all stored offsets are>=
thanX + 60s
(the generation timestamp) so it performs the switch.So new workers are created at
X + 58s + 90s
, but the generation started operating atX + 60s
. We get a~90s
lag (90s - epsilon
, whereepsilon = 2s
in my example) before we start observing data from the new generation!This doesn't have to be the case. Consider the following alternative design (and I'm sure there are many more different/better designs):
X + 28s
in the above example), it tells the existing workers that they should query no further than the generation's timestamp (X + 60s
).X + 60s
time point), it persists its offsets and informs the master.Then the observation lag is independent of
offset_flush_interval
because the workers will truncate this interval when they learn about a new generation (they'll do the last flush earlier than usual). Furthermore, ifgeneration_fetch_interval < 2 * ring_delay = 60s
, the master will learn about the new generation before it starts operating. Then the observation lag will depend only on the querying frequency of each worker, the confidence window, and the communication delays between master and workers; thus, assuming that the communication delay is small, the lag will be roughly the same as if no generation switch was performed.The text was updated successfully, but these errors were encountered: