-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: cosmetic changes around Snapshot functions #3652
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,79 +90,40 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot | |
snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal, cll] { | ||
IterateBucketsFb(cll, stream_journal); | ||
db_slice_->UnregisterOnChange(snapshot_version_); | ||
if (cll->IsCancelled()) { | ||
Cancel(); | ||
} else if (!stream_journal) { | ||
CloseRecordChannel(); | ||
// We stop the channel if we are performing backups (non-streaming) or full sync failed. | ||
// For a successful full-sync we keep the channel in order to switch to streaming mode. | ||
if (cll->IsCancelled() || !stream_journal) { | ||
StopChannel(); | ||
} | ||
}); | ||
} | ||
|
||
void SliceSnapshot::StartIncremental(Context* cntx, LSN start_lsn) { | ||
auto* journal = db_slice_->shard_owner()->journal(); | ||
DCHECK(journal); | ||
|
||
serializer_ = std::make_unique<RdbSerializer>(compression_mode_); | ||
|
||
snapshot_fb_ = | ||
fb2::Fiber("incremental_snapshot", [this, journal, cntx, lsn = start_lsn]() mutable { | ||
DCHECK(lsn <= journal->GetLsn()) << "The replica tried to sync from the future."; | ||
|
||
VLOG(1) << "Starting incremental snapshot from lsn=" << lsn; | ||
|
||
// The replica sends the LSN of the next entry is wants to receive. | ||
while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) { | ||
serializer_->WriteJournalEntry(journal->GetEntry(lsn)); | ||
PushSerializedToChannel(false); | ||
lsn++; | ||
} | ||
|
||
VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1); | ||
|
||
// This check is safe, but it is not trivially safe. | ||
// We rely here on the fact that JournalSlice::AddLogRecord can | ||
// only preempt while holding the callback lock. | ||
// That guarantees that if we have processed the last LSN the callback | ||
// will only be added after JournalSlice::AddLogRecord has finished | ||
// iterating its callbacks and we won't process the record twice. | ||
// We have to make sure we don't preempt ourselves before registering the callback! | ||
|
||
// GetLsn() is always the next lsn that we expect to create. | ||
if (journal->GetLsn() == lsn) { | ||
{ | ||
FiberAtomicGuard fg; | ||
serializer_->SendFullSyncCut(); | ||
} | ||
auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this); | ||
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); | ||
PushSerializedToChannel(true); | ||
} else { | ||
// We stopped but we didn't manage to send the whole stream. | ||
cntx->ReportError( | ||
std::make_error_code(errc::state_not_recoverable), | ||
absl::StrCat("Partial sync was unsuccessful because entry #", lsn, | ||
" was dropped from the buffer. Current lsn=", journal->GetLsn())); | ||
Cancel(); | ||
} | ||
}); | ||
snapshot_fb_ = fb2::Fiber("incremental_snapshot", [cntx, start_lsn, this] { | ||
this->SwitchIncrementalFb(cntx, start_lsn); | ||
}); | ||
} | ||
|
||
void SliceSnapshot::Stop() { | ||
// Wait for serialization to finish in any case. | ||
Join(); | ||
void SliceSnapshot::Finalize() { | ||
DCHECK(db_slice_->shard_owner()->IsMyThread()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. more dchecks to help enforce invariants. |
||
DCHECK(journal_cb_id_); // Called only for streaming use-case. | ||
romange marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if (journal_cb_id_) { | ||
auto* journal = db_slice_->shard_owner()->journal(); | ||
serializer_->SendJournalOffset(journal->GetLsn()); | ||
journal->UnregisterOnChange(journal_cb_id_); | ||
} | ||
// Wait for serialization to finish in any case. | ||
snapshot_fb_.JoinIfNeeded(); | ||
|
||
auto* journal = db_slice_->shard_owner()->journal(); | ||
serializer_->SendJournalOffset(journal->GetLsn()); | ||
journal->UnregisterOnChange(journal_cb_id_); | ||
journal_cb_id_ = 0; | ||
PushSerializedToChannel(true); | ||
CloseRecordChannel(); | ||
} | ||
|
||
void SliceSnapshot::Cancel() { | ||
VLOG(1) << "SliceSnapshot::Cancel"; | ||
void SliceSnapshot::StopChannel() { | ||
VLOG(1) << "SliceSnapshot::StopChannel"; | ||
DCHECK(db_slice_->shard_owner()->IsMyThread()); | ||
|
||
// Cancel() might be called multiple times from different fibers of the same thread, but we | ||
// should unregister the callback only once. | ||
|
@@ -175,11 +136,6 @@ void SliceSnapshot::Cancel() { | |
CloseRecordChannel(); | ||
} | ||
|
||
void SliceSnapshot::Join() { | ||
// Fiber could have already been joined by Stop. | ||
snapshot_fb_.JoinIfNeeded(); | ||
} | ||
|
||
// The algorithm is to go over all the buckets and serialize those with | ||
// version < snapshot_version_. In order to serialize each physical bucket exactly once we update | ||
// bucket version to snapshot_version_ once it has been serialized. | ||
|
@@ -251,6 +207,49 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn | |
<< stats_.loop_serialized << "/" << stats_.side_saved << "/" << stats_.savecb_calls; | ||
} | ||
|
||
void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { | ||
auto* journal = db_slice_->shard_owner()->journal(); | ||
DCHECK(journal); | ||
DCHECK_LE(lsn, journal->GetLsn()) << "The replica tried to sync from the future."; | ||
|
||
VLOG(1) << "Starting incremental snapshot from lsn=" << lsn; | ||
|
||
// The replica sends the LSN of the next entry is wants to receive. | ||
while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) { | ||
serializer_->WriteJournalEntry(journal->GetEntry(lsn)); | ||
PushSerializedToChannel(false); | ||
lsn++; | ||
} | ||
Comment on lines
+219
to
+223
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you do it in FOR cycle, it will be more readable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just moved the code and I deliberately did not want to modify it in this PR. |
||
|
||
VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1); | ||
|
||
// This check is safe, but it is not trivially safe. | ||
// We rely here on the fact that JournalSlice::AddLogRecord can | ||
// only preempt while holding the callback lock. | ||
// That guarantees that if we have processed the last LSN the callback | ||
// will only be added after JournalSlice::AddLogRecord has finished | ||
// iterating its callbacks and we won't process the record twice. | ||
// We have to make sure we don't preempt ourselves before registering the callback! | ||
|
||
// GetLsn() is always the next lsn that we expect to create. | ||
if (journal->GetLsn() == lsn) { | ||
{ | ||
FiberAtomicGuard fg; | ||
serializer_->SendFullSyncCut(); | ||
} | ||
auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. simple lambda is better than bind |
||
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); | ||
PushSerializedToChannel(true); | ||
} else { | ||
// We stopped but we didn't manage to send the whole stream. | ||
cntx->ReportError( | ||
std::make_error_code(errc::state_not_recoverable), | ||
absl::StrCat("Partial sync was unsuccessful because entry #", lsn, | ||
" was dropped from the buffer. Current lsn=", journal->GetLsn())); | ||
StopChannel(); | ||
} | ||
} | ||
|
||
bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { | ||
++stats_.savecb_calls; | ||
|
||
|
@@ -288,7 +287,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite | |
|
||
while (!it.is_done()) { | ||
++result; | ||
// might preempt | ||
// might preempt due to bid value serialization. | ||
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); | ||
++it; | ||
} | ||
|
@@ -349,7 +348,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { | |
return false; | ||
|
||
// Flush any of the leftovers to avoid interleavings | ||
const auto serialized = Serialize(); | ||
size_t serialized = Serialize(); | ||
|
||
// Bucket serialization might have accumulated some delayed values. | ||
// Because we can finally block in this function, we'll await and serialize them | ||
|
@@ -359,7 +358,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { | |
delayed_entries_.pop_back(); | ||
} | ||
|
||
const auto total_serialized = Serialize() + serialized; | ||
size_t total_serialized = Serialize() + serialized; | ||
return total_serialized > 0; | ||
} | ||
|
||
|
@@ -401,6 +400,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) | |
} | ||
|
||
void SliceSnapshot::CloseRecordChannel() { | ||
DCHECK(db_slice_->shard_owner()->IsMyThread()); | ||
std::unique_lock lk(db_slice_->GetSerializationMutex()); | ||
|
||
CHECK(!serialize_bucket_running_); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is probably the only functional change I did, but if you look at the implementation of Cancel/StopChannel, you will see it fits both usecases.
Now, with the rename its meaning shifted so we can just call the same function.