Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce log writer APIs (log_io.cpp, log_io.hpp) #1144

Merged
merged 13 commits into from
May 17, 2022
Merged
5 changes: 3 additions & 2 deletions production/db/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ else()
endif()

add_library(gaia_db_persistence STATIC
src/log_file.cpp
src/async_disk_writer.cpp
src/async_write_batch.cpp)
src/async_write_batch.cpp
src/log_file.cpp
src/log_io.cpp)
# Add GAIA_DB_SERVER preprocessor definition for conditional includes.
target_compile_definitions(gaia_db_persistence PUBLIC GAIA_DB_SERVER=1)
configure_gaia_target(gaia_db_persistence)
Expand Down
35 changes: 15 additions & 20 deletions production/db/core/src/async_disk_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ async_disk_writer_t::async_disk_writer_t(int validate_flush_efd, int signal_chec
ASSERT_PRECONDITION(validate_flush_efd >= 0, "Invalid validate flush eventfd");
ASSERT_PRECONDITION(signal_checkpoint_efd >= 0, "Invalid signal checkpoint eventfd");

m_validate_flush_efd = validate_flush_efd;
m_signal_checkpoint_efd = signal_checkpoint_efd;
m_validate_flush_eventfd = validate_flush_efd;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we put an _ before fd? (i.e m_validate_flush_event_fd).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, eventfd is the name of a kernel object type and a syscall

m_signal_checkpoint_eventfd = signal_checkpoint_efd;

// Used to block new writes to disk when a batch is already getting flushed.
s_flush_efd = eventfd(1, 0);
if (s_flush_efd == -1)
s_flush_eventfd = eventfd(1, 0);
if (s_flush_eventfd == -1)
{
const char* reason = ::explain_eventfd(1, 0);
throw_system_error(reason);
Expand All @@ -62,7 +62,7 @@ async_disk_writer_t::~async_disk_writer_t()

void async_disk_writer_t::teardown()
{
close_fd(s_flush_efd);
close_fd(s_flush_eventfd);
}

void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t user_data)
Expand All @@ -76,12 +76,7 @@ void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t use
throw_system_error(ss.str(), err);
}

void async_disk_writer_t::map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_fd)
{
m_ts_to_session_decision_fd_map.insert(std::pair(commit_ts, session_decision_fd));
}

