Skip to content

Commit

Permalink
Using the sync mrecordlog version. (#4653)
Browse files Browse the repository at this point in the history
This PR introduces a wrapper around mrecordlog to use the new
sync mrecordlog in an async context.

Operations are executed in spawn_blocking.

If the future is cancelled, then the mrecordlog object will be poisoned.
  • Loading branch information
fulmicoton authored Mar 1, 2024
1 parent 4054ec5 commit 05311f1
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 171 deletions.
4 changes: 1 addition & 3 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ matches = "0.1.9"
md5 = "0.7"
mime_guess = "2.0.4"
mockall = "0.11"
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "bc6a998" }
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "2c593d3" }
new_string_template = "1.4.0"
nom = "7.1.3"
num_cpus = "1"
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-indexing/src/source/ingest_api_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,16 @@ impl Source for IngestApiSource {
return Ok(INGEST_API_POLLING_COOL_DOWN);
};

let batch_num_docs = doc_batch.num_docs();
// TODO use a timestamp (in the raw doc batch) given by at ingest time to be more accurate.
let mut raw_doc_batch = RawDocBatch::with_capacity(doc_batch.num_docs());
for doc in doc_batch.iter() {
for doc in doc_batch.into_iter() {
match doc {
DocCommand::Ingest { payload } => raw_doc_batch.docs.push(payload),
DocCommand::Commit => raw_doc_batch.force_commit = true,
}
}
let current_offset = first_position + doc_batch.num_docs() as u64 - 1;
let current_offset = first_position + batch_num_docs as u64 - 1;
let partition_id = self.partition_id.clone();
raw_doc_batch
.checkpoint_delta
Expand Down
27 changes: 16 additions & 11 deletions quickwit/quickwit-ingest/src/doc_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,25 @@ impl JsonDocBatchBuilder {

impl DocBatch {
/// Returns an iterator over the document payloads within a doc_batch.
pub fn iter(&self) -> impl Iterator<Item = DocCommand<Bytes>> + '_ {
self.iter_raw().map(DocCommand::read)
#[allow(clippy::should_implement_trait)]
pub fn into_iter(self) -> impl Iterator<Item = DocCommand<Bytes>> {
self.into_iter_raw().map(DocCommand::read)
}

/// Returns an iterator over the document payloads within a doc_batch.
pub fn iter_raw(&self) -> impl Iterator<Item = Bytes> + '_ {
self.doc_lengths
.iter()
.cloned()
.scan(0, |current_offset, doc_num_bytes| {
pub fn into_iter_raw(self) -> impl Iterator<Item = Bytes> {
let DocBatch {
doc_buffer,
doc_lengths,
..
} = self;
doc_lengths
.into_iter()
.scan(0, move |current_offset, doc_num_bytes| {
let start = *current_offset;
let end = start + doc_num_bytes as usize;
*current_offset = end;
Some(self.doc_buffer.slice(start..end))
Some(doc_buffer.slice(start..end))
})
}

Expand Down Expand Up @@ -341,7 +346,7 @@ mod tests {
assert_eq!(batch.num_docs(), 4);
assert_eq!(batch.num_bytes(), 5 + 1 + 5 + 4);

let mut iter = batch.iter();
let mut iter = batch.clone().into_iter();
assert!(commands_eq(
iter.next().unwrap(),
DocCommand::Ingest {
Expand All @@ -367,7 +372,7 @@ mod tests {
assert!(iter.next().is_none());

let mut copied_batch = DocBatchBuilder::new("test".to_string());
for raw_buf in batch.iter_raw() {
for raw_buf in batch.clone().into_iter_raw() {
copied_batch.command_from_buf(raw_buf);
}
let copied_batch = copied_batch.build();
Expand All @@ -389,7 +394,7 @@ mod tests {
assert_eq!(batch.num_docs(), 3);
assert_eq!(batch.num_bytes(), 12 + 12 + 3);

let mut iter = batch.iter();
let mut iter = batch.into_iter();
assert!(commands_eq(
iter.next().unwrap(),
DocCommand::Ingest {
Expand Down
23 changes: 11 additions & 12 deletions quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,32 +174,31 @@ impl IngestApiService {
}
let mut num_docs = 0usize;
let mut notifications = Vec::new();
for doc_batch in &request.doc_batches {
let commit = request.commit();
for doc_batch in request.doc_batches {
// TODO better error handling.
// If there is an error, we probably want a transactional behavior.
let records_it = doc_batch.iter_raw();
let max_position = self
.queues
.append_batch(&doc_batch.index_id, records_it, ctx)
.await?;
let commit = request.commit();

let batch_num_docs = doc_batch.num_docs();
let batch_num_bytes = doc_batch.num_bytes();
let index_id = doc_batch.index_id.clone();
let records_it = doc_batch.into_iter_raw();
let max_position = self.queues.append_batch(&index_id, records_it, ctx).await?;
if let Some(max_position) = max_position {
if commit != CommitType::Auto {
if commit == CommitType::Force {
self.queues
.append_batch(
&doc_batch.index_id,
&index_id,
iter::once(DocCommand::Commit::<Bytes>.into_buf()),
ctx,
)
.await?;
}
notifications.push((doc_batch.index_id.clone(), max_position));
notifications.push((index_id.clone(), max_position));
}
}

let batch_num_docs = doc_batch.num_docs();
let batch_num_bytes = doc_batch.num_bytes();
num_docs += batch_num_docs;
INGEST_METRICS
.ingested_num_bytes
Expand Down Expand Up @@ -475,7 +474,7 @@ mod tests {
let position = doc_batch.num_docs() as u64;
assert_eq!(doc_batch.num_docs(), 5);
assert!(matches!(
doc_batch.iter().nth(4),
doc_batch.into_iter().nth(4),
Some(DocCommand::Commit::<Bytes>)
));
ingest_api_service
Expand Down
50 changes: 33 additions & 17 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

use bytes::{BufMut, BytesMut};
use futures::StreamExt;
use mrecordlog::MultiRecordLog;
use mrecordlog::Record;
use quickwit_common::retry::RetryParams;
use quickwit_common::{spawn_named_task, ServiceStream};
use quickwit_proto::ingest::ingester::{
Expand All @@ -38,6 +38,7 @@ use tokio::task::JoinHandle;
use tracing::{debug, error, warn};

use super::models::ShardStatus;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::{with_lock_metrics, ClientId, IngesterPool};

/// A fetch stream task is responsible for waiting and pushing new records written to a shard's
Expand All @@ -51,7 +52,7 @@ pub(super) struct FetchStreamTask {
queue_id: QueueId,
/// The position of the next record fetched.
from_position_inclusive: u64,
mrecordlog: Arc<RwLock<Option<MultiRecordLog>>>,
mrecordlog: Arc<RwLock<Option<MultiRecordLogAsync>>>,
fetch_message_tx: mpsc::Sender<IngestV2Result<FetchMessage>>,
/// This channel notifies the fetch task when new records are available. This way the fetch
/// task does not need to grab the lock and poll the mrecordlog queue unnecessarily.
Expand All @@ -75,7 +76,7 @@ impl FetchStreamTask {

pub fn spawn(
open_fetch_stream_request: OpenFetchStreamRequest,
mrecordlog: Arc<RwLock<Option<MultiRecordLog>>>,
mrecordlog: Arc<RwLock<Option<MultiRecordLogAsync>>>,
shard_status_rx: watch::Receiver<ShardStatus>,
batch_num_bytes: usize,
) -> (ServiceStream<IngestV2Result<FetchMessage>>, JoinHandle<()>) {
Expand Down Expand Up @@ -137,13 +138,13 @@ impl FetchStreamTask {
// The queue was dropped.
break;
};
for (_position, mrecord) in mrecords {
if mrecord_buffer.len() + mrecord.len() > mrecord_buffer.capacity() {
for Record { payload, .. } in mrecords {
if mrecord_buffer.len() + payload.len() > mrecord_buffer.capacity() {
has_drained_queue = false;
break;
}
mrecord_buffer.put(mrecord.borrow());
mrecord_lengths.push(mrecord.len() as u32);
mrecord_buffer.put(payload.borrow());
mrecord_lengths.push(payload.len() as u32);
}
// Drop the lock while we send the message.
drop(mrecordlog_guard);
Expand Down Expand Up @@ -594,7 +595,6 @@ pub(super) mod tests {
use std::time::Duration;

use bytes::Bytes;
use mrecordlog::MultiRecordLog;
use quickwit_proto::ingest::ingester::IngesterServiceClient;
use quickwit_proto::ingest::ShardState;
use quickwit_proto::types::queue_id;
Expand All @@ -621,7 +621,7 @@ pub(super) mod tests {
async fn test_fetch_task_happy_path() {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLog::open(tempdir.path()).await.unwrap(),
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
)));
let client_id = "test-client".to_string();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
Expand Down Expand Up @@ -653,7 +653,11 @@ pub(super) mod tests {
mrecordlog_guard
.as_mut()
.unwrap()
.append_record(&queue_id, None, MRecord::new_doc("test-doc-foo").encode())
.append_records(
&queue_id,
None,
std::iter::once(MRecord::new_doc("test-doc-foo").encode()),
)
.await
.unwrap();
drop(mrecordlog_guard);
Expand Down Expand Up @@ -703,7 +707,11 @@ pub(super) mod tests {
mrecordlog_guard
.as_mut()
.unwrap()
.append_record(&queue_id, None, MRecord::new_doc("test-doc-bar").encode())
.append_records(
&queue_id,
None,
std::iter::once(MRecord::new_doc("test-doc-bar").encode()),
)
.await
.unwrap();
drop(mrecordlog_guard);
Expand Down Expand Up @@ -808,7 +816,7 @@ pub(super) mod tests {
async fn test_fetch_task_eof_at_beginning() {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLog::open(tempdir.path()).await.unwrap(),
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
)));
let client_id = "test-client".to_string();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
Expand Down Expand Up @@ -865,7 +873,7 @@ pub(super) mod tests {
async fn test_fetch_task_from_position_exclusive() {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLog::open(tempdir.path()).await.unwrap(),
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
)));
let client_id = "test-client".to_string();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
Expand Down Expand Up @@ -905,7 +913,11 @@ pub(super) mod tests {
mrecordlog_guard
.as_mut()
.unwrap()
.append_record(&queue_id, None, MRecord::new_doc("test-doc-foo").encode())
.append_records(
&queue_id,
None,
std::iter::once(MRecord::new_doc("test-doc-foo").encode()),
)
.await
.unwrap();
drop(mrecordlog_guard);
Expand All @@ -922,7 +934,11 @@ pub(super) mod tests {
mrecordlog_guard
.as_mut()
.unwrap()
.append_record(&queue_id, None, MRecord::new_doc("test-doc-bar").encode())
.append_records(
&queue_id,
None,
std::iter::once(MRecord::new_doc("test-doc-bar").encode()),
)
.await
.unwrap();
drop(mrecordlog_guard);
Expand Down Expand Up @@ -966,7 +982,7 @@ pub(super) mod tests {
async fn test_fetch_task_error() {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLog::open(tempdir.path()).await.unwrap(),
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
)));
let client_id = "test-client".to_string();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
Expand Down Expand Up @@ -999,7 +1015,7 @@ pub(super) mod tests {
async fn test_fetch_task_batch_num_bytes() {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLog::open(tempdir.path()).await.unwrap(),
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
)));
let client_id = "test-client".to_string();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
Expand Down
12 changes: 6 additions & 6 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use fnv::FnvHashMap;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mrecordlog::error::CreateQueueError;
use mrecordlog::MultiRecordLog;
use quickwit_cluster::Cluster;
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
Expand Down Expand Up @@ -79,6 +78,7 @@ use super::replication::{
use super::state::{IngesterState, InnerIngesterState, WeakIngesterState};
use super::IngesterPool;
use crate::metrics::INGEST_METRICS;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::{estimate_size, with_lock_metrics, FollowerId};

/// Minimum interval between two reset shards operations.
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Ingester {
async fn init_primary_shard(
&self,
state: &mut InnerIngesterState,
mrecordlog: &mut MultiRecordLog,
mrecordlog: &mut MultiRecordLogAsync,
shard: Shard,
now: Instant,
) -> IngestV2Result<()> {
Expand Down Expand Up @@ -675,10 +675,13 @@ impl Ingester {
for subrequest in local_persist_subrequests {
let queue_id = subrequest.queue_id;

let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64;
let batch_num_docs = subrequest.doc_batch.num_docs() as u64;

let append_result = append_non_empty_doc_batch(
&mut state_guard.mrecordlog,
&queue_id,
&subrequest.doc_batch,
subrequest.doc_batch,
force_commit,
)
.await;
Expand Down Expand Up @@ -729,8 +732,6 @@ impl Ingester {
.expect("primary shard should exist")
.set_replication_position_inclusive(current_position_inclusive.clone(), now);

let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64;
let batch_num_docs = subrequest.doc_batch.num_docs() as u64;
INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes);
INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs);

Expand Down Expand Up @@ -1213,7 +1214,6 @@ mod tests {
use super::*;
use crate::ingest_v2::broadcast::ShardInfos;
use crate::ingest_v2::fetch::tests::{into_fetch_eof, into_fetch_payload};
use crate::ingest_v2::test_utils::MultiRecordLogTestExt;
use crate::ingest_v2::DEFAULT_IDLE_SHARD_TIMEOUT;
use crate::MRecord;

Expand Down
2 changes: 0 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ mod replication;
mod router;
mod routing_table;
mod state;
#[cfg(test)]
mod test_utils;
mod workbench;

use std::ops::{Add, AddAssign};
Expand Down
Loading

0 comments on commit 05311f1

Please sign in to comment.