-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce log writer APIs (log_io.cpp, log_io.hpp) #1144
Changes from 1 commit
e4d426b
e9407d9
b72466a
0508152
cbdad1d
5fe5efa
f0d039c
baa1012
1c6fd3b
734b736
1b5b83a
8c7e7c5
1a96167
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,7 +37,7 @@ namespace persistence | |
class async_disk_writer_t | ||
{ | ||
public: | ||
async_disk_writer_t(int validate_flushed_batch_efd, int signal_checkpoint_efd); | ||
async_disk_writer_t(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd); | ||
|
||
~async_disk_writer_t(); | ||
|
||
|
@@ -98,43 +98,43 @@ class async_disk_writer_t | |
/** | ||
* Copy any temporary writes (which don't exist in gaia shared memory) into the metadata buffer. | ||
*/ | ||
unsigned char* copy_into_metadata_buffer(void* source, size_t size, int file_fd); | ||
unsigned char* copy_into_metadata_buffer(const void* source, size_t size, int file_fd); | ||
|
||
/** | ||
* Perform maintenance actions on in_flight batch after all of its IO entries have been processed. | ||
*/ | ||
void perform_post_completion_maintenance(); | ||
|
||
void add_decisions_to_batch(decision_list_t& decisions); | ||
void add_decisions_to_batch(const decision_list_t& decisions); | ||
|
||
/** | ||
* For each commit ts, keep track of the eventfd which the session thread blocks on. Once the txn | ||
* has been made durable, this eventfd is written to so that the session thread can make progress and | ||
* return commit decision to the client. | ||
*/ | ||
void map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_efd); | ||
void map_commit_ts_to_session_decision_eventfd(gaia_txn_id_t commit_ts, int session_decision_eventfd); | ||
|
||
private: | ||
// Reserve slots in the in_progress batch to be able to append additional operations to it (before it gets submitted to the kernel) | ||
static constexpr size_t c_submit_batch_sqe_count = 3; | ||
static constexpr size_t c_single_submission_entry_count = 1; | ||
static constexpr size_t c_async_batch_size = 32; | ||
static constexpr size_t c_max_iovec_array_size_bytes = IOV_MAX * sizeof(iovec); | ||
static inline eventfd_t c_default_flush_efd_value = 1; | ||
static inline iovec c_default_iov = {static_cast<void*>(&c_default_flush_efd_value), sizeof(eventfd_t)}; | ||
static inline eventfd_t c_default_flush_eventfd_value = 1; | ||
static inline iovec c_default_iov = {static_cast<void*>(&c_default_flush_eventfd_value), sizeof(eventfd_t)}; | ||
|
||
// eventfd to signal that a batch flush has completed. | ||
// Used to block new writes to disk when a batch is already getting flushed. | ||
static inline int s_flush_efd = -1; | ||
static inline int s_flush_eventfd = -1; | ||
|
||
// eventfd to signal that the IO results belonging to a batch are ready to be validated. | ||
int m_validate_flush_efd = -1; | ||
int m_validate_flush_eventfd = -1; | ||
|
||
// eventfd to signal that a file is ready to be checkpointed. | ||
int m_signal_checkpoint_efd = -1; | ||
int m_signal_checkpoint_eventfd = -1; | ||
|
||
// Keep track of session threads to unblock. | ||
std::unordered_map<gaia_txn_id_t, int> m_ts_to_session_decision_fd_map; | ||
std::unordered_map<gaia_txn_id_t, int> m_ts_to_session_decision_eventfd_map; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this data structure only accessed by a single thread? (Obviously it's not threadsafe.) |
||
|
||
// Writes are batched and we maintain two buffers so that writes to a buffer | ||
// can still proceed when the other buffer is getting flushed to disk. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,8 +83,13 @@ constexpr size_t c_max_locators = (1ULL << 32) - 1; | |
// similarly optimized by substituting locators for gaia_ids. | ||
constexpr size_t c_hash_buckets = 1ULL << 20; | ||
|
||
// This is arbitrary, but we need to keep txn logs to a reasonable size. | ||
constexpr size_t c_max_log_records = 1ULL << 20; | ||
// Track maximum number of new chunks (apart from the one that the txn is already using) | ||
// that can be allocated per transaction. | ||
// This sets an upper bound on txn size: 32MB < txn_size < 36MB | ||
constexpr size_t c_max_chunks_per_txn = 8; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comments elsewhere: based on existing limits (including those in my pending changes for txn log storage), I think we need to allow up to 2^10 chunks/txn. Also, the max txn log size in bytes is now 1MB (2^16 16-byte records). |
||
|
||
// 8 chunks can hold up to 8 * (2^16 - 2^8) = 522240 64B objects, | ||
constexpr size_t c_max_log_records = 522240; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you express this value in terms of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, this will conflict with my current changes (which limit txn logs to 2^16 records), so I think we can postpone discussion until after those are merged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the meantime, here are some numbers to consider: we can address up to 2^16 txn logs in metadata (16 bits to embed an fd/offset), and we can have at most 2^32 live object versions (we have 2^38 bytes in the data segment and objects are at least 2^6 bytes). I think it makes sense to limit the total number of txn records to the max number of live versions (otherwise some records must be addressing versions that don't exist), so we also can have at most 2^32 txn log records, so to ensure this limit is respected we can have at most 2^16 records/txn log. That's a bit smaller than I'd like, but I think the consistency arguments above are compelling. We can discuss if this limit is actually adequate for customer use cases (ditto our 64KB object size limit). Given the above, we can address at most 2^32 bytes of new object versions in a txn log: max object size is 2^16 bytes and at most 2^16 new versions can be addressed in a txn log. Chunk size is 2^22 bytes, so that means that a txn log can address at most 2^10 chunks. Given 16-bit chunk IDs, that's 2KB for chunk ID storage, or half a 4KB page. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, I think my current changes to redesign in-memory txn log storage unfortunately need to be merged before these changes, even though these changes are further along, because the scalability issues with mmap() revealed in benchmarking seem to have much more perf impact than any deficiencies of the current persistence implementation. |
||
|
||
// This is an array of offsets in the data segment corresponding to object | ||
// versions, where each array index is referred to as a "locator." | ||
|
@@ -108,6 +113,8 @@ struct txn_log_t | |
// convenient place for shared state between the client and server. | ||
memory_manager::chunk_offset_t current_chunk; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI I'm removing this in my current changes (it never worked anyway for its intended purpose as shared state). |
||
size_t record_count; | ||
int session_decision_eventfd; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it makes sense to put a datum that is valid for only one process (i.e. an fd) in shared-memory state. We should discuss alternatives, e.g. registering this eventfd as part of txn submission, or registering some general index into an array of session shared state (which I think we will need anyway for reliable recovery from crashed sessions). |
||
size_t chunk_count; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you know, we can always infer the set of chunks used in a txn by scanning the offsets in the txn log (which we need to do anyway during conflict detection), so we should discuss if this is necessary. |
||
|
||
struct log_record_t | ||
{ | ||
|
@@ -135,6 +142,7 @@ struct txn_log_t | |
}; | ||
|
||
log_record_t log_records[c_max_log_records]; | ||
gaia_offset_t chunks[c_max_chunks_per_txn]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I mentioned, the set of chunks is already implicit in the offsets, so we need to discuss whether explicitly storing it is necessary or if it can be reconstructed on the fly from a scan. |
||
|
||
friend std::ostream& operator<<(std::ostream& os, const txn_log_t& l) | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,12 +34,12 @@ class log_file_t | |
/** | ||
* Obtain offset to write the next log record at. | ||
*/ | ||
size_t get_current_offset(); | ||
file_offset_t get_current_offset(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be |
||
|
||
/** | ||
* Get remaining space in persistent log file. | ||
*/ | ||
size_t get_remaining_bytes_count(size_t record_size); | ||
size_t get_bytes_remaining_after_append(size_t record_size); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be |
||
|
||
/** | ||
* Allocate space in persistent log file. | ||
|
@@ -48,6 +48,11 @@ class log_file_t | |
|
||
int get_file_fd(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be |
||
|
||
/** | ||
* Obtain sequence number for the file. | ||
*/ | ||
file_sequence_t get_file_sequence(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be |
||
|
||
private: | ||
size_t m_file_size; | ||
file_sequence_t m_file_seq; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
///////////////////////////////////////////// | ||
// Copyright (c) Gaia Platform LLC | ||
// All rights reserved. | ||
///////////////////////////////////////////// | ||
|
||
#pragma once | ||
|
||
#include <fcntl.h> | ||
#include <unistd.h> | ||
|
||
#include <cstddef> | ||
|
||
#include <atomic> | ||
#include <functional> | ||
#include <memory> | ||
#include <string> | ||
#include <unordered_map> | ||
|
||
#include "gaia/common.hpp" | ||
|
||
#include "gaia_internal/db/db_object.hpp" | ||
|
||
#include "async_disk_writer.hpp" | ||
#include "db_internal_types.hpp" | ||
#include "log_file.hpp" | ||
#include "memory_manager.hpp" | ||
|
||
namespace gaia | ||
{ | ||
namespace db | ||
{ | ||
namespace persistence | ||
{ | ||
|
||
/* | ||
* Fill the record_header.crc field with CRC_INITIAL_VALUE when | ||
* computing the checksum: crc32c is vulnerable to 0-prefixing, | ||
* so we make sure the initial bytes are non-zero. | ||
*/ | ||
static constexpr crc32_t c_crc_initial_value = ((uint32_t)-1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me paste a note that maybe you could paraphrase as a comment in case it's unclear whether CRC-32 is sufficient for the log size:
|
||
|
||
class log_handler_t | ||
{ | ||
public: | ||
explicit log_handler_t(const std::string& directory_path); | ||
~log_handler_t(); | ||
void open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd); | ||
|
||
/** | ||
* Allocate space in the log file and return starting offset of allocation. | ||
*/ | ||
file_offset_t allocate_log_space(size_t payload_size); | ||
|
||
/** | ||
* Create a log record which stores txn information. | ||
*/ | ||
void create_txn_record( | ||
gaia_txn_id_t commit_ts, | ||
record_type_t type, | ||
std::vector<gaia_offset_t>& object_offsets, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is |
||
std::vector<gaia::common::gaia_id_t>& deleted_ids); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this usage will conflict with #1164. I think you'll have to infer the deleted IDs by querying the deleted offsets in the txn log. |
||
|
||
/** | ||
* Process the in memory txn_log and submit the processed writes (generated log records) to the async_disk_writer. | ||
*/ | ||
void process_txn_log_and_write(int txn_log_fd, gaia_txn_id_t commit_ts); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another reason that these changes will need rework after my current perf fixes are merged: there will no longer be any txn log fds. |
||
|
||
/** | ||
* Create a log record which stores decisions for one or more txns. | ||
*/ | ||
void create_decision_record(const decision_list_t& txn_decisions); | ||
|
||
/** | ||
* Submit async_disk_writer's internal I/O request queue to the kernel for processing. | ||
*/ | ||
void submit_writes(bool sync); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the same parameter name as |
||
|
||
/** | ||
* Validate the result of I/O calls submitted to the kernel for processing. | ||
*/ | ||
void validate_flushed_batch(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what the meaning of "validate" is here when the return type is |
||
|
||
/** | ||
* Track the session_decision_eventfd for each commit_ts; txn_commit() will only return once | ||
* session_decision_eventfd is written to by the log_writer thread - signifying that the txn decision | ||
* has been persisted. | ||
*/ | ||
void register_commit_ts_for_session_notification(gaia_txn_id_t commit_ts, int session_decision_eventfd); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is the sort of API I had in mind above. I don't see why you need to stash the eventfd anywhere else when it's passed to the log_writer thread (I assume it will be maintained in session TLS)? |
||
|
||
private: | ||
// TODO: Make log file size configurable. | ||
static constexpr uint64_t c_file_size = 4 * 1024 * 1024; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So with a 4MB log file size (which does seem reasonable), and a max txn log size of (2^16 object versions * 2^16 bytes max object size) = 4GB, we can have up to 2^10 log files/txn? Can our metadata accommodate that? |
||
static constexpr std::string_view c_gaia_wal_dir_name = "/wal_dir"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the advice I've seen on the web says to avoid |
||
static constexpr int c_gaia_wal_dir_permissions = 0755; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should either add a comment breaking down the permissions or (better) OR them from the corresponding constants in |
||
static inline std::string s_wal_dir_path{}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't you use |
||
static inline int s_dir_fd = -1; | ||
|
||
// Log file sequence starts from 1. | ||
static inline std::atomic<file_sequence_t::value_type> s_file_num = 1; | ||
|
||
// Keep track of the current log file. | ||
std::unique_ptr<log_file_t> m_current_file; | ||
|
||
std::unique_ptr<async_disk_writer_t> m_async_disk_writer; | ||
}; | ||
|
||
} // namespace persistence | ||
} // namespace db | ||
} // namespace gaia |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,13 @@ namespace db | |
namespace persistence | ||
{ | ||
|
||
enum class record_type_t : uint8_t | ||
{ | ||
not_set = 0x0, | ||
txn = 0x1, | ||
decision = 0x2, | ||
}; | ||
|
||
enum class decision_type_t : uint8_t | ||
{ | ||
undecided = 0, | ||
|
@@ -60,6 +67,17 @@ static_assert( | |
|
||
constexpr file_sequence_t c_invalid_file_sequence_number; | ||
|
||
typedef size_t file_offset_t; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't you use our typesafe |
||
|
||
// The record size is constrained by the size of the log file. | ||
// We'd never need more than 32 bits for the record size. | ||
typedef uint32_t record_size_t; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should |
||
|
||
// https://stackoverflow.com/questions/2321676/data-length-vs-crc-length | ||
// "From the wikipedia article: "maximal total blocklength is equal to 2**r − 1". That's in bits. | ||
// So CRC-32 would have max message size 2^33-1 bits or about 2^30 bytes = 1GB | ||
typedef uint32_t crc32_t; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I see you included the comment I suggested elsewhere here. |
||
|
||
// This assertion ensures that the default type initialization | ||
// matches the value of the invalid constant. | ||
static_assert( | ||
|
@@ -72,6 +90,25 @@ struct log_file_info_t | |
int file_fd; | ||
}; | ||
|
||
struct record_header_t | ||
{ | ||
crc32_t crc; | ||
record_size_t payload_size; | ||
record_type_t record_type; | ||
gaia_txn_id_t txn_commit_ts; | ||
|
||
// Stores a count value depending on the record type. | ||
// For a txn record, this represents the count of deleted objects. | ||
// For a decision record, this represents the number of decisions in the record's payload. | ||
union | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious why we need to store deleted object count but not counts for other object updates? |
||
{ | ||
uint32_t deleted_object_count; | ||
uint32_t decision_count; | ||
}; | ||
|
||
char padding[3]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment to explain what this is used for / why it is necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, it would help if the 3 would be a constant so its name would be more descriptive of why 3 is an appropriate value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The length of |
||
}; | ||
|
||
// The primary motivation of this buffer is to keep a hold of any additional information we want to write to the log | ||
// apart from the shared memory objects. | ||
// Custom information includes | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the names be
event_fd
instead ofeventfd
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because
eventfd
is a kernel object type (likememfd
), and has its ownopen(2)
-like system calleventfd()
(which really should have beeneventfd_create()
).