void async_disk_writer_t::add_decisions_to_batch(decision_list_t& decisions)
void async_disk_writer_t::add_decisions_to_batch(const decision_list_t& decisions)
{
for (const auto& decision : decisions)
{
Expand All @@ -106,7 +101,7 @@ void async_disk_writer_t::perform_post_completion_maintenance()
// Signal to checkpointing thread the upper bound of files that it can process.
if (max_file_seq_to_close > 0)
{
signal_eventfd(m_signal_checkpoint_efd, max_file_seq_to_close);
signal_eventfd(m_signal_checkpoint_eventfd, max_file_seq_to_close);
}

const decision_list_t& decisions = m_in_flight_batch->get_decision_batch_entries();
Expand All @@ -116,10 +111,10 @@ void async_disk_writer_t::perform_post_completion_maintenance()
transactions::txn_metadata_t::set_txn_durable(decision.commit_ts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the TXN_DURABLE flag can only be correct if both the committed data and the decision have been persisted. How do you know at this point that the data is also durable? Isn't the log batch committed asynchronously with the decision batch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of a higher-level design digression: generally speaking, if we have dedicated txn metadata flags for a property, we don't need a watermark for it, and vice versa. The key difference is that metadata flags can be updated out-of-order, while a watermark is in-order by construction (it can only represent a "prefix property", i.e. a predicate which is true for every timestamp <= the watermark). If txns were made durable strictly in commit_ts order (or if there were no latency penalty from requiring a full prefix of txns to be durable before advancing the watermark), then I think a durability watermark could replace the TXN_DURABLE flag. This also seems reasonable given the current batch approach, since we can't signal a session thread that its committing txn is durable until fdatasync() has completed for that txn's log and decision batches. If we required batches to observe the prefix property as well (i.e. we couldn't finalize a batch as long as there were any "holes" in it), then I think this would be a natural approach. However, in view of the longer-term design goal to make durability fully asynchronous using O_DIRECT | O_DATASYNC (so we can signal a session thread as soon as its log/decision write has completed, without having to wait for fdatasync() to complete for the whole batch), I think it's probably better to stick with the metadata flags approach, since that would allow committing session threads to be signaled out-of-order by the logging thread (when we move to O_DIRECT), which might be important for txns with large variance in update size (to prevent large txns from blocking commit notification of small txns). Anyway, curious to hear your thoughts on this.


// Unblock session thread.
auto itr = m_ts_to_session_decision_fd_map.find(decision.commit_ts);
ASSERT_INVARIANT(itr != m_ts_to_session_decision_fd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts");
auto itr = m_ts_to_session_decision_eventfd_map.find(decision.commit_ts);
ASSERT_INVARIANT(itr != m_ts_to_session_decision_eventfd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts");
signal_eventfd_single_thread(itr->second);
m_ts_to_session_decision_fd_map.erase(itr);
m_ts_to_session_decision_eventfd_map.erase(itr);
}

m_in_flight_batch->clear_decision_batch();
Expand All @@ -130,7 +125,7 @@ void async_disk_writer_t::submit_and_swap_in_progress_batch(int file_fd, bool sh
eventfd_t event_counter;

// Block on any pending disk flushes.
eventfd_read(s_flush_efd, &event_counter);
eventfd_read(s_flush_eventfd, &event_counter);

// Perform any maintenance on the in_flight batch.
perform_post_completion_maintenance();
Expand All @@ -147,7 +142,7 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_
swap_batches();

// Nothing to submit; reset the flush efd that got burnt in submit_and_swap_in_progress_batch() function.
signal_eventfd_single_thread(s_flush_efd);
signal_eventfd_single_thread(s_flush_eventfd);

// Reset metadata buffer.
m_metadata_buffer.clear();
Expand All @@ -158,8 +153,8 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_
m_in_progress_batch->add_fdatasync_op_to_batch(file_fd, get_enum_value(uring_op_t::fdatasync), IOSQE_IO_LINK);

// Signal eventfd's as part of batch.
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, s_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK);
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, m_validate_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN);
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, s_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK);
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, m_validate_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN);

swap_batches();
auto flushed_batch_size = m_in_flight_batch->get_unsubmitted_entries_count();
Expand Down Expand Up @@ -187,7 +182,7 @@ void async_disk_writer_t::perform_file_close_operations(int file_fd, file_sequen
m_in_progress_batch->append_file_to_batch(file_fd, log_seq);
}

unsigned char* async_disk_writer_t::copy_into_metadata_buffer(void* source, size_t size, int file_fd)
unsigned char* async_disk_writer_t::copy_into_metadata_buffer(const void* source, size_t size, int file_fd)
{
auto current_ptr = m_metadata_buffer.get_current_ptr();
ASSERT_PRECONDITION(current_ptr, "Invalid metadata buffer pointer.");
Expand Down
16 changes: 1 addition & 15 deletions production/db/core/src/chunk_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,7 @@ gaia_offset_t chunk_manager_t::allocate(
get_state() == chunk_state_t::in_use,
"Objects can only be allocated from a chunk in the IN_USE state!");

ASSERT_PRECONDITION(allocation_size_in_bytes > 0, "Requested allocation size cannot be 0!");

// Check before converting to slot units to avoid overflow.
ASSERT_PRECONDITION(
allocation_size_in_bytes <= (c_max_allocation_size_in_slots * c_slot_size_in_bytes),
"Requested allocation size exceeds maximum allocation size of 64KB!");

// Calculate allocation size in slot units.
#ifdef DEBUG
// Round up allocation to a page so we can mprotect() it.
size_t allocation_size_in_pages = (allocation_size_in_bytes + c_page_size_in_bytes - 1) / c_page_size_in_bytes;
size_t allocation_size_in_slots = allocation_size_in_pages * (c_page_size_in_bytes / c_slot_size_in_bytes);
#else
size_t allocation_size_in_slots = (allocation_size_in_bytes + c_slot_size_in_bytes - 1) / c_slot_size_in_bytes;
#endif
size_t allocation_size_in_slots = calculate_allocation_size_in_slots(allocation_size_in_bytes);

// Ensure that the new allocation doesn't overflow the chunk.
if (m_metadata->min_unallocated_slot_offset() + allocation_size_in_slots > c_last_slot_offset)
Expand Down
20 changes: 10 additions & 10 deletions production/db/core/src/log_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,9 @@ namespace persistence
// TODO (Mihir): Use io_uring for fsync, close & fallocate operations in this file.
// open() operation will remain synchronous, since we need the file fd to perform other async
// operations on the file.
log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_seq, size_t size)
log_file_t::log_file_t(const std::string& dir_name, int dir_fd, file_sequence_t file_seq, size_t file_size)
: m_dir_name(dir_name), m_dir_fd(dir_fd), m_file_seq(file_seq), m_file_size(file_size)
{
m_dir_fd = dir_fd;
m_dir_name = dir;
m_file_seq = file_seq;
m_file_size = size;
m_current_offset = 0;

// open and fallocate depending on size.
std::stringstream file_name;
file_name << m_dir_name << "/" << m_file_seq;
Expand Down Expand Up @@ -66,12 +61,12 @@ log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_
}
}

size_t log_file_t::get_current_offset()
file_offset_t log_file_t::get_current_offset() const
{
return m_current_offset;
}

int log_file_t::get_file_fd()
int log_file_t::get_file_fd() const
{
return m_file_fd;
}
Expand All @@ -81,7 +76,12 @@ void log_file_t::allocate(size_t size)
m_current_offset += size;
}

size_t log_file_t::get_remaining_bytes_count(size_t record_size)
file_sequence_t log_file_t::get_file_sequence() const
{
return m_file_seq;
}

size_t log_file_t::get_bytes_remaining_after_append(size_t record_size) const
{
ASSERT_INVARIANT(m_file_size > 0, "Preallocated file size should be greater than 0.");
if (m_file_size < (m_current_offset + record_size))
Expand Down
Loading