Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom recovery/checkpointing impl #1232

Draft
wants to merge 9 commits into
base: logwriter_thread
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion production/db/core/inc/db_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class server_t
static inline mapped_data_t<data_t> s_shared_data{};
static inline mapped_data_t<id_index_t> s_shared_id_index{};
static inline index::indexes_t s_global_indexes{};
static inline std::unique_ptr<persistent_store_manager> s_persistent_store{};
static inline std::unique_ptr<persistence::persistent_store_manager> s_persistent_store{};
static inline std::unique_ptr<persistence::log_handler_t> s_log_handler{};

// These fields have transaction lifetime.
Expand Down Expand Up @@ -267,6 +267,8 @@ class server_t
// Keep track of the last txn that has been submitted to the async_disk_writer.
static inline std::atomic<gaia_txn_id_t> s_last_queued_commit_ts_upper_bound = c_invalid_gaia_txn_id;

static inline gaia_txn_id_t s_last_checkpointed_commit_ts_lower_bound = c_invalid_gaia_txn_id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this atomic like s_last_queued_commit_ts_upper_bound?


// Keep a track of undecided txns submitted to the async_disk_writer.
static inline std::set<gaia_txn_id_t> s_seen_and_undecided_txn_set{};

Expand Down Expand Up @@ -443,6 +445,8 @@ class server_t

static void recover_persistent_log();

static void checkpoint_handler();

static void flush_all_pending_writes();

static void session_handler(int session_socket);
Expand Down
53 changes: 53 additions & 0 deletions production/db/core/inc/log_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,33 @@ class log_handler_t
*/
void register_commit_ts_for_session_notification(gaia_txn_id_t commit_ts, int session_decision_eventfd);

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should move these comments to the header file, but you can wait until we move the others as well.

* Entry point to start recovery procedure from gaia log files. Checkpointing reuses the same function.
*/
void recover_from_persistent_log(
gaia_txn_id_t& last_checkpointed_commit_ts,
uint64_t& last_processed_log_seq,
uint64_t max_log_seq_to_process,
recovery_mode_t mode);

/**
* Destroy all log files with sequence number lesser than or equal to max_log_seq_to_delete.
*/
void destroy_persistent_log(uint64_t max_log_seq_to_delete);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't max_log_seq_to_delete a log_sequence_t?


/**
* Register persistent store create/delete APIs. Rework to call persistent store APIs directly?
*/
void register_write_to_persistent_store_fn(std::function<void(db_recovered_object_t&)> write_obj_fn);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for making these functions pluggable? Testing?

void register_remove_from_persistent_store_fn(std::function<void(gaia::common::gaia_id_t)> remove_obj_fn);

/**
* Set the log sequence counter.
*/
void set_persistent_log_sequence(uint64_t log_seq);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can log_seq be a log_sequence_t?


size_t get_remaining_txns_to_checkpoint_count();

private:
// TODO: Make log file size configurable.
static constexpr uint64_t c_file_size = 4 * 1024 * 1024;
Expand All @@ -102,6 +129,32 @@ class log_handler_t
std::unique_ptr<log_file_t> m_current_file;

std::unique_ptr<async_disk_writer_t> m_async_disk_writer;

// Map txn commit_ts to location of log record header during recovery.
// This index is maintained on a per log file basis. Before moving to the next file
// we assert that this index is empty as all txns have been processed.
// Note that the recovery implementation proceeds in increasing log file order.
std::map<gaia_txn_id_t, unsigned char*> txn_index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should have a more descriptive name, say log_records_by_commit_ts or commit_ts_to_log_record_map.


// This map contains the current set of txns that are being processed (by either recovery or checkpointing)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing period.

// Txns are processed one decision record at a time; a single decision record may contain
// multiple txns.
std::map<gaia_txn_id_t, decision_type_t> decision_index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this just stores the commit/abort decision for each commit_ts in the log file? Why can't you just look that up in txn metadata? Or are you trying to decouple this impl from txn metadata?


gaia_txn_id_t m_max_decided_commit_ts;

std::function<void(db_recovered_object_t&)> write_to_persistent_store_fn;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again I'm dubious that this needs to be pluggable, unless there's a layering issue that this solves.

std::function<void(gaia::common::gaia_id_t)> remove_from_persistent_store_fn;

// Recovery & Checkpointing APIs
size_t update_cursor(struct record_iterator_t* it);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we mixing the terms "cursor" and "iterator"? They mean roughly the same thing, so we should pick one and stick with it.

size_t validate_recovered_record_crc(struct record_iterator_t* it);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use "checksum" rather than "CRC" in generic contexts like this that don't refer to the implementation.

