Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce log writer APIs (log_io.cpp, log_io.hpp) #1144

Merged
merged 13 commits into from
May 17, 2022
Merged

Conversation

mihirj1993
Copy link

@mihirj1993 mihirj1993 commented Dec 16, 2021

Introduce log writer APIs (log_io.cpp, log_io.hpp)

  • The log_writer APIs will process the txn log and generate pwrite() requests for txn writes (to be written to log files)
  • The log_writer APIs will be used by a single server side thread (log_writer thread : Introduce log writer thread #1146)
  • The in-memory log record format is augmented with a list of chunks; which represents the order in which chunks were used in the txn. This is necessary since a txn may receive chunks in arbitrary order from the server (given chunk reuse) and the in-memory txn log does not track object offsets in the order in which they were allocated for the txn (since the client sorts the log by offset for the purposes of validation)
  • The in-memory log record format is further augmented with an eventfd. The session_decision_eventfd is specific to a session and is used by the log_writer thread to signal to the session thread that txn writes have been persisted. The session thread can then return commit result back to the client.
  • For simplicity, this session_decision_eventfd is stashed in the txn log. The alternative implementation requires keeping a threadsafe structure on the server which is accessed by both the session threads and the log_writer thread (to obtain the relevant session eventfd's to write to).

@@ -37,7 +37,7 @@ namespace persistence
class async_disk_writer_t
{
public:
async_disk_writer_t(int validate_flushed_batch_efd, int signal_checkpoint_efd);
async_disk_writer_t(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the names be event_fd instead of eventfd?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because eventfd is a kernel object type (like memfd), and has its own open(2)-like system call eventfd() (which really should have been eventfd_create()).

constexpr size_t c_max_chunks_per_txn = 8;

// 8 chunks can hold up to 8 * (2^16 - 2^8) = 522240 64B objects,
constexpr size_t c_max_log_records = 522240;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you express this value in terms of c_max_chunks_per_txn, so that changing that value doesn't require recalculating this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this will conflict with my current changes (which limit txn logs to 2^16 records), so I think we can postpone discussion until after those are merged.

Copy link
Contributor

@senderista senderista Jan 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the meantime, here are some numbers to consider: we can address up to 2^16 txn logs in metadata (16 bits to embed an fd/offset), and we can have at most 2^32 live object versions (we have 2^38 bytes in the data segment and objects are at least 2^6 bytes). I think it makes sense to limit the total number of txn records to the max number of live versions (otherwise some records must be addressing versions that don't exist), so we also can have at most 2^32 txn log records, so to ensure this limit is respected we can have at most 2^16 records/txn log. That's a bit smaller than I'd like, but I think the consistency arguments above are compelling. We can discuss if this limit is actually adequate for customer use cases (ditto our 64KB object size limit).

Given the above, we can address at most 2^32 bytes of new object versions in a txn log: max object size is 2^16 bytes and at most 2^16 new versions can be addressed in a txn log. Chunk size is 2^22 bytes, so that means that a txn log can address at most 2^10 chunks. Given 16-bit chunk IDs, that's 2KB for chunk ID storage, or half a 4KB page.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think my current changes to redesign in-memory txn log storage unfortunately need to be merged before these changes, even though these changes are further along, because the scalability issues with mmap() revealed in benchmarking seem to have much more perf impact than any deficiencies of the current persistence implementation.

uint32_t decision_count;
};

char padding[3];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to explain what this is used for / why it is necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it would help if the 3 would be a constant so its name would be more descriptive of why 3 is an appropriate value.

Copy link
Contributor

@senderista senderista left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really looking good, just a few things could be improved. Also a heads-up that my pending changes to replace txn log fds will require some rework (but not really that much).


// Keep track of session threads to unblock.
std::unordered_map<gaia_txn_id_t, int> m_ts_to_session_decision_fd_map;
std::unordered_map<gaia_txn_id_t, int> m_ts_to_session_decision_eventfd_map;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this data structure only accessed by a single thread? (Obviously it's not threadsafe.)

// Track maximum number of new chunks (apart from the one that the txn is already using)
// that can be allocated per transaction.
// This sets an upper bound on txn size: 32MB < txn_size < 36MB
constexpr size_t c_max_chunks_per_txn = 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments elsewhere: based on existing limits (including those in my pending changes for txn log storage), I think we need to allow up to 2^10 chunks/txn.

Also, the max txn log size in bytes is now 1MB (2^16 16-byte records).

@@ -108,6 +113,8 @@ struct txn_log_t
// convenient place for shared state between the client and server.
memory_manager::chunk_offset_t current_chunk;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I'm removing this in my current changes (it never worked anyway for its intended purpose as shared state).

@@ -108,6 +113,8 @@ struct txn_log_t
// convenient place for shared state between the client and server.
memory_manager::chunk_offset_t current_chunk;
size_t record_count;
int session_decision_eventfd;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to put a datum that is valid for only one process (i.e. an fd) in shared-memory state. We should discuss alternatives, e.g. registering this eventfd as part of txn submission, or registering some general index into an array of session shared state (which I think we will need anyway for reliable recovery from crashed sessions).

@@ -108,6 +113,8 @@ struct txn_log_t
// convenient place for shared state between the client and server.
memory_manager::chunk_offset_t current_chunk;
size_t record_count;
int session_decision_eventfd;
size_t chunk_count;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you know, we can always infer the set of chunks used in a txn by scanning the offsets in the txn log (which we need to do anyway during conflict detection), so we should discuss if this is necessary.

// The txn_log is sorted on the client for the correct validation impl, thus this map is used to track order of writes.
// Note that writes beloning to a txn can be assigned in arbitrary chunk order (due to chunk reuse) which is another reason to
// track chunk ids in the log.
std::map<chunk_offset_t, std::set<gaia_offset_t>> chunk_to_offsets_map;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no reason to track the full set of offsets in a chunk at all, when all you care about is the smallest and largest ones. Why not just track min and max offsets per chunk?

std::map<chunk_offset_t, std::set<gaia_offset_t>> chunk_to_offsets_map;
for (size_t i = 0; i < log.data()->chunk_count; i++)
{
auto chunk = log.data()->chunks + i;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be clearer to use array indexing rather than pointer arithmetic.

/**
* Validate the result of I/O calls submitted to the kernel for processing.
*/
void validate_flushed_batch();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the meaning of "validate" is here when the return type is void rather than bool.

#else
size_t allocation_size_in_slots = (allocation_size_in_bytes + c_slot_size_in_bytes - 1) / c_slot_size_in_bytes;
#endif
size_t allocation_size_in_slots = calculate_allocation_size_in_slots(allocation_size_in_bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this refactoring!


// Create iovec entries.
size_t payload_size = 0;
for (size_t i = 0; i < contiguous_address_offsets.size(); i += 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note how the logic in this loop would be simplified by just storing start/end offset pairs in contiguous_address_offsets.

m_async_disk_writer->perform_post_completion_maintenance();
}

void log_handler_t::submit_writes(bool sync)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the same parameter name as async_disk_writer_t::submit_and_swap_in_progress_batch(): should_wait_for_completion.

/**
* Submit async_disk_writer's internal I/O request queue to the kernel for processing.
*/
void submit_writes(bool sync);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the same parameter name as async_disk_writer_t::submit_and_swap_in_progress_batch(): should_wait_for_completion.

@@ -116,10 +116,10 @@ void async_disk_writer_t::perform_post_completion_maintenance()
transactions::txn_metadata_t::set_txn_durable(decision.commit_ts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the TXN_DURABLE flag can only be correct if both the committed data and the decision have been persisted. How do you know at this point that the data is also durable? Isn't the log batch committed asynchronously with the decision batch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of a higher-level design digression: generally speaking, if we have dedicated txn metadata flags for a property, we don't need a watermark for it, and vice versa. The key difference is that metadata flags can be updated out-of-order, while a watermark is in-order by construction (it can only represent a "prefix property", i.e. a predicate which is true for every timestamp <= the watermark). If txns were made durable strictly in commit_ts order (or if there were no latency penalty from requiring a full prefix of txns to be durable before advancing the watermark), then I think a durability watermark could replace the TXN_DURABLE flag. This also seems reasonable given the current batch approach, since we can't signal a session thread that its committing txn is durable until fdatasync() has completed for that txn's log and decision batches. If we required batches to observe the prefix property as well (i.e. we couldn't finalize a batch as long as there were any "holes" in it), then I think this would be a natural approach. However, in view of the longer-term design goal to make durability fully asynchronous using O_DIRECT | O_DATASYNC (so we can signal a session thread as soon as its log/decision write has completed, without having to wait for fdatasync() to complete for the whole batch), I think it's probably better to stick with the metadata flags approach, since that would allow committing session threads to be signaled out-of-order by the logging thread (when we move to O_DIRECT), which might be important for txns with large variance in update size (to prevent large txns from blocking commit notification of small txns). Anyway, curious to hear your thoughts on this.

@LaurentiuCristofor LaurentiuCristofor marked this pull request as draft March 23, 2022 22:10
@senderista senderista marked this pull request as ready for review May 11, 2022 00:46
@senderista
Copy link
Contributor

I think all of my and others' feedback has been addressed, and I also rewrote some clearly inefficient code.

@senderista senderista dismissed their stale review May 11, 2022 00:50

I now own this PR

Copy link
Contributor

@simone-gaia simone-gaia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but honestly don't understand much of what is going on...

Is this code hooked up anywhere or at least tested?

@@ -37,12 +37,12 @@ async_disk_writer_t::async_disk_writer_t(int validate_flush_efd, int signal_chec
ASSERT_PRECONDITION(validate_flush_efd >= 0, "Invalid validate flush eventfd");
ASSERT_PRECONDITION(signal_checkpoint_efd >= 0, "Invalid signal checkpoint eventfd");

m_validate_flush_efd = validate_flush_efd;
m_signal_checkpoint_efd = signal_checkpoint_efd;
m_validate_flush_eventfd = validate_flush_efd;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we put an _ before fd? (i.e m_validate_flush_event_fd).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, eventfd is the name of a kernel object type and a syscall

void perform_flushed_batch_maintenance();

private:
static constexpr char c_gaia_wal_dir_name[] = "/wal_dir";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea to prepend / to a directory name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, will fix (I certainly haven't found all the issues in the original code)

{
auto dirpath = wal_dir_path;
ASSERT_PRECONDITION(!dirpath.empty(), "Gaia persistent directory path shouldn't be empty.");
s_wal_dir_path = dirpath.append(c_gaia_wal_dir_name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the operator is just /.

@senderista
Copy link
Contributor

LGTM, but honestly don't understand much of what is going on...

Is this code hooked up anywhere or at least tested?

Neither, but there's no time for that. I'll do lots of testing for the next 2 PRs.

@@ -25,14 +25,9 @@ namespace persistence
// TODO (Mihir): Use io_uring for fsync, close & fallocate operations in this file.
// open() operation will remain synchronous, since we need the file fd to perform other async
// operations on the file.
log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_seq, size_t size)
log_file_t::log_file_t(const std::string& dir_name, int dir_fd, file_sequence_t file_seq, size_t file_size)
: m_file_size(file_size), m_file_seq(file_seq), m_dir_name(dir_name), m_dir_fd(dir_fd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we also need m_current_offset(0)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could we perform the initializations in the same order in which the arguments are passed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_current_offset is already initialized to a default value (see its declaration). We get a warning if initialization order doesn't match declaration order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change declaration order or does it matter to optimize storage?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


log_handler_t::log_handler_t(const std::string& wal_dir_path)
{
auto dirpath = wal_dir_path;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dirpath -> dir_path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

log_handler_t::log_handler_t(const std::string& wal_dir_path)
{
auto dirpath = wal_dir_path;
ASSERT_PRECONDITION(!dirpath.empty(), "Gaia persistent directory path shouldn't be empty.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "persistent" be "persistence". It's not a directory that is persistent, but a directory used for persistence. Perhaps we should also say something like "data", i.e. "Gaia data persistence directory" instead of "Gaia persistent directory".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or "Gaia persistent data directory" - this goes well with "persistent log directory" that is used further down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


void log_handler_t::open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd)
{
ASSERT_PRECONDITION(validate_flushed_batch_eventfd != -1, "Eventfd to signal post flush maintenance operations invalid!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I got this right:

Eventfd to signal post flush maintenance operations invalid!
->
Eventfd to signal post-flush maintenance operations are invalid!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

writes_to_submit.push_back({header_ptr, sizeof(record_header_t)});
writes_to_submit.push_back({txn_decisions_ptr, txn_decision_size});

m_async_disk_writer->enqueue_pwritev_requests(writes_to_submit, m_current_file->get_file_fd(), m_current_file->get_current_offset(), uring_op_t::pwritev_decision);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you break the arguments of this call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

std::vector<chunk_data_t> chunk_data;

// Obtain deleted IDs and min/max offsets per chunk.
for (size_t i = 0; i < txn_log->record_count; i++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i++ -> ++i

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments on minor issues, nothing blocking.

@senderista senderista merged commit c1a4d78 into master May 17, 2022
@senderista senderista deleted the log_writer_apis branch May 17, 2022 02:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants