diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 0e65814e5de9..2c6e9c58b608 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -112,20 +112,21 @@ void DflyCmd::ReplicaInfo::Cancel() { // Update state and cancel context. replica_state = SyncState::CANCELLED; cntx.Cancel(); - // Wait for tasks to finish. shard_set->RunBlockingInParallel([this](EngineShard* shard) { + VLOG(2) << "Disconnecting flow " << shard->shard_id(); + FlowInfo* flow = &flows[shard->shard_id()]; if (flow->cleanup) { flow->cleanup(); } - + VLOG(2) << "After flow cleanup " << shard->shard_id(); flow->full_sync_fb.JoinIfNeeded(); flow->conn = nullptr; }); - // Wait for error handler to quit. cntx.JoinErrorHandler(); + VLOG(1) << "Disconnecting replica " << address << ":" << listening_port; } DflyCmd::DflyCmd(ServerFamily* server_family) : sf_(server_family) { @@ -598,10 +599,8 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha } void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) { - // Shard can be null for io thread. - if (shard != nullptr) { - flow->saver->StopSnapshotInShard(shard); - } + DCHECK(shard); + flow->saver->StopFullSyncInShard(shard); // Wait for full sync to finish. flow->full_sync_fb.JoinIfNeeded(); diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 3fbd9969f029..6fb0bb6b01a6 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -601,6 +601,7 @@ void EngineShard::RemoveContTx(Transaction* tx) { } void EngineShard::Heartbeat() { + DVLOG(2) << " Hearbeat"; DCHECK(namespaces.IsInitialized()); CacheStats(); diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 89b96178d24c..8ad8fc361930 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -56,6 +56,10 @@ class EngineShard { return shard_; } + bool IsMyThread() const { + return this == shard_; + } + ShardId shard_id() const { return shard_id_; } diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 4670b007d174..1d25c496bf4c 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1324,7 +1324,7 @@ void RdbSaver::Impl::StartIncrementalSnapshotting(Context* cntx, EngineShard* sh } void RdbSaver::Impl::StopSnapshotting(EngineShard* shard) { - GetSnapshot(shard)->Stop(); + GetSnapshot(shard)->Finalize(); } void RdbSaver::Impl::Cancel() { @@ -1334,7 +1334,7 @@ void RdbSaver::Impl::Cancel() { auto& snapshot = GetSnapshot(shard); if (snapshot) - snapshot->Cancel(); + snapshot->StopChannel(); dfly::SliceSnapshot::DbRecord rec; while (channel_.Pop(rec)) { @@ -1479,7 +1479,7 @@ void RdbSaver::StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard impl_->StartIncrementalSnapshotting(cntx, shard, start_lsn); } -void RdbSaver::StopSnapshotInShard(EngineShard* shard) { +void RdbSaver::StopFullSyncInShard(EngineShard* shard) { impl_->StopSnapshotting(shard); } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 4a485c63fbbd..86f056d5b177 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -92,8 +92,8 @@ class RdbSaver { // Send only the incremental snapshot since start_lsn. void StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard, LSN start_lsn); - // Stops serialization in journal streaming mode in the shard's thread. - void StopSnapshotInShard(EngineShard* shard); + // Stops full-sync serialization for replication in the shard's thread. + void StopFullSyncInShard(EngineShard* shard); // Stores auxiliary (meta) values and header_info std::error_code SaveHeader(const GlobalData& header_info); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index fa508002bb47..e6674f41aabc 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -90,79 +90,41 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal, cll] { IterateBucketsFb(cll, stream_journal); db_slice_->UnregisterOnChange(snapshot_version_); - if (cll->IsCancelled()) { - Cancel(); - } else if (!stream_journal) { - CloseRecordChannel(); + // We stop the channel if we are performing backups (non-streaming) or full sync failed. + // For a successful full-sync we keep the channel in order to switch to streaming mode. + if (cll->IsCancelled() || !stream_journal) { + StopChannel(); } }); } void SliceSnapshot::StartIncremental(Context* cntx, LSN start_lsn) { - auto* journal = db_slice_->shard_owner()->journal(); - DCHECK(journal); - serializer_ = std::make_unique(compression_mode_); - snapshot_fb_ = - fb2::Fiber("incremental_snapshot", [this, journal, cntx, lsn = start_lsn]() mutable { - DCHECK(lsn <= journal->GetLsn()) << "The replica tried to sync from the future."; - - VLOG(1) << "Starting incremental snapshot from lsn=" << lsn; - - // The replica sends the LSN of the next entry is wants to receive. - while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) { - serializer_->WriteJournalEntry(journal->GetEntry(lsn)); - PushSerializedToChannel(false); - lsn++; - } - - VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1); - - // This check is safe, but it is not trivially safe. - // We rely here on the fact that JournalSlice::AddLogRecord can - // only preempt while holding the callback lock. - // That guarantees that if we have processed the last LSN the callback - // will only be added after JournalSlice::AddLogRecord has finished - // iterating its callbacks and we won't process the record twice. - // We have to make sure we don't preempt ourselves before registering the callback! - - // GetLsn() is always the next lsn that we expect to create. - if (journal->GetLsn() == lsn) { - { - FiberAtomicGuard fg; - serializer_->SendFullSyncCut(); - } - auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this); - journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); - PushSerializedToChannel(true); - } else { - // We stopped but we didn't manage to send the whole stream. - cntx->ReportError( - std::make_error_code(errc::state_not_recoverable), - absl::StrCat("Partial sync was unsuccessful because entry #", lsn, - " was dropped from the buffer. Current lsn=", journal->GetLsn())); - Cancel(); - } - }); + snapshot_fb_ = fb2::Fiber("incremental_snapshot", [cntx, start_lsn, this] { + this->SwitchIncrementalFb(cntx, start_lsn); + }); } -void SliceSnapshot::Stop() { - // Wait for serialization to finish in any case. - Join(); +// Called only for replication use-case. +void SliceSnapshot::Finalize() { + DCHECK(db_slice_->shard_owner()->IsMyThread()); + DCHECK(journal_cb_id_); - if (journal_cb_id_) { - auto* journal = db_slice_->shard_owner()->journal(); - serializer_->SendJournalOffset(journal->GetLsn()); - journal->UnregisterOnChange(journal_cb_id_); - } + // Wait for serialization to finish in any case. + snapshot_fb_.JoinIfNeeded(); + auto* journal = db_slice_->shard_owner()->journal(); + serializer_->SendJournalOffset(journal->GetLsn()); + journal->UnregisterOnChange(journal_cb_id_); + journal_cb_id_ = 0; PushSerializedToChannel(true); CloseRecordChannel(); } -void SliceSnapshot::Cancel() { - VLOG(1) << "SliceSnapshot::Cancel"; +void SliceSnapshot::StopChannel() { + VLOG(1) << "SliceSnapshot::StopChannel"; + DCHECK(db_slice_->shard_owner()->IsMyThread()); // Cancel() might be called multiple times from different fibers of the same thread, but we // should unregister the callback only once. @@ -175,11 +137,6 @@ void SliceSnapshot::Cancel() { CloseRecordChannel(); } -void SliceSnapshot::Join() { - // Fiber could have already been joined by Stop. - snapshot_fb_.JoinIfNeeded(); -} - // The algorithm is to go over all the buckets and serialize those with // version < snapshot_version_. In order to serialize each physical bucket exactly once we update // bucket version to snapshot_version_ once it has been serialized. @@ -251,6 +208,49 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn << stats_.loop_serialized << "/" << stats_.side_saved << "/" << stats_.savecb_calls; } +void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) { + auto* journal = db_slice_->shard_owner()->journal(); + DCHECK(journal); + DCHECK_LE(lsn, journal->GetLsn()) << "The replica tried to sync from the future."; + + VLOG(1) << "Starting incremental snapshot from lsn=" << lsn; + + // The replica sends the LSN of the next entry is wants to receive. + while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) { + serializer_->WriteJournalEntry(journal->GetEntry(lsn)); + PushSerializedToChannel(false); + lsn++; + } + + VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1); + + // This check is safe, but it is not trivially safe. + // We rely here on the fact that JournalSlice::AddLogRecord can + // only preempt while holding the callback lock. + // That guarantees that if we have processed the last LSN the callback + // will only be added after JournalSlice::AddLogRecord has finished + // iterating its callbacks and we won't process the record twice. + // We have to make sure we don't preempt ourselves before registering the callback! + + // GetLsn() is always the next lsn that we expect to create. + if (journal->GetLsn() == lsn) { + { + FiberAtomicGuard fg; + serializer_->SendFullSyncCut(); + } + auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this); + journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); + PushSerializedToChannel(true); + } else { + // We stopped but we didn't manage to send the whole stream. + cntx->ReportError( + std::make_error_code(errc::state_not_recoverable), + absl::StrCat("Partial sync was unsuccessful because entry #", lsn, + " was dropped from the buffer. Current lsn=", journal->GetLsn())); + StopChannel(); + } +} + bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { ++stats_.savecb_calls; @@ -288,7 +288,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite while (!it.is_done()) { ++result; - // might preempt + // might preempt due to big value serialization. SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get()); ++it; } @@ -349,7 +349,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { return false; // Flush any of the leftovers to avoid interleavings - const auto serialized = Serialize(); + size_t serialized = Serialize(); // Bucket serialization might have accumulated some delayed values. // Because we can finally block in this function, we'll await and serialize them @@ -359,7 +359,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) { delayed_entries_.pop_back(); } - const auto total_serialized = Serialize() + serialized; + size_t total_serialized = Serialize() + serialized; return total_serialized > 0; } @@ -401,6 +401,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) } void SliceSnapshot::CloseRecordChannel() { + DCHECK(db_slice_->shard_owner()->IsMyThread()); std::unique_lock lk(db_slice_->GetSerializationMutex()); CHECK(!serialize_bucket_running_); diff --git a/src/server/snapshot.h b/src/server/snapshot.h index ba5e083a8ecb..79e9243148b9 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -79,22 +79,30 @@ class SliceSnapshot { // called. void StartIncremental(Context* cntx, LSN start_lsn); - // Stop snapshot. Only needs to be called for journal streaming mode. - void Stop(); + // Finalizes the snapshot. Only called for replication. + // Blocking. Must be called from the Snapshot thread. + void Finalize(); - // Wait for iteration fiber to stop. - void Join(); - - // Force stop. Needs to be called together with cancelling the context. + // Stops channel. Needs to be called together with cancelling the context. // Snapshot can't always react to cancellation in streaming mode because the // iteration fiber might have finished running by then. - void Cancel(); + // Blocking. Must be called from the Snapshot thread. + void StopChannel(); + + // Waits for a regular, non journal snapshot to finish. + // Called only for non-replication, backups usecases. + void Join() { + snapshot_fb_.JoinIfNeeded(); + } private: - // Main fiber that iterates over all buckets in the db slice + // Main snapshotting fiber that iterates over all buckets in the db slice // and submits them to SerializeBucket. void IterateBucketsFb(const Cancellation* cll, bool send_full_sync_cut); + // A fiber function that switches to the incremental mode + void SwitchIncrementalFb(Context* cntx, LSN lsn); + // Called on traversing cursor by IterateBucketsFb. bool BucketSaveCb(PrimeIterator it); @@ -117,10 +125,11 @@ class SliceSnapshot { // Push serializer's internal buffer to channel. // Push regardless of buffer size if force is true. - // Return if pushed. + // Return true if pushed. Can block. Is called from the snapshot thread. bool PushSerializedToChannel(bool force); - // Helper function that flushes the serialized items into the RecordStream + // Helper function that flushes the serialized items into the RecordStream. + // Can block on the channel. using FlushState = SerializerBase::FlushState; size_t Serialize(FlushState flush_state = FlushState::kFlushMidEntry);