Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cosmetic changes around Snapshot functions #3652

Merged
merged 2 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,21 @@ void DflyCmd::ReplicaInfo::Cancel() {
// Update state and cancel context.
replica_state = SyncState::CANCELLED;
cntx.Cancel();

// Wait for tasks to finish.
shard_set->RunBlockingInParallel([this](EngineShard* shard) {
VLOG(2) << "Disconnecting flow " << shard->shard_id();

FlowInfo* flow = &flows[shard->shard_id()];
if (flow->cleanup) {
flow->cleanup();
}

VLOG(2) << "After flow cleanup " << shard->shard_id();
flow->full_sync_fb.JoinIfNeeded();
flow->conn = nullptr;
});

// Wait for error handler to quit.
cntx.JoinErrorHandler();
VLOG(1) << "Disconnecting replica " << address << ":" << listening_port;
}

DflyCmd::DflyCmd(ServerFamily* server_family) : sf_(server_family) {
Expand Down
1 change: 1 addition & 0 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ void EngineShard::RemoveContTx(Transaction* tx) {
}

void EngineShard::Heartbeat() {
DVLOG(2) << " Hearbeat";
DCHECK(namespaces.IsInitialized());

CacheStats();
Expand Down
4 changes: 4 additions & 0 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class EngineShard {
return shard_;
}

bool IsMyThread() const {
return this == shard_;
}

ShardId shard_id() const {
return shard_id_;
}
Expand Down
4 changes: 2 additions & 2 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ void RdbSaver::Impl::StartIncrementalSnapshotting(Context* cntx, EngineShard* sh
}

void RdbSaver::Impl::StopSnapshotting(EngineShard* shard) {
GetSnapshot(shard)->Stop();
GetSnapshot(shard)->Finalize();
}

void RdbSaver::Impl::Cancel() {
Expand All @@ -1334,7 +1334,7 @@ void RdbSaver::Impl::Cancel() {

auto& snapshot = GetSnapshot(shard);
if (snapshot)
snapshot->Cancel();
snapshot->StopChannel();

dfly::SliceSnapshot::DbRecord rec;
while (channel_.Pop(rec)) {
Expand Down
132 changes: 66 additions & 66 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,79 +90,40 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal, cll] {
IterateBucketsFb(cll, stream_journal);
db_slice_->UnregisterOnChange(snapshot_version_);
if (cll->IsCancelled()) {
Cancel();
} else if (!stream_journal) {
CloseRecordChannel();
// We stop the channel if we are performing backups (non-streaming) or full sync failed.
// For a successful full-sync we keep the channel in order to switch to streaming mode.
if (cll->IsCancelled() || !stream_journal) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is probably the only functional change I did, but if you look at the implementation of Cancel/StopChannel, you will see it fits both usecases.
Now, with the rename its meaning shifted so we can just call the same function.

StopChannel();
}
});
}

void SliceSnapshot::StartIncremental(Context* cntx, LSN start_lsn) {
auto* journal = db_slice_->shard_owner()->journal();
DCHECK(journal);

serializer_ = std::make_unique<RdbSerializer>(compression_mode_);

snapshot_fb_ =
fb2::Fiber("incremental_snapshot", [this, journal, cntx, lsn = start_lsn]() mutable {
DCHECK(lsn <= journal->GetLsn()) << "The replica tried to sync from the future.";

VLOG(1) << "Starting incremental snapshot from lsn=" << lsn;

// The replica sends the LSN of the next entry is wants to receive.
while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) {
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
PushSerializedToChannel(false);
lsn++;
}

VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1);

// This check is safe, but it is not trivially safe.
// We rely here on the fact that JournalSlice::AddLogRecord can
// only preempt while holding the callback lock.
// That guarantees that if we have processed the last LSN the callback
// will only be added after JournalSlice::AddLogRecord has finished
// iterating its callbacks and we won't process the record twice.
// We have to make sure we don't preempt ourselves before registering the callback!

// GetLsn() is always the next lsn that we expect to create.
if (journal->GetLsn() == lsn) {
{
FiberAtomicGuard fg;
serializer_->SendFullSyncCut();
}
auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this);
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
PushSerializedToChannel(true);
} else {
// We stopped but we didn't manage to send the whole stream.
cntx->ReportError(
std::make_error_code(errc::state_not_recoverable),
absl::StrCat("Partial sync was unsuccessful because entry #", lsn,
" was dropped from the buffer. Current lsn=", journal->GetLsn()));
Cancel();
}
});
snapshot_fb_ = fb2::Fiber("incremental_snapshot", [cntx, start_lsn, this] {
this->SwitchIncrementalFb(cntx, start_lsn);
});
}

