From 3963148a14b6186a77e66c162dc74d3c4cf2d3a7 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Tue, 10 Oct 2023 10:41:37 -0400 Subject: [PATCH] Version records stored in MRecordLog (#3919) --- .../quickwit-index-management/src/index.rs | 4 +- .../src/actors/indexing_pipeline.rs | 8 +- .../src/source/ingest/mod.rs | 66 +++++--- quickwit/quickwit-indexing/src/source/mod.rs | 40 ++--- quickwit/quickwit-indexing/src/test_utils.rs | 2 +- .../quickwit-ingest/src/ingest_v2/fetch.rs | 126 +++++++++++---- .../quickwit-ingest/src/ingest_v2/ingester.rs | 152 ++++++++---------- quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 40 +---- .../quickwit-ingest/src/ingest_v2/mrecord.rs | 98 +++++++++++ .../src/ingest_v2/replication.rs | 20 ++- quickwit/quickwit-proto/build.rs | 4 +- .../protos/quickwit/ingest.proto | 7 + .../protos/quickwit/ingester.proto | 2 +- .../quickwit/quickwit.ingest.ingester.rs | 2 +- .../src/codegen/quickwit/quickwit.ingest.rs | 11 ++ .../quickwit-proto/src/ingest/ingester.rs | 36 ++--- quickwit/quickwit-proto/src/ingest/mod.rs | 25 +++ .../quickwit-storage/src/storage_resolver.rs | 3 +- 18 files changed, 409 insertions(+), 237 deletions(-) create mode 100644 quickwit/quickwit-ingest/src/ingest_v2/mrecord.rs diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 3a6c777bcca..067b47715b3 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -368,7 +368,7 @@ mod tests { #[tokio::test] async fn test_create_index() { let metastore = metastore_for_test(); - let storage_resolver = StorageResolver::ram_and_file_for_test(); + let storage_resolver = StorageResolver::for_test(); let index_service = IndexService::new(metastore.clone(), storage_resolver); let index_id = "test-index"; let index_uri = "ram://indexes/test-index"; @@ -404,7 +404,7 @@ mod tests { #[tokio::test] async fn test_delete_index() { let metastore = metastore_for_test(); - let storage_resolver = StorageResolver::ram_and_file_for_test(); + let storage_resolver = StorageResolver::for_test(); let storage = storage_resolver .resolve(&Uri::for_test("ram://indexes/test-index")) .await diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 1ac7177dece..730b62e87e1 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -680,7 +680,7 @@ mod tests { pipeline_id, doc_mapper: Arc::new(default_doc_mapper_for_test()), source_config, - source_storage_resolver: StorageResolver::ram_and_file_for_test(), + source_storage_resolver: StorageResolver::for_test(), indexing_directory: TempDirectory::for_test(), indexing_settings: IndexingSettings::for_test(), ingester_pool: IngesterPool::default(), @@ -781,7 +781,7 @@ mod tests { pipeline_id, doc_mapper: Arc::new(default_doc_mapper_for_test()), source_config, - source_storage_resolver: StorageResolver::ram_and_file_for_test(), + source_storage_resolver: StorageResolver::for_test(), indexing_directory: TempDirectory::for_test(), indexing_settings: IndexingSettings::for_test(), ingester_pool: IngesterPool::default(), @@ -860,7 +860,7 @@ mod tests { pipeline_id, doc_mapper, source_config, - source_storage_resolver: StorageResolver::ram_and_file_for_test(), + source_storage_resolver: StorageResolver::for_test(), indexing_directory: TempDirectory::for_test(), indexing_settings: IndexingSettings::for_test(), ingester_pool: IngesterPool::default(), @@ -985,7 +985,7 @@ mod tests { pipeline_id, doc_mapper: Arc::new(broken_mapper), source_config, - source_storage_resolver: StorageResolver::ram_and_file_for_test(), + source_storage_resolver: StorageResolver::for_test(), indexing_directory: TempDirectory::for_test(), indexing_settings: IndexingSettings::for_test(), ingester_pool: IngesterPool::default(), diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index bb669b6e81e..b333f75b916 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -22,11 +22,11 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; -use anyhow::Context; +use anyhow::{bail, Context}; use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{ActorExitStatus, Mailbox}; -use quickwit_ingest::{IngesterPool, MultiFetchStream}; +use quickwit_ingest::{decoded_mrecords, IngesterPool, MRecord, MultiFetchStream}; use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint}; use quickwit_metastore::Metastore; use quickwit_proto::ingest::ingester::{ @@ -154,14 +154,27 @@ impl IngestSource { batch_builder: &mut BatchBuilder, fetch_response: FetchResponseV2, ) -> anyhow::Result<()> { + let Some(mrecord_batch) = &fetch_response.mrecord_batch else { + return Ok(()); + }; let assigned_shard = self .assigned_shards .get_mut(&fetch_response.shard_id) .expect("shard should be assigned"); let partition_id = assigned_shard.partition_id.clone(); - for doc in fetch_response.docs() { - batch_builder.add_doc(doc); + for mrecord in decoded_mrecords(mrecord_batch) { + match mrecord { + MRecord::Doc(doc) => { + batch_builder.add_doc(doc); + } + MRecord::Commit => { + batch_builder.force_commit(); + } + MRecord::Unknown => { + bail!("source cannot decode mrecord"); + } + } } let from_position_exclusive = assigned_shard.current_position_inclusive.clone(); let to_position_inclusive = fetch_response @@ -430,7 +443,7 @@ mod tests { use quickwit_metastore::MockMetastore; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateResponse}; - use quickwit_proto::ingest::{DocBatchV2, Shard}; + use quickwit_proto::ingest::{MRecordBatch, Shard}; use quickwit_proto::metastore::{AcquireShardsResponse, AcquireShardsSubresponse}; use quickwit_storage::StorageResolver; use tokio::sync::watch; @@ -535,7 +548,7 @@ mod tests { metastore, ingester_pool: ingester_pool.clone(), queues_dir_path: PathBuf::from("./queues"), - storage_resolver: StorageResolver::ram_and_file_for_test(), + storage_resolver: StorageResolver::for_test(), }); let checkpoint = SourceCheckpoint::default(); let mut source = IngestSource::try_new(runtime_args, checkpoint) @@ -614,7 +627,7 @@ mod tests { metastore, ingester_pool: ingester_pool.clone(), queues_dir_path: PathBuf::from("./queues"), - storage_resolver: StorageResolver::ram_and_file_for_test(), + storage_resolver: StorageResolver::for_test(), }); let checkpoint = SourceCheckpoint::default(); let mut source = IngestSource::try_new(runtime_args, checkpoint) @@ -656,9 +669,9 @@ mod tests { source_id: "test-source".into(), shard_id: 1, from_position_inclusive: 12, - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-112test-doc-113"), - doc_lengths: vec![12, 12], + mrecord_batch: Some(MRecordBatch { + mrecord_buffer: Bytes::from_static(b"\0\0test-doc-112\0\0test-doc-113\0\x01"), + mrecord_lengths: vec![14, 14, 2], }), })) .await @@ -670,9 +683,9 @@ mod tests { source_id: "test-source".into(), shard_id: 2, from_position_inclusive: 23, - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-223"), - doc_lengths: vec![12], + mrecord_batch: Some(MRecordBatch { + mrecord_buffer: Bytes::from_static(b"\0\0test-doc-223"), + mrecord_lengths: vec![14], }), })) .await @@ -690,6 +703,7 @@ mod tests { assert_eq!(doc_batch.docs[0], "test-doc-112"); assert_eq!(doc_batch.docs[1], "test-doc-113"); assert_eq!(doc_batch.docs[2], "test-doc-223"); + assert!(doc_batch.force_commit); let partition_deltas = doc_batch .checkpoint_delta @@ -700,7 +714,7 @@ mod tests { assert_eq!(partition_deltas.len(), 2); assert_eq!(partition_deltas[0].0, 1u64.into()); assert_eq!(partition_deltas[0].1.from, 11u64.into()); - assert_eq!(partition_deltas[0].1.to, 13u64.into()); + assert_eq!(partition_deltas[0].1.to, 14u64.into()); assert_eq!(partition_deltas[1].0, 2u64.into()); assert_eq!(partition_deltas[1].1.from, 22u64.into()); @@ -755,12 +769,12 @@ mod tests { assert_eq!(request.subrequests.len(), 2); let subrequest_0 = &request.subrequests[0]; - assert_eq!(subrequest_0.shard_id, 1); - assert_eq!(subrequest_0.to_position_inclusive, 11); + assert_eq!(subrequest_0.shard_id, 2); + assert_eq!(subrequest_0.to_position_inclusive, 22); let subrequest_1 = &request.subrequests[1]; - assert_eq!(subrequest_1.shard_id, 2); - assert_eq!(subrequest_1.to_position_inclusive, 22); + assert_eq!(subrequest_1.shard_id, 3); + assert_eq!(subrequest_1.to_position_inclusive, 33); Ok(TruncateResponse {}) }); @@ -790,7 +804,7 @@ mod tests { metastore, ingester_pool: ingester_pool.clone(), queues_dir_path: PathBuf::from("./queues"), - storage_resolver: StorageResolver::ram_and_file_for_test(), + storage_resolver: StorageResolver::for_test(), }); let checkpoint = SourceCheckpoint::default(); let mut source = IngestSource::try_new(runtime_args, checkpoint) @@ -803,7 +817,7 @@ mod tests { let ctx: SourceContext = ActorContext::for_test(&universe, source_mailbox, observable_state_tx); - // In this scenario, the ingester 2 is not available and the shard 5 is no longer assigned. + // In this scenario, the ingester 2 is not available and the shard 6 is no longer assigned. source.assigned_shards.insert( 1, AssignedShard { @@ -840,12 +854,22 @@ mod tests { current_position_inclusive: Position::from(44u64), }, ); + source.assigned_shards.insert( + 5, + AssignedShard { + leader_id: "test-ingester-2".into(), + follower_id_opt: Some("test-ingester-3".into()), + partition_id: 4u64.into(), + current_position_inclusive: Position::Beginning, + }, + ); let checkpoint = SourceCheckpoint::from_iter(vec![ (1u64.into(), 11u64.into()), (2u64.into(), 22u64.into()), (3u64.into(), 33u64.into()), (4u64.into(), 44u64.into()), - (5u64.into(), 55u64.into()), + (5u64.into(), Position::Beginning), + (6u64.into(), 66u64.into()), ]); source.suggest_truncate(checkpoint, &ctx).await.unwrap(); diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 8f3ac8f371a..9129193ff7c 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -163,7 +163,7 @@ impl SourceRuntimeArgs { ingester_pool: IngesterPool::default(), queues_dir_path, source_config, - storage_resolver: StorageResolver::ram_and_file_for_test(), + storage_resolver: StorageResolver::for_test(), }) } } @@ -455,10 +455,11 @@ impl Handler for SourceActor { } #[derive(Debug, Default)] -pub struct BatchBuilder { +pub(crate) struct BatchBuilder { docs: Vec, num_bytes: u64, checkpoint_delta: SourceCheckpointDelta, + force_commit: bool, } impl BatchBuilder { @@ -467,14 +468,19 @@ impl BatchBuilder { self.docs.push(doc); } + pub fn force_commit(&mut self) { + self.force_commit = true; + } + pub fn build(self) -> RawDocBatch { RawDocBatch { docs: self.docs, checkpoint_delta: self.checkpoint_delta, - force_commit: false, + force_commit: self.force_commit, } } + #[cfg(feature = "kafka")] pub fn clear(&mut self) { self.docs.clear(); self.num_bytes = 0; @@ -503,8 +509,7 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - check_source_connectivity(&StorageResolver::ram_and_file_for_test(), &source_config) - .await?; + check_source_connectivity(&StorageResolver::for_test(), &source_config).await?; } { let source_config = SourceConfig { @@ -516,8 +521,7 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - check_source_connectivity(&StorageResolver::ram_and_file_for_test(), &source_config) - .await?; + check_source_connectivity(&StorageResolver::for_test(), &source_config).await?; } { let source_config = SourceConfig { @@ -529,12 +533,11 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - assert!(check_source_connectivity( - &StorageResolver::ram_and_file_for_test(), - &source_config - ) - .await - .is_err()); + assert!( + check_source_connectivity(&StorageResolver::for_test(), &source_config) + .await + .is_err() + ); } { let source_config = SourceConfig { @@ -546,12 +549,11 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - assert!(check_source_connectivity( - &StorageResolver::ram_and_file_for_test(), - &source_config - ) - .await - .is_ok()); + assert!( + check_source_connectivity(&StorageResolver::for_test(), &source_config) + .await + .is_ok() + ); } Ok(()) } diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 0f9ef8591e9..77ffd30429f 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -92,7 +92,7 @@ impl TestSandbox { let temp_dir = tempfile::tempdir()?; let indexer_config = IndexerConfig::for_test()?; let num_blocking_threads = 1; - let storage_resolver = StorageResolver::ram_and_file_for_test(); + let storage_resolver = StorageResolver::for_test(); let metastore_resolver = MetastoreResolver::configured(storage_resolver.clone(), &MetastoreConfigs::default()); let metastore = metastore_resolver diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 04f7000c0ae..dcae2050cdd 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -24,10 +24,11 @@ use std::fmt; use std::ops::RangeInclusive; use std::sync::Arc; +use bytes::{BufMut, BytesMut}; use futures::StreamExt; use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{FetchResponseV2, IngesterService, OpenFetchStreamRequest}; -use quickwit_proto::ingest::{IngestV2Error, IngestV2Result}; +use quickwit_proto::ingest::{IngestV2Error, IngestV2Result, MRecordBatch}; use quickwit_proto::types::{queue_id, NodeId, QueueId, ShardId, SourceId}; use quickwit_proto::IndexUid; use tokio::sync::{mpsc, watch, RwLock}; @@ -36,7 +37,7 @@ use tracing::{debug, error, warn}; use super::ingester::IngesterState; use super::models::ShardStatus; -use crate::{ClientId, DocBatchBuilderV2, IngesterPool}; +use crate::{ClientId, IngesterPool}; /// A fetch task is responsible for waiting and pushing new records written to a shard's record log /// into a channel named `fetch_response_tx`. @@ -146,9 +147,9 @@ impl FetchTask { source_id=%self.source_id, shard_id=%self.shard_id, fetch_range=?self.fetch_range, - "Spawning fetch task." + "spawning fetch task" ); - let mut total_num_docs = 0; + let mut total_num_records = 0; while !self.fetch_range.is_empty() { if !self.wait_for_new_records().await { @@ -157,39 +158,44 @@ impl FetchTask { let fetch_range = self.fetch_range.clone(); let state_guard = self.state.read().await; - let Ok(docs) = state_guard.mrecordlog.range(&self.queue_id, fetch_range) else { + let Ok(mrecords) = state_guard.mrecordlog.range(&self.queue_id, fetch_range) else { warn!( client_id=%self.client_id, index_uid=%self.index_uid, source_id=%self.source_id, shard_id=%self.shard_id, - "Failed to read from record log because it was dropped." + "failed to read from record log because it was dropped." ); break; }; - let mut doc_batch_builder = DocBatchBuilderV2::with_capacity(self.batch_num_bytes); + let mut mrecord_buffer = BytesMut::with_capacity(self.batch_num_bytes); + let mut mrecord_lengths = Vec::new(); - for (_position, doc) in docs { - if doc_batch_builder.num_bytes() + doc.len() > doc_batch_builder.capacity() { + for (_position, mrecord) in mrecords { + if mrecord_buffer.len() + mrecord.len() > mrecord_buffer.capacity() { break; } - doc_batch_builder.add_doc(doc.borrow()); + mrecord_buffer.put(mrecord.borrow()); + mrecord_lengths.push(mrecord.len() as u32); } // Drop the lock while we send the message. drop(state_guard); - let doc_batch = doc_batch_builder.build(); - let num_docs = doc_batch.num_docs() as u64; - total_num_docs += num_docs; + let mrecord_batch = MRecordBatch { + mrecord_buffer: mrecord_buffer.freeze(), + mrecord_lengths, + }; + let num_records = mrecord_batch.num_mrecords() as u64; + total_num_records += num_records; let fetch_response = FetchResponseV2 { index_uid: self.index_uid.clone().into(), source_id: self.source_id.clone(), shard_id: self.shard_id, - doc_batch: Some(doc_batch), + mrecord_batch: Some(mrecord_batch), from_position_inclusive: *self.fetch_range.start(), }; - advance_by(&mut self.fetch_range, num_docs); + advance_by(&mut self.fetch_range, num_records); if self .fetch_response_tx @@ -206,12 +212,12 @@ impl FetchTask { index_uid=%self.index_uid, source_id=%self.source_id, shard_id=%self.shard_id, - "Fetch task completed." + "fetch task completed" ); - if total_num_docs == 0 { + if total_num_records == 0 { (0, None) } else { - (total_num_docs, Some(*self.fetch_range.start() - 1)) + (total_num_records, Some(*self.fetch_range.start() - 1)) } } } @@ -550,9 +556,20 @@ mod tests { assert_eq!(fetch_response.source_id, "test-source"); assert_eq!(fetch_response.shard_id, 1); assert_eq!(fetch_response.from_position_inclusive, 0); - assert_eq!(fetch_response.doc_batch.as_ref().unwrap().doc_lengths, [12]); assert_eq!( - fetch_response.doc_batch.as_ref().unwrap().doc_buffer, + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_lengths, + [12] + ); + assert_eq!( + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_buffer, "test-doc-000" ); @@ -581,9 +598,20 @@ mod tests { .unwrap() .unwrap(); assert_eq!(fetch_response.from_position_inclusive, 1); - assert_eq!(fetch_response.doc_batch.as_ref().unwrap().doc_lengths, [12]); assert_eq!( - fetch_response.doc_batch.as_ref().unwrap().doc_buffer, + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_lengths, + [12] + ); + assert_eq!( + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_buffer, "test-doc-001" ); @@ -594,8 +622,8 @@ mod tests { }; shard_status_tx.send(shard_status).unwrap(); - let (num_docs, last_position) = fetch_task_handle.await.unwrap(); - assert_eq!(num_docs, 2); + let (num_records, last_position) = fetch_task_handle.await.unwrap(); + assert_eq!(num_records, 2); assert_eq!(last_position, Some(1)); } @@ -659,14 +687,25 @@ mod tests { assert_eq!(fetch_response.source_id, "test-source"); assert_eq!(fetch_response.shard_id, 1); assert_eq!(fetch_response.from_position_inclusive, 0); - assert_eq!(fetch_response.doc_batch.as_ref().unwrap().doc_lengths, [12]); assert_eq!( - fetch_response.doc_batch.as_ref().unwrap().doc_buffer, + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_lengths, + [12] + ); + assert_eq!( + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_buffer, "test-doc-000" ); - let (num_docs, last_position) = fetch_task_handle.await.unwrap(); - assert_eq!(num_docs, 1); + let (num_records, last_position) = fetch_task_handle.await.unwrap(); + assert_eq!(num_records, 1); assert_eq!(last_position, Some(0)); } @@ -741,11 +780,19 @@ mod tests { assert_eq!(fetch_response.shard_id, 1); assert_eq!(fetch_response.from_position_inclusive, 0); assert_eq!( - fetch_response.doc_batch.as_ref().unwrap().doc_lengths, + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_lengths, [12, 12] ); assert_eq!( - fetch_response.doc_batch.as_ref().unwrap().doc_buffer, + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_buffer, "test-doc-000test-doc-001" ); let fetch_response = timeout(Duration::from_millis(100), fetch_stream.next()) @@ -757,14 +804,25 @@ mod tests { assert_eq!(fetch_response.source_id, "test-source"); assert_eq!(fetch_response.shard_id, 1); assert_eq!(fetch_response.from_position_inclusive, 2); - assert_eq!(fetch_response.doc_batch.as_ref().unwrap().doc_lengths, [12]); assert_eq!( - fetch_response.doc_batch.as_ref().unwrap().doc_buffer, + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_lengths, + [12] + ); + assert_eq!( + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_buffer, "test-doc-002" ); - let (num_docs, last_position) = fetch_task_handle.await.unwrap(); - assert_eq!(num_docs, 3); + let (num_records, last_position) = fetch_task_handle.await.unwrap(); + assert_eq!(num_records, 3); assert_eq!(last_position, Some(2)); } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 559c5a3a529..5e5affe0f3b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use bytes::{Bytes, BytesMut}; use futures::stream::FuturesUnordered; use futures::StreamExt; use mrecordlog::error::{DeleteQueueError, TruncateError}; @@ -53,13 +52,13 @@ use super::fetch::FetchTask; use super::gc::remove_shards_after; use super::ingest_metastore::IngestMetastore; use super::models::{Position, PrimaryShard, ReplicaShard, ShardStatus}; +use super::mrecord::MRecord; use super::replication::{ ReplicationClient, ReplicationClientTask, ReplicationTask, ReplicationTaskHandle, }; use super::IngesterPool; use crate::ingest_v2::gc::REMOVAL_GRACE_PERIOD; use crate::metrics::INGEST_METRICS; -use crate::DocCommand; #[derive(Clone)] pub struct Ingester { @@ -381,17 +380,20 @@ impl IngesterService for Ingester { continue; }; let primary_position_inclusive = if force_commit { - let docs = doc_batch.docs().chain(once(commit_doc())); + let encoded_mrecords = doc_batch + .docs() + .map(|doc| MRecord::Doc(doc).encode()) + .chain(once(MRecord::Commit.encode())); state_guard .mrecordlog - .append_records(&queue_id, None, docs) + .append_records(&queue_id, None, encoded_mrecords) .await .expect("TODO") // TODO: Io error, close shard? } else { - let docs = doc_batch.docs(); + let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); state_guard .mrecordlog - .append_records(&queue_id, None, docs) + .append_records(&queue_id, None, encoded_mrecords) .await .expect("TODO") // TODO: Io error, close shard? }; @@ -632,19 +634,12 @@ impl IngesterService for Ingester { } } -// TODO -pub(super) fn commit_doc() -> Bytes { - let mut buffer = BytesMut::with_capacity(1); - let command = DocCommand::::Commit; - command.write(&mut buffer); - Bytes::from(buffer) -} - #[cfg(test)] mod tests { use std::net::SocketAddr; use std::time::Duration; + use bytes::Bytes; use quickwit_proto::ingest::ingester::{ IngesterServiceGrpcServer, IngesterServiceGrpcServerAdapter, PersistSubrequest, TruncateSubrequest, @@ -765,8 +760,8 @@ mod tests { state_guard.mrecordlog.create_queue(queue_id).await.unwrap(); } let records = [ - Bytes::from_static(b"test-doc-200"), - Bytes::from_static(b"test-doc-201"), + MRecord::new_doc("test-doc-200").encode(), + MRecord::new_doc("test-doc-201").encode(), ]; state_guard .mrecordlog @@ -774,8 +769,8 @@ mod tests { .await .unwrap(); let records = [ - Bytes::from_static(b"test-doc-300"), - Bytes::from_static(b"test-doc-301"), + MRecord::new_doc("test-doc-300").encode(), + MRecord::new_doc("test-doc-301").encode(), ]; state_guard .mrecordlog @@ -783,8 +778,8 @@ mod tests { .await .unwrap(); let records = [ - Bytes::from_static(b"test-doc-400"), - Bytes::from_static(b"test-doc-401"), + MRecord::new_doc("test-doc-400").encode(), + MRecord::new_doc("test-doc-401").encode(), ]; state_guard .mrecordlog @@ -861,7 +856,7 @@ mod tests { let persist_request = PersistRequest { leader_id: self_node_id.to_string(), - commit_type: CommitTypeV2::Auto as i32, + commit_type: CommitTypeV2::Force as i32, subrequests: vec![ PersistSubrequest { index_uid: "test-index:0".to_string(), @@ -875,20 +870,14 @@ mod tests { source_id: "test-source".to_string(), shard_id: 1, follower_id: None, - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-010"), - doc_lengths: vec![12], - }), + doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), }, PersistSubrequest { index_uid: "test-index:1".to_string(), source_id: "test-source".to_string(), shard_id: 0, follower_id: None, - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-100test-doc-101"), - doc_lengths: vec![12, 12], - }), + doc_batch: Some(DocBatchV2::for_test(["test-doc-100", "test-doc-101"])), }, ], }; @@ -908,22 +897,28 @@ mod tests { let queue_id_01 = queue_id("test-index:0", "test-source", 1); let primary_shard_01 = state_guard.primary_shards.get(&queue_id_01).unwrap(); - primary_shard_01.assert_positions(0, NONE_REPLICA_POSITION); - primary_shard_01.assert_is_open(0); + primary_shard_01.assert_positions(1, NONE_REPLICA_POSITION); + primary_shard_01.assert_is_open(1); - state_guard - .mrecordlog - .assert_records_eq(&queue_id_01, .., &[(0, "test-doc-010")]); + state_guard.mrecordlog.assert_records_eq( + &queue_id_01, + .., + &[(0, "\0\0test-doc-010"), (1, "\0\u{1}")], + ); let queue_id_10 = queue_id("test-index:1", "test-source", 0); let primary_shard_10 = state_guard.primary_shards.get(&queue_id_10).unwrap(); - primary_shard_10.assert_positions(1, NONE_REPLICA_POSITION); - primary_shard_10.assert_is_open(1); + primary_shard_10.assert_positions(2, NONE_REPLICA_POSITION); + primary_shard_10.assert_is_open(2); state_guard.mrecordlog.assert_records_eq( &queue_id_10, .., - &[(0, "test-doc-100"), (1, "test-doc-101")], + &[ + (0, "\0\0test-doc-100"), + (1, "\0\0test-doc-101"), + (2, "\0\u{1}"), + ], ); } @@ -1023,20 +1018,14 @@ mod tests { source_id: "test-source".to_string(), shard_id: 1, follower_id: Some(follower_id.to_string()), - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-010"), - doc_lengths: vec![12], - }), + doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), }, PersistSubrequest { index_uid: "test-index:1".to_string(), source_id: "test-source".to_string(), shard_id: 0, follower_id: Some(follower_id.to_string()), - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-100test-doc-101"), - doc_lengths: vec![12, 12], - }), + doc_batch: Some(DocBatchV2::for_test(["test-doc-100", "test-doc-101"])), }, ], }; @@ -1062,9 +1051,11 @@ mod tests { primary_shard_01.assert_positions(0, Some(0)); primary_shard_01.assert_is_open(0); - leader_state_guard - .mrecordlog - .assert_records_eq(&queue_id_01, .., &[(0, "test-doc-010")]); + leader_state_guard.mrecordlog.assert_records_eq( + &queue_id_01, + .., + &[(0, "\0\0test-doc-010")], + ); let queue_id_10 = queue_id("test-index:1", "test-source", 0); let primary_shard_10 = leader_state_guard.primary_shards.get(&queue_id_10).unwrap(); @@ -1074,7 +1065,7 @@ mod tests { leader_state_guard.mrecordlog.assert_records_eq( &queue_id_10, .., - &[(0, "test-doc-100"), (1, "test-doc-101")], + &[(0, "\0\0test-doc-100"), (1, "\0\0test-doc-101")], ); } @@ -1161,20 +1152,14 @@ mod tests { source_id: "test-source".to_string(), shard_id: 1, follower_id: Some(follower_id.to_string()), - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-010"), - doc_lengths: vec![12], - }), + doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), }, PersistSubrequest { index_uid: "test-index:1".to_string(), source_id: "test-source".to_string(), shard_id: 0, follower_id: Some(follower_id.to_string()), - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-100test-doc-101"), - doc_lengths: vec![12, 12], - }), + doc_batch: Some(DocBatchV2::for_test(["test-doc-100", "test-doc-101"])), }, ], }; @@ -1204,17 +1189,21 @@ mod tests { let queue_id_01 = queue_id("test-index:0", "test-source", 1); let leader_state_guard = leader.state.read().await; - leader_state_guard - .mrecordlog - .assert_records_eq(&queue_id_01, .., &[(0, "test-doc-010")]); + leader_state_guard.mrecordlog.assert_records_eq( + &queue_id_01, + .., + &[(0, "\0\0test-doc-010")], + ); let primary_shard = leader_state_guard.primary_shards.get(&queue_id_01).unwrap(); primary_shard.assert_positions(0, Some(0)); primary_shard.assert_is_open(0); - follower_state_guard - .mrecordlog - .assert_records_eq(&queue_id_01, .., &[(0, "test-doc-010")]); + follower_state_guard.mrecordlog.assert_records_eq( + &queue_id_01, + .., + &[(0, "\0\0test-doc-010")], + ); let replica_shard = follower_state_guard .replica_shards @@ -1228,7 +1217,7 @@ mod tests { leader_state_guard.mrecordlog.assert_records_eq( &queue_id_10, .., - &[(0, "test-doc-100"), (1, "test-doc-101")], + &[(0, "\0\0test-doc-100"), (1, "\0\0test-doc-101")], ); let primary_shard = leader_state_guard.primary_shards.get(&queue_id_10).unwrap(); @@ -1238,7 +1227,7 @@ mod tests { follower_state_guard.mrecordlog.assert_records_eq( &queue_id_10, .., - &[(0, "test-doc-100"), (1, "test-doc-101")], + &[(0, "\0\0test-doc-100"), (1, "\0\0test-doc-101")], ); let replica_shard = follower_state_guard @@ -1276,20 +1265,14 @@ mod tests { source_id: "test-source".to_string(), shard_id: 0, follower_id: None, - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-000"), - doc_lengths: vec![12], - }), + doc_batch: Some(DocBatchV2::for_test(["test-doc-000"])), }, PersistSubrequest { index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), shard_id: 1, follower_id: None, - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-010"), - doc_lengths: vec![12], - }), + doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), }, ], }; @@ -1309,9 +1292,12 @@ mod tests { .await .unwrap(); let fetch_response = fetch_stream.next().await.unwrap().unwrap(); - let doc_batch = fetch_response.doc_batch.unwrap(); - assert_eq!(doc_batch.doc_buffer, Bytes::from_static(b"test-doc-000")); - assert_eq!(doc_batch.doc_lengths, [12]); + let mrecord_batch = fetch_response.mrecord_batch.unwrap(); + assert_eq!( + mrecord_batch.mrecord_buffer, + Bytes::from_static(b"\0\0test-doc-000") + ); + assert_eq!(mrecord_batch.mrecord_lengths, [14]); assert_eq!(fetch_response.from_position_inclusive, 0); let persist_request = PersistRequest { @@ -1322,21 +1308,18 @@ mod tests { source_id: "test-source".to_string(), shard_id: 0, follower_id: None, - doc_batch: Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc-001test-doc-002"), - doc_lengths: vec![12, 12], - }), + doc_batch: Some(DocBatchV2::for_test(["test-doc-001", "test-doc-002"])), }], }; ingester.persist(persist_request).await.unwrap(); let fetch_response = fetch_stream.next().await.unwrap().unwrap(); - let doc_batch = fetch_response.doc_batch.unwrap(); + let mrecord_batch = fetch_response.mrecord_batch.unwrap(); assert_eq!( - doc_batch.doc_buffer, - Bytes::from_static(b"test-doc-001test-doc-002") + mrecord_batch.mrecord_buffer, + Bytes::from_static(b"\0\0test-doc-001\0\0test-doc-002") ); - assert_eq!(doc_batch.doc_lengths, [12, 12]); + assert_eq!(mrecord_batch.mrecord_lengths, [14, 14]); assert_eq!(fetch_response.from_position_inclusive, 1); } @@ -1385,7 +1368,6 @@ mod tests { .init_primary_shard(&mut state_guard, &queue_id_02, &self_node_id, None) .await .unwrap(); - state_guard .primary_shards .get_mut(&queue_id_02) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index d8e593e257f..f60f2f99bfe 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -22,60 +22,24 @@ mod gc; mod ingest_metastore; mod ingester; mod models; +mod mrecord; mod replication; mod router; mod shard_table; #[cfg(test)] mod test_utils; -use bytes::{BufMut, BytesMut}; use quickwit_common::tower::Pool; use quickwit_proto::ingest::ingester::IngesterServiceClient; -use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::types::NodeId; pub use self::fetch::MultiFetchStream; pub use self::ingest_metastore::IngestMetastore; pub use self::ingester::Ingester; +pub use self::mrecord::{decoded_mrecords, MRecord}; pub use self::router::IngestRouter; pub type IngesterPool = Pool; /// Identifies an ingester client, typically a source, for logging and debugging purposes. pub type ClientId = String; - -#[derive(Default)] -pub(crate) struct DocBatchBuilderV2 { - doc_buffer: BytesMut, - doc_lengths: Vec, -} - -impl DocBatchBuilderV2 { - pub fn with_capacity(capacity: usize) -> Self { - Self { - doc_buffer: BytesMut::with_capacity(capacity), - doc_lengths: Vec::new(), - } - } - - pub fn add_doc(&mut self, doc: &[u8]) { - self.doc_lengths.push(doc.len() as u32); - self.doc_buffer.put(doc); - } - - pub fn build(self) -> DocBatchV2 { - DocBatchV2 { - doc_buffer: self.doc_buffer.freeze(), - doc_lengths: self.doc_lengths, - } - } - - /// Returns the capacity of the underlying buffer, expressed in bytes. - pub fn capacity(&self) -> usize { - self.doc_buffer.capacity() - } - - fn num_bytes(&self) -> usize { - self.doc_buffer.len() - } -} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecord.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecord.rs new file mode 100644 index 00000000000..9f4822380b0 --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecord.rs @@ -0,0 +1,98 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use bytes::{Buf, Bytes}; +use quickwit_proto::ingest::MRecordBatch; + +/// The first byte of a [`MRecord`] is the version of the record header. +#[derive(Debug)] +#[repr(u8)] +pub enum HeaderVersion { + /// Version 0, introduced in Quickwit 0.7.0, it uses one byte to encode the record type. + V0 = 0, +} + +/// MRecord header v0 for a document composed of the header version and the `Doc = 0` record type. +const MRECORD_DOC_HEADER_V0: &[u8; 2] = &[HeaderVersion::V0 as u8, 0]; + +/// MRecord header v0 for a commit composed of the header version and the `Commit = 1` record type. +const MRECORD_COMMIT_HEADER_V0: &[u8; 2] = &[HeaderVersion::V0 as u8, 1]; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum MRecord { + Doc(Bytes), + Commit, + Unknown, +} + +impl MRecord { + pub fn encode(&self) -> impl Buf { + match &self { + Self::Doc(doc) => MRECORD_DOC_HEADER_V0.chain(doc.clone()), + Self::Commit => MRECORD_COMMIT_HEADER_V0.chain(Bytes::new()), + Self::Unknown => panic!("unknown mrecord type should not be encoded"), + } + } + + pub fn decode(mut buf: impl Buf) -> Self { + let header_version = buf.get_u8(); + + if header_version != HeaderVersion::V0 as u8 { + return Self::Unknown; + } + match buf.get_u8() { + 0 => { + let doc = buf.copy_to_bytes(buf.remaining()); + Self::Doc(doc) + } + 1 => Self::Commit, + _ => Self::Unknown, + } + } + + #[cfg(any(test, feature = "testsuite"))] + pub fn new_doc(doc: impl Into) -> Self { + Self::Doc(doc.into()) + } +} + +pub fn decoded_mrecords(mrecord_batch: &MRecordBatch) -> impl Iterator + '_ { + mrecord_batch.encoded_mrecords().map(MRecord::decode) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mrecord_doc_roundtrip() { + let record = MRecord::new_doc("hello"); + let encoded_record = record.encode(); + let decoded_record = MRecord::decode(encoded_record); + assert_eq!(record, decoded_record); + } + + #[test] + fn test_mrecord_commit_roundtrip() { + let record = MRecord::Commit; + let encoded_record = record.encode(); + let decoded_record = MRecord::decode(encoded_record); + assert_eq!(record, decoded_record); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index ea033d7b437..a89c213a996 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -31,8 +31,9 @@ use quickwit_proto::types::NodeId; use tokio::sync::{mpsc, oneshot, watch, RwLock}; use tokio::task::JoinHandle; -use super::ingester::{commit_doc, IngesterState}; +use super::ingester::IngesterState; use super::models::{Position, ReplicaShard, ShardStatus}; +use super::mrecord::MRecord; use crate::metrics::INGEST_METRICS; /// A replication request is sent by the leader to its follower to update the state of a replica @@ -261,17 +262,20 @@ impl ReplicationTask { continue; }; let replica_position_inclusive = if force_commit { - let docs = doc_batch.docs().chain(once(commit_doc())); + let encoded_mrecords = doc_batch + .docs() + .map(|doc| MRecord::Doc(doc).encode()) + .chain(once(MRecord::Commit.encode())); state_guard .mrecordlog - .append_records(&queue_id, None, docs) + .append_records(&queue_id, None, encoded_mrecords) .await .expect("TODO") } else { - let docs = doc_batch.docs(); + let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); state_guard .mrecordlog - .append_records(&queue_id, None, docs) + .append_records(&queue_id, None, encoded_mrecords) .await .expect("TODO") }; @@ -565,7 +569,7 @@ mod tests { state_guard .mrecordlog - .assert_records_eq(&queue_id_01, .., &[(0, "test-doc-010")]); + .assert_records_eq(&queue_id_01, .., &[(0, "\0\0test-doc-010")]); let queue_id_11 = queue_id("test-index:1", "test-source", 1); let replica_shard_11 = state_guard.replica_shards.get(&queue_id_11).unwrap(); @@ -574,7 +578,7 @@ mod tests { state_guard.mrecordlog.assert_records_eq( &queue_id_11, .., - &[(0, "test-doc-110"), (1, "test-doc-111")], + &[(0, "\0\0test-doc-110"), (1, "\0\0test-doc-111")], ); drop(state_guard); @@ -620,7 +624,7 @@ mod tests { state_guard.mrecordlog.assert_records_eq( &queue_id_01, .., - &[(0, "test-doc-010"), (1, "test-doc-011")], + &[(0, "\0\0test-doc-010"), (1, "\0\0test-doc-011")], ); let replica_shard_01 = state_guard.replica_shards.get(&queue_id_01).unwrap(); replica_shard_01.assert_is_open(1); diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs index 6fac85acd2a..5933d6751cb 100644 --- a/quickwit/quickwit-proto/build.rs +++ b/quickwit/quickwit-proto/build.rs @@ -47,7 +47,7 @@ fn main() -> Result<(), Box> { // Ingest service let mut prost_config = prost_build::Config::default(); - prost_config.bytes(["DocBatchV2.doc_buffer"]); + prost_config.bytes(["DocBatchV2.doc_buffer", "MRecordBatch.mrecord_buffer"]); Codegen::run_with_config( &[ @@ -65,7 +65,7 @@ fn main() -> Result<(), Box> { // "Classic" prost + tonic codegen for metastore and search services. let mut prost_config = prost_build::Config::default(); prost_config - .bytes(["DocBatchV2.doc_buffer"]) + .bytes(["DocBatchV2.doc_buffer", "MRecordBatch.mrecord_buffer"]) .protoc_arg("--experimental_allow_proto3_optional"); tonic_build::configure() diff --git a/quickwit/quickwit-proto/protos/quickwit/ingest.proto b/quickwit/quickwit-proto/protos/quickwit/ingest.proto index 653370633bb..655fd034037 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingest.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingest.proto @@ -33,6 +33,13 @@ message DocBatchV2 { repeated uint32 doc_lengths = 2; } +message MRecordBatch { + // Buffer of encoded and then concatenated mrecords. + bytes mrecord_buffer = 1; + // Lengths of the mrecords in the buffer. + repeated uint32 mrecord_lengths = 2; +} + enum ShardState { // The shard is open and accepts write requests. OPEN = 0; diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 6ce06cf4ef5..a9fc691925e 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -169,7 +169,7 @@ message FetchResponseV2 { string source_id = 2; uint64 shard_id = 3; uint64 from_position_inclusive = 4; - ingest.DocBatchV2 doc_batch = 5; + quickwit.ingest.MRecordBatch mrecord_batch = 5; } message PingRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index f763c115041..f0ec6f22ab9 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -236,7 +236,7 @@ pub struct FetchResponseV2 { #[prost(uint64, tag = "4")] pub from_position_inclusive: u64, #[prost(message, optional, tag = "5")] - pub doc_batch: ::core::option::Option, + pub mrecord_batch: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs index 2dcefc893a9..9b86903e36c 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs @@ -8,6 +8,17 @@ pub struct DocBatchV2 { pub doc_lengths: ::prost::alloc::vec::Vec, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MRecordBatch { + /// Buffer of encoded and then concatenated mrecords. + #[prost(bytes = "bytes", tag = "1")] + pub mrecord_buffer: ::prost::bytes::Bytes, + /// Lengths of the mrecords in the buffer. + #[prost(uint32, repeated, tag = "2")] + pub mrecord_lengths: ::prost::alloc::vec::Vec, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] #[derive(Eq)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index 3b4deb176d8..03a2c89a732 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -17,8 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use bytes::Bytes; - use crate::types::{queue_id, QueueId}; include!("../codegen/quickwit/quickwit.ingest.ingester.rs"); @@ -30,24 +28,20 @@ impl FetchResponseV2 { queue_id(&self.index_uid, &self.source_id, self.shard_id) } - pub fn docs(&self) -> impl Iterator + '_ { - self.doc_batch.iter().flat_map(|doc_batch| doc_batch.docs()) - } - - pub fn num_docs(&self) -> usize { - if let Some(doc_batch) = &self.doc_batch { - doc_batch.doc_lengths.len() + pub fn num_mrecords(&self) -> usize { + if let Some(mrecord_batch) = &self.mrecord_batch { + mrecord_batch.mrecord_lengths.len() } else { 0 } } pub fn to_position_inclusive(&self) -> Option { - let Some(doc_batch) = &self.doc_batch else { + let Some(mrecord_batch) = &self.mrecord_batch else { return None; }; - let num_docs = doc_batch.num_docs() as u64; - Some(self.from_position_inclusive + num_docs - 1) + let num_mrecords = mrecord_batch.num_mrecords() as u64; + Some(self.from_position_inclusive + num_mrecords - 1) } } @@ -154,8 +148,10 @@ impl TruncateSubrequest { #[cfg(test)] mod tests { + use bytes::Bytes; + use super::*; - use crate::ingest::DocBatchV2; + use crate::ingest::{DocBatchV2, MRecordBatch}; #[test] fn test_fetch_response_to_position_inclusive() { @@ -164,19 +160,19 @@ mod tests { source_id: "test-source".to_string(), shard_id: 0, from_position_inclusive: 0, - doc_batch: None, + mrecord_batch: None, }; assert_eq!(response.to_position_inclusive(), None); - response.doc_batch = Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doc"), - doc_lengths: vec![8], + response.mrecord_batch = Some(MRecordBatch { + mrecord_buffer: Bytes::from_static(b"\0\0test-doc"), + mrecord_lengths: vec![10], }); assert_eq!(response.to_position_inclusive(), Some(0)); - response.doc_batch = Some(DocBatchV2 { - doc_buffer: Bytes::from_static(b"test-doctest-doc"), - doc_lengths: vec![8, 8], + response.mrecord_batch = Some(MRecordBatch { + mrecord_buffer: Bytes::from_static(b"\0\0test-doc\0\0test-doc"), + mrecord_lengths: vec![10, 10], }); assert_eq!(response.to_position_inclusive(), Some(1)); } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index ef204f387d2..e606a8bbeb2 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -133,6 +133,31 @@ impl DocBatchV2 { } } +impl MRecordBatch { + pub fn encoded_mrecords(&self) -> impl Iterator + '_ { + self.mrecord_lengths + .iter() + .scan(0, |start_offset, mrecord_length| { + let start = *start_offset; + let end = start + *mrecord_length as usize; + *start_offset = end; + Some(self.mrecord_buffer.slice(start..end)) + }) + } + + pub fn is_empty(&self) -> bool { + self.mrecord_lengths.is_empty() + } + + pub fn num_bytes(&self) -> usize { + self.mrecord_buffer.len() + } + + pub fn num_mrecords(&self) -> usize { + self.mrecord_lengths.len() + } +} + impl Shard { pub fn is_open(&self) -> bool { self.shard_state() == ShardState::Open diff --git a/quickwit/quickwit-storage/src/storage_resolver.rs b/quickwit/quickwit-storage/src/storage_resolver.rs index 376a25fe337..9cd79c3d4b7 100644 --- a/quickwit/quickwit-storage/src/storage_resolver.rs +++ b/quickwit/quickwit-storage/src/storage_resolver.rs @@ -116,7 +116,8 @@ impl StorageResolver { /// Returns a [`StorageResolver`] for testing purposes. Unlike /// [`StorageResolver::unconfigured`], this resolver does not return a singleton. - pub fn ram_and_file_for_test() -> Self { + #[cfg(any(test, feature = "testsuite"))] + pub fn for_test() -> Self { StorageResolver::builder() .register(RamStorageFactory::default()) .register(LocalFileStorageFactory)