void map_log_file(struct record_iterator_t* it, int file_fd, recovery_mode_t recovery_mode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, heads-up that txn logs will no longer have their own fds after my pending changes are merged (the equivalent is called txn_log_offset_t and it's also a 16-bit unsigned int, but no mapping is required to access the txn_log_t header, just adding the offset to the mapped base address of the new txn log shmem segment).

void unmap_file(void* start, size_t size);
bool is_remaining_file_empty(unsigned char* start, unsigned char* end);
void write_log_record_to_persistent_store(read_record_t* record);
void write_records(record_iterator_t* it, gaia_txn_id_t& last_checkpointed_commit_ts);
bool write_log_file_to_persistent_store(gaia_txn_id_t& last_checkpointed_commit_ts, record_iterator_t& it);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the order of arguments reversed in the last two functions? Also, shouldn't the record_iterator_t& argument to write_log_file_to_persistent_store() be const? Finally, what's the rationale for pointer vs. reference types in these signatures?

};

} // namespace persistence
Expand Down
39 changes: 39 additions & 0 deletions production/db/core/inc/persistence_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@ namespace db
namespace persistence
{

enum class recovery_mode_t : uint8_t
{
not_set = 0x0,

// Does not tolerate any IO failure when reading a log file; any
// IO error is treated as unrecoverable.
// This mode is used when checkpointing log writes to RocksDB.
checkpoint = 0x1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the behavior of checkpoint is identical to kill_on_first_error , then why do we need both options?


// Stop recovery on first IO error. Database will always start and will try to recover as much
// committed data from the log as possible.
// Updates are logged one batch as a time; Persistent batch IO is validated
// first before marking any txn in the batch as durable (and returning a commit decision to the user);
// Thus ignore any txn after the last seen decision timestamp before encountering IO error.
finish_on_first_error = 0x2,

// TODO: Already supported by 'checkpoint' option, but make this option visible to customer along with 'finish_on_first_error'
kill_on_first_error = 0x3,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is a good name; it seems to imply that we actually kill the process or something like that. Maybe fail_on_first_error?

};

enum class record_type_t : uint8_t
{
not_set = 0x0,
Expand Down Expand Up @@ -109,6 +129,25 @@ struct record_header_t
char padding[3];
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: the record_header_t definition: if you insert padding fields by hand then you should verify your assumptions on those padding fields via static_assert().

struct read_record_t
{
struct record_header_t header;
unsigned char payload[];
};

struct record_iterator_t
{
unsigned char* cursor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is a "cursor" in this context? A pointer to the current txn log record?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments to describe the use of these fields.

unsigned char* end;
unsigned char* stop_at;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

unsigned char* begin;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for the order of these fields? It seems odd that begin is declared before end.

void* mapped;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? Presumably a memory-mapped view of the txn log? In any case, as I said, txn logs will no longer be individually memory-mapped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think this is the mapping for file_fd? In that case, the fields file_fd, mapped, map_size really belong together in a dedicated data structure, I think. Preferably, such a data structure would have a destructor that released the mapping and closed the fd. (You can look at production/db/core/inc/mapped_data.hpp and decide if you want to use or adapt any of those classes.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapped_data perhaps? With just mapped, the question is: "what is mapped?".

size_t map_size;
int file_fd;
recovery_mode_t recovery_mode;
bool halt_recovery;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Halt recovery on what condition? Or is this just a flag we set when we want to halt recovery?

};

// The primary motivation of this buffer is to keep a hold of any additional information we want to write to the log
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "This buffer is used to stage non-object data to be written to the log."?

// apart from the shared memory objects.
// Custom information includes
Expand Down
41 changes: 40 additions & 1 deletion production/db/core/inc/persistent_store_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace gaia
{
namespace db
{
namespace persistence
{

class persistent_store_manager
{
Expand Down Expand Up @@ -87,11 +89,48 @@ class persistent_store_manager

void reset_log();

/**
* This API is only used during checkpointing & recovery.
*/
void put(gaia::db::db_recovered_object_t& object);

/**
* This API is only used during checkpointing & recovery.
*/
void remove(gaia::common::gaia_id_t id_to_remove);

/**
* Flush rocksdb memory buffer to disk as an SST file.
* The rocksdb memtable is used to hold writes before writing them to an SST file.
* The alternative is the RocksDB SSTFileWriter. Both options require reserving memory to stage writes in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can flush writes in commit_ts order to an SSTFileWriter, then I don't see what advantage the memtable gives us. Are you thinking that we need to use an ordered structure to buffer writes because data may appear in the WAL out of commit_ts order, so we may as well use the memtable?

* the SST format before writing to disk.
* Additionally, in RocksDB, each column family has an individual set of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand, surely we're only using a single column family anyway, so how does using the memtable help us in this respect?

* SST files and the memtable. Thus, writing to multiple column families will require maintaining multiple SSTFileWriter objects.
* All of this is handled by the memtable already.
* Note that the memtable is only written to during log checkpointing.
*/
void flush();

/**
* Get value of a custom key. Used to retain a gaia counter across restarts.
*/
uint64_t get_value(const std::string& key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the method name should reflect the return type. Maybe get_word_value()?


/**
* Update custom key's value. Used to retain gaia counter across restarts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update custom key's value.

Something seems to be missing. Should this be "Update a custom key's value."?

*/
void update_value(const std::string& key, uint64_t value_to_write);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the method name should reflect the argument type. Maybe set_word_value()? (I think "set" is more symmetric with "get" and also more conventional.)


static constexpr char c_data_dir_command_flag[] = "--data-dir";
static constexpr char c_persistent_store_dir_name[] = "/data";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of odd to use a leading slash for a relative path. I assume you're doing this so you can append it to a path with no trailing slash, but you may as well just use std::filesystem and forget about directory separators.

static const std::string c_last_processed_log_num_key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment describing how this is used. Also, I think you should be able to initialize this in the header with static inline.


private:
gaia::db::counters_t* m_counters = nullptr;
std::unique_ptr<gaia::db::rdb_wrapper_t> m_rdb_wrapper;
std::unique_ptr<gaia::db::persistence::rdb_wrapper_t> m_rdb_wrapper;
std::string m_data_dir_path;
};

} // namespace persistence
} // namespace db
} // namespace gaia
5 changes: 5 additions & 0 deletions production/db/core/inc/rdb_object_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ class string_reader_t
}
};

void encode_checkpointed_object(
const db_recovered_object_t* gaia_object,
string_writer_t& key,
string_writer_t& value);

void encode_object(
const db_object_t* gaia_object,
string_writer_t& key,
Expand Down
12 changes: 11 additions & 1 deletion production/db/core/inc/rdb_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ namespace gaia
{
namespace db
{

namespace persistence
{
class rdb_wrapper_t
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave an empty line after the set of namespace declarations.

{
public:
Expand Down Expand Up @@ -52,12 +53,21 @@ class rdb_wrapper_t

void handle_rdb_error(rocksdb::Status status);

void flush();

void put(const rocksdb::Slice& key, const rocksdb::Slice& value);

void remove(const rocksdb::Slice& key);

void get(const rocksdb::Slice& key, std::string& value);

private:
std::unique_ptr<rocksdb::TransactionDB> m_txn_db;
std::string m_data_dir;
rocksdb::WriteOptions m_write_options;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is intended to be initialized to its default value then you should be explicit:

rocksdb::WriteOptions m_write_options{};

rocksdb::TransactionDBOptions m_txn_options;
};

} // namespace persistence
} // namespace db
} // namespace gaia
78 changes: 77 additions & 1 deletion production/db/core/src/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,41 @@ void server_t::recover_persistent_log()
s_log_handler = std::make_unique<persistence::log_handler_t>(s_server_conf.data_dir());

s_log_handler->open_for_writes(s_validate_persistence_batch_eventfd, s_signal_checkpoint_log_eventfd);

auto put_obj = [&](db_recovered_object_t& obj) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, wondering why you need this indirection either for decoupling or extensibility. I would think the server is already sufficiently decoupled from RocksDB by going through the persistent_store_manager.

s_persistent_store->put(obj);
};
auto remove_obj = [=](gaia_id_t id) {
s_persistent_store->remove(id);
};

s_log_handler->register_write_to_persistent_store_fn(put_obj);
s_log_handler->register_remove_from_persistent_store_fn(remove_obj);

if (s_server_conf.persistence_mode() == persistence_mode_t::e_reinitialized_on_startup)
{
s_log_handler->destroy_persistent_log(INT64_MAX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the argument be INT64_MAX when the type is uint64_t? Or should this be UINT64_MAX?

Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue for the call to recover_from_persistent_log and the second call to destroy_persistent_log below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things: 1) this is the wrong constant for the type uint64_t (should be UINT64_MAX), 2) in C++, you should be using constants from std::numeric_limits, 3) you shouldn't just treat this as a magic number but should define a new constant in the same class/namespace as destroy_persistent_log(), explain the semantics of this constant in a comment with its definition, and assert wherever necessary that no valid log sequence number can equal this constant.

}

