diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 7ab869b2915..ad7edaaed24 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1478,9 +1478,12 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, wal.GetPreSyncSize() > 0) { synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); } + auto writer = wal.ReleaseWriter(); ROCKS_LOG_INFO(immutable_db_options_.info_log, - "deleting log %" PRIu64 " from logs_\n", wal.number); - logs_to_free_.push_back(wal.ReleaseWriter()); + "deleting log %" PRIu64 + " from logs_. Last Seq number of the WAL is %" PRIu64 "\n", + wal.number, writer->GetLastSequence()); + logs_to_free_.push_back(writer); it = logs_.erase(it); } else { wal.FinishSync(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 1f604168abf..30380712767 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1822,7 +1822,7 @@ class DBImpl : public DB { IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size); + LogFileNumberSize& log_file_number_size, int caller_id); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index d86a5d4bd42..d6289155dc7 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -319,9 +319,12 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // logs_ could have changed while we were waiting. continue; } + auto writer = log.ReleaseWriter(); ROCKS_LOG_INFO(immutable_db_options_.info_log, - "deleting log %" PRIu64 " from logs_\n", log.number); - logs_to_free_.push_back(log.ReleaseWriter()); + "deleting log %" PRIu64 + " from logs_, last seq number of WAL %" PRIu64 "\n", + log.number, writer->GetLastSequence()); + logs_to_free_.push_back(writer); logs_.pop_front(); } // Current log cannot be obsolete. @@ -496,7 +499,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { for (const auto w : state.logs_to_free) { // TODO: maybe check the return value of Close. ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Close log %" PRIu64 " from logs_\n", w->get_log_number()); + "Close log %" PRIu64 + " from logs_, last Seq number in WAL %" PRIu64 "\n", + w->get_log_number(), w->GetLastSequence()); auto s = w->Close(); s.PermitUncheckedError(); } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 28dceb3c0f1..ad5a1de3a99 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1735,7 +1735,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, assert(log_writer->get_log_number() == log_file_number_size.number); impl->mutex_.AssertHeld(); s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size, - log_file_number_size); + log_file_number_size, 0); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 5a497299c01..86a2f71d27a 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1412,14 +1412,23 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size) { + LogFileNumberSize& log_file_number_size, + int caller_id) { assert(log_size != nullptr); if (log_writer->file()->GetFileSize() == 0) { ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Start writing to WAL: [%" PRIu64 "]", + "Start writing to WAL: [%" PRIu64 " ]", log_writer->get_log_number()); } + if (log_writer->get_log_number() != logs_.back().number) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "] CallerId: %d", + log_writer->get_log_number(), logs_.back().number, caller_id); + } Slice log_entry = WriteBatchInternal::Contents(&merged_batch); + SequenceNumber seq = WriteBatchInternal::Sequence(&merged_batch); + log_writer->SetLastSequence(seq); *log_size = log_entry.size(); // When two_write_queues_ WriteToWAL has to be protected from concurretn calls // from the two queues anyway and log_write_mutex_ is already held. Otherwise @@ -1472,7 +1481,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, uint64_t log_size; io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, - log_file_number_size); + log_file_number_size, 1); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1535,12 +1544,6 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); } - if (log_writer->get_log_number() != logs_.back().number) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "]", - log_writer->get_log_number(), logs_.back().number); - } - return io_s; } @@ -1580,7 +1583,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL( uint64_t log_size; io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, - log_file_number_size); + log_file_number_size, 2); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; diff --git a/db/log_writer.cc b/db/log_writer.cc index e2e596596aa..2d6a3ba1ba4 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -29,6 +29,7 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); } + last_seq_ = 0; } Writer::~Writer() { diff --git a/db/log_writer.h b/db/log_writer.h index 1a91b21994d..71da305d739 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -92,11 +92,16 @@ class Writer { bool TEST_BufferIsEmpty(); + void SetLastSequence(SequenceNumber seq) { last_seq_ = seq; } + + SequenceNumber GetLastSequence() const { return last_seq_; } + private: std::unique_ptr dest_; size_t block_offset_; // Current offset in block uint64_t log_number_; bool recycle_log_files_; + SequenceNumber last_seq_; // crc32c values for all supported record types. These are // pre-computed to reduce the overhead of computing the crc of the