diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 2f0410cc83b..e79ac5d1be2 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3975,15 +3975,13 @@ dependencies = [ [[package]] name = "mrecordlog" version = "0.4.0" -source = "git+https://github.com/quickwit-oss/mrecordlog?rev=bc6a998#bc6a998966603e11928ff23914d6da169fc8fce6" +source = "git+https://github.com/quickwit-oss/mrecordlog?rev=2c593d3#2c593d385b5d788a84f39aed712160547f3b3256" dependencies = [ - "async-trait", "bytes", "crc32fast", "serde", "serde_json", "thiserror", - "tokio", "tracing", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index c2d7d9288eb..b8644a761f2 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -154,7 +154,7 @@ matches = "0.1.9" md5 = "0.7" mime_guess = "2.0.4" mockall = "0.11" -mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "bc6a998" } +mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "2c593d3" } new_string_template = "1.4.0" nom = "7.1.3" num_cpus = "1" diff --git a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs index 41d7bb2a6ed..8a20d16fc55 100644 --- a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs +++ b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs @@ -166,15 +166,16 @@ impl Source for IngestApiSource { return Ok(INGEST_API_POLLING_COOL_DOWN); }; + let batch_num_docs = doc_batch.num_docs(); // TODO use a timestamp (in the raw doc batch) given by at ingest time to be more accurate. let mut raw_doc_batch = RawDocBatch::with_capacity(doc_batch.num_docs()); - for doc in doc_batch.iter() { + for doc in doc_batch.into_iter() { match doc { DocCommand::Ingest { payload } => raw_doc_batch.docs.push(payload), DocCommand::Commit => raw_doc_batch.force_commit = true, } } - let current_offset = first_position + doc_batch.num_docs() as u64 - 1; + let current_offset = first_position + batch_num_docs as u64 - 1; let partition_id = self.partition_id.clone(); raw_doc_batch .checkpoint_delta diff --git a/quickwit/quickwit-ingest/src/doc_batch.rs b/quickwit/quickwit-ingest/src/doc_batch.rs index 3d058dd8d76..b912cf31c97 100644 --- a/quickwit/quickwit-ingest/src/doc_batch.rs +++ b/quickwit/quickwit-ingest/src/doc_batch.rs @@ -211,20 +211,25 @@ impl JsonDocBatchBuilder { impl DocBatch { /// Returns an iterator over the document payloads within a doc_batch. - pub fn iter(&self) -> impl Iterator> + '_ { - self.iter_raw().map(DocCommand::read) + #[allow(clippy::should_implement_trait)] + pub fn into_iter(self) -> impl Iterator> { + self.into_iter_raw().map(DocCommand::read) } /// Returns an iterator over the document payloads within a doc_batch. - pub fn iter_raw(&self) -> impl Iterator + '_ { - self.doc_lengths - .iter() - .cloned() - .scan(0, |current_offset, doc_num_bytes| { + pub fn into_iter_raw(self) -> impl Iterator { + let DocBatch { + doc_buffer, + doc_lengths, + .. + } = self; + doc_lengths + .into_iter() + .scan(0, move |current_offset, doc_num_bytes| { let start = *current_offset; let end = start + doc_num_bytes as usize; *current_offset = end; - Some(self.doc_buffer.slice(start..end)) + Some(doc_buffer.slice(start..end)) }) } @@ -341,7 +346,7 @@ mod tests { assert_eq!(batch.num_docs(), 4); assert_eq!(batch.num_bytes(), 5 + 1 + 5 + 4); - let mut iter = batch.iter(); + let mut iter = batch.clone().into_iter(); assert!(commands_eq( iter.next().unwrap(), DocCommand::Ingest { @@ -367,7 +372,7 @@ mod tests { assert!(iter.next().is_none()); let mut copied_batch = DocBatchBuilder::new("test".to_string()); - for raw_buf in batch.iter_raw() { + for raw_buf in batch.clone().into_iter_raw() { copied_batch.command_from_buf(raw_buf); } let copied_batch = copied_batch.build(); @@ -389,7 +394,7 @@ mod tests { assert_eq!(batch.num_docs(), 3); assert_eq!(batch.num_bytes(), 12 + 12 + 3); - let mut iter = batch.iter(); + let mut iter = batch.into_iter(); assert!(commands_eq( iter.next().unwrap(), DocCommand::Ingest { diff --git a/quickwit/quickwit-ingest/src/ingest_api_service.rs b/quickwit/quickwit-ingest/src/ingest_api_service.rs index d908d604bb0..33df248475f 100644 --- a/quickwit/quickwit-ingest/src/ingest_api_service.rs +++ b/quickwit/quickwit-ingest/src/ingest_api_service.rs @@ -174,32 +174,31 @@ impl IngestApiService { } let mut num_docs = 0usize; let mut notifications = Vec::new(); - for doc_batch in &request.doc_batches { + let commit = request.commit(); + for doc_batch in request.doc_batches { // TODO better error handling. // If there is an error, we probably want a transactional behavior. - let records_it = doc_batch.iter_raw(); - let max_position = self - .queues - .append_batch(&doc_batch.index_id, records_it, ctx) - .await?; - let commit = request.commit(); + + let batch_num_docs = doc_batch.num_docs(); + let batch_num_bytes = doc_batch.num_bytes(); + let index_id = doc_batch.index_id.clone(); + let records_it = doc_batch.into_iter_raw(); + let max_position = self.queues.append_batch(&index_id, records_it, ctx).await?; if let Some(max_position) = max_position { if commit != CommitType::Auto { if commit == CommitType::Force { self.queues .append_batch( - &doc_batch.index_id, + &index_id, iter::once(DocCommand::Commit::.into_buf()), ctx, ) .await?; } - notifications.push((doc_batch.index_id.clone(), max_position)); + notifications.push((index_id.clone(), max_position)); } } - let batch_num_docs = doc_batch.num_docs(); - let batch_num_bytes = doc_batch.num_bytes(); num_docs += batch_num_docs; INGEST_METRICS .ingested_num_bytes @@ -475,7 +474,7 @@ mod tests { let position = doc_batch.num_docs() as u64; assert_eq!(doc_batch.num_docs(), 5); assert!(matches!( - doc_batch.iter().nth(4), + doc_batch.into_iter().nth(4), Some(DocCommand::Commit::) )); ingest_api_service diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 04f01ab3264..e43dcfe27e2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use bytes::{BufMut, BytesMut}; use futures::StreamExt; -use mrecordlog::MultiRecordLog; +use mrecordlog::Record; use quickwit_common::retry::RetryParams; use quickwit_common::{spawn_named_task, ServiceStream}; use quickwit_proto::ingest::ingester::{ @@ -38,6 +38,7 @@ use tokio::task::JoinHandle; use tracing::{debug, error, warn}; use super::models::ShardStatus; +use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{with_lock_metrics, ClientId, IngesterPool}; /// A fetch stream task is responsible for waiting and pushing new records written to a shard's @@ -51,7 +52,7 @@ pub(super) struct FetchStreamTask { queue_id: QueueId, /// The position of the next record fetched. from_position_inclusive: u64, - mrecordlog: Arc>>, + mrecordlog: Arc>>, fetch_message_tx: mpsc::Sender>, /// This channel notifies the fetch task when new records are available. This way the fetch /// task does not need to grab the lock and poll the mrecordlog queue unnecessarily. @@ -75,7 +76,7 @@ impl FetchStreamTask { pub fn spawn( open_fetch_stream_request: OpenFetchStreamRequest, - mrecordlog: Arc>>, + mrecordlog: Arc>>, shard_status_rx: watch::Receiver, batch_num_bytes: usize, ) -> (ServiceStream>, JoinHandle<()>) { @@ -137,13 +138,13 @@ impl FetchStreamTask { // The queue was dropped. break; }; - for (_position, mrecord) in mrecords { - if mrecord_buffer.len() + mrecord.len() > mrecord_buffer.capacity() { + for Record { payload, .. } in mrecords { + if mrecord_buffer.len() + payload.len() > mrecord_buffer.capacity() { has_drained_queue = false; break; } - mrecord_buffer.put(mrecord.borrow()); - mrecord_lengths.push(mrecord.len() as u32); + mrecord_buffer.put(payload.borrow()); + mrecord_lengths.push(payload.len() as u32); } // Drop the lock while we send the message. drop(mrecordlog_guard); @@ -594,7 +595,6 @@ pub(super) mod tests { use std::time::Duration; use bytes::Bytes; - use mrecordlog::MultiRecordLog; use quickwit_proto::ingest::ingester::IngesterServiceClient; use quickwit_proto::ingest::ShardState; use quickwit_proto::types::queue_id; @@ -621,7 +621,7 @@ pub(super) mod tests { async fn test_fetch_task_happy_path() { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = Arc::new(RwLock::new(Some( - MultiRecordLog::open(tempdir.path()).await.unwrap(), + MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), ))); let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); @@ -653,7 +653,11 @@ pub(super) mod tests { mrecordlog_guard .as_mut() .unwrap() - .append_record(&queue_id, None, MRecord::new_doc("test-doc-foo").encode()) + .append_records( + &queue_id, + None, + std::iter::once(MRecord::new_doc("test-doc-foo").encode()), + ) .await .unwrap(); drop(mrecordlog_guard); @@ -703,7 +707,11 @@ pub(super) mod tests { mrecordlog_guard .as_mut() .unwrap() - .append_record(&queue_id, None, MRecord::new_doc("test-doc-bar").encode()) + .append_records( + &queue_id, + None, + std::iter::once(MRecord::new_doc("test-doc-bar").encode()), + ) .await .unwrap(); drop(mrecordlog_guard); @@ -808,7 +816,7 @@ pub(super) mod tests { async fn test_fetch_task_eof_at_beginning() { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = Arc::new(RwLock::new(Some( - MultiRecordLog::open(tempdir.path()).await.unwrap(), + MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), ))); let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); @@ -865,7 +873,7 @@ pub(super) mod tests { async fn test_fetch_task_from_position_exclusive() { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = Arc::new(RwLock::new(Some( - MultiRecordLog::open(tempdir.path()).await.unwrap(), + MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), ))); let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); @@ -905,7 +913,11 @@ pub(super) mod tests { mrecordlog_guard .as_mut() .unwrap() - .append_record(&queue_id, None, MRecord::new_doc("test-doc-foo").encode()) + .append_records( + &queue_id, + None, + std::iter::once(MRecord::new_doc("test-doc-foo").encode()), + ) .await .unwrap(); drop(mrecordlog_guard); @@ -922,7 +934,11 @@ pub(super) mod tests { mrecordlog_guard .as_mut() .unwrap() - .append_record(&queue_id, None, MRecord::new_doc("test-doc-bar").encode()) + .append_records( + &queue_id, + None, + std::iter::once(MRecord::new_doc("test-doc-bar").encode()), + ) .await .unwrap(); drop(mrecordlog_guard); @@ -966,7 +982,7 @@ pub(super) mod tests { async fn test_fetch_task_error() { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = Arc::new(RwLock::new(Some( - MultiRecordLog::open(tempdir.path()).await.unwrap(), + MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), ))); let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); @@ -999,7 +1015,7 @@ pub(super) mod tests { async fn test_fetch_task_batch_num_bytes() { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = Arc::new(RwLock::new(Some( - MultiRecordLog::open(tempdir.path()).await.unwrap(), + MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), ))); let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 5caf2205179..635a45c7f45 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -31,7 +31,6 @@ use fnv::FnvHashMap; use futures::stream::FuturesUnordered; use futures::StreamExt; use mrecordlog::error::CreateQueueError; -use mrecordlog::MultiRecordLog; use quickwit_cluster::Cluster; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; @@ -79,6 +78,7 @@ use super::replication::{ use super::state::{IngesterState, InnerIngesterState, WeakIngesterState}; use super::IngesterPool; use crate::metrics::INGEST_METRICS; +use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{estimate_size, with_lock_metrics, FollowerId}; /// Minimum interval between two reset shards operations. @@ -174,7 +174,7 @@ impl Ingester { async fn init_primary_shard( &self, state: &mut InnerIngesterState, - mrecordlog: &mut MultiRecordLog, + mrecordlog: &mut MultiRecordLogAsync, shard: Shard, now: Instant, ) -> IngestV2Result<()> { @@ -675,10 +675,13 @@ impl Ingester { for subrequest in local_persist_subrequests { let queue_id = subrequest.queue_id; + let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64; + let batch_num_docs = subrequest.doc_batch.num_docs() as u64; + let append_result = append_non_empty_doc_batch( &mut state_guard.mrecordlog, &queue_id, - &subrequest.doc_batch, + subrequest.doc_batch, force_commit, ) .await; @@ -729,8 +732,6 @@ impl Ingester { .expect("primary shard should exist") .set_replication_position_inclusive(current_position_inclusive.clone(), now); - let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64; - let batch_num_docs = subrequest.doc_batch.num_docs() as u64; INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes); INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs); @@ -1213,7 +1214,6 @@ mod tests { use super::*; use crate::ingest_v2::broadcast::ShardInfos; use crate::ingest_v2::fetch::tests::{into_fetch_eof, into_fetch_payload}; - use crate::ingest_v2::test_utils::MultiRecordLogTestExt; use crate::ingest_v2::DEFAULT_IDLE_SHARD_TIMEOUT; use crate::MRecord; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index c2e6ffd2459..14f505a3f71 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -30,8 +30,6 @@ mod replication; mod router; mod routing_table; mod state; -#[cfg(test)] -mod test_utils; mod workbench; use std::ops::{Add, AddAssign}; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs index eaf17f0e469..73cf70d4219 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -24,10 +24,10 @@ use std::ops::RangeInclusive; use bytesize::ByteSize; use fail::fail_point; use mrecordlog::error::{AppendError, DeleteQueueError}; -use mrecordlog::MultiRecordLog; use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::types::{Position, QueueId}; +use crate::mrecordlog_async::MultiRecordLogAsync; use crate::MRecord; #[derive(Debug, thiserror::Error)] @@ -44,9 +44,9 @@ pub(super) enum AppendDocBatchError { /// /// Panics if `doc_batch` is empty. pub(super) async fn append_non_empty_doc_batch( - mrecordlog: &mut MultiRecordLog, + mrecordlog: &mut MultiRecordLogAsync, queue_id: &QueueId, - doc_batch: &DocBatchV2, + doc_batch: DocBatchV2, force_commit: bool, ) -> Result { let append_result = if force_commit { @@ -114,7 +114,7 @@ pub(super) enum NotEnoughCapacityError { /// Checks whether the log has enough capacity to store some records. pub(super) fn check_enough_capacity( - mrecordlog: &MultiRecordLog, + mrecordlog: &MultiRecordLogAsync, disk_capacity: ByteSize, memory_capacity: ByteSize, requested_capacity: ByteSize, @@ -146,7 +146,7 @@ pub(super) fn check_enough_capacity( /// Deletes a queue from the WAL. Returns without error if the queue does not exist. pub async fn force_delete_queue( - mrecordlog: &mut MultiRecordLog, + mrecordlog: &mut MultiRecordLogAsync, queue_id: &QueueId, ) -> io::Result<()> { match mrecordlog.delete_queue(queue_id).await { @@ -158,19 +158,19 @@ pub async fn force_delete_queue( /// Returns the first and last position of the records currently stored in the queue. Returns `None` /// if the queue does not exist or is empty. pub(super) fn queue_position_range( - mrecordlog: &MultiRecordLog, + mrecordlog: &MultiRecordLogAsync, queue_id: &QueueId, ) -> Option> { let first_position = mrecordlog .range(queue_id, ..) .ok()? .next() - .map(|(position, _)| position)?; + .map(|record| record.position)?; let last_position = mrecordlog .last_record(queue_id) .ok()? - .map(|(position, _)| position)?; + .map(|record| record.position)?; Some(first_position..=last_position) } @@ -182,13 +182,13 @@ mod tests { #[tokio::test] async fn test_append_non_empty_doc_batch() { let tempdir = tempfile::tempdir().unwrap(); - let mut mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let mut mrecordlog = MultiRecordLogAsync::open(tempdir.path()).await.unwrap(); let queue_id = "test-queue".to_string(); let doc_batch = DocBatchV2::for_test(["test-doc-foo"]); let append_error = - append_non_empty_doc_batch(&mut mrecordlog, &queue_id, &doc_batch, false) + append_non_empty_doc_batch(&mut mrecordlog, &queue_id, doc_batch.clone(), false) .await .unwrap_err(); @@ -199,14 +199,16 @@ mod tests { mrecordlog.create_queue(&queue_id).await.unwrap(); - let position = append_non_empty_doc_batch(&mut mrecordlog, &queue_id, &doc_batch, false) - .await - .unwrap(); + let position = + append_non_empty_doc_batch(&mut mrecordlog, &queue_id, doc_batch.clone(), false) + .await + .unwrap(); assert_eq!(position, Position::offset(0u64)); - let position = append_non_empty_doc_batch(&mut mrecordlog, &queue_id, &doc_batch, true) - .await - .unwrap(); + let position = + append_non_empty_doc_batch(&mut mrecordlog, &queue_id, doc_batch.clone(), true) + .await + .unwrap(); assert_eq!(position, Position::offset(2u64)); } @@ -219,16 +221,15 @@ mod tests { fail::cfg("ingester:append_records", "return").unwrap(); let tempdir = tempfile::tempdir().unwrap(); - let mut mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let mut mrecordlog = MultiRecordLogAsync::open(tempdir.path()).await.unwrap(); let queue_id = "test-queue".to_string(); mrecordlog.create_queue(&queue_id).await.unwrap(); let doc_batch = DocBatchV2::for_test(["test-doc-foo"]); - let append_error = - append_non_empty_doc_batch(&mut mrecordlog, &queue_id, &doc_batch, false) - .await - .unwrap_err(); + let append_error = append_non_empty_doc_batch(&mut mrecordlog, &queue_id, doc_batch, false) + .await + .unwrap_err(); assert!(matches!(append_error, AppendDocBatchError::Io(..))); @@ -238,7 +239,7 @@ mod tests { #[tokio::test] async fn test_check_enough_capacity() { let tempdir = tempfile::tempdir().unwrap(); - let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let mrecordlog = MultiRecordLogAsync::open(tempdir.path()).await.unwrap(); let disk_error = check_enough_capacity(&mrecordlog, ByteSize(0), ByteSize(0), ByteSize(12)).unwrap_err(); @@ -260,7 +261,7 @@ mod tests { #[tokio::test] async fn test_append_queue_position_range() { let tempdir = tempfile::tempdir().unwrap(); - let mut mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let mut mrecordlog = MultiRecordLogAsync::open(tempdir.path()).await.unwrap(); assert!(queue_position_range(&mrecordlog, &"queue-not-found".to_string()).is_none()); @@ -268,14 +269,14 @@ mod tests { assert!(queue_position_range(&mrecordlog, &"test-queue".to_string()).is_none()); mrecordlog - .append_record("test-queue", None, &b"test-doc-foo"[..]) + .append_records("test-queue", None, std::iter::once(&b"test-doc-foo"[..])) .await .unwrap(); let position_range = queue_position_range(&mrecordlog, &"test-queue".to_string()).unwrap(); assert_eq!(position_range, 0..=0); mrecordlog - .append_record("test-queue", None, &b"test-doc-bar"[..]) + .append_records("test-queue", None, std::iter::once(&b"test-doc-bar"[..])) .await .unwrap(); let position_range = queue_position_range(&mrecordlog, &"test-queue".to_string()).unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 89ee05b358f..e4f690c7c06 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -582,6 +582,10 @@ impl ReplicationTask { continue; } }; + + let batch_num_bytes = doc_batch.num_bytes() as u64; + let batch_num_docs = doc_batch.num_docs() as u64; + let requested_capacity = estimate_size(&doc_batch); let current_usage = match check_enough_capacity( @@ -636,9 +640,6 @@ impl ReplicationTask { .wal_memory_usage_bytes .set(new_memory_usage.as_u64() as i64); - let batch_num_bytes = doc_batch.num_bytes() as u64; - let batch_num_docs = doc_batch.num_docs() as u64; - INGEST_METRICS .replicated_num_bytes_total .inc_by(batch_num_bytes); @@ -722,7 +723,6 @@ mod tests { use quickwit_proto::types::{queue_id, IndexUid, ShardId}; use super::*; - use crate::ingest_v2::test_utils::MultiRecordLogTestExt; fn into_init_replica_request( syn_replication_message: SynReplicationMessage, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 805a934e131..3055f756221 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -25,7 +25,6 @@ use std::time::{Duration, Instant}; use fnv::FnvHashMap; use mrecordlog::error::{DeleteQueueError, TruncateError}; -use mrecordlog::MultiRecordLog; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; use quickwit_proto::control_plane::AdviseResetShardsResponse; @@ -39,6 +38,7 @@ use super::models::IngesterShard; use super::rate_meter::RateMeter; use super::replication::{ReplicationStreamTaskHandle, ReplicationTaskHandle}; use crate::ingest_v2::mrecordlog_utils::{force_delete_queue, queue_position_range}; +use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{FollowerId, LeaderId}; /// Stores the state of the ingester and attempts to prevent deadlocks by exposing an API that @@ -51,7 +51,7 @@ use crate::{FollowerId, LeaderId}; pub(super) struct IngesterState { // `inner` is a mutex because it's almost always accessed mutably. inner: Arc>, - mrecordlog: Arc>>, + mrecordlog: Arc>>, pub status_rx: watch::Receiver, } @@ -136,7 +136,7 @@ impl IngesterState { let now = Instant::now(); info!("opening WAL located at `{}`", wal_dir_path.display()); - let open_result = MultiRecordLog::open_with_prefs( + let open_result = MultiRecordLogAsync::open_with_prefs( wal_dir_path, mrecordlog::SyncPolicy::OnDelay(Duration::from_secs(5)), ) @@ -265,7 +265,7 @@ impl IngesterState { // Leaks the mrecordlog lock for use in fetch tasks. It's safe to do so because fetch tasks // never attempt to lock the inner state. - pub fn mrecordlog(&self) -> Arc>> { + pub fn mrecordlog(&self) -> Arc>> { self.mrecordlog.clone() } @@ -304,7 +304,7 @@ impl DerefMut for PartiallyLockedIngesterState<'_> { pub(super) struct FullyLockedIngesterState<'a> { pub inner: MutexGuard<'a, InnerIngesterState>, - pub mrecordlog: RwLockMappedWriteGuard<'a, MultiRecordLog>, + pub mrecordlog: RwLockMappedWriteGuard<'a, MultiRecordLogAsync>, } impl fmt::Debug for FullyLockedIngesterState<'_> { @@ -403,7 +403,7 @@ impl FullyLockedIngesterState<'_> { #[derive(Clone)] pub(super) struct WeakIngesterState { inner: Weak>, - mrecordlog: Weak>>, + mrecordlog: Weak>>, status_rx: watch::Receiver, } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/test_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/test_utils.rs deleted file mode 100644 index 4960d04c3b3..00000000000 --- a/quickwit/quickwit-ingest/src/ingest_v2/test_utils.rs +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::ops::RangeBounds; - -use mrecordlog::MultiRecordLog; - -pub(super) trait MultiRecordLogTestExt { - fn assert_records_eq(&self, queue_id: &str, range: R, expected_records: &[(u64, &str)]) - where R: RangeBounds + 'static; -} - -impl MultiRecordLogTestExt for MultiRecordLog { - #[track_caller] - fn assert_records_eq(&self, queue_id: &str, range: R, expected_records: &[(u64, &str)]) - where R: RangeBounds + 'static { - let records = self - .range(queue_id, range) - .unwrap() - .map(|(position, record)| (position, String::from_utf8(record.into_owned()).unwrap())) - .collect::>(); - assert_eq!( - records.len(), - expected_records.len(), - "expected {} records, got {}", - expected_records.len(), - records.len() - ); - for ((position, record), (expected_position, expected_record)) in - records.iter().zip(expected_records.iter()) - { - assert_eq!( - position, expected_position, - "expected record at position `{expected_position}`, got `{position}`", - ); - assert_eq!( - record, expected_record, - "expected record `{expected_record}`, got `{record}`", - ); - } - } -} diff --git a/quickwit/quickwit-ingest/src/lib.rs b/quickwit/quickwit-ingest/src/lib.rs index 8766717c3b0..82935c3f2a0 100644 --- a/quickwit/quickwit-ingest/src/lib.rs +++ b/quickwit/quickwit-ingest/src/lib.rs @@ -27,6 +27,7 @@ mod ingest_service; mod ingest_v2; mod memory_capacity; mod metrics; +mod mrecordlog_async; mod notifications; mod position; mod queue; diff --git a/quickwit/quickwit-ingest/src/mrecordlog_async.rs b/quickwit/quickwit-ingest/src/mrecordlog_async.rs new file mode 100644 index 00000000000..51dd252bb34 --- /dev/null +++ b/quickwit/quickwit-ingest/src/mrecordlog_async.rs @@ -0,0 +1,195 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::io; +use std::ops::RangeBounds; +use std::path::Path; + +use bytes::Buf; +use mrecordlog::error::*; +use mrecordlog::{MultiRecordLog, Record, SyncPolicy}; +use tokio::task::JoinError; +use tracing::error; + +/// A light wrapper to allow async operation in mrecordlog. +pub struct MultiRecordLogAsync { + mrecordlog_opt: Option, +} + +impl MultiRecordLogAsync { + fn take(&mut self) -> MultiRecordLog { + let Some(mrecordlog) = self.mrecordlog_opt.take() else { + error!("wal is poisoned, aborting process"); + std::process::abort(); + }; + mrecordlog + } + + fn mrecordlog_ref(&self) -> &MultiRecordLog { + let Some(mrecordlog) = &self.mrecordlog_opt else { + error!("the mrecordlog is corrupted, aborting process"); + std::process::abort(); + }; + mrecordlog + } + + pub async fn open(directory_path: &Path) -> Result { + Self::open_with_prefs(directory_path, SyncPolicy::OnAppend).await + } + + pub async fn open_with_prefs( + directory_path: &Path, + sync_policy: SyncPolicy, + ) -> Result { + let directory_path = directory_path.to_path_buf(); + let mrecordlog = tokio::task::spawn(async move { + MultiRecordLog::open_with_prefs(&directory_path, sync_policy) + }) + .await + .map_err(|join_err| { + error!(error=?join_err, "failed to load WAL"); + ReadRecordError::IoError(io::Error::new( + io::ErrorKind::Other, + "loading wal from directory failed", + )) + })??; + Ok(Self { + mrecordlog_opt: Some(mrecordlog), + }) + } + + async fn run_operation(&mut self, operation: F) -> T + where + F: FnOnce(&mut MultiRecordLog) -> T + Send + 'static, + T: Send + 'static, + { + let mut mrecordlog = self.take(); + let join_res: Result<(T, MultiRecordLog), JoinError> = + tokio::task::spawn_blocking(move || { + let res = operation(&mut mrecordlog); + (res, mrecordlog) + }) + .await; + match join_res { + Ok((operation_result, mrecordlog)) => { + self.mrecordlog_opt = Some(mrecordlog); + operation_result + } + Err(join_error) => { + // This could be caused by a panic + error!(error=?join_error, "failed to run mrecordlog operation"); + panic!("failed to run mrecordlog operation"); + } + } + } + + pub async fn create_queue(&mut self, queue: &str) -> Result<(), CreateQueueError> { + let queue = queue.to_string(); + self.run_operation(move |mrecordlog| mrecordlog.create_queue(&queue)) + .await + } + + pub async fn delete_queue(&mut self, queue: &str) -> Result<(), DeleteQueueError> { + let queue = queue.to_string(); + self.run_operation(move |mrecordlog| mrecordlog.delete_queue(&queue)) + .await + } + + pub async fn append_records + Send + 'static>( + &mut self, + queue: &str, + position_opt: Option, + payloads: T, + ) -> Result, AppendError> { + let queue = queue.to_string(); + self.run_operation(move |mrecordlog| { + mrecordlog.append_records(&queue, position_opt, payloads) + }) + .await + } + + #[track_caller] + #[cfg(test)] + pub fn assert_records_eq(&self, queue_id: &str, range: R, expected_records: &[(u64, &str)]) + where R: RangeBounds + 'static { + let records = self + .range(queue_id, range) + .unwrap() + .map(|Record { position, payload }| { + (position, String::from_utf8(payload.into_owned()).unwrap()) + }) + .collect::>(); + assert_eq!( + records.len(), + expected_records.len(), + "expected {} records, got {}", + expected_records.len(), + records.len() + ); + for ((position, record), (expected_position, expected_record)) in + records.iter().zip(expected_records.iter()) + { + assert_eq!( + position, expected_position, + "expected record at position `{expected_position}`, got `{position}`", + ); + assert_eq!( + record, expected_record, + "expected record `{expected_record}`, got `{record}`", + ); + } + } + + pub async fn truncate(&mut self, queue: &str, position: u64) -> Result { + let queue = queue.to_string(); + self.run_operation(move |mrecordlog| mrecordlog.truncate(&queue, position)) + .await + } + + pub fn range( + &self, + queue: &str, + range: R, + ) -> Result> + '_, MissingQueue> + where + R: RangeBounds + 'static, + { + self.mrecordlog_ref().range(queue, range) + } + + pub fn queue_exists(&self, queue: &str) -> bool { + self.mrecordlog_ref().queue_exists(queue) + } + + pub fn list_queues(&self) -> impl Iterator { + self.mrecordlog_ref().list_queues() + } + + pub fn last_record(&self, queue: &str) -> Result>, MissingQueue> { + self.mrecordlog_ref().last_record(queue) + } + + pub fn memory_usage(&self) -> usize { + self.mrecordlog_ref().memory_usage() + } + + pub fn disk_usage(&self) -> usize { + self.mrecordlog_ref().disk_usage() + } +} diff --git a/quickwit/quickwit-ingest/src/queue.rs b/quickwit/quickwit-ingest/src/queue.rs index e96c9cbb6d8..50f35a1a127 100644 --- a/quickwit/quickwit-ingest/src/queue.rs +++ b/quickwit/quickwit-ingest/src/queue.rs @@ -22,9 +22,10 @@ use std::path::Path; use bytes::Buf; use mrecordlog::error::CreateQueueError; -use mrecordlog::MultiRecordLog; +use mrecordlog::Record; use quickwit_actors::ActorContext; +use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{ DocBatchBuilder, FetchResponse, IngestApiService, IngestServiceError, ListQueuesResponse, }; @@ -35,13 +36,13 @@ const FETCH_PAYLOAD_LIMIT: usize = 2_000_000; // 2MB const QUICKWIT_CF_PREFIX: &str = ".queue_"; pub struct Queues { - record_log: MultiRecordLog, + record_log: MultiRecordLogAsync, } impl Queues { pub async fn open(queues_dir_path: &Path) -> crate::Result { tokio::fs::create_dir_all(queues_dir_path).await.unwrap(); - let record_log = MultiRecordLog::open(queues_dir_path).await?; + let record_log = MultiRecordLogAsync::open(queues_dir_path).await?; Ok(Queues { record_log }) } @@ -121,7 +122,9 @@ impl Queues { record: &[u8], ctx: &ActorContext, ) -> crate::Result> { - self.append_batch(queue_id, std::iter::once(record), ctx) + use bytes::Bytes; + + self.append_batch(queue_id, std::iter::once(Bytes::from(record.to_vec())), ctx) .await } @@ -131,7 +134,7 @@ impl Queues { pub async fn append_batch<'a>( &mut self, queue_id: &str, - records_it: impl Iterator, + records_it: impl Iterator + Send + 'static, ctx: &ActorContext, ) -> crate::Result> { let real_queue_id = format!("{QUICKWIT_CF_PREFIX}{queue_id}"); @@ -176,11 +179,11 @@ impl Queues { let mut num_bytes = 0; let mut first_key_opt = None; - for (pos, record) in records { + for Record { position, payload } in records { if first_key_opt.is_none() { - first_key_opt = Some(pos); + first_key_opt = Some(position); } - num_bytes += doc_batch.command_from_buf(record.as_ref()); + num_bytes += doc_batch.command_from_buf(payload.as_ref()); if num_bytes > size_limit { break; } @@ -272,7 +275,7 @@ mod tests { let fetch_resp = self.fetch(queue_id, start_after, None).unwrap(); assert_eq!(fetch_resp.first_position, expected_first_pos_opt); let doc_batch = fetch_resp.doc_batch.unwrap(); - let records: Vec = doc_batch.iter_raw().collect(); + let records: Vec = doc_batch.clone().into_iter_raw().collect(); assert_eq!(&records, expected); } } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index 2372d8662fd..4ba02ac4d14 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -120,13 +120,19 @@ impl Shard { } impl DocBatchV2 { - pub fn docs(&self) -> impl Iterator + '_ { - self.doc_lengths.iter().scan(0, |start_offset, doc_length| { - let start = *start_offset; - let end = start + *doc_length as usize; - *start_offset = end; - Some(self.doc_buffer.slice(start..end)) - }) + pub fn docs(self) -> impl Iterator { + let DocBatchV2 { + doc_buffer, + doc_lengths, + } = self; + doc_lengths + .into_iter() + .scan(0, move |start_offset, doc_length| { + let start = *start_offset; + let end = start + doc_length as usize; + *start_offset = end; + Some(doc_buffer.slice(start..end)) + }) } pub fn is_empty(&self) -> bool { diff --git a/quickwit/quickwit-proto/src/ingest/router.rs b/quickwit/quickwit-proto/src/ingest/router.rs index 24b1e146012..8d208ed1a8d 100644 --- a/quickwit/quickwit-proto/src/ingest/router.rs +++ b/quickwit/quickwit-proto/src/ingest/router.rs @@ -17,12 +17,4 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use super::Bytes; - include!("../codegen/quickwit/quickwit.ingest.router.rs"); - -impl IngestSubrequest { - pub fn docs(&self) -> impl Iterator + '_ { - self.doc_batch.iter().flat_map(|doc_batch| doc_batch.docs()) - } -}