void SliceSnapshot::Stop() {
// Wait for serialization to finish in any case.
Join();
void SliceSnapshot::Finalize() {
DCHECK(db_slice_->shard_owner()->IsMyThread());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more dchecks to help enforce invariants.

DCHECK(journal_cb_id_); // Called only for streaming use-case.
romange marked this conversation as resolved.
Show resolved Hide resolved

if (journal_cb_id_) {
auto* journal = db_slice_->shard_owner()->journal();
serializer_->SendJournalOffset(journal->GetLsn());
journal->UnregisterOnChange(journal_cb_id_);
}
// Wait for serialization to finish in any case.
snapshot_fb_.JoinIfNeeded();

auto* journal = db_slice_->shard_owner()->journal();
serializer_->SendJournalOffset(journal->GetLsn());
journal->UnregisterOnChange(journal_cb_id_);
journal_cb_id_ = 0;
PushSerializedToChannel(true);
CloseRecordChannel();
}

void SliceSnapshot::Cancel() {
VLOG(1) << "SliceSnapshot::Cancel";
void SliceSnapshot::StopChannel() {
VLOG(1) << "SliceSnapshot::StopChannel";
DCHECK(db_slice_->shard_owner()->IsMyThread());

// Cancel() might be called multiple times from different fibers of the same thread, but we
// should unregister the callback only once.
Expand All @@ -175,11 +136,6 @@ void SliceSnapshot::Cancel() {
CloseRecordChannel();
}

void SliceSnapshot::Join() {
// Fiber could have already been joined by Stop.
snapshot_fb_.JoinIfNeeded();
}

// The algorithm is to go over all the buckets and serialize those with
// version < snapshot_version_. In order to serialize each physical bucket exactly once we update
// bucket version to snapshot_version_ once it has been serialized.
Expand Down Expand Up @@ -251,6 +207,49 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
<< stats_.loop_serialized << "/" << stats_.side_saved << "/" << stats_.savecb_calls;
}

void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
auto* journal = db_slice_->shard_owner()->journal();
DCHECK(journal);
DCHECK_LE(lsn, journal->GetLsn()) << "The replica tried to sync from the future.";

VLOG(1) << "Starting incremental snapshot from lsn=" << lsn;

// The replica sends the LSN of the next entry is wants to receive.
while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) {
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
PushSerializedToChannel(false);
lsn++;
}
Comment on lines +219 to +223
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you do it in FOR cycle, it will be more readable

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved the code and I deliberately did not want to modify it in this PR.


VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1);

// This check is safe, but it is not trivially safe.
// We rely here on the fact that JournalSlice::AddLogRecord can
// only preempt while holding the callback lock.
// That guarantees that if we have processed the last LSN the callback
// will only be added after JournalSlice::AddLogRecord has finished
// iterating its callbacks and we won't process the record twice.
// We have to make sure we don't preempt ourselves before registering the callback!

// GetLsn() is always the next lsn that we expect to create.
if (journal->GetLsn() == lsn) {
{
FiberAtomicGuard fg;
serializer_->SendFullSyncCut();
}
auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simple lambda is better than bind

journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
PushSerializedToChannel(true);
} else {
// We stopped but we didn't manage to send the whole stream.
cntx->ReportError(
std::make_error_code(errc::state_not_recoverable),
absl::StrCat("Partial sync was unsuccessful because entry #", lsn,
" was dropped from the buffer. Current lsn=", journal->GetLsn()));
StopChannel();
}
}

bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
++stats_.savecb_calls;

Expand Down Expand Up @@ -288,7 +287,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite

while (!it.is_done()) {
++result;
// might preempt
// might preempt due to bid value serialization.
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
++it;
}
Expand Down Expand Up @@ -349,7 +348,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
return false;

// Flush any of the leftovers to avoid interleavings
const auto serialized = Serialize();
size_t serialized = Serialize();

// Bucket serialization might have accumulated some delayed values.
// Because we can finally block in this function, we'll await and serialize them
Expand All @@ -359,7 +358,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
delayed_entries_.pop_back();
}

const auto total_serialized = Serialize() + serialized;
size_t total_serialized = Serialize() + serialized;
return total_serialized > 0;
}

Expand Down Expand Up @@ -401,6 +400,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
}

void SliceSnapshot::CloseRecordChannel() {
DCHECK(db_slice_->shard_owner()->IsMyThread());
std::unique_lock lk(db_slice_->GetSerializationMutex());

CHECK(!serialize_bucket_running_);
Expand Down
29 changes: 19 additions & 10 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,30 @@ class SliceSnapshot {
// called.
void StartIncremental(Context* cntx, LSN start_lsn);

// Stop snapshot. Only needs to be called for journal streaming mode.
void Stop();
// Finalizes the snapshot. Should only be called for journal streaming mode.
// Blocking. Must be called from the Snapshot thread.
void Finalize();

// Wait for iteration fiber to stop.
void Join();

// Force stop. Needs to be called together with cancelling the context.
// Stops channel. Needs to be called together with cancelling the context.
// Snapshot can't always react to cancellation in streaming mode because the
// iteration fiber might have finished running by then.
void Cancel();
// Blocking. Must be called from the Snapshot thread.
void StopChannel();

// Waits for a regular, non journal snapshot to finish.
// Used in RdbSaver, and can be called from any thread.
void Join() {
snapshot_fb_.JoinIfNeeded();
}

private:
// Main fiber that iterates over all buckets in the db slice
// Main snapshotting fiber that iterates over all buckets in the db slice
// and submits them to SerializeBucket.
void IterateBucketsFb(const Cancellation* cll, bool send_full_sync_cut);

// A fiber function that switches to the incremental mode
void SwitchIncrementalFb(Context* cntx, LSN lsn);

// Called on traversing cursor by IterateBucketsFb.
bool BucketSaveCb(PrimeIterator it);

Expand All @@ -117,10 +125,11 @@ class SliceSnapshot {

// Push serializer's internal buffer to channel.
// Push regardless of buffer size if force is true.
// Return if pushed.
// Return true if pushed. Can block. Is called from the snapshot thread.
bool PushSerializedToChannel(bool force);

// Helper function that flushes the serialized items into the RecordStream
// Helper function that flushes the serialized items into the RecordStream.
// Can block on the channel.
using FlushState = SerializerBase::FlushState;
size_t Serialize(FlushState flush_state = FlushState::kFlushMidEntry);

Expand Down
Loading