From 8ba34cb6e5dd7f1423a557ca508509390fbc2322 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 5 Sep 2024 10:54:41 +0300 Subject: [PATCH 1/2] chore: cosmetic changes around Snapshot functions Some renames and added comments. Refactored StartIncremental into a separate function without any functional changes. Signed-off-by: Roman Gershman --- src/server/dflycmd.cc | 7 +- src/server/engine_shard.cc | 1 + src/server/engine_shard.h | 4 ++ src/server/rdb_save.cc | 4 +- src/server/snapshot.cc | 132 ++++++++++++++++++------------------- src/server/snapshot.h | 29 +++++--- 6 files changed, 96 insertions(+), 81 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 0e65814e5de9..adf9475a1b4c 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -112,20 +112,21 @@ void DflyCmd::ReplicaInfo::Cancel() { // Update state and cancel context. replica_state = SyncState::CANCELLED; cntx.Cancel(); - // Wait for tasks to finish. shard_set->RunBlockingInParallel([this](EngineShard* shard) { + VLOG(2) << "Disconnecting flow " << shard->shard_id(); + FlowInfo* flow = &flows[shard->shard_id()]; if (flow->cleanup) { flow->cleanup(); } - + VLOG(2) << "After flow cleanup " << shard->shard_id(); flow->full_sync_fb.JoinIfNeeded(); flow->conn = nullptr; }); - // Wait for error handler to quit. cntx.JoinErrorHandler(); + VLOG(1) << "Disconnecting replica " << address << ":" << listening_port; } DflyCmd::DflyCmd(ServerFamily* server_family) : sf_(server_family) { diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 3fbd9969f029..6fb0bb6b01a6 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -601,6 +601,7 @@ void EngineShard::RemoveContTx(Transaction* tx) { } void EngineShard::Heartbeat() { + DVLOG(2) << " Hearbeat"; DCHECK(namespaces.IsInitialized()); CacheStats(); diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 89b96178d24c..8ad8fc361930 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -56,6 +56,10 @@ class EngineShard { return shard_; } + bool IsMyThread() const { + return this == shard_; + } + ShardId shard_id() const { return shard_id_; } diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 4670b007d174..44afdeb05e6e 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1324,7 +1324,7 @@ void RdbSaver::Impl::StartIncrementalSnapshotting(Context* cntx, EngineShard* sh } void RdbSaver::Impl::StopSnapshotting(EngineShard* shard) { - GetSnapshot(shard)->Stop(); + GetSnapshot(shard)->Finalize(); } void RdbSaver::Impl::Cancel() { @@ -1334,7 +1334,7 @@ void RdbSaver::Impl::Cancel() { auto& snapshot = GetSnapshot(shard); if (snapshot) - snapshot->Cancel(); + snapshot->StopChannel(); dfly::SliceSnapshot::DbRecord rec; while (channel_.Pop(rec)) { diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index fa508002bb47..2d9982b5c0a5 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -90,79 +90,40 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal, cll] { IterateBucketsFb(cll, stream_journal); db_slice_->UnregisterOnChange(snapshot_version_); - if (cll->IsCancelled()) { - Cancel(); - } else if (!stream_journal) { - CloseRecordChannel(); + // We stop the channel if we are performing backups (non-streaming) or full sync failed. + // For a successful full-sync we keep the channel in order to switch to streaming mode. + if (cll->IsCancelled() || !stream_journal) { + StopChannel(); } }); } void SliceSnapshot::StartIncremental(Context* cntx, LSN start_lsn) { - auto* journal = db_slice_->shard_owner()->journal(); - DCHECK(journal); - serializer_ = std::make_unique(compression_mode_); - snapshot_fb_ = - fb2::Fiber("incremental_snapshot", [this, journal, cntx, lsn = start_lsn]() mutable { - DCHECK(lsn <= journal->GetLsn()) << "The replica tried to sync from the future."; - - VLOG(1) << "Starting incremental snapshot from lsn=" << lsn; - - // The replica sends the LSN of the next entry is wants to receive. - while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) { - serializer_->WriteJournalEntry(journal->GetEntry(lsn)); - PushSerializedToChannel(false); - lsn++; - } - - VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1); - - // This check is safe, but it is not trivially safe. - // We rely here on the fact that JournalSlice::AddLogRecord can - // only preempt while holding the callback lock. - // That guarantees that if we have processed the last LSN the callback - // will only be added after JournalSlice::AddLogRecord has finished - // iterating its callbacks and we won't process the record twice. - // We have to make sure we don't preempt ourselves before registering the callback! - - // GetLsn() is always the next lsn that we expect to create. - if (journal->GetLsn() == lsn) { - { - FiberAtomicGuard fg; - serializer_->SendFullSyncCut(); - } - auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this); - journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); - PushSerializedToChannel(true); - } else { - // We stopped but we didn't manage to send the whole stream. - cntx->ReportError( - std::make_error_code(errc::state_not_recoverable), - absl::StrCat("Partial sync was unsuccessful because entry #", lsn, - " was dropped from the buffer. Current lsn=", journal->GetLsn())); - Cancel(); - } - }); + snapshot_fb_ = fb2::Fiber("incremental_snapshot", [cntx, start_lsn, this] { + this->SwitchIncrementalFb(cntx, start_lsn); + }); } -void SliceSnapshot::Stop() { - // Wait for serialization to finish in any case. - Join(); +void SliceSnapshot::Finalize() { + DCHECK(db_slice_->shard_owner()->IsMyThread()); + DCHECK(journal_cb_id_); // Called only for streaming use-case. - if (journal_cb_id_) { - auto* journal = db_slice_->shard_owner()->journal(); - serializer_->SendJournalOffset(journal->GetLsn()); - journal->UnregisterOnChange(journal_cb_id_); - } + // Wait for serialization to finish in any case. + snapshot_fb_.JoinIfNeeded(); + auto* journal = db_slice_->shard_owner()->journal(); + serializer_->SendJournalOffset(journal->GetLsn()); + journal->UnregisterOnChange(journal_cb_id_); + journal_cb_id_ = 0; PushSerializedToChannel(true); CloseRecordChannel(); } -void SliceSnapshot::Cancel() { - VLOG(1) << "SliceSnapshot::Cancel"; +void SliceSnapshot::StopChannel() { + VLOG(1) << "SliceSnapshot::StopChannel"; + DCHECK(db_slice_->shard_owner()->IsMyThread()); // Cancel() might be called multiple times from different fibers of the same thread, but we // should unregister the callback only once. @@ -175,11 +136,6 @@ void SliceSnapshot::Cancel() { CloseRecordChannel(); } -void SliceSnapshot::Join() { - // Fiber could have already been joined by Stop. - snapshot_fb_.JoinIfNeeded(); -} - // The algorithm is to go over all the buckets and serialize those with // version < snapshot_version_. In order to serialize each physical bucket exactly once we update // bucket version to snapshot_version_ once it has been serialized. @@ -251,6 +207,49 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn << stats_.loop_serialized << "/" << stats_.side_saved << "/" << stats_.savecb_calls; } +void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { + auto* journal = db_slice_->shard_owner()->journal(); + DCHECK(journal); + DCHECK_LE(lsn, journal->GetLsn()) << "The replica tried to sync from the future."; + + VLOG(1) << "Starting incremental snapshot from lsn=" << lsn; + + // The replica sends the LSN of the next entry is wants to receive. + while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) { + serializer_->WriteJournalEntry(journal->GetEntry(lsn)); + PushSerializedToChannel(false); + lsn++; + } + + VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1); + + // This check is safe, but it is not trivially safe. + // We rely here on the fact that JournalSlice::AddLogRecord can + // only preempt while holding the callback lock. + // That guarantees that if we have processed the last LSN the callback + // will only be added after JournalSlice::AddLogRecord has finished + // iterating its callbacks and we won't process the record twice. + // We have to make sure we don't preempt ourselves before registering the callback! + + // GetLsn() is always the next lsn that we expect to create. + if (journal->GetLsn() == lsn) { + { + FiberAtomicGuard fg; + serializer_->SendFullSyncCut(); + } + auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this); + journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); + PushSerializedToChannel(true); + } else { + // We stopped but we didn't manage to send the whole stream. + cntx->ReportError( + std::make_error_code(errc::state_not_recoverable), + absl::StrCat("Partial sync was unsuccessful because entry #", lsn, + " was dropped from the buffer. Current lsn=", journal->GetLsn())); + StopChannel(); + } +} + bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { ++stats_.savecb_calls; @@ -288,7 +287,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite while (!it.is_done()) { ++result; - // might preempt + // might preempt due to bid value serialization. SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); ++it; } @@ -349,7 +348,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { return false; // Flush any of the leftovers to avoid interleavings - const auto serialized = Serialize(); + size_t serialized = Serialize(); // Bucket serialization might have accumulated some delayed values. // Because we can finally block in this function, we'll await and serialize them @@ -359,7 +358,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { delayed_entries_.pop_back(); } - const auto total_serialized = Serialize() + serialized; + size_t total_serialized = Serialize() + serialized; return total_serialized > 0; } @@ -401,6 +400,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) } void SliceSnapshot::CloseRecordChannel() { + DCHECK(db_slice_->shard_owner()->IsMyThread()); std::unique_lock lk(db_slice_->GetSerializationMutex()); CHECK(!serialize_bucket_running_); diff --git a/src/server/snapshot.h b/src/server/snapshot.h index ba5e083a8ecb..3a305426524f 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -79,22 +79,30 @@ class SliceSnapshot { // called. void StartIncremental(Context* cntx, LSN start_lsn); - // Stop snapshot. Only needs to be called for journal streaming mode. - void Stop(); + // Finalizes the snapshot. Should only be called for journal streaming mode. + // Blocking. Must be called from the Snapshot thread. + void Finalize(); - // Wait for iteration fiber to stop. - void Join(); - - // Force stop. Needs to be called together with cancelling the context. + // Stops channel. Needs to be called together with cancelling the context. // Snapshot can't always react to cancellation in streaming mode because the // iteration fiber might have finished running by then. - void Cancel(); + // Blocking. Must be called from the Snapshot thread. + void StopChannel(); + + // Waits for a regular, non journal snapshot to finish. + // Used in RdbSaver, and can be called from any thread. + void Join() { + snapshot_fb_.JoinIfNeeded(); + } private: - // Main fiber that iterates over all buckets in the db slice + // Main snapshotting fiber that iterates over all buckets in the db slice // and submits them to SerializeBucket. void IterateBucketsFb(const Cancellation* cll, bool send_full_sync_cut); + // A fiber function that switches to the incremental mode + void SwitchIncrementalFb(Context* cntx, LSN lsn); + // Called on traversing cursor by IterateBucketsFb. bool BucketSaveCb(PrimeIterator it); @@ -117,10 +125,11 @@ class SliceSnapshot { // Push serializer's internal buffer to channel. // Push regardless of buffer size if force is true. - // Return if pushed. + // Return true if pushed. Can block. Is called from the snapshot thread. bool PushSerializedToChannel(bool force); - // Helper function that flushes the serialized items into the RecordStream + // Helper function that flushes the serialized items into the RecordStream. + // Can block on the channel. using FlushState = SerializerBase::FlushState; size_t Serialize(FlushState flush_state = FlushState::kFlushMidEntry); From 3c911c21fa9fe3fac8ba64bc65fb999cd5d03ab6 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 6 Sep 2024 09:48:52 +0300 Subject: [PATCH 2/2] chore: fix comments --- src/server/dflycmd.cc | 6 ++---- src/server/rdb_save.cc | 2 +- src/server/rdb_save.h | 4 ++-- src/server/snapshot.cc | 5 +++-- src/server/snapshot.h | 4 ++-- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index adf9475a1b4c..2c6e9c58b608 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -599,10 +599,8 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha } void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) { - // Shard can be null for io thread. - if (shard != nullptr) { - flow->saver->StopSnapshotInShard(shard); - } + DCHECK(shard); + flow->saver->StopFullSyncInShard(shard); // Wait for full sync to finish. flow->full_sync_fb.JoinIfNeeded(); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 44afdeb05e6e..1d25c496bf4c 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1479,7 +1479,7 @@ void RdbSaver::StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard impl_->StartIncrementalSnapshotting(cntx, shard, start_lsn); } -void RdbSaver::StopSnapshotInShard(EngineShard* shard) { +void RdbSaver::StopFullSyncInShard(EngineShard* shard) { impl_->StopSnapshotting(shard); } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 4a485c63fbbd..86f056d5b177 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -92,8 +92,8 @@ class RdbSaver { // Send only the incremental snapshot since start_lsn. void StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard, LSN start_lsn); - // Stops serialization in journal streaming mode in the shard's thread. - void StopSnapshotInShard(EngineShard* shard); + // Stops full-sync serialization for replication in the shard's thread. + void StopFullSyncInShard(EngineShard* shard); // Stores auxiliary (meta) values and header_info std::error_code SaveHeader(const GlobalData& header_info); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 2d9982b5c0a5..e6674f41aabc 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -106,9 +106,10 @@ void SliceSnapshot::StartIncremental(Context* cntx, LSN start_lsn) { }); } +// Called only for replication use-case. void SliceSnapshot::Finalize() { DCHECK(db_slice_->shard_owner()->IsMyThread()); - DCHECK(journal_cb_id_); // Called only for streaming use-case. + DCHECK(journal_cb_id_); // Wait for serialization to finish in any case. snapshot_fb_.JoinIfNeeded(); @@ -287,7 +288,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite while (!it.is_done()) { ++result; - // might preempt due to bid value serialization. + // might preempt due to big value serialization. SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); ++it; } diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 3a305426524f..79e9243148b9 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -79,7 +79,7 @@ class SliceSnapshot { // called. void StartIncremental(Context* cntx, LSN start_lsn); - // Finalizes the snapshot. Should only be called for journal streaming mode. + // Finalizes the snapshot. Only called for replication. // Blocking. Must be called from the Snapshot thread. void Finalize(); @@ -90,7 +90,7 @@ class SliceSnapshot { void StopChannel(); // Waits for a regular, non journal snapshot to finish. - // Used in RdbSaver, and can be called from any thread. + // Called only for non-replication, backups usecases. void Join() { snapshot_fb_.JoinIfNeeded(); }