// Get last processed log.
auto last_processed_log_seq = s_persistent_store->get_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line and line 300 are too long.


// Recover only the first time this method gets called.
gaia_txn_id_t last_checkpointed_commit_ts = 0;
s_log_handler->recover_from_persistent_log(
last_checkpointed_commit_ts,
last_processed_log_seq,
INT64_MAX,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above.

gaia::db::persistence::recovery_mode_t::finish_on_first_error);

s_persistent_store->update_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key, last_processed_log_seq);

s_log_handler->set_persistent_log_sequence(last_processed_log_seq);

s_log_handler->destroy_persistent_log(INT64_MAX);

std::cout << "s_validate_persistence_batch_eventfd = " << s_validate_persistence_batch_eventfd << std::endl;
s_log_handler->open_for_writes(s_validate_persistence_batch_eventfd, s_signal_checkpoint_log_eventfd);
}
}

Expand Down Expand Up @@ -899,14 +934,19 @@ void server_t::recover_db()
auto cleanup = make_scope_guard([]() { end_startup_txn(); });
begin_startup_txn();

s_persistent_store = std::make_unique<gaia::db::persistent_store_manager>(
s_persistent_store = std::make_unique<gaia::db::persistence::persistent_store_manager>(
get_counters(), s_server_conf.data_dir());
if (s_server_conf.persistence_mode() == persistence_mode_t::e_reinitialized_on_startup)
{
s_persistent_store->destroy_persistent_store();
}
s_persistent_store->open();

recover_persistent_log();

// Flush persistent store buffer to disk.
s_persistent_store->flush();

s_persistent_store->recover();
}
}
Expand Down Expand Up @@ -1043,6 +1083,36 @@ void server_t::log_writer_handler()
}
}

