diff --git a/production/db/core/CMakeLists.txt b/production/db/core/CMakeLists.txt index 3c5e294e89ec..a41e6ea63db0 100644 --- a/production/db/core/CMakeLists.txt +++ b/production/db/core/CMakeLists.txt @@ -133,7 +133,8 @@ endif() add_library(gaia_db_persistence STATIC src/log_file.cpp src/async_disk_writer.cpp - src/async_write_batch.cpp) + src/async_write_batch.cpp + src/log_io.cpp) configure_gaia_target(gaia_db_persistence) target_include_directories(gaia_db_persistence PRIVATE "${GAIA_DB_CORE_PUBLIC_INCLUDES}" diff --git a/production/db/core/inc/async_disk_writer.hpp b/production/db/core/inc/async_disk_writer.hpp index 94a81d074af8..ce1ec01250ed 100644 --- a/production/db/core/inc/async_disk_writer.hpp +++ b/production/db/core/inc/async_disk_writer.hpp @@ -37,7 +37,7 @@ namespace persistence class async_disk_writer_t { public: - async_disk_writer_t(int validate_flushed_batch_efd, int signal_checkpoint_efd); + async_disk_writer_t(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd); ~async_disk_writer_t(); @@ -98,21 +98,14 @@ class async_disk_writer_t /** * Copy any temporary writes (which don't exist in gaia shared memory) into the metadata buffer. */ - unsigned char* copy_into_metadata_buffer(void* source, size_t size, int file_fd); + unsigned char* copy_into_metadata_buffer(const void* source, size_t size, int file_fd); /** * Perform maintenance actions on in_flight batch after all of its IO entries have been processed. */ void perform_post_completion_maintenance(); - void add_decisions_to_batch(decision_list_t& decisions); - - /** - * For each commit ts, keep track of the eventfd which the session thread blocks on. Once the txn - * has been made durable, this eventfd is written to so that the session thread can make progress and - * return commit decision to the client. - */ - void map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_efd); + void add_decisions_to_batch(const decision_list_t& decisions); private: // Reserve slots in the in_progress batch to be able to append additional operations to it (before it gets submitted to the kernel) @@ -120,21 +113,18 @@ class async_disk_writer_t static constexpr size_t c_single_submission_entry_count = 1; static constexpr size_t c_async_batch_size = 32; static constexpr size_t c_max_iovec_array_size_bytes = IOV_MAX * sizeof(iovec); - static inline eventfd_t c_default_flush_efd_value = 1; - static inline iovec c_default_iov = {static_cast(&c_default_flush_efd_value), sizeof(eventfd_t)}; + static inline eventfd_t c_default_flush_eventfd_value = 1; + static inline iovec c_default_iov = {static_cast(&c_default_flush_eventfd_value), sizeof(eventfd_t)}; // eventfd to signal that a batch flush has completed. // Used to block new writes to disk when a batch is already getting flushed. - static inline int s_flush_efd = -1; + static inline int s_flush_eventfd = -1; // eventfd to signal that the IO results belonging to a batch are ready to be validated. - int m_validate_flush_efd = -1; + int m_log_flush_eventfd = -1; // eventfd to signal that a file is ready to be checkpointed. - int m_signal_checkpoint_efd = -1; - - // Keep track of session threads to unblock. - std::unordered_map m_ts_to_session_decision_fd_map; + int m_signal_checkpoint_eventfd = -1; // Writes are batched and we maintain two buffers so that writes to a buffer // can still proceed when the other buffer is getting flushed to disk. diff --git a/production/db/core/inc/db_client.hpp b/production/db/core/inc/db_client.hpp index 04784fa3f25b..f923a03cb38b 100644 --- a/production/db/core/inc/db_client.hpp +++ b/production/db/core/inc/db_client.hpp @@ -104,6 +104,8 @@ class client_t thread_local static inline mapped_data_t s_private_locators; thread_local static inline gaia::db::index::indexes_t s_local_indexes{}; + thread_local static inline size_t s_txn_memory_size_bytes = 0; + // These fields have session lifetime. thread_local static inline config::session_options_t s_session_options; diff --git a/production/db/core/inc/db_helpers.hpp b/production/db/core/inc/db_helpers.hpp index 3581e137c7a9..95b243ce0e03 100644 --- a/production/db/core/inc/db_helpers.hpp +++ b/production/db/core/inc/db_helpers.hpp @@ -126,10 +126,24 @@ inline void allocate_object( { memory_manager::memory_manager_t* memory_manager = gaia::db::get_memory_manager(); memory_manager::chunk_manager_t* chunk_manager = gaia::db::get_chunk_manager(); + + size_t size_to_allocate = size + c_db_object_header_size; + + if (get_current_txn_memory_size_bytes() != nullptr) + { + if (*get_current_txn_memory_size_bytes() + size_to_allocate <= c_max_txn_memory_size_bytes) + { + *get_current_txn_memory_size_bytes() += size_to_allocate; + } + else + { + throw transaction_memory_limit_exceeded(); + } + } // The allocation can fail either because there is no current chunk, or // because the current chunk is full. - gaia_offset_t object_offset = chunk_manager->allocate(size + c_db_object_header_size); + gaia_offset_t object_offset = chunk_manager->allocate(size_to_allocate); if (!object_offset.is_valid()) { if (chunk_manager->initialized()) @@ -155,8 +169,12 @@ inline void allocate_object( // Initialize the new chunk. chunk_manager->initialize(new_chunk_offset); + // // Before we allocate, persist current chunk ID in txn log, for access + // // on the server in case we crash. + // gaia::db::get_mapped_log()->data()->current_chunk = new_chunk_offset; + // Allocate from new chunk. - object_offset = chunk_manager->allocate(size + c_db_object_header_size); + object_offset = chunk_manager->allocate(size_to_allocate); } ASSERT_POSTCONDITION( diff --git a/production/db/core/inc/db_internal_types.hpp b/production/db/core/inc/db_internal_types.hpp index ec24469b3b8a..d94bed5ee7fe 100644 --- a/production/db/core/inc/db_internal_types.hpp +++ b/production/db/core/inc/db_internal_types.hpp @@ -99,8 +99,16 @@ constexpr size_t c_max_locators = (1ULL << 32) - 1; // similarly optimized by substituting locators for gaia_ids. constexpr size_t c_hash_buckets = 1ULL << 20; -// This is arbitrary, but we need to keep txn logs to a reasonable size. -constexpr size_t c_max_log_records = 1ULL << 20; +// // Track maximum number of new chunks (apart from the one that the txn is already using) +// // that can be allocated per transaction. +// // This sets an upper bound on txn size: 32MB < txn_size < 36MB +// constexpr size_t c_max_chunks_per_txn = 8; + +// This sets an upper bound on the size of the transaction. +constexpr size_t c_max_txn_memory_size_bytes = 64 * 1024 * 1024; + +// 8 chunks can hold up to 8 * (2^16 - 2^8) = 522240 64B objects, +constexpr size_t c_max_log_records = 522240; // This is an array of offsets in the data segment corresponding to object // versions, where each array index is referred to as a "locator." @@ -121,6 +129,7 @@ struct txn_log_t { gaia_txn_id_t begin_ts; size_t record_count; + // size_t chunk_count; struct log_record_t { @@ -166,6 +175,7 @@ struct txn_log_t }; log_record_t log_records[c_max_log_records]; + // gaia_offset_t chunks[c_max_chunks_per_txn]; friend std::ostream& operator<<(std::ostream& os, const txn_log_t& l) { diff --git a/production/db/core/inc/db_server.hpp b/production/db/core/inc/db_server.hpp index 525f49e600e0..76f69dceb7cd 100644 --- a/production/db/core/inc/db_server.hpp +++ b/production/db/core/inc/db_server.hpp @@ -21,6 +21,7 @@ #include "gaia_internal/common/generator_iterator.hpp" #include "db_internal_types.hpp" +#include "log_io.hpp" #include "mapped_data.hpp" #include "memory_manager.hpp" #include "messages_generated.h" @@ -107,6 +108,11 @@ class server_t private: static inline server_config_t s_server_conf{}; + // TODO: Delete this once recovery/checkpointing implementation is in. + static inline bool c_use_gaia_log_implementation = false; + + static constexpr uint64_t c_txn_group_timeout_us = 100; + // This is arbitrary but seems like a reasonable starting point (pending benchmarks). static constexpr size_t c_stream_batch_size{1ULL << 10}; @@ -121,6 +127,19 @@ class server_t static inline int s_server_shutdown_eventfd = -1; static inline int s_listening_socket = -1; + // Signals the log writer thread to persist txn updates. + static inline int s_signal_log_write_eventfd = -1; + + // Signals the log writer thread to persist txn decisions. + static inline int s_signal_decision_eventfd = -1; + + // Signals the checkpointing thread to merge log file updates into the LSM store. + static inline int s_signal_checkpoint_log_eventfd = -1; + + // To signal to the persistence thread to check the return values of a batch of async I/O + // operations post batch flush. + static inline int s_do_write_batch_maintenance_eventfd = -1; + // These thread objects are owned by the client dispatch thread. static inline std::vector s_session_threads{}; @@ -130,6 +149,7 @@ class server_t static inline mapped_data_t s_shared_id_index{}; static inline index::indexes_t s_global_indexes{}; static inline std::unique_ptr s_persistent_store{}; + static inline std::unique_ptr s_log_handler{}; // These fields have transaction lifetime. thread_local static inline gaia_txn_id_t s_txn_id = c_invalid_gaia_txn_id; @@ -148,7 +168,6 @@ class server_t thread_local static inline messages::session_state_t s_session_state = messages::session_state_t::DISCONNECTED; thread_local static inline bool s_session_shutdown = false; thread_local static inline int s_session_shutdown_eventfd = -1; - thread_local static inline gaia::db::memory_manager::memory_manager_t s_memory_manager{}; thread_local static inline gaia::db::memory_manager::chunk_manager_t s_chunk_manager{}; @@ -239,6 +258,12 @@ class server_t // The current thread's index in `s_safe_ts_per_thread_entries`. thread_local static inline size_t s_safe_ts_index{c_invalid_safe_ts_index}; + // Keep track of the last txn that has been submitted to the async_disk_writer. + static inline std::atomic s_last_queued_commit_ts_upper_bound = c_invalid_gaia_txn_id; + + // Keep a track of undecided txns submitted to the async_disk_writer. + static inline std::set s_txn_decision_not_queued_set{}; + private: // Returns the current value of the given watermark. static inline gaia_txn_id_t get_watermark(watermark_type_t watermark_type) @@ -406,6 +431,15 @@ class server_t static void client_dispatch_handler(const std::string& socket_name); + static void log_writer_handler(); + + static void persist_pending_writes(bool should_wait_for_completion = false); + + static void recover_persistent_log(); + + // Method should only be called on server shutdown. + static void flush_pending_writes_on_server_shutdown(); + static void session_handler(int session_socket); static std::pair get_stream_socket_pair(); diff --git a/production/db/core/inc/db_shared_data.hpp b/production/db/core/inc/db_shared_data.hpp index 8d3544166fd9..7b0c511ff8b8 100644 --- a/production/db/core/inc/db_shared_data.hpp +++ b/production/db/core/inc/db_shared_data.hpp @@ -52,5 +52,7 @@ gaia::db::memory_manager::chunk_manager_t* get_chunk_manager(); // Gets the mapped transaction log for the current session thread. gaia::db::mapped_log_t* get_mapped_log(); +size_t get_current_txn_memory_size_bytes(); + } // namespace db } // namespace gaia diff --git a/production/db/core/inc/log_file.hpp b/production/db/core/inc/log_file.hpp index dc818103c5dc..cc6f98cdcbc2 100644 --- a/production/db/core/inc/log_file.hpp +++ b/production/db/core/inc/log_file.hpp @@ -34,24 +34,29 @@ class log_file_t /** * Obtain offset to write the next log record at. */ - size_t get_current_offset(); + const file_offset_t get_current_offset(); /** * Get remaining space in persistent log file. */ - size_t get_remaining_bytes_count(size_t record_size); + const size_t get_bytes_remaining_after_append(size_t record_size); /** * Allocate space in persistent log file. */ void allocate(size_t size); - int get_file_fd(); + const int get_file_fd(); + + /** + * Obtain sequence number for the file. + */ + const file_sequence_t get_file_sequence(); private: size_t m_file_size; file_sequence_t m_file_seq; - size_t m_current_offset; + file_offset_t m_current_offset; std::string m_dir_name; int m_dir_fd; int m_file_fd; diff --git a/production/db/core/inc/log_io.hpp b/production/db/core/inc/log_io.hpp new file mode 100644 index 000000000000..227edad3b113 --- /dev/null +++ b/production/db/core/inc/log_io.hpp @@ -0,0 +1,108 @@ +///////////////////////////////////////////// +// Copyright (c) Gaia Platform LLC +// All rights reserved. +///////////////////////////////////////////// + +#pragma once + +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include "gaia/common.hpp" +#include + +#include "gaia_internal/db/db_object.hpp" + +#include "async_disk_writer.hpp" +#include "db_internal_types.hpp" +#include "log_file.hpp" +#include "memory_manager.hpp" + +namespace gaia +{ +namespace db +{ +namespace persistence +{ + +/* + * Fill the record_header.crc field with CRC_INITIAL_VALUE when + * computing the checksum: crc32c is vulnerable to 0-prefixing, + * so we make sure the initial bytes are non-zero. + * + * https://stackoverflow.com/questions/2321676/data-length-vs-crc-length + * "From the wikipedia article: "maximal total blocklength is equal to 2r − 1". That's in bits. + * You don't need to do much research to see that 29 - 1 is 511 bits. Using CRC-8, + * multiple messages longer than 64 bytes will have the same CRC checksum value." + * So CRC-16 would have max message size 2^17-1 bits or about 2^14 bytes = 16KB, + * and CRC-32 would have max message size 2^33-1 bits or about 2^30 bytes = 1GB + */ +static constexpr crc32_t c_crc_initial_value = ((uint32_t)-1); + +class log_handler_t +{ +public: + explicit log_handler_t(const std::string& directory_path); + ~log_handler_t(); + void open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd); + + /** + * Allocate space in the log file and return starting offset of allocation. + */ + file_offset_t allocate_log_space(size_t payload_size); + + /** + * Create a log record which stores txn information. + */ + void create_txn_record( + gaia_txn_id_t commit_ts, + record_type_t type, + std::vector& object_offsets, + std::vector& deleted_ids); + + /** + * Process the in memory txn_log and submit the processed writes (generated log records) to the async_disk_writer. + */ + void process_txn_log_and_write(int txn_log_fd, gaia_txn_id_t commit_ts); + + /** + * Create a log record which stores decisions for one or more txns. + */ + void create_decision_record(const decision_list_t& txn_decisions); + + /** + * Submit async_disk_writer's internal I/O request queue to the kernel for processing. + */ + void submit_writes(bool should_wait_for_completion); + + /** + * Validate the result of I/O calls submitted to the kernel for processing. + */ + void check_flushed_batch_results_and_do_maintenance(); + +private: + static constexpr char c_gaia_wal_dir_name[] = "/wal_dir"; + static constexpr int c_gaia_wal_dir_permissions = S_IRWXU | (S_IRGRP | S_IROTH | S_IXGRP | S_IXOTH); + static inline std::string s_wal_dir_path{}; + static inline int s_dir_fd = -1; + + // Log file sequence starts from 1. + static inline std::atomic s_file_num = 1; + + // Keep track of the current log file. + std::unique_ptr m_current_file; + + std::unique_ptr m_async_disk_writer; +}; + +} // namespace persistence +} // namespace db +} // namespace gaia diff --git a/production/db/core/inc/persistence_types.hpp b/production/db/core/inc/persistence_types.hpp index 8eb0ebc6eacb..fb247caa06a2 100644 --- a/production/db/core/inc/persistence_types.hpp +++ b/production/db/core/inc/persistence_types.hpp @@ -14,6 +14,7 @@ #include "gaia_internal/common/mmap_helpers.hpp" #include "gaia_internal/common/retail_assert.hpp" +#include #include "gaia_internal/db/db_types.hpp" namespace gaia @@ -23,6 +24,13 @@ namespace db namespace persistence { +enum class record_type_t : size_t +{ + not_set = 0x0, + txn = 0x1, + decision = 0x2, +}; + enum class decision_type_t : uint8_t { undecided = 0, @@ -36,6 +44,20 @@ struct decision_entry_t decision_type_t decision; }; +// Represents start and end offsets of a set of contiguous objects. +// They can span chunks. +struct contiguous_offsets_t +{ + gaia_offset_t min_offset; + gaia_offset_t max_offset; +}; + +struct chunk_info_helper_t +{ + common::toposort::graph::node_ptr node_ptr; + contiguous_offsets_t offsets; +}; + // Pair of log file sequence number and file fd. typedef std::vector decision_list_t; @@ -60,6 +82,72 @@ static_assert( constexpr file_sequence_t c_invalid_file_sequence_number; +static constexpr uint64_t c_max_log_file_size_in_bytes = 4 * 1024 * 1024; + +class file_offset_t : public common::int_type_t +{ +public: + // By default, we should initialize to 0. + constexpr file_offset_t() + : common::int_type_t() + { + } + + constexpr file_offset_t(size_t value) + : common::int_type_t(value) + { + } +}; + +static_assert( + sizeof(file_offset_t) == sizeof(file_offset_t::value_type), + "file_offset_t has a different size than its underlying integer type!"); + +// The record size is constrained by the size of the log file. +// We'd never need more than 32 bits for the record size. +class record_size_t : public common::int_type_t +{ +public: + // By default, we should initialize to an invalid value. + constexpr record_size_t() + : common::int_type_t() + { + } + + constexpr record_size_t(uint32_t value) + : common::int_type_t(value) + { + } +}; + +static_assert( + sizeof(record_size_t) == sizeof(record_size_t::value_type), + "record_size_t has a different size than its underlying integer type!"); + +constexpr record_size_t c_invalid_record_size; + +// https://stackoverflow.com/questions/2321676/data-length-vs-crc-length +// "From the wikipedia article: "maximal total blocklength is equal to 2**r − 1". That's in bits. +// So CRC-32 would have max message size 2^33-1 bits or about 2^30 bytes = 1GB +class crc32_t : public common::int_type_t +{ +public: + // By default, we should initialize to 0. + constexpr crc32_t() + : common::int_type_t() + { + } + + constexpr crc32_t(uint32_t value) + : common::int_type_t(value) + { + } +}; + +static_assert( + sizeof(crc32_t) == sizeof(crc32_t::value_type), + "crc32_t has a different size than its underlying integer type!"); + // This assertion ensures that the default type initialization // matches the value of the invalid constant. static_assert( @@ -72,6 +160,27 @@ struct log_file_info_t int file_fd; }; +struct record_header_t +{ + crc32_t crc; + record_size_t payload_size; + record_type_t record_type; + gaia_txn_id_t txn_commit_ts; + + // Stores a count value depending on the record type. + // For a txn record, this represents the count of deleted objects. + // For a decision record, this represents the number of decisions in the record's payload. + union + { + uint32_t deleted_object_count; + uint32_t decision_count; + }; +}; + +static_assert( + sizeof(record_header_t) % 8 == 0, + "record_header_t should be a multiple of 8 bytes!"); + // The primary motivation of this buffer is to keep a hold of any additional information we want to write to the log // apart from the shared memory objects. // Custom information includes diff --git a/production/db/core/inc/persistent_store_manager.hpp b/production/db/core/inc/persistent_store_manager.hpp index d9d7b5ad6470..24f8ffb5d3e9 100644 --- a/production/db/core/inc/persistent_store_manager.hpp +++ b/production/db/core/inc/persistent_store_manager.hpp @@ -6,6 +6,7 @@ #pragma once #include +#include #include "gaia/common.hpp" @@ -14,6 +15,7 @@ #include "db_helpers.hpp" #include "db_internal_types.hpp" #include "db_shared_data.hpp" +#include "log_io.hpp" #include "rdb_wrapper.hpp" // This file provides gaia specific functionality to persist writes to & read from diff --git a/production/db/core/inc/txn_metadata.hpp b/production/db/core/inc/txn_metadata.hpp index 87aa8c25899b..1663f1b339bd 100644 --- a/production/db/core/inc/txn_metadata.hpp +++ b/production/db/core/inc/txn_metadata.hpp @@ -107,6 +107,7 @@ class txn_metadata_t inline bool seal_if_uninitialized(); static inline int get_txn_log_fd(gaia_txn_id_t commit_ts); + static inline int get_entry(gaia_txn_id_t commit_ts); static inline bool seal_uninitialized_ts(gaia_txn_id_t ts); static inline bool invalidate_txn_log_fd(gaia_txn_id_t commit_ts); static inline void set_active_txn_submitted(gaia_txn_id_t begin_ts, gaia_txn_id_t commit_ts); @@ -114,6 +115,8 @@ class txn_metadata_t static inline void update_txn_decision(gaia_txn_id_t commit_ts, bool is_committed); static inline void set_txn_durable(gaia_txn_id_t commit_ts); static inline bool set_txn_gc_complete(gaia_txn_id_t commit_ts); + static inline void* get_txn_metadata_entry_addr(gaia_txn_id_t commit_ts); + static inline uint32_t get_txn_metadata_entry_higher_bits(gaia_txn_id_t commit_ts); static gaia_txn_id_t register_begin_ts(); static gaia_txn_id_t register_commit_ts(gaia_txn_id_t begin_ts, int log_fd); diff --git a/production/db/core/inc/txn_metadata.inc b/production/db/core/inc/txn_metadata.inc index b0274b2ee580..e27944a0ffa6 100644 --- a/production/db/core/inc/txn_metadata.inc +++ b/production/db/core/inc/txn_metadata.inc @@ -377,6 +377,27 @@ txn_metadata_entry_t txn_metadata_t::get_entry() return txn_metadata_entry_t{s_txn_metadata_map[m_ts].load()}; } +txn_metadata_entry_t txn_metadata_t::get_entry(gaia_txn_id_t commit_ts) +{ + ASSERT_PRECONDITION(s_txn_metadata_map, "Transaction metadata has not been initialized!"); + + return txn_metadata_entry_t{s_txn_metadata_map[commit_ts].load()}; +} + +void* txn_metadata_t::get_txn_metadata_entry_addr(gaia_txn_id_t commit_ts) +{ + ASSERT_PRECONDITION(s_txn_metadata_map, "Transaction metadata has not been initialized!"); + + return &s_txn_metadata_map[commit_ts]; +} + +uint32_t txn_metadata_t::get_txn_metadata_entry_higher_bits(gaia_txn_id_t commit_ts) +{ + ASSERT_PRECONDITION(s_txn_metadata_map, "Transaction metadata has not been initialized!"); + + return static_cast(s_txn_metadata_map[commit_ts].load() >> 32); +} + void txn_metadata_t::set_entry(txn_metadata_entry_t entry) { s_txn_metadata_map[m_ts].store(entry.get_word()); diff --git a/production/db/core/src/async_disk_writer.cpp b/production/db/core/src/async_disk_writer.cpp index ce2b34055a13..e0eb642d2d61 100644 --- a/production/db/core/src/async_disk_writer.cpp +++ b/production/db/core/src/async_disk_writer.cpp @@ -16,6 +16,7 @@ #include #include "gaia_internal/common/fd_helpers.hpp" +#include "gaia_internal/common/futex_helpers.hpp" #include "gaia_internal/common/retail_assert.hpp" #include "async_write_batch.hpp" @@ -32,17 +33,16 @@ namespace db namespace persistence { -async_disk_writer_t::async_disk_writer_t(int validate_flush_efd, int signal_checkpoint_efd) +async_disk_writer_t::async_disk_writer_t(int log_flush_eventfd, int signal_checkpoint_eventfd) { - ASSERT_PRECONDITION(validate_flush_efd >= 0, "Invalid validate flush eventfd"); - ASSERT_PRECONDITION(signal_checkpoint_efd >= 0, "Invalid signal checkpoint eventfd"); + ASSERT_PRECONDITION(log_flush_eventfd >= 0, "Invalid validate flush eventfd"); + ASSERT_PRECONDITION(signal_checkpoint_eventfd >= 0, "Invalid signal checkpoint eventfd"); - m_validate_flush_efd = validate_flush_efd; - m_signal_checkpoint_efd = signal_checkpoint_efd; + m_log_flush_eventfd = log_flush_eventfd; + m_signal_checkpoint_eventfd = signal_checkpoint_eventfd; - // Used to block new writes to disk when a batch is already getting flushed. - s_flush_efd = eventfd(1, 0); - if (s_flush_efd == -1) + s_flush_eventfd = eventfd(1, 0); + if (s_flush_eventfd == -1) { const char* reason = ::explain_eventfd(1, 0); throw_system_error(reason); @@ -51,6 +51,15 @@ async_disk_writer_t::async_disk_writer_t(int validate_flush_efd, int signal_chec void async_disk_writer_t::open(size_t batch_size) { + if (!m_in_progress_batch) + { + m_in_progress_batch = std::make_unique(); + } + if (!m_in_flight_batch) + { + m_in_flight_batch = std::make_unique(); + } + m_in_progress_batch->setup(batch_size); m_in_flight_batch->setup(batch_size); } @@ -62,7 +71,7 @@ async_disk_writer_t::~async_disk_writer_t() void async_disk_writer_t::teardown() { - close_fd(s_flush_efd); + close_fd(s_flush_eventfd); } void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t user_data) @@ -76,12 +85,7 @@ void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t use throw_system_error(ss.str(), err); } -void async_disk_writer_t::map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_fd) -{ - m_ts_to_session_decision_fd_map.insert(std::pair(commit_ts, session_decision_fd)); -} - -void async_disk_writer_t::add_decisions_to_batch(decision_list_t& decisions) +void async_disk_writer_t::add_decisions_to_batch(const decision_list_t& decisions) { for (const auto& decision : decisions) { @@ -106,7 +110,7 @@ void async_disk_writer_t::perform_post_completion_maintenance() // Signal to checkpointing thread the upper bound of files that it can process. if (max_file_seq_to_close > 0) { - signal_eventfd(m_signal_checkpoint_efd, max_file_seq_to_close); + signal_eventfd(m_signal_checkpoint_eventfd, max_file_seq_to_close); } const decision_list_t& decisions = m_in_flight_batch->get_decision_batch_entries(); @@ -115,11 +119,13 @@ void async_disk_writer_t::perform_post_completion_maintenance() // Set durability flags for txn. transactions::txn_metadata_t::set_txn_durable(decision.commit_ts); - // Unblock session thread. - auto itr = m_ts_to_session_decision_fd_map.find(decision.commit_ts); - ASSERT_INVARIANT(itr != m_ts_to_session_decision_fd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts"); - signal_eventfd_single_thread(itr->second); - m_ts_to_session_decision_fd_map.erase(itr); + // Unblock session thread. + void* addr = transactions::txn_metadata_t::get_txn_metadata_entry_addr(decision.commit_ts); + + // Assert since anything else implies a programming issue. + // CAN THIS BE 0?; Is it possible for a wake to occur before a wait? + // It is possible for the Futex call to be missed. + common::futex_wake(addr, 1); } m_in_flight_batch->clear_decision_batch(); @@ -130,10 +136,22 @@ void async_disk_writer_t::submit_and_swap_in_progress_batch(int file_fd, bool sh eventfd_t event_counter; // Block on any pending disk flushes. - eventfd_read(s_flush_efd, &event_counter); + eventfd_read(s_flush_eventfd, &event_counter); + + // m_log_flush_eventfd might already be consumed by the caller. + uint64_t val; + ssize_t bytes_read = ::read(m_log_flush_eventfd, &val, sizeof(val)); + if(bytes_read == -1) + { + ASSERT_INVARIANT(errno == EAGAIN, "Unexpected error! read() is expected to fail with EAGAIN for a non-blocking eventfd."); + } + else + { + ASSERT_INVARIANT(bytes_read == sizeof(val), "Failed to fully read data!"); + // Before submitting the next batch perform any maintenance on the in_flight batch. + perform_post_completion_maintenance(); + } - // Perform any maintenance on the in_flight batch. - perform_post_completion_maintenance(); finish_and_submit_batch(file_fd, should_wait_for_completion); } @@ -146,8 +164,8 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_ { swap_batches(); - // Nothing to submit; reset the flush efd that got burnt in submit_and_swap_in_progress_batch() function. - signal_eventfd_single_thread(s_flush_efd); + // Nothing to submit; reset the flush eventfd that got burnt in submit_and_swap_in_progress_batch() function. + signal_eventfd_single_thread(s_flush_eventfd); // Reset metadata buffer. m_metadata_buffer.clear(); @@ -158,8 +176,8 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_ m_in_progress_batch->add_fdatasync_op_to_batch(file_fd, get_enum_value(uring_op_t::fdatasync), IOSQE_IO_LINK); // Signal eventfd's as part of batch. - m_in_progress_batch->add_pwritev_op_to_batch(static_cast(&c_default_iov), 1, s_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK); - m_in_progress_batch->add_pwritev_op_to_batch(static_cast(&c_default_iov), 1, m_validate_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN); + m_in_progress_batch->add_pwritev_op_to_batch(&c_default_iov, 1, m_log_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN); + m_in_progress_batch->add_pwritev_op_to_batch(&c_default_iov, 1, s_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK); swap_batches(); auto flushed_batch_size = m_in_flight_batch->get_unsubmitted_entries_count(); @@ -187,7 +205,7 @@ void async_disk_writer_t::perform_file_close_operations(int file_fd, file_sequen m_in_progress_batch->append_file_to_batch(file_fd, log_seq); } -unsigned char* async_disk_writer_t::copy_into_metadata_buffer(void* source, size_t size, int file_fd) +unsigned char* async_disk_writer_t::copy_into_metadata_buffer(const void* source, size_t size, int file_fd) { auto current_ptr = m_metadata_buffer.get_current_ptr(); ASSERT_PRECONDITION(current_ptr, "Invalid metadata buffer pointer."); diff --git a/production/db/core/src/db_client.cpp b/production/db/core/src/db_client.cpp index 1bf9653cdaf9..571ac9658356 100644 --- a/production/db/core/src/db_client.cpp +++ b/production/db/core/src/db_client.cpp @@ -183,6 +183,9 @@ void client_t::txn_cleanup() // Reset TLS events vector for the next transaction that will run on this thread. s_events.clear(); + + // Reset txn size. + s_txn_memory_size_bytes = 0; } int client_t::get_session_socket(const std::string& socket_name) @@ -402,6 +405,11 @@ void client_t::begin_transaction() s_log.open(log_fd, read_only); cleanup_private_locators.dismiss(); + + // Keep track of every chunk used in a transaction. This helps retain the order in which chunks are + // assigned to a txn; with chunk reuse they can be assigned out of order. + auto& chunk = s_log.data()->chunks[s_log.data()->chunk_count++]; + chunk = static_cast(s_chunk_manager.chunk_offset()); } void client_t::apply_txn_log(int log_fd) diff --git a/production/db/core/src/db_server.cpp b/production/db/core/src/db_server.cpp index f98bbd202458..ed509ec721bb 100644 --- a/production/db/core/src/db_server.cpp +++ b/production/db/core/src/db_server.cpp @@ -27,6 +27,7 @@ #include "gaia_internal/common/scope_guard.hpp" #include "gaia_internal/common/socket_helpers.hpp" #include "gaia_internal/common/system_error.hpp" +#include "gaia_internal/common/futex_helpers.hpp" #include "gaia_internal/db/catalog_core.hpp" #include "gaia_internal/db/db.hpp" #include "gaia_internal/db/db_object.hpp" @@ -255,6 +256,158 @@ void server_t::handle_rollback_txn( txn_rollback(); } +void server_t::recover_persistent_log() +{ + if (s_server_conf.persistence_mode() == persistence_mode_t::e_disabled) + { + return; + } + + if (c_use_gaia_log_implementation) + { + if (!s_log_handler) + { + ASSERT_INVARIANT(s_do_write_batch_maintenance_eventfd >= 0, "Invalid validate flush eventfd."); + s_log_handler = std::make_unique(s_server_conf.data_dir()); + + s_log_handler->open_for_writes(s_do_write_batch_maintenance_eventfd, s_signal_checkpoint_log_eventfd); + } + + if (s_server_conf.persistence_mode() == persistence_mode_t::e_disabled_after_recovery) + { + s_log_handler.reset(); + } + } +} + +void server_t::persist_pending_writes(bool should_wait_for_completion) +{ + ASSERT_PRECONDITION(s_log_handler, "Persistent log handler should be initialized."); + + // This list is needed to enable batching of txn decisions and writing them in one go. + persistence::decision_list_t txn_decisions; + + auto start = std::chrono::steady_clock::now(); + auto end = start; + bool updates_exist = false; + size_t num_scans = 0; + + // Heuristic so we aren't consuming 100% CPU in the while loop scanning for items - even when + // new commit ts don't exist. The alternatives is to have a minimum loop time (this could be + // 5-10% of c_txn_group_timeout_us) instead of a minimum number of scans. Further changes here + // pending experimentation. + size_t c_max_consecutive_scans_with_no_updates = 5; + + // Run loop till there are no more updates to consume or there is a timeout. + while (true) + { + // Take snapshot of last allocated timestamp. + gaia_txn_id_t last_allocated_ts = get_last_txn_id(); + + bool uninitialized_ts_seen = false; + + // We don't require safe_ts_t guarantees outside this loop. + for (safe_ts_t ts = s_last_queued_commit_ts_upper_bound.load() + 1; ts <= last_allocated_ts; ++ts) + { + if (txn_metadata_t::is_uninitialized_ts(ts)) + { + uninitialized_ts_seen = true; + continue; + } + + if (txn_metadata_t::is_commit_ts(ts)) + { + // Avoid duplicate writes. + if (s_txn_decision_not_queued_set.count(ts) == 0) + { + updates_exist = true; + int txn_log_fd = txn_metadata_t::get_txn_log_fd(ts); + + s_txn_decision_not_queued_set.insert(ts); + + // Add txn writes to the async_write_batch. Internally writes will get logged to disk when the + // batch becomes full. + s_log_handler->process_txn_log_and_write(txn_log_fd, ts); + } + } + } + + // An uninitialized txn metadata entry could always be used for a commit_ts after the above scan completed. + // Thus update s_last_queued_commit_ts_upper_bound only if no uninitialized ts are observed. + if (!uninitialized_ts_seen) + { + s_last_queued_commit_ts_upper_bound = last_allocated_ts; + } + + // Iterate through commit ts of txn's for which the decision hasn't been sent to the + // async_disk_writer yet and create decision records for them. + auto it = s_txn_decision_not_queued_set.cbegin(); + while (it != s_txn_decision_not_queued_set.cend()) + { + auto ts = *it; + + if (txn_metadata_t::is_txn_validating(ts)) + { + // This ensures that if a txn with commit ts 'X' has been decided then all txn's + // with commit ts < X have also been decided. + break; + } + else + { + // Txn is decided at this point. + updates_exist = true; + auto decision = txn_metadata_t::is_txn_committed(ts) ? persistence::decision_type_t::commit + : persistence::decision_type_t::abort; + txn_decisions.emplace_back({ts, decision}); + + // No uninitialized ts observed. Delete from s_seen_txn_and_unlogged_desision_set + if (!uninitialized_ts_seen) + { + // Iterator will advance with this assignment. + it = s_txn_decision_not_queued_set.erase(it); + } + else + { + // Skip any deletions from s_seen_txn_and_unlogged_desision_set array. This + // is done to stop duplicate logging of txn updates. Although this won't + // stop duplicate logging of the decision record - which is tolerated since + // this should be much lower cost than logging updates from all txn's found + // in this scan. + it++; + } + } + } + + end = std::chrono::steady_clock::now(); + + if (updates_exist) + { + // Reset. + num_scans = 0; + } + else + { + num_scans ++; + } + + if (num_scans == c_max_consecutive_scans_with_no_updates || std::chrono::duration_cast(end - start).count() >= c_txn_group_timeout_us) + { + break; + } + } + + if (updates_exist) + { + if (txn_decisions.size() > 0) + { + s_log_handler->create_decision_record(txn_decisions); + } + + // Explicitly submit any remaining writes that are still not logged. + s_log_handler->submit_writes(should_wait_for_completion); + } +} + void server_t::handle_commit_txn( int*, size_t, session_event_t event, const void*, session_state_t old_state, session_state_t new_state) { @@ -784,6 +937,7 @@ void server_t::recover_db() s_persistent_store->destroy_persistent_store(); } s_persistent_store->open(); + recover_persistent_log(); s_persistent_store->recover(); } } @@ -796,6 +950,131 @@ void server_t::recover_db() } } +void server_t::flush_pending_writes_on_server_shutdown() +{ + while (true) + { + bool should_wait_for_completion = true; + persist_pending_writes(should_wait_for_completion); + if (s_txn_decision_not_queued_set.size() == 0) + { + break; + } + } +} + +void server_t::log_writer_handler() +{ + // Set up the epoll loop. + int epoll_fd = ::epoll_create1(0); + if (epoll_fd == -1) + { + throw_system_error(c_message_epoll_create1_failed); + } + + auto epoll_cleanup = make_scope_guard([&]() { close_fd(epoll_fd); }); + int registered_fds[] = {s_signal_log_write_eventfd, s_signal_decision_eventfd, s_do_write_batch_maintenance_eventfd, s_server_shutdown_eventfd}; + + for (int registered_fd : registered_fds) + { + epoll_event ev{}; + ev.events = EPOLLIN; + ev.data.fd = registered_fd; + if (-1 == ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, registered_fd, &ev)) + { + throw_system_error(c_message_epoll_ctl_failed); + } + } + epoll_event events[std::size(registered_fds)]; + + bool shutdown = false; + + while (!shutdown) + { + // Block on shutdown or to receive a new write request. + int ready_fd_count = ::epoll_wait(epoll_fd, events, std::size(events), -1); + if (ready_fd_count == -1) + { + if (errno == EINTR) + { + continue; + } + throw_system_error(c_message_epoll_wait_failed); + } + + for (int i = 0; i < ready_fd_count && !shutdown; ++i) + { + epoll_event ev = events[i]; + + if (ev.events & EPOLLERR) + { + if (ev.data.fd == s_signal_log_write_eventfd) + { + throw_system_error("Txn write eventfd error!"); + } + if (ev.data.fd == s_signal_decision_eventfd) + { + throw_system_error("Decision eventfd error!"); + } + else if (ev.data.fd == s_server_shutdown_eventfd) + { + throw_system_error("Shutdown eventfd error!"); + } + else if (ev.data.fd == s_do_write_batch_maintenance_eventfd) + { + throw_system_error("Validate persistence batch eventfd error!"); + } + else + { + ASSERT_UNREACHABLE(false, c_message_unexpected_fd); + } + } + + ASSERT_INVARIANT(ev.events == EPOLLIN, c_message_unexpected_event_type); + + // Signal to the persistence thread a batch was written to disk and the results of I/O operations + // need to be checked. + if (ev.data.fd == s_do_write_batch_maintenance_eventfd) + { + uint64_t val; + ssize_t bytes_read = ::read(s_do_write_batch_maintenance_eventfd, &val, sizeof(val)); + if (bytes_read == -1) + { + ASSERT_INVARIANT(errno == EAGAIN, "Unexpected error! read() is expected to fail with EAGAIN for a non-blocking eventfd."); + } + else + { + // Validate the results of a previously flushed batch only if read returns successfully. + s_log_handler->check_flushed_batch_results_and_do_maintenance(); + } + + persist_pending_writes(c_txn_group_timeout_us); + } + else if (ev.data.fd == s_signal_log_write_eventfd) + { + consume_eventfd(s_signal_log_write_eventfd); + persist_pending_writes(c_txn_group_timeout_us); + } + else if (ev.data.fd == s_signal_decision_eventfd) + { + consume_eventfd(s_signal_decision_eventfd); + persist_pending_writes(c_txn_group_timeout_us); + } + else if (ev.data.fd == s_server_shutdown_eventfd) + { + // Server shutdown: finish persisting any pending writes before exiting. + flush_pending_writes_on_server_shutdown(); + shutdown = true; + } + else + { + // We don't monitor any other fds. + ASSERT_UNREACHABLE(c_message_unexpected_fd); + } + } + } +} + gaia_txn_id_t server_t::begin_startup_txn() { // Reserve an index in the safe_ts array, so the main thread can execute @@ -1185,7 +1464,7 @@ void server_t::client_dispatch_handler(const std::string& socket_name) } else if (ev.data.fd == s_server_shutdown_eventfd) { - consume_eventfd(s_server_shutdown_eventfd); + read_eventfd(s_server_shutdown_eventfd); return; } else @@ -1258,7 +1537,7 @@ void server_t::session_handler(int session_socket) epoll_event events[std::size(fds)]; // Event to signal session-owned threads to terminate. - s_session_shutdown_eventfd = make_eventfd(); + s_session_shutdown_eventfd = make_single_reader_eventfd(); auto owned_threads_cleanup = make_scope_guard([]() { // Signal all session-owned threads to terminate. signal_eventfd_multiple_threads(s_session_shutdown_eventfd); @@ -1380,7 +1659,7 @@ void server_t::session_handler(int session_socket) else if (ev.data.fd == s_server_shutdown_eventfd) { ASSERT_INVARIANT(ev.events == EPOLLIN, "Expected EPOLLIN event type!"); - consume_eventfd(s_server_shutdown_eventfd); + read_eventfd(s_server_shutdown_eventfd); event = session_event_t::SERVER_SHUTDOWN; } else @@ -1616,7 +1895,7 @@ void server_t::stream_producer_handler( else if (ev.data.fd == cancel_eventfd) { ASSERT_INVARIANT(ev.events == EPOLLIN, c_message_unexpected_event_type); - consume_eventfd(cancel_eventfd); + read_eventfd(cancel_eventfd); producer_shutdown = true; } else @@ -2683,6 +2962,13 @@ bool server_t::txn_commit() // Register the committing txn under a new commit timestamp. gaia_txn_id_t commit_ts = submit_txn(s_txn_id, log_fd); + if (c_use_gaia_log_implementation) + { + // Signal to the persistence thread to write txn log to disk. The commit_ts value being + // signaled in this case is irrelevant since it is never used by the polling thread. + eventfd_write(s_signal_log_write_eventfd, static_cast(commit_ts)); + } + // This is only used for persistence. std::string txn_name; @@ -2704,25 +2990,69 @@ bool server_t::txn_commit() // Update the txn metadata with our commit decision. txn_metadata_t::update_txn_decision(commit_ts, is_committed); - // Persist the commit decision. - // REVIEW: We can return a decision to the client asynchronously with the - // decision being persisted (because the decision can be reconstructed from - // the durable log itself, without the decision record). - if (s_persistent_store) + ASSERT_INVARIANT(txn_metadata_t::is_txn_decided(commit_ts), "Txn decision not updated!"); + + if (c_use_gaia_log_implementation) { - // Mark txn as durable in metadata so we can GC the txn log. - // We only mark it durable after validation to simplify the - // state transitions: - // TXN_VALIDATING -> TXN_DECIDED -> TXN_DURABLE. - txn_metadata_t::set_txn_durable(commit_ts); + // Signal to the persistence thread to write txn decision to disk. + // Use another decision fd to not lose events. + eventfd_write(s_signal_decision_eventfd, static_cast(commit_ts)); - if (is_committed) + if (s_log_handler) { - s_persistent_store->append_wal_commit_marker(txn_name); + void* addr = transactions::txn_metadata_t::get_txn_metadata_entry_addr(commit_ts); + + // We are only interested in whether the persistence bit is set. + // futex_wait() will only wait if the int32_t at addr matches the expected value. + // The addr points to the last 32 bits of the txn metadata entry for commit_ts. + // expected_val is only relevant if durable bit is unset. + uint32_t expected_val = 0; + txn_metadata_entry_t txn_entry{transactions::txn_metadata_t::get_entry(commit_ts)}; + + if (!txn_entry.is_durable()) + { + + expected_val = static_cast(txn_entry >> 32); + } + else + { + // Txn is durable. Exit. + return is_committed; + } + + + // Wait if address pointed to by futex is the same as expected_val. + // Only want to wait if expected_val has the persistence bit unset. + // What if it has lost the futex wake? + // Futex wake can be lost if log writer thread marks the txn entry durable and then issues a wake call before futex_wait + // is even called by commit path. + // In that case, the futex_wait call will simply fail since expected_val is now different from value at addr. + // Futex WAKE only called after txn entry is updated. + // If we reach this point in the code, expected_val has durable bit unset. + common::futex_wait(addr, expected_val); + + ASSERT_POSTCONDITION(txn_metadata_t::is_txn_durable(commit_ts), "Txn should be durable."); } - else + } + else + { + // Persist the commit decision. + if (s_persistent_store) { - s_persistent_store->append_wal_rollback_marker(txn_name); + // Mark txn as durable in metadata so we can GC the txn log. + // We only mark it durable after validation to simplify the + // state transitions: + // TXN_VALIDATING -> TXN_DECIDED -> TXN_DURABLE. + txn_metadata_t::set_txn_durable(commit_ts); + + if (is_committed) + { + s_persistent_store->append_wal_commit_marker(txn_name); + } + else + { + s_persistent_store->append_wal_rollback_marker(txn_name); + } } } @@ -3175,7 +3505,7 @@ void server_t::run(server_config_t server_conf) while (true) { // Create eventfd shutdown event. - s_server_shutdown_eventfd = make_eventfd(); + s_server_shutdown_eventfd = make_single_reader_eventfd(); auto cleanup_shutdown_eventfd = make_scope_guard([]() { // We can't close this fd until all readers and writers have exited. // The only readers are the client dispatch thread and the session @@ -3185,6 +3515,25 @@ void server_t::run(server_config_t server_conf) close_fd(s_server_shutdown_eventfd); }); + // This eventfd does not require semaphore semantics since only a single batch can be flushed to disk at a time so we expect + // the eventfd counter to never exceed 1. + // The persistence batch should only be validated after a batch has finished flushing (or this will lead to + // premature cleaning of metadata in the in-flight persistence batch) + s_do_write_batch_maintenance_eventfd = make_multi_reader_eventfd(); + + // For the below two eventfd's, use non-semaphore semantics. With non-semaphore semantics, + // multiple threads could concurrently increment the eventfd to a value > 1 before it's + // serviced by the epoll loop, but a single read will reset the eventfd to 0, effectively + // debouncing the multiple notifications. + s_signal_log_write_eventfd = make_multi_reader_eventfd(); + s_signal_decision_eventfd = make_multi_reader_eventfd(); + + auto cleanup_persistence_eventfds = make_scope_guard([]() { + close_fd(s_signal_log_write_eventfd); + close_fd(s_signal_decision_eventfd); + close_fd(s_do_write_batch_maintenance_eventfd); + }); + // Block handled signals in this thread and subsequently spawned threads. sigset_t handled_signals = mask_signals(); @@ -3194,6 +3543,14 @@ void server_t::run(server_config_t server_conf) init_shared_memory(); + std::thread log_writer_thread; + if (!(s_server_conf.persistence_mode() == persistence_mode_t::e_disabled || + s_server_conf.persistence_mode() == persistence_mode_t::e_disabled_after_recovery)) + { + // Launch persistence thread. + log_writer_thread = std::thread(&log_writer_handler); + } + // Launch thread to listen for client connections and create session threads. std::thread client_dispatch_thread(client_dispatch_handler, server_conf.instance_name()); @@ -3207,6 +3564,12 @@ void server_t::run(server_config_t server_conf) // We shouldn't get here unless the signal handler thread has caught a signal. ASSERT_INVARIANT(caught_signal != 0, "A signal should have been caught!"); + if (!(s_server_conf.persistence_mode() == persistence_mode_t::e_disabled || + s_server_conf.persistence_mode() == persistence_mode_t::e_disabled_after_recovery)) + { + log_writer_thread.join(); + } + // We special-case SIGHUP to force reinitialization of the server. // This is only enabled if persistence is disabled, because otherwise // data would disappear on reset, only to reappear when the database is diff --git a/production/db/core/src/db_shared_data_client.cpp b/production/db/core/src/db_shared_data_client.cpp index 3070bde9bc83..91ff9bc66d2e 100644 --- a/production/db/core/src/db_shared_data_client.cpp +++ b/production/db/core/src/db_shared_data_client.cpp @@ -29,6 +29,11 @@ gaia::db::locators_t* gaia::db::get_locators_for_allocator() return gaia::db::get_locators(); } +size_t* get_current_txn_memory_size_bytes() +{ + return &gaia::db::client_t::s_txn_memory_size_bytes; +} + gaia::db::counters_t* gaia::db::get_counters() { // Since we don't use this accessor in the client itself, we can assert that diff --git a/production/db/core/src/db_shared_data_server.cpp b/production/db/core/src/db_shared_data_server.cpp index 39fe4548e678..ada7999cf4f5 100644 --- a/production/db/core/src/db_shared_data_server.cpp +++ b/production/db/core/src/db_shared_data_server.cpp @@ -31,6 +31,11 @@ gaia::db::counters_t* gaia::db::get_counters() return gaia::db::server_t::s_shared_counters.data(); } +size_t* get_current_txn_memory_size_bytes() +{ + return nullptr; +} + gaia::db::data_t* gaia::db::get_data() { // Since we don't use this accessor in the server itself, we can assert that diff --git a/production/db/core/src/exceptions.cpp b/production/db/core/src/exceptions.cpp index bf47b699cc5c..9577152eb1a0 100644 --- a/production/db/core/src/exceptions.cpp +++ b/production/db/core/src/exceptions.cpp @@ -52,6 +52,11 @@ out_of_memory_internal::out_of_memory_internal() m_message = "Out of memory."; } +transaction_memory_limit_exceeded_internal::transaction_memory_limit_exceeded_internal() +{ + m_message = "Transaction exceeded its memory limit."; +} + system_object_limit_exceeded_internal::system_object_limit_exceeded_internal() { m_message = "System object limit exceeded."; diff --git a/production/db/core/src/log_file.cpp b/production/db/core/src/log_file.cpp index 0156358fea02..a668cf7d3338 100644 --- a/production/db/core/src/log_file.cpp +++ b/production/db/core/src/log_file.cpp @@ -66,12 +66,12 @@ log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_ } } -size_t log_file_t::get_current_offset() +const file_offset_t log_file_t::get_current_offset() { return m_current_offset; } -int log_file_t::get_file_fd() +const int log_file_t::get_file_fd() { return m_file_fd; } @@ -81,7 +81,12 @@ void log_file_t::allocate(size_t size) m_current_offset += size; } -size_t log_file_t::get_remaining_bytes_count(size_t record_size) +const file_sequence_t log_file_t::get_file_sequence() +{ + return m_file_seq; +} + +const size_t log_file_t::get_bytes_remaining_after_append(size_t record_size) { ASSERT_INVARIANT(m_file_size > 0, "Preallocated file size should be greater than 0."); if (m_file_size < (m_current_offset + record_size)) diff --git a/production/db/core/src/log_io.cpp b/production/db/core/src/log_io.cpp new file mode 100644 index 000000000000..fd45ae96c713 --- /dev/null +++ b/production/db/core/src/log_io.cpp @@ -0,0 +1,362 @@ +///////////////////////////////////////////// +// Copyright (c) Gaia Platform LLC +// All rights reserved. +///////////////////////////////////////////// + +#include "log_io.hpp" + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "liburing.h" + +#include "gaia_internal/common/retail_assert.hpp" +#include "gaia_internal/common/scope_guard.hpp" +#include "gaia_internal/db/db_types.hpp" +#include +#include + +#include "crc32c.h" +#include "db_helpers.hpp" +#include "db_internal_types.hpp" +#include "db_object_helpers.hpp" +#include "log_file.hpp" +#include "mapped_data.hpp" +#include "memory_helpers.hpp" +#include "memory_types.hpp" +#include "txn_metadata.hpp" +#include "persistence_types.hpp" + +using namespace gaia::common; +using namespace gaia::db::memory_manager; +using namespace gaia::db; +using namespace toposort; + +namespace gaia +{ +namespace db +{ +namespace persistence +{ + +log_handler_t::log_handler_t(const std::string& wal_dir_path) +{ + auto dirpath = wal_dir_path; + ASSERT_PRECONDITION(!dirpath.empty(), "Gaia persistent directory path shouldn't be empty."); + s_wal_dir_path = dirpath.append(c_gaia_wal_dir_name); + + if (-1 == mkdir(s_wal_dir_path.c_str(), c_gaia_wal_dir_permissions) && errno != EEXIST) + { + throw_system_error("Unable to create persistent log directory"); + } + + if (-1 == open(s_wal_dir_path.c_str(), O_DIRECTORY)) + { + throw_system_error("Unable to open persistent log directory."); + } +} + +void log_handler_t::open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd) +{ + ASSERT_PRECONDITION(validate_flushed_batch_eventfd != -1, "Eventfd to signal post flush maintenance operations invalid!"); + ASSERT_INVARIANT(s_dir_fd != -1, "Unable to open data directory for persistent log writes."); + + // Create new log file every time the log_writer gets initialized. + m_async_disk_writer = std::make_unique(validate_flushed_batch_eventfd, signal_checkpoint_eventfd); + + m_async_disk_writer->open(); +} + +log_handler_t::~log_handler_t() +{ + close_fd(s_dir_fd); +} + +// Currently using the rocksdb impl. +// Todo(Mihir) - Research other crc libs. +uint32_t calculate_crc32(uint32_t init_crc, const void* data, size_t n) +{ + // This implementation uses the CRC32 instruction from the SSE4 (SSE4.2) instruction set if it is available. + // Otherwise, it defaults to a 4 table based lookup implementation. + // Here is an old benchmark that compares various crc implementations including the two used by rocks. + // https://www.strchr.com/crc32_popcnt + return rocksdb::crc32c::Extend(init_crc, static_cast(data), n); +} + +file_offset_t log_handler_t::allocate_log_space(size_t payload_size) +{ + // For simplicity, we don't break up transaction records across log files. Txn updates + // which don't fit in the current file are written to the next one. + // If a transaction is larger than the log file size, then the entire txn is written to the next log file. + // Another simplification is that an async_write_batch contains only writes belonging to a single log file. + if (!m_current_file) + { + auto file_size = (payload_size > c_max_log_file_size_in_bytes) ? payload_size : c_max_log_file_size_in_bytes; + m_current_file.reset(); + m_current_file = std::make_unique(s_wal_dir_path, s_dir_fd, s_file_num, file_size); + } + else if (m_current_file->get_bytes_remaining_after_append(payload_size) <= 0) + { + m_async_disk_writer->perform_file_close_operations(m_current_file->get_file_fd(), m_current_file->get_file_sequence()); + + // One batch writes to a single log file at a time. + m_async_disk_writer->submit_and_swap_in_progress_batch(m_current_file->get_file_fd()); + + m_current_file.reset(); + + // Open new file. + s_file_num++; + auto file_size = (payload_size > c_max_log_file_size_in_bytes) ? payload_size : c_max_log_file_size_in_bytes; + m_current_file = std::make_unique(s_wal_dir_path, s_dir_fd, s_file_num, file_size); + } + + auto current_offset = m_current_file->get_current_offset(); + m_current_file->allocate(payload_size); + + // Return starting offset of the allocation. + return current_offset; +} + +void log_handler_t::create_decision_record(const decision_list_t& txn_decisions) +{ + ASSERT_PRECONDITION(!txn_decisions.empty(), "Decision record cannot have empty payload."); + + // Track decisions per batch. + m_async_disk_writer->add_decisions_to_batch(txn_decisions); + + // Create decision record and enqueue a pwrite() request for the same. + std::vector writes_to_submit; + size_t txn_decision_size = txn_decisions.size() * (sizeof(decision_entry_t)); + size_t total_log_space_needed = txn_decision_size + sizeof(record_header_t); + allocate_log_space(total_log_space_needed); + + // Create log record header. + record_header_t header{}; + header.crc = c_crc_initial_value; + header.payload_size = total_log_space_needed; + header.decision_count = txn_decisions.size(); + header.txn_commit_ts = c_invalid_gaia_txn_id; + header.record_type = record_type_t::decision; + + // Compute crc. + crc32_t txn_crc = 0; + txn_crc = calculate_crc32(txn_crc, &header, sizeof(record_header_t)); + txn_crc = calculate_crc32(txn_crc, txn_decisions.data(), txn_decision_size); + + ASSERT_INVARIANT(txn_crc != 0, "CRC cannot be zero."); + header.crc = txn_crc; + + // Copy information which needs to survive for the entire batch lifetime into the metadata buffer. + auto header_ptr = m_async_disk_writer->copy_into_metadata_buffer(&header, sizeof(record_header_t), m_current_file->get_file_fd()); + auto txn_decisions_ptr = m_async_disk_writer->copy_into_metadata_buffer(txn_decisions.data(), txn_decision_size, m_current_file->get_file_fd()); + + writes_to_submit.push_back({header_ptr, sizeof(record_header_t)}); + writes_to_submit.push_back({txn_decisions_ptr, txn_decision_size}); + + m_async_disk_writer->enqueue_pwritev_requests(writes_to_submit, m_current_file->get_file_fd(), m_current_file->get_current_offset(), uring_op_t::pwritev_decision); +} + +void create_node_for_topsort( + graph& chunk_offset_graph, + std::unordered_map& chunk_info_map, + chunk_offset_t chunk_offset, + gaia_offset_t gaia_offset) +{ + if (chunk_info_map.count(chunk_offset) == 0) + { + auto node_ptr = chunk_offset_graph.create_node(std::move(chunk_offset)); + chunk_info_helper_t chunk_info_helper; + chunk_info_helper.node_ptr = node_ptr; + chunk_info_helper.offsets.min_offset = gaia_offset; + chunk_info_helper.offsets.max_offset = gaia_offset; + chunk_info_map.insert(std::pair(chunk_offset, chunk_info_helper)); + } +} + +void log_handler_t::process_txn_log_and_write(int txn_log_fd, gaia_txn_id_t commit_ts) +{ + // Map in memory txn_log. + mapped_log_t log; + log.open(txn_log_fd); + + std::vector deleted_ids; + + // The txn log records are sorted by locator but preserve the order of multiple updates to the + // same locator. Use topsort to find the 'almost' original chunk order from the relative chunk + // ordering present in the sorted txn log. If updates for a certain locator aren't made across + // chunks, then the order in which chunks are written to the persistent log is not important. + graph chunk_offset_graph; + + // Used to avoid dups in graph. + std::unordered_map chunk_info_map; + std::set> dependencies; + + for (size_t i = 0; i < log.data()->record_count; i++) + { + auto lr = log.data()->log_records[i]; + + if (lr.operation() == gaia_operation_t::remove) + { + deleted_ids.push_back(offset_to_ptr(lr.old_offset)->id); + } + else + { + chunk_offset_t chunk_offset = memory_manager::chunk_from_offset(lr.new_offset); + create_node_for_topsort(chunk_offset_graph, chunk_info_map, chunk_offset, lr.new_offset); + + if (i+1 != log.data()->record_count - 1) + { + auto lr_plus_1 = log.data()->log_records[i + 1]; + chunk_offset_t chunk_offset_plus_1 = memory_manager::chunk_from_offset(lr_plus_1.new_offset); + + if (lr.locator == lr_plus_1.locator) + { + // Locator objects across two different chunks. + if (chunk_offset != chunk_offset_plus_1) + { + // Create node. + create_node_for_topsort(chunk_offset_graph, chunk_info_map, chunk_offset_plus_1, lr_plus_1.new_offset); + + // Add dependency. + auto node_ptr_plus_1 = chunk_info_map.find(chunk_offset_plus_1)->second; + if (dependencies.count({chunk_offset_plus_1, chunk_offset}) == 0) + { + node_ptr_plus_1.node_ptr->depends_on(chunk_info_map.find(chunk_offset)->second.node_ptr); + } + } + } + } + + // Simply update min & max offset for the chunk. + auto it = chunk_info_map.find(chunk_offset); + if (lr.new_offset < it->second.offsets.min_offset) + { + it->second.offsets.min_offset = lr.new_offset; + } + else if (lr.new_offset > it->second.offsets.max_offset) + { + it->second.offsets.max_offset = lr.new_offset; + } + } + } + + // Get chunk order. + graph::node_list nodes = chunk_offset_graph.sort().sorted_nodes; + + std::vector contiguous_offsets; + + for (graph::node_ptr node_ptr : nodes) + { + auto it = chunk_info_map.find(node_ptr.get()->data); + auto end_offset = it->second.offsets.max_offset; + auto payload_size = offset_to_ptr(end_offset)->payload_size + c_db_object_header_size; + size_t allocation_size = memory_manager::calculate_allocation_size_in_slots(payload_size) * c_slot_size_in_bytes; + contiguous_offsets_t offset_pair{it->second.offsets.min_offset, end_offset + allocation_size}; + contiguous_offsets.push_back(offset_pair); + } + + if (deleted_ids.size() > 0 || contiguous_offsets.size() > 0) + { + create_txn_record(commit_ts, record_type_t::txn, contiguous_offsets, deleted_ids); + } +} + +void log_handler_t::check_flushed_batch_results_and_do_maintenance() +{ + m_async_disk_writer->perform_post_completion_maintenance(); +} + +void log_handler_t::submit_writes(bool should_wait_for_completion) +{ + m_async_disk_writer->submit_and_swap_in_progress_batch(m_current_file->get_file_fd(), should_wait_for_completion); +} + +void log_handler_t::create_txn_record( + gaia_txn_id_t commit_ts, + record_type_t type, + std::vector& contiguous_offsets, + std::vector& deleted_ids) +{ + ASSERT_PRECONDITION(!deleted_ids.empty() || !contiguous_offsets.empty(), "Txn record cannot have empty payload."); + + std::vector writes_to_submit; + + // Reserve iovec to store header for the log record. + struct iovec header_entry = {nullptr, 0}; + writes_to_submit.push_back(header_entry); + + // Create iovec entries. + size_t payload_size = 0; + for (auto offset_pair : contiguous_offsets) + { + auto ptr = offset_to_ptr(offset_pair.min_offset); + auto chunk_size = offset_pair.max_offset - offset_pair.min_offset; + payload_size += chunk_size; + writes_to_submit.push_back({ptr, chunk_size}); + } + + // Augment payload size with the size of deleted ids. + auto deleted_size = deleted_ids.size() * sizeof(gaia_id_t); + payload_size += deleted_size; + + // Calculate total log space needed. + auto total_log_space_needed = payload_size + sizeof(record_header_t); + + // Allocate log space. + auto begin_offset = allocate_log_space(total_log_space_needed); + + // Create header. + record_header_t header; + header.crc = c_crc_initial_value; + header.payload_size = total_log_space_needed; + header.deleted_object_count = deleted_ids.size(); + header.txn_commit_ts = commit_ts; + header.record_type = type; + + // Calculate CRC. + auto txn_crc = calculate_crc32(0, &header, sizeof(record_header_t)); + + // Start from 1 to skip CRC calculation for the first entry. + for (size_t i = 1; i < writes_to_submit.size(); i++) + { + txn_crc = calculate_crc32(txn_crc, writes_to_submit.at(i).iov_base, writes_to_submit.at(i).iov_len); + } + + // Augment CRC calculation with set of deleted object IDs. + txn_crc = calculate_crc32(txn_crc, deleted_ids.data(), deleted_size); + + // Update CRC in header before sending it to the async_disk_writer. + ASSERT_INVARIANT(txn_crc != 0, "Computed CRC cannot be zero."); + header.crc = txn_crc; + + // Copy the header into the metadata buffer as it needs to survive the lifetime of the async_write_batch it is a part of. + auto header_ptr = m_async_disk_writer->copy_into_metadata_buffer(&header, sizeof(record_header_t), m_current_file->get_file_fd()); + + // Update the first iovec entry with the header information. + writes_to_submit.at(0).iov_base = header_ptr; + writes_to_submit.at(0).iov_len = sizeof(record_header_t); + + // Allocate space for deleted writes in the metadata buffer. + if (!deleted_ids.empty()) + { + auto deleted_id_ptr = m_async_disk_writer->copy_into_metadata_buffer(deleted_ids.data(), deleted_size, m_current_file->get_file_fd()); + writes_to_submit.push_back({deleted_id_ptr, deleted_size}); + } + + // Finally send I/O requests to the async_disk_writer. + m_async_disk_writer->enqueue_pwritev_requests(writes_to_submit, m_current_file->get_file_fd(), begin_offset, uring_op_t::pwritev_txn); +} + +} // namespace persistence +} // namespace db +} // namespace gaia diff --git a/production/db/core/src/persistent_store_manager.cpp b/production/db/core/src/persistent_store_manager.cpp index 31e30b7f324e..a315a4f3e0bc 100644 --- a/production/db/core/src/persistent_store_manager.cpp +++ b/production/db/core/src/persistent_store_manager.cpp @@ -9,12 +9,15 @@ #include +#include "gaia_internal/common/retail_assert.hpp" #include "gaia_internal/common/system_table_types.hpp" #include "gaia_internal/db/db_object.hpp" #include "gaia_internal/db/db_types.hpp" #include "db_helpers.hpp" #include "db_internal_types.hpp" +#include "log_io.hpp" +#include "persistence_types.hpp" #include "rdb_object_converter.hpp" #include "rdb_wrapper.hpp" diff --git a/production/db/inc/memory_manager/memory_helpers.hpp b/production/db/inc/memory_manager/memory_helpers.hpp index d5e01d0a8388..3849cd43b018 100644 --- a/production/db/inc/memory_manager/memory_helpers.hpp +++ b/production/db/inc/memory_manager/memory_helpers.hpp @@ -27,6 +27,8 @@ inline gaia_offset_t offset_from_chunk_and_slot( inline void* page_address_from_offset(gaia_offset_t offset); +inline size_t calculate_allocation_size_in_slots(size_t allocation_size_in_bytes); + // Converts a slot offset to its bitmap index. inline size_t slot_to_bit_index(slot_offset_t slot_offset); diff --git a/production/db/inc/memory_manager/memory_helpers.inc b/production/db/inc/memory_manager/memory_helpers.inc index 48ae88e682cc..f46202f3f6d0 100644 --- a/production/db/inc/memory_manager/memory_helpers.inc +++ b/production/db/inc/memory_manager/memory_helpers.inc @@ -23,6 +23,27 @@ slot_offset_t slot_from_offset(gaia_offset_t offset) return static_cast(offset & mask); } +size_t calculate_allocation_size_in_slots(size_t allocation_size_in_bytes) +{ + ASSERT_PRECONDITION(allocation_size_in_bytes > 0, "Requested allocation size cannot be 0!"); + + // Check before converting to slot units to avoid overflow. + ASSERT_PRECONDITION( + allocation_size_in_bytes <= (c_max_allocation_size_in_slots * c_slot_size_in_bytes), + "Requested allocation size exceeds maximum allocation size of 64KB!"); + + // Calculate allocation size in slot units. +#ifdef DEBUG + // Round up allocation to a page so we can mprotect() it. + size_t allocation_size_in_pages = (allocation_size_in_bytes + c_page_size_in_bytes - 1) / c_page_size_in_bytes; + size_t allocation_size_in_slots = allocation_size_in_pages * (c_page_size_in_bytes / c_slot_size_in_bytes); +#else + size_t allocation_size_in_slots = (allocation_size_in_bytes + c_slot_size_in_bytes - 1) / c_slot_size_in_bytes; +#endif + + return allocation_size_in_slots; +} + gaia_offset_t offset_from_chunk_and_slot( chunk_offset_t chunk_offset, slot_offset_t slot_offset) { diff --git a/production/db/memory_manager/src/chunk_manager.cpp b/production/db/memory_manager/src/chunk_manager.cpp index 2b7508baee86..fe02690f50b6 100644 --- a/production/db/memory_manager/src/chunk_manager.cpp +++ b/production/db/memory_manager/src/chunk_manager.cpp @@ -91,21 +91,7 @@ gaia_offset_t chunk_manager_t::allocate( get_state() == chunk_state_t::in_use, "Objects can only be allocated from a chunk in the IN_USE state!"); - ASSERT_PRECONDITION(allocation_size_in_bytes > 0, "Requested allocation size cannot be 0!"); - - // Check before converting to slot units to avoid overflow. - ASSERT_PRECONDITION( - allocation_size_in_bytes <= (c_max_allocation_size_in_slots * c_slot_size_in_bytes), - "Requested allocation size exceeds maximum allocation size of 64KB!"); - - // Calculate allocation size in slot units. -#ifdef DEBUG - // Round up allocation to a page so we can mprotect() it. - size_t allocation_size_in_pages = (allocation_size_in_bytes + c_page_size_in_bytes - 1) / c_page_size_in_bytes; - size_t allocation_size_in_slots = allocation_size_in_pages * (c_page_size_in_bytes / c_slot_size_in_bytes); -#else - size_t allocation_size_in_slots = (allocation_size_in_bytes + c_slot_size_in_bytes - 1) / c_slot_size_in_bytes; -#endif + size_t allocation_size_in_slots = calculate_allocation_size_in_slots(allocation_size_in_bytes); // Ensure that the new allocation doesn't overflow the chunk. if (m_metadata->min_unallocated_slot_offset() + allocation_size_in_slots > c_last_slot_offset) diff --git a/production/inc/gaia/exceptions.hpp b/production/inc/gaia/exceptions.hpp index c60c6cb67b9e..05d715ae6850 100644 --- a/production/inc/gaia/exceptions.hpp +++ b/production/inc/gaia/exceptions.hpp @@ -293,6 +293,16 @@ class out_of_memory : public common::gaia_exception { }; +/** + * \brief The transaction exceeded its memory limit. + * + * A transaction can use at most 64MB of memory. + */ +class transaction_memory_limit_exceeded : public common::gaia_exception +{ +}; + + /** * \brief The transaction tried to create more objects than are permitted in the system. * diff --git a/production/inc/gaia_internal/common/fd_helpers.hpp b/production/inc/gaia_internal/common/fd_helpers.hpp index 8323d65fff19..e0b9ce8ccd70 100644 --- a/production/inc/gaia_internal/common/fd_helpers.hpp +++ b/production/inc/gaia_internal/common/fd_helpers.hpp @@ -169,7 +169,7 @@ inline size_t read_fd_at_offset( return bytes_read; } -inline int make_eventfd() +inline int make_single_reader_eventfd() { // Create eventfd shutdown event. // Linux is non-POSIX-compliant and sometimes marks an fd as readable @@ -197,6 +197,21 @@ inline int make_eventfd() return eventfd; } +/** + * Create an eventfd only with the EFD_NONBLOCK flag. + */ +inline int make_multi_reader_eventfd() +{ + int eventfd = ::eventfd(0, EFD_NONBLOCK); + if (eventfd == -1) + { + int err = errno; + const char* reason = ::explain_eventfd(0, EFD_NONBLOCK); + throw system_error(reason, err); + } + return eventfd; +} + inline void signal_eventfd(int eventfd, uint64_t efd_counter_val) { ssize_t bytes_written = ::write(eventfd, &efd_counter_val, sizeof(efd_counter_val)); @@ -227,9 +242,11 @@ inline void signal_eventfd_multiple_threads(int eventfd) signal_eventfd(eventfd, c_max_semaphore_count); } -inline void consume_eventfd(int eventfd) +/** + * Simply return eventfd counter value. The returned value may be greater than 1. + */ +inline uint64_t consume_eventfd(int eventfd) { - // We should always read the value 1 from a semaphore eventfd. uint64_t val; ssize_t bytes_read = ::read(eventfd, &val, sizeof(val)); if (bytes_read == -1) @@ -239,6 +256,14 @@ inline void consume_eventfd(int eventfd) throw system_error(reason, err); } ASSERT_POSTCONDITION(bytes_read == sizeof(val), "Failed to fully read data!"); + return val; +} + +inline void read_eventfd(int eventfd) +{ + // We should always read the value 1, whether the eventfd was signaled with + // signal_eventfd_single_thread() or signal_eventfd_multiple_threads(). + uint64_t val = consume_eventfd(eventfd); ASSERT_POSTCONDITION(val == 1, "Unexpected value!"); } diff --git a/production/inc/gaia_internal/common/futex_helpers.hpp b/production/inc/gaia_internal/common/futex_helpers.hpp new file mode 100644 index 000000000000..b199ac18f9d0 --- /dev/null +++ b/production/inc/gaia_internal/common/futex_helpers.hpp @@ -0,0 +1,70 @@ +///////////////////////////////////////////// +// Copyright (c) Gaia Platform LLC +// All rights reserved. +///////////////////////////////////////////// + +#include /* Definition of FUTEX_* constants */ +#include /* Definition of SYS_* constants */ +#include + +namespace gaia +{ +/** + * \addtogroup gaia + * @{ + */ +namespace common +{ +/** + * \addtogroup common + * @{ + */ + +/** + * Note: glibc provides no wrapper for futex(), necessitating the use of syscall(2). + * https://man7.org/linux/man-pages/man2/futex.2.html + * http://locklessinc.com/articles/futex_cheat_sheet/ + * + * TODO: Get rid of this when the minimum Kernel version is 5.16 where the futex2 API + * allows 64 bit futexes. + */ +inline int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) +{ + return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); +} + +/** + * This operation tests that the value at the futex word + * pointed to by the address addr still contains the + * expected value val, and if so, then sleeps waiting for a + * FUTEX_WAKE operation on the futex word. This set of operations is atomic. + * Until we move to Linux 5.16 only 32 bit futexes are supported. + * + * Since the most common case of using a Futex is to refer to a process-internal lock, + * there is an optimization available here. If the FUTEX_PRIVATE_FLAG is set on the multiplex + * function number passed to the syscall, then the kernel will assume that the Futex is + * private to the process (saving on some internal locking) + */ +inline int futex_wait(void *addr, int val) +{ + return syscall(SYS_futex, addr, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr, nullptr, nullptr); +} + +/** + * Taking the address corresponding to the wait-queue in addr1 and the number to wake in val1. + * If you wish to wake all the sleeping threads, simply pass INT_MAX to this function. + * + * Since the most common case of using a Futex is to refer to a process-internal lock, + * there is an optimization available here. If the FUTEX_PRIVATE_FLAG is set on the multiplex + * function number passed to the syscall, then the kernel will assume that the Futex is + * private to the process (saving on some internal locking) + */ +inline int futex_wake(void *addr1, int val1) +{ + return syscall(SYS_futex, addr1, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, val1, nullptr, nullptr, nullptr); +} + +/*@}*/ +} // namespace common +/*@}*/ +} // namespace gaia diff --git a/production/inc/gaia_internal/common/topological_sort.hpp b/production/inc/gaia_internal/common/topological_sort.hpp new file mode 100644 index 000000000000..598b5d194551 --- /dev/null +++ b/production/inc/gaia_internal/common/topological_sort.hpp @@ -0,0 +1,244 @@ +///////////////////////////////////////////// +// Copyright (c) Gaia Platform LLC +// All rights reserved. +///////////////////////////////////////////// + +/** + * topological_sort.hpp + * + * Implements topological sorting (ordering) with arbitrary data. + * + * Licensed under the MIT License: + * + * Copyright (c) 2021 Cemalettin Dervis + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +#pragma once + +#include +#include +#include + +namespace gaia +{ +namespace common +{ +namespace toposort +{ + +/** + * Represents a graph that should be sorted. + * + * @param T The type of data a node inside the graph should store. + */ +template class graph +{ +public: + class node; + template using shared_ptr_t = std::shared_ptr; + template using vector_t = std::vector; + + using node_ptr = shared_ptr_t; + using node_list = vector_t; + + /** + * Represents a node inside a graph. + * A node stores arbitrary data and a list of its dependencies. + */ + class node final + { + friend class graph; + + public: + node(T&& data) + : m_Data(std::forward(data)) + { + } + + node(const graph&) = delete; + + void operator=(const graph&) = delete; + + node(graph&&) = delete; + + void operator=(graph&&) = delete; + + ~node() noexcept = default; + + /** + * Declares that this node depends on another node. + * + * @param node The node that this node depends on. + */ + void depends_on(node_ptr node) + { + m_Children.push_back(std::move(node)); + } + + /** + * Gets the node's stored data. + */ + const T& data() const + { + return m_Data; + } + + private: + T m_Data{}; + node_list m_Children; + }; + + /** + * Defines the result of a sorting operation. + */ + class result + { + public: + explicit operator bool() const + { + return !sorted_nodes.empty(); + } + + node_list sorted_nodes; + + // If the graph contains a cyclic dependency, this is the first node in + // question. + node_ptr cyclic_a; + + // If the graph contains a cyclic dependency, this is the second node in + // question. + node_ptr cyclic_b; + }; + + /** + * Clears the graph of all created nodes. + */ + void clear() + { + m_Nodes.clear(); + } + + /** + * Creates a node that stores a specific value and does not have any + * dependencies. Dependencies can be declared using the depends_on() method on + * the returned node. + * + * @param value The value that should be stored by the node. + */ + node_ptr create_node(T&& value) + { + m_Nodes.push_back(std::make_shared(std::forward(value))); + return m_Nodes.back(); + } + + /** + * Sorts the graph and returns the results. + */ + result sort() + { + result res{}; + + if (!m_Nodes.empty()) + { + node_ref_list dead; + dead.reserve(m_Nodes.size()); + + node_ref_list pending; + pending.reserve(2u); + + if (!visit(m_Nodes, dead, pending, res)) + res = {}; + + clear(); + } + + return res; + } + + private: + // For temporary lists, we can use raw pointers instead of copying + // around shared pointers. + using node_ref_list = vector_t; + + static bool visit(node_list& graph, node_ref_list& dead, + node_ref_list& pending, result& res) + { + for (const auto& n : graph) + { + const auto getNIterator = [&n](const node_ref_list& list) + { + return std::find_if(list.cbegin(), list.cend(), + [&n](const node* e) { return e == n.get(); }); + }; + + const auto containsN = [&getNIterator](const node_ref_list& list) + { + return getNIterator(list) != list.cend(); + }; + + if (containsN(dead)) + { + continue; + } + + if (containsN(pending)) + { + const auto itPtr = std::find_if(graph.cbegin(), graph.cend(), + [&pending](const node_ptr& e) + { + return e.get() == pending.back(); + }); + + res.cyclic_a = n; + res.cyclic_b = *itPtr; + + return false; + } + else + { + pending.push_back(n.get()); + } + + if (!visit(n->m_Children, dead, pending, res)) + { + return false; + } + + const auto it = getNIterator(pending); + + if (it != pending.end()) + { + pending.erase(it); + } + + dead.push_back(n.get()); + res.sorted_nodes.push_back(n); + } + + return true; + } + node_list m_Nodes; +}; + +} // namespace toposort +} // namespace common +} // namespace gaia diff --git a/production/inc/gaia_internal/exceptions.hpp b/production/inc/gaia_internal/exceptions.hpp index fe6d18f5b4ea..4df92d917fd1 100644 --- a/production/inc/gaia_internal/exceptions.hpp +++ b/production/inc/gaia_internal/exceptions.hpp @@ -221,6 +221,12 @@ class out_of_memory_internal : public out_of_memory out_of_memory_internal(); }; +class transaction_memory_limit_exceeded_internal : public transaction_memory_limit_exceeded +{ +public: + transaction_memory_limit_exceeded_internal(); +}; + class system_object_limit_exceeded_internal : public system_object_limit_exceeded { public: