-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce log writer APIs (log_io.cpp, log_io.hpp) #1144
Changes from all commits
e4d426b
e9407d9
b72466a
0508152
cbdad1d
5fe5efa
f0d039c
baa1012
1c6fd3b
734b736
1b5b83a
8c7e7c5
1a96167
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,12 +37,12 @@ async_disk_writer_t::async_disk_writer_t(int validate_flush_efd, int signal_chec | |
ASSERT_PRECONDITION(validate_flush_efd >= 0, "Invalid validate flush eventfd"); | ||
ASSERT_PRECONDITION(signal_checkpoint_efd >= 0, "Invalid signal checkpoint eventfd"); | ||
|
||
m_validate_flush_efd = validate_flush_efd; | ||
m_signal_checkpoint_efd = signal_checkpoint_efd; | ||
m_validate_flush_eventfd = validate_flush_efd; | ||
m_signal_checkpoint_eventfd = signal_checkpoint_efd; | ||
|
||
// Used to block new writes to disk when a batch is already getting flushed. | ||
s_flush_efd = eventfd(1, 0); | ||
if (s_flush_efd == -1) | ||
s_flush_eventfd = eventfd(1, 0); | ||
if (s_flush_eventfd == -1) | ||
{ | ||
const char* reason = ::explain_eventfd(1, 0); | ||
throw_system_error(reason); | ||
|
@@ -62,7 +62,7 @@ async_disk_writer_t::~async_disk_writer_t() | |
|
||
void async_disk_writer_t::teardown() | ||
{ | ||
close_fd(s_flush_efd); | ||
close_fd(s_flush_eventfd); | ||
} | ||
|
||
void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t user_data) | ||
|
@@ -76,12 +76,7 @@ void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t use | |
throw_system_error(ss.str(), err); | ||
} | ||
|
||
void async_disk_writer_t::map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_fd) | ||
{ | ||
m_ts_to_session_decision_fd_map.insert(std::pair(commit_ts, session_decision_fd)); | ||
} | ||
|
||
void async_disk_writer_t::add_decisions_to_batch(decision_list_t& decisions) | ||
void async_disk_writer_t::add_decisions_to_batch(const decision_list_t& decisions) | ||
{ | ||
for (const auto& decision : decisions) | ||
{ | ||
|
@@ -106,7 +101,7 @@ void async_disk_writer_t::perform_post_completion_maintenance() | |
// Signal to checkpointing thread the upper bound of files that it can process. | ||
if (max_file_seq_to_close > 0) | ||
{ | ||
signal_eventfd(m_signal_checkpoint_efd, max_file_seq_to_close); | ||
signal_eventfd(m_signal_checkpoint_eventfd, max_file_seq_to_close); | ||
} | ||
|
||
const decision_list_t& decisions = m_in_flight_batch->get_decision_batch_entries(); | ||
|
@@ -116,10 +111,10 @@ void async_disk_writer_t::perform_post_completion_maintenance() | |
transactions::txn_metadata_t::set_txn_durable(decision.commit_ts); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit of a higher-level design digression: generally speaking, if we have dedicated txn metadata flags for a property, we don't need a watermark for it, and vice versa. The key difference is that metadata flags can be updated out-of-order, while a watermark is in-order by construction (it can only represent a "prefix property", i.e. a predicate which is true for every timestamp <= the watermark). If txns were made durable strictly in commit_ts order (or if there were no latency penalty from requiring a full prefix of txns to be durable before advancing the watermark), then I think a durability watermark could replace the |
||
|
||
// Unblock session thread. | ||
auto itr = m_ts_to_session_decision_fd_map.find(decision.commit_ts); | ||
ASSERT_INVARIANT(itr != m_ts_to_session_decision_fd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts"); | ||
auto itr = m_ts_to_session_decision_eventfd_map.find(decision.commit_ts); | ||
ASSERT_INVARIANT(itr != m_ts_to_session_decision_eventfd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts"); | ||
signal_eventfd_single_thread(itr->second); | ||
m_ts_to_session_decision_fd_map.erase(itr); | ||
m_ts_to_session_decision_eventfd_map.erase(itr); | ||
} | ||
|
||
m_in_flight_batch->clear_decision_batch(); | ||
|
@@ -130,7 +125,7 @@ void async_disk_writer_t::submit_and_swap_in_progress_batch(int file_fd, bool sh | |
eventfd_t event_counter; | ||
|
||
// Block on any pending disk flushes. | ||
eventfd_read(s_flush_efd, &event_counter); | ||
eventfd_read(s_flush_eventfd, &event_counter); | ||
|
||
// Perform any maintenance on the in_flight batch. | ||
perform_post_completion_maintenance(); | ||
|
@@ -147,7 +142,7 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_ | |
swap_batches(); | ||
|
||
// Nothing to submit; reset the flush efd that got burnt in submit_and_swap_in_progress_batch() function. | ||
signal_eventfd_single_thread(s_flush_efd); | ||
signal_eventfd_single_thread(s_flush_eventfd); | ||
|
||
// Reset metadata buffer. | ||
m_metadata_buffer.clear(); | ||
|
@@ -158,8 +153,8 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_ | |
m_in_progress_batch->add_fdatasync_op_to_batch(file_fd, get_enum_value(uring_op_t::fdatasync), IOSQE_IO_LINK); | ||
|
||
// Signal eventfd's as part of batch. | ||
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, s_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK); | ||
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, m_validate_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN); | ||
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, s_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK); | ||
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, m_validate_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN); | ||
|
||
swap_batches(); | ||
auto flushed_batch_size = m_in_flight_batch->get_unsubmitted_entries_count(); | ||
|
@@ -187,7 +182,7 @@ void async_disk_writer_t::perform_file_close_operations(int file_fd, file_sequen | |
m_in_progress_batch->append_file_to_batch(file_fd, log_seq); | ||
} | ||
|
||
unsigned char* async_disk_writer_t::copy_into_metadata_buffer(void* source, size_t size, int file_fd) | ||
unsigned char* async_disk_writer_t::copy_into_metadata_buffer(const void* source, size_t size, int file_fd) | ||
{ | ||
auto current_ptr = m_metadata_buffer.get_current_ptr(); | ||
ASSERT_PRECONDITION(current_ptr, "Invalid metadata buffer pointer."); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we put an
_
beforefd
? (i.em_validate_flush_event_fd
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no,
eventfd
is the name of a kernel object type and a syscall