void server_t::checkpoint_handler()
{
// Wait for a persistent log file to be closed before checkpointing it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this comment be better placed below, before the call to eventfd_read?

// This can be achieved via blocking on an eventfd read.
uint64_t last_deleted_log_seq = 0;
while (true)
{
// Log sequence number of file ready to be checkpointed.
eventfd_t max_log_seq_to_checkpoint;
eventfd_read(s_signal_checkpoint_log_eventfd, &max_log_seq_to_checkpoint);

// Process all existing log files.
uint64_t last_processed_log_seq = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this log_sequence_t?

s_log_handler->recover_from_persistent_log(
s_last_checkpointed_commit_ts_lower_bound,
last_processed_log_seq,
max_log_seq_to_checkpoint,
gaia::db::persistence::recovery_mode_t::checkpoint);

s_persistent_store->update_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key, last_processed_log_seq);

// Flush persistent store buffer to disk.
s_persistent_store->flush();

ASSERT_INVARIANT(max_log_seq_to_checkpoint > last_deleted_log_seq, "Log files cannot be deleted out of order");
s_log_handler->destroy_persistent_log(last_processed_log_seq);
last_deleted_log_seq = max_log_seq_to_checkpoint;
}
}

gaia_txn_id_t server_t::begin_startup_txn()
{
// Reserve an index in the safe_ts array, so the main thread can execute
Expand Down Expand Up @@ -3521,10 +3591,13 @@ void server_t::run(server_config_t server_conf)

s_signal_decision_eventfd = make_nonblocking_eventfd();

s_signal_checkpoint_log_eventfd = make_blocking_eventfd();

auto cleanup_persistence_eventfds = make_scope_guard([]() {
close_fd(s_signal_log_write_eventfd);
close_fd(s_signal_decision_eventfd);
close_fd(s_validate_persistence_batch_eventfd);
close_fd(s_signal_checkpoint_log_eventfd);
});

// Block handled signals in this thread and subsequently spawned threads.
Expand All @@ -3537,10 +3610,12 @@ void server_t::run(server_config_t server_conf)
init_shared_memory();

std::thread log_writer_thread;
std::thread checkpoint_thread;
if (s_server_conf.persistence_mode() != persistence_mode_t::e_disabled)
{
// Launch persistence thread.
log_writer_thread = std::thread(&log_writer_handler);
checkpoint_thread = std::thread(&checkpoint_handler);
}

// Launch thread to listen for client connections and create session threads.
Expand All @@ -3559,6 +3634,7 @@ void server_t::run(server_config_t server_conf)
if (s_server_conf.persistence_mode() != persistence_mode_t::e_disabled)
{
log_writer_thread.join();
checkpoint_thread.join();
}

// We special-case SIGHUP to force reinitialization of the server.
Expand Down
Loading