diff --git a/production/db/core/inc/async_disk_writer.hpp b/production/db/core/inc/async_disk_writer.hpp index 6cb6673e0642..83530e433eae 100644 --- a/production/db/core/inc/async_disk_writer.hpp +++ b/production/db/core/inc/async_disk_writer.hpp @@ -93,7 +93,7 @@ class async_disk_writer_t * Append fdatasync to the in_progress_batch and update batch with file fd so that the file * can be closed once the kernel has processed it. */ - void perform_file_close_operations(int file_fd, file_sequence_t log_seq); + void perform_file_close_operations(int file_fd, log_sequence_t log_seq); /** * Copy any temporary writes (which don't exist in gaia shared memory) into the metadata buffer. @@ -115,7 +115,8 @@ class async_disk_writer_t void map_commit_ts_to_session_decision_eventfd(gaia_txn_id_t commit_ts, int session_decision_eventfd); private: - // Reserve slots in the in_progress batch to be able to append additional operations to it (before it gets submitted to the kernel) + // Reserve slots in the in_progress batch to be able to append additional operations to it + // (before it gets submitted to the kernel) static constexpr size_t c_submit_batch_sqe_count = 3; static constexpr size_t c_single_submission_entry_count = 1; static constexpr size_t c_async_batch_size = 32; @@ -123,14 +124,15 @@ class async_disk_writer_t static inline eventfd_t c_default_flush_eventfd_value = 1; static inline iovec c_default_iov = {static_cast(&c_default_flush_eventfd_value), sizeof(eventfd_t)}; - // eventfd to signal that a batch flush has completed. - // Used to block new writes to disk when a batch is already getting flushed. + // eventfd for signalling that a batch flush has completed. Used to block new writes to disk + // when a batch is already getting flushed. static inline int s_flush_eventfd = -1; - // eventfd to signal that the IO results belonging to a batch are ready to be validated. + // eventfd for signalling that the IO results belonging to a batch are ready to be checked for + // errors. int m_validate_flush_eventfd = -1; - // eventfd to signal that a file is ready to be checkpointed. + // eventfd for signal that a log file is ready to be checkpointed. int m_signal_checkpoint_eventfd = -1; // Keep track of session threads to unblock. diff --git a/production/db/core/inc/async_write_batch.hpp b/production/db/core/inc/async_write_batch.hpp index 7e5ff4a70d7e..7c9271e3f65f 100644 --- a/production/db/core/inc/async_write_batch.hpp +++ b/production/db/core/inc/async_write_batch.hpp @@ -53,9 +53,9 @@ class async_write_batch_t /** * Add file fd to the batch that should be closed once all of its pending writes have finished. */ - void append_file_to_batch(int fd, file_sequence_t log_seq); + void append_file_to_batch(int fd, log_sequence_t log_seq); - file_sequence_t get_max_file_seq_to_close(); + log_sequence_t get_max_log_seq_to_close(); /** * https://man7.org/linux/man-pages/man2/pwritev.2.html diff --git a/production/db/core/inc/db_server.hpp b/production/db/core/inc/db_server.hpp index 196c0b1b671a..628241ea4864 100644 --- a/production/db/core/inc/db_server.hpp +++ b/production/db/core/inc/db_server.hpp @@ -149,10 +149,10 @@ class server_t static inline mapped_data_t s_shared_locators{}; static inline mapped_data_t s_shared_counters{}; - static inline mapped_data_t s_shared_data{}; + static inline s_shared_data{}; static inline mapped_data_t s_shared_id_index{}; static inline index::indexes_t s_global_indexes{}; - static inline std::unique_ptr s_persistent_store{}; + static inline std::shared_ptr s_persistent_store{}; static inline std::unique_ptr s_log_handler{}; // These fields have transaction lifetime. @@ -267,6 +267,8 @@ class server_t // Keep track of the last txn that has been submitted to the async_disk_writer. static inline std::atomic s_last_queued_commit_ts_upper_bound = c_invalid_gaia_txn_id; + static inline std::atomic s_last_checkpointed_commit_ts_lower_bound = c_invalid_gaia_txn_id; + // Keep a track of undecided txns submitted to the async_disk_writer. static inline std::set s_seen_and_undecided_txn_set{}; @@ -443,6 +445,8 @@ class server_t static void recover_persistent_log(); + static void checkpoint_handler(); + static void flush_all_pending_writes(); static void session_handler(int session_socket); diff --git a/production/db/core/inc/log_file.hpp b/production/db/core/inc/log_file.hpp index 3a4b0a674aff..71503dc0986a 100644 --- a/production/db/core/inc/log_file.hpp +++ b/production/db/core/inc/log_file.hpp @@ -29,7 +29,7 @@ namespace persistence class log_file_t { public: - log_file_t(const std::string& dir_name, int dir_fd, file_sequence_t file_seq, size_t file_size); + log_file_t(const std::string& dir_name, int dir_fd, log_sequence_t log_seq, size_t file_size); /** * Obtain offset to write the next log record at. @@ -51,17 +51,21 @@ class log_file_t /** * Obtain sequence number for the file. */ - file_sequence_t get_file_sequence(); + log_sequence_t get_log_sequence(); private: size_t m_file_size; - file_sequence_t m_file_seq; + log_sequence_t m_log_seq; file_offset_t m_current_offset; std::string m_dir_name; int m_dir_fd; int m_file_fd; std::string m_file_name; inline static constexpr int c_file_permissions = 0666; + + // We reserve space for a header at the end of a log file. + // This serves to signify the end of a file. + inline static constexpr size_t c_reserved_size = sizeof(record_header_t); }; } // namespace persistence diff --git a/production/db/core/inc/log_io.hpp b/production/db/core/inc/log_io.hpp index cb2ca870b9f0..3712610a266a 100644 --- a/production/db/core/inc/log_io.hpp +++ b/production/db/core/inc/log_io.hpp @@ -70,6 +70,11 @@ class log_handler_t */ void create_decision_record(const decision_list_t& txn_decisions); + /** + * Log record to signify end of file. + */ + void create_end_of_file_record(); + /** * Submit async_disk_writer's internal I/O request queue to the kernel for processing. */ @@ -87,21 +92,75 @@ class log_handler_t */ void register_commit_ts_for_session_notification(gaia_txn_id_t commit_ts, int session_decision_eventfd); + /** + * Entry point to start recovery procedure from gaia log files. Checkpointing reuses the same function. + */ + void recover_from_persistent_log( + std::shared_ptr& persistent_store_manager, + gaia_txn_id_t& last_checkpointed_commit_ts, + log_sequence_t& last_processed_log_seq, + log_sequence_t max_log_seq_to_process, + log_reader_mode_t mode); + + /** + * Destroy all log files with sequence number lesser than or equal to max_log_seq_to_delete. + */ + void truncate_persistent_log(log_sequence_t max_log_seq_to_delete); + + /** + * Set the log sequence counter. + */ + void set_persistent_log_sequence(log_sequence_t log_seq); + private: // TODO: Make log file size configurable. static constexpr uint64_t c_file_size = 4 * 1024 * 1024; - static constexpr std::string_view c_gaia_wal_dir_name = "/wal_dir"; + static constexpr const char c_gaia_wal_dir_name[] = "wal_dir"; static constexpr int c_gaia_wal_dir_permissions = 0755; - static inline std::string s_wal_dir_path{}; + static inline std::filesystem::path s_wal_dir_path{}; static inline int s_dir_fd = -1; // Log file sequence starts from 1. - static inline std::atomic s_file_num = 1; + static inline std::atomic s_file_num = 1; // Keep track of the current log file. std::unique_ptr m_current_file; std::unique_ptr m_async_disk_writer; + + // Recovery & Checkpointing APIs/structures. + + // This map is populated when log files are read during recovery/checkpointing. + // Map txn commit_ts to location of log record header during recovery. + // This index is maintained on a per log file basis. Before moving to the next file + // we assert that this index is empty as all txns have been processed. + // Note that the recovery implementation proceeds in increasing log file order. + std::map m_txn_records_by_commit_ts; + + // This map is populated when log files are read during recovery/checkpointing. + // This map contains the current set of txns that are being processed. (by either recovery or checkpointing) + // Txns are processed one decision record at a time; a single decision record may contain + // multiple txns. + std::map m_decision_records_by_commit_ts; + + gaia_txn_id_t m_max_decided_commit_ts; + + size_t update_iterator(struct record_iterator_t* it); + void validate_checksum(struct record_iterator_t* it); + void map_log_file(struct record_iterator_t* it, int file_fd, log_reader_mode_t recovery_mode); + void unmap_file(void* start, size_t size); + bool is_remaining_file_empty(unsigned char* start, unsigned char* end); + void write_log_record_to_persistent_store( + std::shared_ptr& persistent_store_manager, + read_record_t* record); + void write_records( + std::shared_ptr& persistent_store_manager, + record_iterator_t* it, + gaia_txn_id_t* last_checkpointed_commit_ts); + bool write_log_file_to_persistent_store( + std::shared_ptr& persistent_store_manager, + record_iterator_t* it, + gaia_txn_id_t* last_checkpointed_commit_ts); }; } // namespace persistence diff --git a/production/db/core/inc/persistence_types.hpp b/production/db/core/inc/persistence_types.hpp index d1de2638e8dd..331e94bc400d 100644 --- a/production/db/core/inc/persistence_types.hpp +++ b/production/db/core/inc/persistence_types.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include "gaia/common.hpp" @@ -23,11 +24,30 @@ namespace db namespace persistence { +enum class log_reader_mode_t : uint8_t +{ + not_set = 0x0, + + // Checkpoint mode. + // Does not tolerate any IO failure when reading a log file; any + // IO error is treated as unrecoverable. + checkpoint_fail_on_first_error = 0x1, + + // Recovery mode. + // Stop recovery on first IO error. Database will always start and will try to recover as much + // committed data from the log as possible. + // Updates are logged one batch as a time; Persistent batch IO is validated + // first before marking any txn in the batch as durable (and returning a commit decision to the user); + // Thus ignore any txn after the last seen decision timestamp before encountering IO error. + recovery_stop_on_first_error = 0x2, +}; + enum class record_type_t : uint8_t { not_set = 0x0, txn = 0x1, decision = 0x2, + end_of_file = 0x3, }; enum class decision_type_t : uint8_t @@ -46,26 +66,27 @@ struct decision_entry_t // Pair of log file sequence number and file fd. typedef std::vector decision_list_t; -class file_sequence_t : public common::int_type_t +// Persistent log file sequence number. +class log_sequence_t : public common::int_type_t { public: // By default, we should initialize to an invalid value. - constexpr file_sequence_t() + constexpr log_sequence_t() : common::int_type_t() { } - constexpr file_sequence_t(size_t value) + constexpr log_sequence_t(size_t value) : common::int_type_t(value) { } }; static_assert( - sizeof(file_sequence_t) == sizeof(file_sequence_t::value_type), - "file_sequence_t has a different size than its underlying integer type!"); + sizeof(log_sequence_t) == sizeof(log_sequence_t::value_type), + "log_sequence_t has a different size than its underlying integer type!"); -constexpr file_sequence_t c_invalid_file_sequence_number; +constexpr log_sequence_t c_invalid_log_sequence_number; typedef size_t file_offset_t; @@ -81,19 +102,25 @@ typedef uint32_t crc32_t; // This assertion ensures that the default type initialization // matches the value of the invalid constant. static_assert( - c_invalid_file_sequence_number.value() == file_sequence_t::c_default_invalid_value, - "Invalid c_invalid_file_sequence_number initialization!"); + c_invalid_log_sequence_number.value() == log_sequence_t::c_default_invalid_value, + "Invalid c_invalid_log_sequence_number initialization!"); struct log_file_info_t { - file_sequence_t sequence; + log_sequence_t sequence; int file_fd; }; +struct log_file_pointer_t +{ + void* begin; + size_t size; +}; + struct record_header_t { + record_size_t record_size; crc32_t crc; - record_size_t payload_size; record_type_t record_type; gaia_txn_id_t txn_commit_ts; @@ -109,8 +136,86 @@ struct record_header_t char padding[3]; }; -// The primary motivation of this buffer is to keep a hold of any additional information we want to write to the log -// apart from the shared memory objects. +constexpr size_t c_invalid_read_record_size = 0; + +struct read_record_t +{ + struct record_header_t header; + unsigned char payload[]; + + // Record size includes the header size and the payload size. + size_t get_record_size() + { + return header.record_size; + } + + unsigned char* get_record() + { + return reinterpret_cast(this); + } + + unsigned char* get_deleted_ids() + { + ASSERT_PRECONDITION(header.record_type == record_header_t::record_type::txn, "Incorrect record type!"); + return reinterpret_cast(payload); + } + + unsigned char* get_objects() + { + ASSERT_PRECONDITION(header.record_type == record_header_t::record_type::txn, "Incorrect record type!"); + return get_deleted_ids() + header.deleted_object_count * sizeof(gaia_id_t); + } + + decision_entry_t* get_decisions() + { + ASSERT_PRECONDITION(header.record_type == record_header_t::record_type::decision, "Incorrect record type!"); + return reinterpret_cast(payload); + } + + unsigned char* get_payload_end() + { + return get_record() + get_record_size(); + } + + static read_record_t* get_record(void* ptr) + { + ASSERT_PRECONDITION(ptr, "Invalid address!"); + return reinterpret_cast(ptr); + } + + bool is_valid() + { + return header.record_size != c_invalid_read_record_size && (header.record_type == record_type_t::txn || header.record_type == record_type_t::decision || header.record_type == record_type_t::end_of_file); + } +}; + +struct record_iterator_t +{ + // Pointer to the current record in a log file. + unsigned char* iterator; + + // Beginning of the log file. + unsigned char* begin; + + // End of log file. + unsigned char* end; + + // End of log file. May not be the same as end. + unsigned char* stop_at; + + // Value returned from the mmap() call on a persistent log file. + void* mapped_data; + size_t map_size; + int file_fd; + + // Recovery mode. + log_reader_mode_t recovery_mode; + + //This flag is set when halt recovery is halted. + bool halt_recovery; +}; + +// This buffer is used to stage non-object data to be written to the log. // Custom information includes // 1) deleted object IDs in a txn. // 2) custom txn headers diff --git a/production/db/core/inc/persistent_store_manager.hpp b/production/db/core/inc/persistent_store_manager.hpp index a5cf355fb057..ae7f996dfdf8 100644 --- a/production/db/core/inc/persistent_store_manager.hpp +++ b/production/db/core/inc/persistent_store_manager.hpp @@ -5,6 +5,7 @@ #pragma once +#include #include #include "gaia/common.hpp" @@ -24,14 +25,16 @@ namespace gaia { namespace db { +namespace persistence +{ -class persistent_store_manager +class persistent_store_manager_t { public: - persistent_store_manager( - gaia::db::counters_t* counters, std::string data_dir); - ~persistent_store_manager(); + persistent_store_manager_t( + gaia::db::counters_t* counters, std::filesystem::path data_dir); + ~persistent_store_manager_t(); /** * Open rocksdb with the correct options. @@ -87,11 +90,61 @@ class persistent_store_manager void reset_log(); + /** + * This API is only used during checkpointing & recovery. + */ + void put( + common::gaia_id_t id, + common::gaia_type_t type, + gaia::common::reference_offset_t num_references, + const gaia::common::gaia_id_t* references, + uint16_t payload_size, + uint16_t data_size, + const char* data); + + /** + * This API is only used during checkpointing & recovery. + */ + void remove(gaia::common::gaia_id_t id_to_remove); + + /** + * Flush rocksdb memory buffer to disk as an SST file. + * The rocksdb memtable is used to hold writes before writing them to an SST file. + * The alternative is the RocksDB SSTFileWriter. Both options require reserving memory to stage writes in + * the SST format before writing to disk. + * Additionally, in RocksDB, each column family has an individual set of + * SST files and the memtable. Thus, writing to multiple column families will require maintaining multiple SSTFileWriter objects. + * All of this is handled by the memtable already. + * Note that the memtable is only written to during log checkpointing. + */ + void flush(); + + /** + * Get value of a custom key. Used to retain a gaia counter across restarts. + */ + uint64_t get_counter_value(const std::string& key); + + /** + * Update a custom key's value. Used to retain gaia counter across restarts. + */ + void set_counter_value(const std::string& key, uint64_t value_to_write); + + static constexpr char c_data_dir_command_flag[] = "--data-dir"; + static inline const std::filesystem::path c_persistent_store_dir_name{"data"}; + + // Keep track of the last read log file to avoid reading it repeatedly during checkpointing. + static inline const std::string c_last_processed_log_seq_key = "gaia_last_processed_log_seq_key"; + + // Default value for any counters (example: log file sequence number) stored in the persistent store. + // If a key for the counter is not found, the default value is returned. + static inline const uint64_t c_default_counter_value = 0; + private: gaia::db::counters_t* m_counters = nullptr; - std::unique_ptr m_rdb_wrapper; - std::string m_data_dir_path; + std::unique_ptr m_rdb_wrapper; + std::filesystem::path m_data_dir_path; }; +} // namespace persistence } // namespace db } // namespace gaia diff --git a/production/db/core/inc/rdb_object_converter.hpp b/production/db/core/inc/rdb_object_converter.hpp index 3b50ed825f45..72fd2541f2dd 100644 --- a/production/db/core/inc/rdb_object_converter.hpp +++ b/production/db/core/inc/rdb_object_converter.hpp @@ -45,22 +45,8 @@ class string_writer_t m_buffer.reserve(len); } - inline void write_uint64(uint64_t value) - { - write(reinterpret_cast(&value), sizeof(value)); - } - - inline void write_uint32(uint32_t value) - { - write(reinterpret_cast(&value), sizeof(value)); - } - - inline void write_uint16(uint16_t value) - { - write(reinterpret_cast(&value), sizeof(value)); - } - - inline void write_uint8(uint8_t value) + template + inline void write(T value) { write(reinterpret_cast(&value), sizeof(value)); } @@ -110,6 +96,11 @@ class string_reader_t { } + string_reader_t(const char* data, size_t size) + : m_starting_byte(data), m_current_byte(data), m_size(size) + { + } + inline void read_uint64(uint64_t& out) { const char* value_ptr = read(sizeof(uint64_t)); @@ -170,9 +161,15 @@ class string_reader_t }; void encode_object( - const db_object_t* gaia_object, - string_writer_t& key, - string_writer_t& value); + common::gaia_id_t id, + common::gaia_type_t type, + gaia::common::reference_offset_t num_references, + const gaia::common::gaia_id_t* references, + uint16_t payload_size, + uint16_t data_size, + const char* data, + gaia::db::persistence::string_writer_t& key, + gaia::db::persistence::string_writer_t& value); db_object_t* decode_object( const rocksdb::Slice& key, diff --git a/production/db/core/inc/rdb_wrapper.hpp b/production/db/core/inc/rdb_wrapper.hpp index 75097fe52ecd..cc6cfc3efa87 100644 --- a/production/db/core/inc/rdb_wrapper.hpp +++ b/production/db/core/inc/rdb_wrapper.hpp @@ -5,6 +5,7 @@ #pragma once +#include #include #include @@ -16,12 +17,18 @@ namespace gaia { namespace db { +namespace persistence +{ class rdb_wrapper_t { public: - rdb_wrapper_t(std::string dir, const rocksdb::WriteOptions& write_opts, rocksdb::TransactionDBOptions txn_opts) - : m_txn_db(nullptr), m_data_dir(std::move(dir)), m_write_options(write_opts), m_txn_options(std::move(txn_opts)) + rdb_wrapper_t( + const std::filesystem::path dir, + const rocksdb::WriteOptions& write_opts, + const rocksdb::ReadOptions& read_opts, + rocksdb::TransactionDBOptions txn_opts) + : m_txn_db(nullptr), m_data_dir(std::move(dir)), m_write_options(write_opts), m_read_options(read_opts), m_txn_options(std::move(txn_opts)) { } @@ -52,12 +59,22 @@ class rdb_wrapper_t void handle_rdb_error(rocksdb::Status status); + void flush(); + + void put(const rocksdb::Slice& key, const rocksdb::Slice& value); + + void remove(const rocksdb::Slice& key); + + void get(const rocksdb::Slice& key, std::string& value); + private: std::unique_ptr m_txn_db; - std::string m_data_dir; - rocksdb::WriteOptions m_write_options; - rocksdb::TransactionDBOptions m_txn_options; + std::filesystem::path m_data_dir; + rocksdb::WriteOptions m_write_options{}; + rocksdb::ReadOptions m_read_options{}; + rocksdb::TransactionDBOptions m_txn_options{}; }; +} // namespace persistence } // namespace db } // namespace gaia diff --git a/production/db/core/src/async_disk_writer.cpp b/production/db/core/src/async_disk_writer.cpp index 6299544ad898..10772fa57231 100644 --- a/production/db/core/src/async_disk_writer.cpp +++ b/production/db/core/src/async_disk_writer.cpp @@ -107,15 +107,15 @@ void async_disk_writer_t::perform_post_completion_maintenance() m_in_flight_batch->validate_next_completion_event(); } - auto max_file_seq_to_close = m_in_flight_batch->get_max_file_seq_to_close(); + auto max_log_seq_to_close = m_in_flight_batch->get_max_log_seq_to_close(); // Post validation, close all files in batch. m_in_flight_batch->close_all_files_in_batch(); // Signal to checkpointing thread the upper bound of files that it can process. - if (max_file_seq_to_close > 0) + if (max_log_seq_to_close > 0) { - signal_eventfd(m_signal_checkpoint_eventfd, max_file_seq_to_close); + signal_eventfd(m_signal_checkpoint_eventfd, max_log_seq_to_close); } const decision_list_t& decisions = m_in_flight_batch->get_decision_batch_entries(); @@ -196,7 +196,7 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_ ASSERT_INVARIANT(m_in_flight_batch->get_unsubmitted_entries_count() == 0, "batch size after submission should be 0."); } -void async_disk_writer_t::perform_file_close_operations(int file_fd, file_sequence_t log_seq) +void async_disk_writer_t::perform_file_close_operations(int file_fd, log_sequence_t log_seq) { submit_if_full(file_fd, c_single_submission_entry_count); diff --git a/production/db/core/src/async_write_batch.cpp b/production/db/core/src/async_write_batch.cpp index 46b3205c0fd8..ed2e38361868 100644 --- a/production/db/core/src/async_write_batch.cpp +++ b/production/db/core/src/async_write_batch.cpp @@ -104,20 +104,20 @@ void async_write_batch_t::close_all_files_in_batch() m_files_to_close.clear(); } -void async_write_batch_t::append_file_to_batch(int fd, file_sequence_t log_seq) +void async_write_batch_t::append_file_to_batch(int fd, log_sequence_t log_seq) { // Add file info to the batch that should be closed once all of its pending writes have finished. log_file_info_t info{log_seq, fd}; m_files_to_close.push_back(info); } -file_sequence_t async_write_batch_t::get_max_file_seq_to_close() +log_sequence_t async_write_batch_t::get_max_log_seq_to_close() { if (m_files_to_close.size() > 0) { return m_files_to_close.back().sequence; } - return c_invalid_file_sequence_number; + return c_invalid_log_sequence_number; } size_t async_write_batch_t::get_unsubmitted_entries_count() diff --git a/production/db/core/src/db_server.cpp b/production/db/core/src/db_server.cpp index 2866278bd323..0757c307d4f8 100644 --- a/production/db/core/src/db_server.cpp +++ b/production/db/core/src/db_server.cpp @@ -53,6 +53,7 @@ using namespace gaia::db::messages; using namespace gaia::db::memory_manager; using namespace gaia::db::storage; using namespace gaia::db::transactions; +using namespace gaia::db::persistence; using namespace gaia::common; using namespace gaia::common::iterators; using namespace gaia::common::scope_guard; @@ -259,6 +260,10 @@ void server_t::handle_rollback_txn( void server_t::recover_persistent_log() { + // The APIs to delete logs and recover persistent logs require a max log file sequence number to process. + // Passing this argument ensure that all log files are either deleted or recovered. + constexpr log_sequence_t c_max_log_sequence_number(std::numeric_limits::max()); + if (c_use_gaia_log_implementation) { // If persistence is disabled, then this is a no-op. @@ -270,6 +275,32 @@ void server_t::recover_persistent_log() s_log_handler = std::make_unique(s_server_conf.data_dir()); s_log_handler->open_for_writes(s_validate_persistence_batch_eventfd, s_signal_checkpoint_log_eventfd); + + if (s_server_conf.persistence_mode() == persistence_mode_t::e_reinitialized_on_startup) + { + s_log_handler->truncate_persistent_log(c_max_log_sequence_number); + } + + // Get last processed log. + auto last_processed_log_seq = s_persistent_store->get_counter_value(persistent_store_manager_t::c_last_processed_log_seq_key); + + // Recover only the first time this method gets called. + gaia_txn_id_t last_checkpointed_commit_ts = 0; + s_log_handler->recover_from_persistent_log( + s_persistent_store, + last_checkpointed_commit_ts, + last_processed_log_seq, + c_max_log_sequence_number, + log_reader_mode_t::recovery_stop_on_first_error); + + s_persistent_store->set_counter_value(persistent_store_manager_t::c_last_processed_log_seq_key, last_processed_log_seq); + + s_log_handler->set_persistent_log_sequence(last_processed_log_seq); + + s_log_handler->truncate_persistent_log(c_max_log_sequence_number); + + std::cout << "s_validate_persistence_batch_eventfd = " << s_validate_persistence_batch_eventfd << std::endl; + s_log_handler->open_for_writes(s_validate_persistence_batch_eventfd, s_signal_checkpoint_log_eventfd); } } @@ -899,14 +930,18 @@ void server_t::recover_db() auto cleanup = make_scope_guard([]() { end_startup_txn(); }); begin_startup_txn(); - s_persistent_store = std::make_unique( - get_counters(), s_server_conf.data_dir()); + s_persistent_store = std::make_unique(get_counters(), s_server_conf.data_dir()); if (s_server_conf.persistence_mode() == persistence_mode_t::e_reinitialized_on_startup) { s_persistent_store->destroy_persistent_store(); } s_persistent_store->open(); + recover_persistent_log(); + + // Flush persistent store buffer to disk. + s_persistent_store->flush(); + s_persistent_store->recover(); } } @@ -1043,6 +1078,38 @@ void server_t::log_writer_handler() } } +void server_t::checkpoint_handler() +{ + log_sequence_t last_deleted_log_seq = 0; + while (true) + { + // Log sequence number of file ready to be checkpointed. + log_sequence_t max_log_seq_to_checkpoint; + + // Wait for a persistent log file to be closed before checkpointing it. + // This can be achieved via blocking on an eventfd read. + eventfd_read(s_signal_checkpoint_log_eventfd, &max_log_seq_to_checkpoint); + + // Process all existing log files. + log_sequence_t last_processed_log_seq = 0; + s_log_handler->recover_from_persistent_log( + s_persistent_store, + s_last_checkpointed_commit_ts_lower_bound, + last_processed_log_seq, + max_log_seq_to_checkpoint, + log_reader_mode_t::checkpoint_fail_on_first_error); + + s_persistent_store->set_counter_value(persistent_store_manager_t::c_last_processed_log_seq_key, last_processed_log_seq); + + // Flush persistent store buffer to disk. + s_persistent_store->flush(); + + ASSERT_INVARIANT(max_log_seq_to_checkpoint > last_deleted_log_seq, "Log files cannot be deleted out of order"); + s_log_handler->truncate_persistent_log(last_processed_log_seq); + last_deleted_log_seq = max_log_seq_to_checkpoint; + } +} + gaia_txn_id_t server_t::begin_startup_txn() { // Reserve an index in the safe_ts array, so the main thread can execute @@ -3521,10 +3588,13 @@ void server_t::run(server_config_t server_conf) s_signal_decision_eventfd = make_nonblocking_eventfd(); + s_signal_checkpoint_log_eventfd = make_blocking_eventfd(); + auto cleanup_persistence_eventfds = make_scope_guard([]() { close_fd(s_signal_log_write_eventfd); close_fd(s_signal_decision_eventfd); close_fd(s_validate_persistence_batch_eventfd); + close_fd(s_signal_checkpoint_log_eventfd); }); // Block handled signals in this thread and subsequently spawned threads. @@ -3537,10 +3607,12 @@ void server_t::run(server_config_t server_conf) init_shared_memory(); std::thread log_writer_thread; + std::thread checkpoint_thread; if (s_server_conf.persistence_mode() != persistence_mode_t::e_disabled) { // Launch persistence thread. log_writer_thread = std::thread(&log_writer_handler); + checkpoint_thread = std::thread(&checkpoint_handler); } // Launch thread to listen for client connections and create session threads. @@ -3559,6 +3631,7 @@ void server_t::run(server_config_t server_conf) if (s_server_conf.persistence_mode() != persistence_mode_t::e_disabled) { log_writer_thread.join(); + checkpoint_thread.join(); } // We special-case SIGHUP to force reinitialization of the server. diff --git a/production/db/core/src/exceptions.cpp b/production/db/core/src/exceptions.cpp index 9c2897832ee2..35a208e22220 100644 --- a/production/db/core/src/exceptions.cpp +++ b/production/db/core/src/exceptions.cpp @@ -180,6 +180,16 @@ memory_allocation_error_internal::memory_allocation_error_internal(const std::st m_message = message; } +namespace persistence +{ + +checksum_error_internal::checksum_error_internal(const std::string& message) +{ + m_message = message; +} + +} // namespace persistence + namespace index { diff --git a/production/db/core/src/log_file.cpp b/production/db/core/src/log_file.cpp index a0cf818ace7c..b7e5831cd5fe 100644 --- a/production/db/core/src/log_file.cpp +++ b/production/db/core/src/log_file.cpp @@ -25,17 +25,17 @@ namespace persistence // TODO (Mihir): Use io_uring for fsync, close & fallocate operations in this file. // open() operation will remain synchronous, since we need the file fd to perform other async // operations on the file. -log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_seq, size_t size) +log_file_t::log_file_t(const std::string& dir, int dir_fd, log_sequence_t log_seq, size_t size) { m_dir_fd = dir_fd; m_dir_name = dir; - m_file_seq = file_seq; - m_file_size = size; + m_log_seq = log_seq; + m_file_size = size - c_reserved_size; m_current_offset = 0; // open and fallocate depending on size. std::stringstream file_name; - file_name << m_dir_name << "/" << m_file_seq; + file_name << m_dir_name << "/" << m_log_seq; m_file_fd = openat(dir_fd, file_name.str().c_str(), O_WRONLY | O_CREAT, c_file_permissions); if (m_file_fd < 0) { @@ -81,9 +81,9 @@ void log_file_t::allocate(size_t size) m_current_offset += size; } -file_sequence_t log_file_t::get_file_sequence() +log_sequence_t log_file_t::get_log_sequence() { - return m_file_seq; + return m_log_seq; } size_t log_file_t::get_bytes_remaining_after_append(size_t record_size) diff --git a/production/db/core/src/log_io.cpp b/production/db/core/src/log_io.cpp index 0858bd48ca4e..f0ba36baf050 100644 --- a/production/db/core/src/log_io.cpp +++ b/production/db/core/src/log_io.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -32,6 +33,8 @@ #include "mapped_data.hpp" #include "memory_helpers.hpp" #include "memory_types.hpp" +#include "persistent_store_manager.hpp" +#include "rdb_object_converter.hpp" #include "txn_metadata.hpp" using namespace gaia::common; @@ -64,7 +67,8 @@ log_handler_t::log_handler_t(const std::string& wal_dir_path) void log_handler_t::open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd) { - ASSERT_PRECONDITION(validate_flushed_batch_eventfd != -1, "Eventfd to signal post flush maintenance operations invalid!"); + ASSERT_PRECONDITION(validate_flushed_batch_eventfd != -1, "Invalid eventfd!"); + ASSERT_PRECONDITION(signal_checkpoint_eventfd != -1, "Invalid eventfd!"); ASSERT_INVARIANT(s_dir_fd != -1, "Unable to open data directory for persistent log writes."); // Create new log file every time the log_writer gets initialized. @@ -89,6 +93,32 @@ uint32_t calculate_crc32(uint32_t init_crc, const void* data, size_t n) return rocksdb::crc32c::Extend(init_crc, static_cast(data), n); } +void log_handler_t::create_end_of_file_record() +{ + // Create log record header. + record_header_t header; + header.crc = c_crc_initial_value; + + // No need to allocate space in the log file as it is already reserved during log file + // construction. + header.record_size = sizeof(record_header_t); + header.decision_count = 0; + header.txn_commit_ts = c_invalid_gaia_txn_id; + header.record_type = record_type_t::end_of_file; + + // Compute crc. + crc32_t txn_crc = 0; + txn_crc = calculate_crc32(txn_crc, &header, sizeof(record_header_t)); + header.crc = txn_crc; + + // Copy information which needs to survive for the entire batch lifetime into the metadata buffer. + auto header_ptr = m_async_disk_writer->copy_into_metadata_buffer(&header, sizeof(record_header_t), m_current_file->get_file_fd()); + std::vector writes_to_submit = {{header_ptr, sizeof(record_header_t)}}; + + // Append record to async_write_batch. + m_async_disk_writer->enqueue_pwritev_requests(writes_to_submit, m_current_file->get_file_fd(), m_current_file->get_current_offset(), uring_op_t::pwritev_decision); +} + file_offset_t log_handler_t::allocate_log_space(size_t payload_size) { // For simplicity, we don't break up transaction records across log files. Txn updates @@ -103,7 +133,11 @@ file_offset_t log_handler_t::allocate_log_space(size_t payload_size) } else if (m_current_file->get_bytes_remaining_after_append(payload_size) <= 0) { - m_async_disk_writer->perform_file_close_operations(m_current_file->get_file_fd(), m_current_file->get_file_sequence()); + // No need to allocate space in the log file as it is already reserved during log file + // construction. + create_end_of_file_record(); + + m_async_disk_writer->perform_file_close_operations(m_current_file->get_file_fd(), m_current_file->get_log_sequence()); // One batch writes to a single log file at a time. m_async_disk_writer->submit_and_swap_in_progress_batch(m_current_file->get_file_fd()); @@ -139,7 +173,7 @@ void log_handler_t::create_decision_record(const decision_list_t& txn_decisions) // Create log record header. record_header_t header; header.crc = c_crc_initial_value; - header.payload_size = total_log_space_needed; + header.record_size = total_log_space_needed; header.decision_count = txn_decisions.size(); header.txn_commit_ts = c_invalid_gaia_txn_id; header.record_type = record_type_t::decision; @@ -259,8 +293,18 @@ void log_handler_t::create_txn_record( struct iovec header_entry = {nullptr, 0}; writes_to_submit.push_back(header_entry); - // Create iovec entries. - size_t payload_size = 0; + // Augment payload size with the size of deleted ids. + auto deleted_size = deleted_ids.size() * sizeof(gaia_id_t); + size_t payload_size = deleted_size; + + // Allocate space for deleted writes in the metadata buffer. + if (!deleted_ids.empty()) + { + auto deleted_id_ptr = m_async_disk_writer->copy_into_metadata_buffer(deleted_ids.data(), deleted_size, m_current_file->get_file_fd()); + writes_to_submit.push_back({deleted_id_ptr, deleted_size}); + } + + // Create iovec entries for txn objects. for (size_t i = 0; i < contiguous_address_offsets.size(); i += 2) { auto offset = contiguous_address_offsets.at(i); @@ -270,10 +314,6 @@ void log_handler_t::create_txn_record( writes_to_submit.push_back({ptr, chunk_size}); } - // Augment payload size with the size of deleted ids. - auto deleted_size = deleted_ids.size() * sizeof(gaia_id_t); - payload_size += deleted_size; - // Calculate total log space needed. auto total_log_space_needed = payload_size + sizeof(record_header_t); @@ -283,7 +323,7 @@ void log_handler_t::create_txn_record( // Create header. record_header_t header; header.crc = c_crc_initial_value; - header.payload_size = total_log_space_needed; + header.record_size = total_log_space_needed; header.deleted_object_count = deleted_ids.size(); header.txn_commit_ts = commit_ts; header.record_type = type; @@ -291,15 +331,15 @@ void log_handler_t::create_txn_record( // Calculate CRC. auto txn_crc = calculate_crc32(0, &header, sizeof(record_header_t)); - // Start from 1 to skip CRC calculation for the first entry. - for (size_t i = 1; i < writes_to_submit.size(); i++) + // Augment CRC calculation with set of deleted object IDs. + txn_crc = calculate_crc32(txn_crc, deleted_ids.data(), deleted_size); + + // Skip CRC calculation for the first entry and for all deleted objects. + for (size_t i = 1 + deleted_size; i < writes_to_submit.size(); i++) { txn_crc = calculate_crc32(txn_crc, writes_to_submit.at(i).iov_base, writes_to_submit.at(i).iov_len); } - // Augment CRC calculation with set of deleted object IDs. - txn_crc = calculate_crc32(txn_crc, deleted_ids.data(), deleted_size); - // Update CRC in header before sending it to the async_disk_writer. ASSERT_INVARIANT(txn_crc > 0, "Computed CRC cannot be zero."); header.crc = txn_crc; @@ -311,15 +351,418 @@ void log_handler_t::create_txn_record( writes_to_submit.at(0).iov_base = header_ptr; writes_to_submit.at(0).iov_len = sizeof(record_header_t); - // Allocate space for deleted writes in the metadata buffer. - if (!deleted_ids.empty()) + // Finally send I/O requests to the async_disk_writer. + m_async_disk_writer->enqueue_pwritev_requests(writes_to_submit, m_current_file->get_file_fd(), begin_offset, uring_op_t::pwritev_txn); +} + +void log_handler_t::truncate_persistent_log(log_sequence_t max_log_seq_to_delete) +{ + // Done with recovery/checkpointing, delete all files with sequence number <= + // max_log_seq_to_delete + for (const auto& file : std::filesystem::directory_iterator(s_wal_dir_path)) { - auto deleted_id_ptr = m_async_disk_writer->copy_into_metadata_buffer(deleted_ids.data(), deleted_size, m_current_file->get_file_fd()); - writes_to_submit.push_back({deleted_id_ptr, deleted_size}); + auto filename_str = file.path().filename().string(); + + // Unable to use log_sequence_t here since the from_chars() method only handles int types. + size_t log_seq = c_invalid_log_sequence_number; + const auto result = std::from_chars(filename_str.c_str(), filename_str.c_str() + filename_str.size(), log_seq); + if (result.ec != std::errc{}) + { + throw_system_error("Failed to convert log file name to integer!"); + } + + if (log_seq <= max_log_seq_to_delete) + { + // Throws filesystem::filesystem_error on underlying OS API errors. + // TODO: Revisit whether failure of the filesystem should cause the server to crash. + std::filesystem::remove(file.path()); + } } +} - // Finally send I/O requests to the async_disk_writer. - m_async_disk_writer->enqueue_pwritev_requests(writes_to_submit, m_current_file->get_file_fd(), begin_offset, uring_op_t::pwritev_txn); +void log_handler_t::set_persistent_log_sequence(log_sequence_t log_seq) +{ + // Extremely unlikely but add a check anyway. + ASSERT_PRECONDITION(log_seq != std::numeric_limits::max(), "Log file sequence number is exhausted!"); + s_file_num = log_seq + 1; +} + +void log_handler_t::recover_from_persistent_log( + std::shared_ptr& s_persistent_store, + gaia_txn_id_t& last_checkpointed_commit_ts, + log_sequence_t& last_processed_log_seq, + log_sequence_t max_log_seq_to_process, + log_reader_mode_t mode) +{ + // Only relevant for checkpointing. Recovery doesn't care about the + // 'last_checkpointed_commit_ts' and will reset this field to zero. + // We don't persist txn ids across restarts. + m_max_decided_commit_ts = last_checkpointed_commit_ts; + + // Scan all files and read log records starting from the highest numbered file. + std::vector log_file_sequences; + for (const auto& file : std::filesystem::directory_iterator(s_wal_dir_path)) + { + ASSERT_INVARIANT(file.is_regular_file(), "Only expecting files in persistent log directory."); + + auto filename_str = file.path().filename().string(); + + // The file name is just the log sequence number. + size_t log_seq = c_invalid_log_sequence_number; + const auto result = std::from_chars(filename_str.c_str(), filename_str.c_str() + filename_str.size(), log_seq); + if (result.ec != std::errc{}) + { + throw_system_error("Failed to convert log file name to integer!"); + } + log_file_sequences.push_back(log_seq); + } + + // Sort files in ascending order by file name. + std::sort(log_file_sequences.begin(), log_file_sequences.end()); + + // Apply txns from file. + log_sequence_t max_log_seq_to_delete = 0; + std::vector file_fds_to_close; + std::vector file_pointers_to_unmap; + + auto file_cleanup = scope_guard::make_scope_guard([&]() { + for (auto fd : file_fds_to_close) + { + close_fd(fd); + } + + for (log_file_pointer_t entry : file_pointers_to_unmap) + { + unmap_fd_data(entry.begin, entry.size); + } + }); + + for (log_sequence_t log_seq : log_file_sequences) + { + if (log_seq > max_log_seq_to_process) + { + break; + } + + // Ignore already processed files. + if (log_seq <= last_processed_log_seq) + { + continue; + } + + // Open file. + std::filesystem::path file_name = s_wal_dir_path / std::string(log_seq.value); + auto file_fd = open(file_name.c_str(), O_RDONLY); + + if (file_fd == -1) + { + throw_system_error("Unable to open persistent log file."); + } + + // Try to close fd as soon as possible. + auto file_cleanup = scope_guard::make_scope_guard([&]() { + close_fd(file_fd); + }); + + // mmap file. + record_iterator_t it; + map_log_file(&it, file_fd, mode); + + // Try to unmap file as soon as possible. + auto mmap_cleanup = scope_guard::make_scope_guard([&]() { + unmap_fd_data(it.begin, it.map_size); + }); + + // halt_recovery is set to true if an IO error is encountered while reading the log. + bool halt_recovery = write_log_file_to_persistent_store(s_persistent_store, &it, &last_checkpointed_commit_ts); + + // Skip unmapping and closing the file if it has some unprocessed transactions. + if (m_txn_records_by_commit_ts.size() > 0) + { + file_fds_to_close.push_back(file_fd); + file_cleanup.dismiss(); + + file_pointers_to_unmap.push_back({it.mapped_data, it.map_size}); + mmap_cleanup.dismiss(); + } + else if (mode == log_reader_mode_t::checkpoint_fail_on_first_error) + { + if (m_txn_records_by_commit_ts.size() == 0) + { + // Safe to delete this file as it doesn't have any more txns to write to the persistent store. + max_log_seq_to_delete = log_seq; + ASSERT_INVARIANT(m_decision_records_by_commit_ts.size() == 0, "Failed to process all persistent log records."); + } + } + + if (halt_recovery) + { + // TODO: THROW an error based on recovery mode. + + // TODO: Verify whether it is possible to see IO errors in intermediate log files. + // ASSERT_INVARIANT(log_seq == log_file_sequences.back(), "We don't expect IO errors in intermediate log files."); + break; + } + } + + if (log_file_sequences.size() > 0) + { + // Set last_processed_log_seq as max_log_seq_to_delete in Checkpoint mode & to the last + // observed log sequence in the wal directory during recovery. + last_processed_log_seq = (mode == log_reader_mode_t::checkpoint_fail_on_first_error) ? max_log_seq_to_delete : log_file_sequences.back(); + } + + ASSERT_POSTCONDITION(m_decision_records_by_commit_ts.size() == 0, "Failed to process all persistent log records."); +} + +bool log_handler_t::write_log_file_to_persistent_store( + std::shared_ptr& persistent_store_manager, + record_iterator_t* it, + gaia_txn_id_t* last_checkpointed_commit_ts) +{ + // Iterate over records in file and write them to persistent store. + write_records(persistent_store_manager, it, last_checkpointed_commit_ts); + + // Check that any remaining transactions have commit timestamp greater than the commit ts of the + // txn that was last written to the persistent store. + for (auto entry : m_txn_records_by_commit_ts) + { + ASSERT_INVARIANT(entry.first > *last_checkpointed_commit_ts, "Expected txn to be checkpointed!"); + } + + // The log file should not have any decision records that are yet to be processed. + ASSERT_POSTCONDITION(!it->halt_recovery && m_decision_records_by_commit_ts.size() == 0, "Failed to process all persistent log records in log file."); + + return it->halt_recovery; +} + +void log_handler_t::write_log_record_to_persistent_store( + std::shared_ptr& persistent_store_manager, + read_record_t* record) +{ + ASSERT_PRECONDITION(record->header.record_type == record_type_t::txn, "Expected transaction record."); + + auto obj_ptr = record->get_objects(); + auto end_ptr = record->get_payload_end(); + + while (obj_ptr < end_ptr) + { + // We use string_reader_t here to read the object instead of directly using + // reinterpret_cast since Gaia objects are packed into the log and don't + // respect the alignment requirement for db_object_t. This is done for the sake of + // encoding simplicity and to avoid bloating the size of the log. (Although the + // latter isn't a very strong argument) + // The object metadata occupies 16 bytes: 8 (id) + 4 (type) + 2 (payload_size) + // + 2 (num_references) = 16. + auto obj_ptr_copy = obj_ptr; + ASSERT_INVARIANT(obj_ptr_copy, "Object cannot be null."); + + gaia_id_t id = *reinterpret_cast(obj_ptr_copy); + ASSERT_INVARIANT(id != common::c_invalid_gaia_id, "Recovered id cannot be invalid."); + obj_ptr_copy += sizeof(gaia_id_t); + + gaia_type_t type = *reinterpret_cast(obj_ptr_copy); + ASSERT_INVARIANT(type != common::c_invalid_gaia_type, "Recovered type cannot be invalid."); + obj_ptr_copy += sizeof(gaia_type_t); + + uint16_t payload_size = *reinterpret_cast(obj_ptr_copy); + ASSERT_INVARIANT(payload_size > 0, "Recovered object size should be greater than 0"); + obj_ptr_copy += sizeof(uint16_t); + + reference_offset_t num_references = *reinterpret_cast(obj_ptr_copy); + obj_ptr_copy += sizeof(reference_offset_t); + + auto references = reinterpret_cast(obj_ptr_copy); + auto data = reinterpret_cast(obj_ptr_copy) + num_references * sizeof(gaia::common::gaia_id_t); + + persistent_store_manager->put( + id, + type, + num_references, + references, + payload_size, + payload_size - (num_references * sizeof(gaia::common::gaia_id_t)), + data); + + size_t requested_size = payload_size + c_db_object_header_size; + + size_t allocation_size = memory_manager::calculate_allocation_size_in_slots(requested_size) * c_slot_size_in_bytes; + + obj_ptr += allocation_size; + } + + // Propagate deletes at the very end. + auto first_id_ptr = reinterpret_cast(record->get_deleted_ids()); + for (gaia_id_t* id_ptr = first_id_ptr; id_ptr < first_id_ptr + record->header.deleted_object_count; ++id_ptr) + { + ASSERT_INVARIANT(id_ptr, "Deleted ID cannot be null."); + ASSERT_INVARIANT(*id_ptr != c_invalid_gaia_id, "Deleted ID cannot be invalid."); + persistent_store_manager->remove(*id_ptr); + } +} + +void log_handler_t::write_records( + std::shared_ptr& persistent_store_manager, + record_iterator_t* it, + gaia_txn_id_t* last_checkpointed_commit_ts) +{ + size_t record_size = c_invalid_read_record_size; + + while (true) + { + auto current_record_ptr = it->iterator; + record_size = update_iterator(it); + + if (record_size == c_invalid_read_record_size) + { + if (it->halt_recovery || (it->iterator >= it->stop_at)) + { + it->iterator = nullptr; + it->end = nullptr; + break; + } + } + + read_record_t* record = read_record_t::get_record(current_record_ptr); + + if (record->header.record_type == record_type_t::decision) + { + // Obtain decisions. Decisions may not be in commit order, so sort and process them. + auto first_decision_entry = record->get_decisions(); + for (decision_entry_t* decision_entry = first_decision_entry; + decision_entry < first_decision_entry + record->header.decision_count; ++decision_entry) + { + if (decision_entry->commit_ts > *last_checkpointed_commit_ts) + { + ASSERT_INVARIANT(m_txn_records_by_commit_ts.count(decision_entry->commit_ts) > 0, "Transaction record should be written before the decision record."); + m_decision_records_by_commit_ts.insert(std::pair(decision_entry->commit_ts, decision_entry->decision)); + } + } + + // Iterate decisions. + for (auto decision_it = m_decision_records_by_commit_ts.cbegin(); decision_it != m_decision_records_by_commit_ts.cend();) + { + ASSERT_INVARIANT(m_txn_records_by_commit_ts.count(decision_it->first) > 0, "Transaction record should be written before the decision record."); + + auto txn_it = m_txn_records_by_commit_ts.find(decision_it->first); + + // Only perform recovery and checkpointing for committed transactions. + if (decision_it->second == decision_type_t::commit) + { + // Txn record is safe to be written to rocksdb at this point, since checksums for both + // the txn & decision record were validated and we asserted that the txn record is written + // before the decision record in the wal. + write_log_record_to_persistent_store(persistent_store_manager, read_record_t::get_record(txn_it->second)); + } + + // Update 'last_checkpointed_commit_ts' in memory so it can later be written to persistent store. + *last_checkpointed_commit_ts = std::max(*last_checkpointed_commit_ts, decision_it->first); + txn_it = m_txn_records_by_commit_ts.erase(txn_it); + decision_it = m_decision_records_by_commit_ts.erase(decision_it); + } + } + else if (record->header.record_type == record_type_t::txn) + { + // Skip over records that have already been checkpointed. + if (record->header.txn_commit_ts <= *last_checkpointed_commit_ts) + { + continue; + } + m_txn_records_by_commit_ts.insert(std::pair(record->header.txn_commit_ts, current_record_ptr)); + } + else if (record->header.record_type == record_type_t::end_of_file) + { + it->iterator = nullptr; + it->end = nullptr; + break; + } + } +} + +size_t log_handler_t::update_iterator(struct record_iterator_t* it) +{ + size_t record_size = c_invalid_read_record_size; + if (it->iterator < it->stop_at) + { + validate_checksum(it); + record_size = read_record_t::get_record(it->iterator)->get_record_size(); + it->iterator += record_size; + } + return record_size; +} + +void log_handler_t::validate_checksum(struct record_iterator_t* it) +{ + auto destination = read_record_t::get_record(it->iterator); + + // Basic checks for record. + bool is_record_valid = destination && destination->is_valid(); + if (!is_record_valid) + { + // Stop processing the current log file. + it->stop_at = it->iterator; + it->halt_recovery = true; + + if (it->recovery_mode == log_reader_mode_t::checkpoint_fail_on_first_error) + { + throw std::runtime_error("Log record is invalid!"); + } + + return; + } + + // CRC calculation requires manipulating the recovered record header's CRC; + // So create a copy of the header to avoid modifying the log once it has been written. + record_header_t header_copy; + memcpy(&header_copy, &destination->header, sizeof(record_header_t)); + + auto expected_crc = header_copy.crc; + header_copy.crc = c_crc_initial_value; + + // First calculate CRC of header. + auto crc = calculate_crc32(0, &header_copy, sizeof(record_header_t)); + + // Calculate payload CRC. + crc = calculate_crc32(crc, destination->payload, header_copy.record_size - sizeof(record_header_t)); + + if (crc != expected_crc) + { + if (it->recovery_mode == log_reader_mode_t::checkpoint_fail_on_first_error) + { + throw checksum_error_internal("Record checksum match failed!"); + } + + if (it->recovery_mode == log_reader_mode_t::recovery_stop_on_first_error) + { + it->halt_recovery = true; + } + } +} + +void log_handler_t::map_log_file(struct record_iterator_t* it, int file_fd, log_reader_mode_t recovery_mode) +{ + read_record_t* wal_record; + + size_t size = get_fd_size(file_fd); + ASSERT_INVARIANT(size > 0, "Empty log file!"); + gaia::common::map_fd_data( + wal_record, + size, + PROT_READ, + MAP_SHARED, + file_fd, + 0); + + *it = (struct record_iterator_t){ + .iterator = (unsigned char*)wal_record, + .end = (unsigned char*)wal_record + size, + .stop_at = (unsigned char*)wal_record + size, + .begin = (unsigned char*)wal_record, + .mapped_data = wal_record, + .map_size = size, + .file_fd = file_fd, + .recovery_mode = recovery_mode, + .halt_recovery = false}; } } // namespace persistence diff --git a/production/db/core/src/persistent_store_manager.cpp b/production/db/core/src/persistent_store_manager.cpp index c59a6e00b429..9e6ee149b09a 100644 --- a/production/db/core/src/persistent_store_manager.cpp +++ b/production/db/core/src/persistent_store_manager.cpp @@ -5,6 +5,7 @@ #include "persistent_store_manager.hpp" +#include #include #include @@ -24,26 +25,32 @@ using namespace std; using namespace gaia::db; -using namespace gaia::db::persistence; using namespace gaia::common; using namespace rocksdb; -persistent_store_manager::persistent_store_manager( - gaia::db::counters_t* counters, std::string data_dir) - : m_counters(counters), m_data_dir_path(std::move(data_dir)) +namespace gaia +{ +namespace db +{ +namespace persistence +{ + +persistent_store_manager_t::persistent_store_manager_t( + gaia::db::counters_t* counters, std::filesystem::path data_dir) + : m_counters(counters), m_data_dir_path(data_dir) { rocksdb::WriteOptions write_options{}; write_options.sync = true; rocksdb::TransactionDBOptions transaction_db_options{}; - m_rdb_wrapper = make_unique(m_data_dir_path, write_options, transaction_db_options); + m_rdb_wrapper = make_unique(m_data_dir_path, write_options, transaction_db_options); } -persistent_store_manager::~persistent_store_manager() +persistent_store_manager_t::~persistent_store_manager_t() { close(); } -void persistent_store_manager::open() +void persistent_store_manager_t::open() { rocksdb::TransactionDBOptions options{}; rocksdb::Options init_options{}; @@ -94,29 +101,29 @@ void persistent_store_manager::open() m_rdb_wrapper->open_txn_db(init_options, options); } -void persistent_store_manager::close() +void persistent_store_manager_t::close() { m_rdb_wrapper->close(); } -void persistent_store_manager::append_wal_commit_marker(const std::string& txn_name) +void persistent_store_manager_t::append_wal_commit_marker(const std::string& txn_name) { m_rdb_wrapper->commit(txn_name); } -std::string persistent_store_manager::begin_txn(gaia_txn_id_t txn_id) +std::string persistent_store_manager_t::begin_txn(gaia_txn_id_t txn_id) { rocksdb::WriteOptions write_options{}; rocksdb::TransactionOptions txn_options{}; return m_rdb_wrapper->begin_txn(write_options, txn_options, txn_id); } -void persistent_store_manager::append_wal_rollback_marker(const std::string& txn_name) +void persistent_store_manager_t::append_wal_rollback_marker(const std::string& txn_name) { m_rdb_wrapper->rollback(txn_name); } -void persistent_store_manager::prepare_wal_for_write(gaia::db::txn_log_t* log, const std::string& txn_name) +void persistent_store_manager_t::prepare_wal_for_write(gaia::db::txn_log_t* log, const std::string& txn_name) { ASSERT_PRECONDITION(log, "Transaction log is null!"); // The key_count variable represents the number of puts + deletes. @@ -130,7 +137,7 @@ void persistent_store_manager::prepare_wal_for_write(gaia::db::txn_log_t* log, c { // Encode key to be deleted. string_writer_t key; - key.write_uint64(lr->deleted_id); + key.write(lr->deleted_id); txn->Delete(key.to_slice()); key_count++; } @@ -138,13 +145,24 @@ void persistent_store_manager::prepare_wal_for_write(gaia::db::txn_log_t* log, c { string_writer_t key; string_writer_t value; - db_object_t* obj = offset_to_ptr(lr->new_offset); - if (!obj) + db_object_t* gaia_object = offset_to_ptr(lr->new_offset); + if (!gaia_object) { // Object was deleted in current transaction. continue; } - encode_object(obj, key, value); + + // TODO(Mihir): Reuse the Put API after new persistence path is enabled. + encode_object( + gaia_object->id, + gaia_object->type, + gaia_object->num_references, + gaia_object->references(), + gaia_object->payload_size, + gaia_object->data_size(), + gaia_object->data(), + key, + value); // Gaia objects encoded as key-value slices shouldn't be empty. ASSERT_INVARIANT( key.get_current_position() != 0 && value.get_current_position() != 0, @@ -171,7 +189,7 @@ void persistent_store_manager::prepare_wal_for_write(gaia::db::txn_log_t* log, c * Todo(Mihir) Note that, for now we skip validating the existence of object references on recovery, * since these aren't validated during object creation either. */ -void persistent_store_manager::recover() +void persistent_store_manager_t::recover() { auto it = std::unique_ptr(m_rdb_wrapper->get_iterator()); gaia_id_t max_id = 0; @@ -179,6 +197,12 @@ void persistent_store_manager::recover() for (it->SeekToFirst(); it->Valid(); it->Next()) { + // An optimization to forego reading already recovered logs. + if (it->key().compare(c_last_processed_log_seq_key) == 0) + { + continue; + } + db_object_t* recovered_object = decode_object(it->key(), it->value()); if (recovered_object->type > max_type_id && recovered_object->type < c_system_table_reserved_range_start) { @@ -190,13 +214,90 @@ void persistent_store_manager::recover() max_id = recovered_object->id; } } + // Check for any errors found during the scan m_rdb_wrapper->handle_rdb_error(it->status()); m_counters->last_id = max_id; m_counters->last_type_id = max_type_id; } -void persistent_store_manager::destroy_persistent_store() +void persistent_store_manager_t::destroy_persistent_store() { m_rdb_wrapper->destroy_persistent_store(); } + +uint64_t persistent_store_manager_t::get_counter_value(const std::string& key) +{ + std::string value; + m_rdb_wrapper->get(key, value); + if (value.empty()) + { + return 0; + } + + gaia::db::persistence::string_reader_t value_reader(value); + + uint64_t result; + value_reader.read_uint64(result); + return result; +} + +void persistent_store_manager_t::flush() +{ + m_rdb_wrapper->flush(); +} + +void persistent_store_manager_t::set_counter_value(const std::string& key_to_write, uint64_t value_to_write) +{ + string_writer_t value; + value.write(value_to_write); + + ASSERT_POSTCONDITION( + value.get_current_position() != 0, "Encoded value should not be empty!"); + m_rdb_wrapper->put(key_to_write, value.to_slice()); +} + +void persistent_store_manager_t::put( + common::gaia_id_t id, + common::gaia_type_t type, + gaia::common::reference_offset_t num_references, + const gaia::common::gaia_id_t* references, + uint16_t payload_size, + uint16_t data_size, + const char* data) +{ + string_writer_t key; + string_writer_t value; + + encode_object( + id, + type, + num_references, + references, + payload_size, + data_size, + data, + key, + value); + + // Gaia objects encoded as key-value slices shouldn't be empty. + ASSERT_POSTCONDITION( + key.get_current_position() != 0 && value.get_current_position() != 0, + "Encoded key & value should not be empty!"); + + // Don't need RocksDB txn context here. + m_rdb_wrapper->put(key.to_slice(), value.to_slice()); +} + +void persistent_store_manager_t::remove(gaia::common::gaia_id_t id_to_remove) +{ + // Encode key to be deleted. + string_writer_t key; + key.write(id_to_remove); + ASSERT_POSTCONDITION(key.get_current_position() != 0, "Encoded key should not be empty!"); + m_rdb_wrapper->remove(key.to_slice()); +} + +} // namespace persistence +} // namespace db +} // namespace gaia diff --git a/production/db/core/src/rdb_object_converter.cpp b/production/db/core/src/rdb_object_converter.cpp index 31c9ad4cf4c8..52bd8ca3db48 100644 --- a/production/db/core/src/rdb_object_converter.cpp +++ b/production/db/core/src/rdb_object_converter.cpp @@ -19,40 +19,42 @@ namespace gaia { namespace db { +namespace persistence +{ /** * Format: * Key: id (uint64) * Value: type, reference_count, payload_size, payload */ -void gaia::db::persistence::encode_object( - const db_object_t* gaia_object, +void encode_object( + common::gaia_id_t id, + common::gaia_type_t type, + gaia::common::reference_offset_t num_references, + const gaia::common::gaia_id_t* references, + uint16_t payload_size, + uint16_t data_size, + const char* data, gaia::db::persistence::string_writer_t& key, gaia::db::persistence::string_writer_t& value) { // Create key. - key.write_uint64(gaia_object->id); + key.write(id); // Create value. - value.write_uint32(gaia_object->type); - value.write_uint16(gaia_object->num_references); - value.write_uint16(gaia_object->payload_size); + value.write(type); + value.write(num_references); + value.write(payload_size); - auto reference_arr_ptr = gaia_object->references(); - for (int i = 0; i < gaia_object->num_references; i++) + for (auto ref_ptr = references; ref_ptr < references + num_references; ++ref_ptr) { - // Encode all references. - value.write_uint64(*reference_arr_ptr); - reference_arr_ptr++; + value.write(*ref_ptr); } - size_t references_size = gaia_object->num_references * sizeof(gaia_id_t); - size_t data_size = gaia_object->payload_size - references_size; - const char* data_ptr = gaia_object->payload + references_size; - value.write(data_ptr, data_size); + value.write(data, data_size); } -db_object_t* gaia::db::persistence::decode_object( +db_object_t* decode_object( const rocksdb::Slice& key, const rocksdb::Slice& value) { @@ -94,5 +96,6 @@ db_object_t* gaia::db::persistence::decode_object( return db_object; } +} // namespace persistence } // namespace db } // namespace gaia diff --git a/production/db/core/src/rdb_wrapper.cpp b/production/db/core/src/rdb_wrapper.cpp index fea3a0379f79..b470b5189b6d 100644 --- a/production/db/core/src/rdb_wrapper.cpp +++ b/production/db/core/src/rdb_wrapper.cpp @@ -14,6 +14,7 @@ #include #include +#include "gaia_internal/common/common.hpp" #include "gaia_internal/common/retail_assert.hpp" #include "gaia_internal/db/db_types.hpp" #include "gaia_internal/db/persistent_store_error.hpp" @@ -24,6 +25,8 @@ namespace gaia { namespace db { +namespace persistence +{ static const std::string c_message_rocksdb_not_initialized = "RocksDB database is not initialized."; @@ -132,5 +135,38 @@ void rdb_wrapper_t::handle_rdb_error(rocksdb::Status status) } } +void rdb_wrapper_t::flush() +{ + rocksdb::FlushOptions options{}; + m_txn_db->Flush(options); +} + +void rdb_wrapper_t::put(const rocksdb::Slice& key, const rocksdb::Slice& value) +{ + m_txn_db->Put(m_write_options, key, value); +} + +void rdb_wrapper_t::remove(const rocksdb::Slice& key) +{ + m_txn_db->Delete(m_write_options, key); +} + +void rdb_wrapper_t::get(const rocksdb::Slice& key, std::string& value) +{ + rocksdb::Status status = m_txn_db->Get(m_read_options, key, &value); + + // Handle key not found error. + if (status.IsNotFound()) + { + value = c_empty_string; + } + // For all other errors, throw persistent_store_error. + else if (!status.ok()) + { + handle_rdb_error(status); + } +} + +} // namespace persistence } // namespace db } // namespace gaia diff --git a/production/db/core/tests/test_rdb_object_converter.cpp b/production/db/core/tests/test_rdb_object_converter.cpp index 4813b51833b0..1be31b2f8e78 100644 --- a/production/db/core/tests/test_rdb_object_converter.cpp +++ b/production/db/core/tests/test_rdb_object_converter.cpp @@ -26,15 +26,15 @@ TEST(write_read_assert, basic) ASSERT_EQ(empty_payload.length(), 0); // Fill write buffer - writer.write_uint64(e1); + writer.write(e1); EXPECT_EQ(8, writer.get_current_position()); - writer.write_uint64(e2); + writer.write(e2); EXPECT_EQ(16, writer.get_current_position()); - writer.write_uint32(e3); + writer.write(e3); writer.write(payload.c_str(), payload.length()); - writer.write_uint32(e5); - writer.write_uint8(e6); - writer.write_uint8(e7); + writer.write(e5); + writer.write(e6); + writer.write(e7); writer.write(empty_payload.c_str(), empty_payload.length()); @@ -78,7 +78,7 @@ TEST(write_read_assert, payload_edge_case) std::string empty_payload = ""; writer.write(empty_payload.data(), empty_payload.length()); - writer.write_uint8(e); + writer.write(e); rocksdb::Slice s = writer.to_slice(); string_reader_t reader(s); diff --git a/production/inc/gaia/exceptions.hpp b/production/inc/gaia/exceptions.hpp index ddc4eca7296a..be83868e71c9 100644 --- a/production/inc/gaia/exceptions.hpp +++ b/production/inc/gaia/exceptions.hpp @@ -397,6 +397,23 @@ class pre_commit_validation_failure : public common::gaia_exception { }; +namespace persistence +{ +/** + * \addtogroup persistence + * @{ + */ + +/** + * Thrown to indicate a checksum validation error when reading persistent log records. + */ +class checksum_error : public common::gaia_exception +{ +}; + +/*@}*/ +} // namespace persistence + namespace index { /** diff --git a/production/inc/gaia_internal/common/fd_helpers.hpp b/production/inc/gaia_internal/common/fd_helpers.hpp index c9d401110cbd..7243407337f3 100644 --- a/production/inc/gaia_internal/common/fd_helpers.hpp +++ b/production/inc/gaia_internal/common/fd_helpers.hpp @@ -238,5 +238,20 @@ inline int make_nonblocking_eventfd() return eventfd; } +/** + * Create an eventfd without any special flags. + */ +inline int make_blocking_eventfd() +{ + int eventfd = ::eventfd(0, 0); + if (eventfd == -1) + { + int err = errno; + const char* reason = ::explain_eventfd(0, 0); + throw system_error(reason, err); + } + return eventfd; +} + } // namespace common } // namespace gaia diff --git a/production/inc/gaia_internal/db/db_object.hpp b/production/inc/gaia_internal/db/db_object.hpp index f95966b72c22..6d1fc15d8a00 100644 --- a/production/inc/gaia_internal/db/db_object.hpp +++ b/production/inc/gaia_internal/db/db_object.hpp @@ -58,6 +58,11 @@ struct alignas(gaia::db::memory_manager::c_allocation_alignment) db_object_t return reinterpret_cast(payload); } + [[nodiscard]] uint16_t data_size() const + { + return payload_size - (num_references * sizeof(gaia::common::gaia_id_t)); + } + friend std::ostream& operator<<(std::ostream& os, const db_object_t& o) { os << "id: " @@ -97,6 +102,25 @@ struct alignas(gaia::db::memory_manager::c_allocation_alignment) db_object_t } }; +// struct db_recovered_object_t +// { +// gaia::common::gaia_id_t id; +// gaia::common::gaia_type_t type; +// uint16_t payload_size; +// gaia::common::reference_offset_t num_references; +// char payload[]; + +// [[nodiscard]] const char* data() const +// { +// return payload + num_references * sizeof(gaia::common::gaia_id_t); +// } + +// [[nodiscard]] const gaia::common::gaia_id_t* references() const +// { +// return reinterpret_cast(payload); +// } +// }; + // Because the type is explicitly aligned to a granularity larger than // its nominal size, we cannot use sizeof() to compute the size of the // object header! diff --git a/production/inc/gaia_internal/exceptions.hpp b/production/inc/gaia_internal/exceptions.hpp index 56c007e947a4..4dff7d85af8a 100644 --- a/production/inc/gaia_internal/exceptions.hpp +++ b/production/inc/gaia_internal/exceptions.hpp @@ -305,6 +305,17 @@ class memory_allocation_error_internal : public memory_allocation_error explicit memory_allocation_error_internal(const std::string& message); }; +namespace persistence +{ + +class checksum_error_internal : public checksum_error +{ +public: + checksum_error_internal(const std::string& message); +}; + +} // namespace persistence + namespace index {