-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom recovery/checkpointing impl #1232
base: logwriter_thread
Are you sure you want to change the base?
Changes from 2 commits
6fd85c2
59b5586
aee42d2
98615c6
6a002d6
a0712f4
9913c26
3cb4d95
35fe56f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,33 @@ class log_handler_t | |
*/ | ||
void register_commit_ts_for_session_notification(gaia_txn_id_t commit_ts, int session_decision_eventfd); | ||
|
||
/** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we should move these comments to the header file, but you can wait until we move the others as well. |
||
* Entry point to start recovery procedure from gaia log files. Checkpointing reuses the same function. | ||
*/ | ||
void recover_from_persistent_log( | ||
gaia_txn_id_t& last_checkpointed_commit_ts, | ||
uint64_t& last_processed_log_seq, | ||
uint64_t max_log_seq_to_process, | ||
recovery_mode_t mode); | ||
|
||
/** | ||
* Destroy all log files with sequence number lesser than or equal to max_log_seq_to_delete. | ||
*/ | ||
void destroy_persistent_log(uint64_t max_log_seq_to_delete); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why isn't |
||
|
||
/** | ||
* Register persistent store create/delete APIs. Rework to call persistent store APIs directly? | ||
*/ | ||
void register_write_to_persistent_store_fn(std::function<void(db_recovered_object_t&)> write_obj_fn); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the motivation for making these functions pluggable? Testing? |
||
void register_remove_from_persistent_store_fn(std::function<void(gaia::common::gaia_id_t)> remove_obj_fn); | ||
|
||
/** | ||
* Set the log sequence counter. | ||
*/ | ||
void set_persistent_log_sequence(uint64_t log_seq); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can |
||
|
||
size_t get_remaining_txns_to_checkpoint_count(); | ||
|
||
private: | ||
// TODO: Make log file size configurable. | ||
static constexpr uint64_t c_file_size = 4 * 1024 * 1024; | ||
|
@@ -102,6 +129,32 @@ class log_handler_t | |
std::unique_ptr<log_file_t> m_current_file; | ||
|
||
std::unique_ptr<async_disk_writer_t> m_async_disk_writer; | ||
|
||
// Map txn commit_ts to location of log record header during recovery. | ||
// This index is maintained on a per log file basis. Before moving to the next file | ||
// we assert that this index is empty as all txns have been processed. | ||
// Note that the recovery implementation proceeds in increasing log file order. | ||
std::map<gaia_txn_id_t, unsigned char*> txn_index; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should have a more descriptive name, say |
||
|
||
// This map contains the current set of txns that are being processed (by either recovery or checkpointing) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing period. |
||
// Txns are processed one decision record at a time; a single decision record may contain | ||
// multiple txns. | ||
std::map<gaia_txn_id_t, decision_type_t> decision_index; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this just stores the commit/abort decision for each |
||
|
||
gaia_txn_id_t m_max_decided_commit_ts; | ||
|
||
std::function<void(db_recovered_object_t&)> write_to_persistent_store_fn; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again I'm dubious that this needs to be pluggable, unless there's a layering issue that this solves. |
||
std::function<void(gaia::common::gaia_id_t)> remove_from_persistent_store_fn; | ||
|
||
// Recovery & Checkpointing APIs | ||
size_t update_cursor(struct record_iterator_t* it); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we mixing the terms "cursor" and "iterator"? They mean roughly the same thing, so we should pick one and stick with it. |
||
size_t validate_recovered_record_crc(struct record_iterator_t* it); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would use "checksum" rather than "CRC" in generic contexts like this that don't refer to the implementation. |
||
void map_log_file(struct record_iterator_t* it, int file_fd, recovery_mode_t recovery_mode); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, heads-up that txn logs will no longer have their own fds after my pending changes are merged (the equivalent is called |
||
void unmap_file(void* start, size_t size); | ||
bool is_remaining_file_empty(unsigned char* start, unsigned char* end); | ||
void write_log_record_to_persistent_store(read_record_t* record); | ||
void write_records(record_iterator_t* it, gaia_txn_id_t& last_checkpointed_commit_ts); | ||
bool write_log_file_to_persistent_store(gaia_txn_id_t& last_checkpointed_commit_ts, record_iterator_t& it); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the order of arguments reversed in the last two functions? Also, shouldn't the |
||
}; | ||
|
||
} // namespace persistence | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,26 @@ namespace db | |
namespace persistence | ||
{ | ||
|
||
enum class recovery_mode_t : uint8_t | ||
{ | ||
not_set = 0x0, | ||
|
||
// Does not tolerate any IO failure when reading a log file; any | ||
// IO error is treated as unrecoverable. | ||
// This mode is used when checkpointing log writes to RocksDB. | ||
checkpoint = 0x1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the behavior of |
||
|
||
// Stop recovery on first IO error. Database will always start and will try to recover as much | ||
// committed data from the log as possible. | ||
// Updates are logged one batch as a time; Persistent batch IO is validated | ||
// first before marking any txn in the batch as durable (and returning a commit decision to the user); | ||
// Thus ignore any txn after the last seen decision timestamp before encountering IO error. | ||
finish_on_first_error = 0x2, | ||
|
||
// TODO: Already supported by 'checkpoint' option, but make this option visible to customer along with 'finish_on_first_error' | ||
kill_on_first_error = 0x3, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure this is a good name; it seems to imply that we actually kill the process or something like that. Maybe |
||
}; | ||
|
||
enum class record_type_t : uint8_t | ||
{ | ||
not_set = 0x0, | ||
|
@@ -109,6 +129,25 @@ struct record_header_t | |
char padding[3]; | ||
}; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re: the |
||
struct read_record_t | ||
{ | ||
struct record_header_t header; | ||
unsigned char payload[]; | ||
}; | ||
|
||
struct record_iterator_t | ||
{ | ||
unsigned char* cursor; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What exactly is a "cursor" in this context? A pointer to the current txn log record? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add some comments to describe the use of these fields. |
||
unsigned char* end; | ||
unsigned char* stop_at; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean? |
||
unsigned char* begin; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the motivation for the order of these fields? It seems odd that |
||
void* mapped; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean? Presumably a memory-mapped view of the txn log? In any case, as I said, txn logs will no longer be individually memory-mapped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I think this is the mapping for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
size_t map_size; | ||
int file_fd; | ||
recovery_mode_t recovery_mode; | ||
bool halt_recovery; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Halt recovery on what condition? Or is this just a flag we set when we want to halt recovery? |
||
}; | ||
|
||
// The primary motivation of this buffer is to keep a hold of any additional information we want to write to the log | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe "This buffer is used to stage non-object data to be written to the log."? |
||
// apart from the shared memory objects. | ||
// Custom information includes | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,8 @@ namespace gaia | |
{ | ||
namespace db | ||
{ | ||
namespace persistence | ||
{ | ||
|
||
class persistent_store_manager | ||
{ | ||
|
@@ -87,11 +89,48 @@ class persistent_store_manager | |
|
||
void reset_log(); | ||
|
||
/** | ||
* This API is only used during checkpointing & recovery. | ||
*/ | ||
void put(gaia::db::db_recovered_object_t& object); | ||
|
||
/** | ||
* This API is only used during checkpointing & recovery. | ||
*/ | ||
void remove(gaia::common::gaia_id_t id_to_remove); | ||
|
||
/** | ||
* Flush rocksdb memory buffer to disk as an SST file. | ||
* The rocksdb memtable is used to hold writes before writing them to an SST file. | ||
* The alternative is the RocksDB SSTFileWriter. Both options require reserving memory to stage writes in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we can flush writes in commit_ts order to an SSTFileWriter, then I don't see what advantage the memtable gives us. Are you thinking that we need to use an ordered structure to buffer writes because data may appear in the WAL out of commit_ts order, so we may as well use the memtable? |
||
* the SST format before writing to disk. | ||
* Additionally, in RocksDB, each column family has an individual set of | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand, surely we're only using a single column family anyway, so how does using the memtable help us in this respect? |
||
* SST files and the memtable. Thus, writing to multiple column families will require maintaining multiple SSTFileWriter objects. | ||
* All of this is handled by the memtable already. | ||
* Note that the memtable is only written to during log checkpointing. | ||
*/ | ||
void flush(); | ||
|
||
/** | ||
* Get value of a custom key. Used to retain a gaia counter across restarts. | ||
*/ | ||
uint64_t get_value(const std::string& key); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the method name should reflect the return type. Maybe |
||
|
||
/** | ||
* Update custom key's value. Used to retain gaia counter across restarts. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Something seems to be missing. Should this be "Update a custom key's value."? |
||
*/ | ||
void update_value(const std::string& key, uint64_t value_to_write); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the method name should reflect the argument type. Maybe |
||
|
||
static constexpr char c_data_dir_command_flag[] = "--data-dir"; | ||
static constexpr char c_persistent_store_dir_name[] = "/data"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's kind of odd to use a leading slash for a relative path. I assume you're doing this so you can append it to a path with no trailing slash, but you may as well just use |
||
static const std::string c_last_processed_log_num_key; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment describing how this is used. Also, I think you should be able to initialize this in the header with |
||
|
||
private: | ||
gaia::db::counters_t* m_counters = nullptr; | ||
std::unique_ptr<gaia::db::rdb_wrapper_t> m_rdb_wrapper; | ||
std::unique_ptr<gaia::db::persistence::rdb_wrapper_t> m_rdb_wrapper; | ||
std::string m_data_dir_path; | ||
}; | ||
|
||
} // namespace persistence | ||
} // namespace db | ||
} // namespace gaia |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,8 @@ namespace gaia | |
{ | ||
namespace db | ||
{ | ||
|
||
namespace persistence | ||
{ | ||
class rdb_wrapper_t | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leave an empty line after the set of namespace declarations. |
||
{ | ||
public: | ||
|
@@ -52,12 +53,21 @@ class rdb_wrapper_t | |
|
||
void handle_rdb_error(rocksdb::Status status); | ||
|
||
void flush(); | ||
|
||
void put(const rocksdb::Slice& key, const rocksdb::Slice& value); | ||
|
||
void remove(const rocksdb::Slice& key); | ||
|
||
void get(const rocksdb::Slice& key, std::string& value); | ||
|
||
private: | ||
std::unique_ptr<rocksdb::TransactionDB> m_txn_db; | ||
std::string m_data_dir; | ||
rocksdb::WriteOptions m_write_options; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is intended to be initialized to its default value then you should be explicit:
|
||
rocksdb::TransactionDBOptions m_txn_options; | ||
}; | ||
|
||
} // namespace persistence | ||
} // namespace db | ||
} // namespace gaia |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -270,6 +270,41 @@ void server_t::recover_persistent_log() | |
s_log_handler = std::make_unique<persistence::log_handler_t>(s_server_conf.data_dir()); | ||
|
||
s_log_handler->open_for_writes(s_validate_persistence_batch_eventfd, s_signal_checkpoint_log_eventfd); | ||
|
||
auto put_obj = [&](db_recovered_object_t& obj) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, wondering why you need this indirection either for decoupling or extensibility. I would think the server is already sufficiently decoupled from RocksDB by going through the |
||
s_persistent_store->put(obj); | ||
}; | ||
auto remove_obj = [=](gaia_id_t id) { | ||
s_persistent_store->remove(id); | ||
}; | ||
|
||
s_log_handler->register_write_to_persistent_store_fn(put_obj); | ||
s_log_handler->register_remove_from_persistent_store_fn(remove_obj); | ||
|
||
if (s_server_conf.persistence_mode() == persistence_mode_t::e_reinitialized_on_startup) | ||
{ | ||
s_log_handler->destroy_persistent_log(INT64_MAX); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the argument be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same issue for the call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A few things: 1) this is the wrong constant for the type |
||
} | ||
|
||
// Get last processed log. | ||
auto last_processed_log_seq = s_persistent_store->get_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line and line 300 are too long. |
||
|
||
// Recover only the first time this method gets called. | ||
gaia_txn_id_t last_checkpointed_commit_ts = 0; | ||
s_log_handler->recover_from_persistent_log( | ||
last_checkpointed_commit_ts, | ||
last_processed_log_seq, | ||
INT64_MAX, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment above. |
||
gaia::db::persistence::recovery_mode_t::finish_on_first_error); | ||
|
||
s_persistent_store->update_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key, last_processed_log_seq); | ||
|
||
s_log_handler->set_persistent_log_sequence(last_processed_log_seq); | ||
|
||
s_log_handler->destroy_persistent_log(INT64_MAX); | ||
|
||
std::cout << "s_validate_persistence_batch_eventfd = " << s_validate_persistence_batch_eventfd << std::endl; | ||
s_log_handler->open_for_writes(s_validate_persistence_batch_eventfd, s_signal_checkpoint_log_eventfd); | ||
} | ||
} | ||
|
||
|
@@ -899,14 +934,19 @@ void server_t::recover_db() | |
auto cleanup = make_scope_guard([]() { end_startup_txn(); }); | ||
begin_startup_txn(); | ||
|
||
s_persistent_store = std::make_unique<gaia::db::persistent_store_manager>( | ||
s_persistent_store = std::make_unique<gaia::db::persistence::persistent_store_manager>( | ||
get_counters(), s_server_conf.data_dir()); | ||
if (s_server_conf.persistence_mode() == persistence_mode_t::e_reinitialized_on_startup) | ||
{ | ||
s_persistent_store->destroy_persistent_store(); | ||
} | ||
s_persistent_store->open(); | ||
|
||
recover_persistent_log(); | ||
|
||
// Flush persistent store buffer to disk. | ||
s_persistent_store->flush(); | ||
|
||
s_persistent_store->recover(); | ||
} | ||
} | ||
|
@@ -1043,6 +1083,36 @@ void server_t::log_writer_handler() | |
} | ||
} | ||
|
||
void server_t::checkpoint_handler() | ||
{ | ||
// Wait for a persistent log file to be closed before checkpointing it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this comment be better placed below, before the call to |
||
// This can be achieved via blocking on an eventfd read. | ||
uint64_t last_deleted_log_seq = 0; | ||
while (true) | ||
{ | ||
// Log sequence number of file ready to be checkpointed. | ||
eventfd_t max_log_seq_to_checkpoint; | ||
eventfd_read(s_signal_checkpoint_log_eventfd, &max_log_seq_to_checkpoint); | ||
|
||
// Process all existing log files. | ||
uint64_t last_processed_log_seq = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why isn't this |
||
s_log_handler->recover_from_persistent_log( | ||
s_last_checkpointed_commit_ts_lower_bound, | ||
last_processed_log_seq, | ||
max_log_seq_to_checkpoint, | ||
gaia::db::persistence::recovery_mode_t::checkpoint); | ||
|
||
s_persistent_store->update_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key, last_processed_log_seq); | ||
|
||
// Flush persistent store buffer to disk. | ||
s_persistent_store->flush(); | ||
|
||
ASSERT_INVARIANT(max_log_seq_to_checkpoint > last_deleted_log_seq, "Log files cannot be deleted out of order"); | ||
s_log_handler->destroy_persistent_log(last_processed_log_seq); | ||
last_deleted_log_seq = max_log_seq_to_checkpoint; | ||
} | ||
} | ||
|
||
gaia_txn_id_t server_t::begin_startup_txn() | ||
{ | ||
// Reserve an index in the safe_ts array, so the main thread can execute | ||
|
@@ -3521,10 +3591,13 @@ void server_t::run(server_config_t server_conf) | |
|
||
s_signal_decision_eventfd = make_nonblocking_eventfd(); | ||
|
||
s_signal_checkpoint_log_eventfd = make_blocking_eventfd(); | ||
|
||
auto cleanup_persistence_eventfds = make_scope_guard([]() { | ||
close_fd(s_signal_log_write_eventfd); | ||
close_fd(s_signal_decision_eventfd); | ||
close_fd(s_validate_persistence_batch_eventfd); | ||
close_fd(s_signal_checkpoint_log_eventfd); | ||
}); | ||
|
||
// Block handled signals in this thread and subsequently spawned threads. | ||
|
@@ -3537,10 +3610,12 @@ void server_t::run(server_config_t server_conf) | |
init_shared_memory(); | ||
|
||
std::thread log_writer_thread; | ||
std::thread checkpoint_thread; | ||
if (s_server_conf.persistence_mode() != persistence_mode_t::e_disabled) | ||
{ | ||
// Launch persistence thread. | ||
log_writer_thread = std::thread(&log_writer_handler); | ||
checkpoint_thread = std::thread(&checkpoint_handler); | ||
} | ||
|
||
// Launch thread to listen for client connections and create session threads. | ||
|
@@ -3559,6 +3634,7 @@ void server_t::run(server_config_t server_conf) | |
if (s_server_conf.persistence_mode() != persistence_mode_t::e_disabled) | ||
{ | ||
log_writer_thread.join(); | ||
checkpoint_thread.join(); | ||
} | ||
|
||
// We special-case SIGHUP to force reinitialization of the server. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't this atomic like
s_last_queued_commit_ts_upper_bound
?