Skip to content

Commit

Permalink
Version records stored in MRecordLog (#3919)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Oct 10, 2023
1 parent 71afc78 commit 3963148
Show file tree
Hide file tree
Showing 18 changed files with 409 additions and 237 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ mod tests {
#[tokio::test]
async fn test_create_index() {
let metastore = metastore_for_test();
let storage_resolver = StorageResolver::ram_and_file_for_test();
let storage_resolver = StorageResolver::for_test();
let index_service = IndexService::new(metastore.clone(), storage_resolver);
let index_id = "test-index";
let index_uri = "ram://indexes/test-index";
Expand Down Expand Up @@ -404,7 +404,7 @@ mod tests {
#[tokio::test]
async fn test_delete_index() {
let metastore = metastore_for_test();
let storage_resolver = StorageResolver::ram_and_file_for_test();
let storage_resolver = StorageResolver::for_test();
let storage = storage_resolver
.resolve(&Uri::for_test("ram://indexes/test-index"))
.await
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ mod tests {
pipeline_id,
doc_mapper: Arc::new(default_doc_mapper_for_test()),
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
source_storage_resolver: StorageResolver::for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down Expand Up @@ -781,7 +781,7 @@ mod tests {
pipeline_id,
doc_mapper: Arc::new(default_doc_mapper_for_test()),
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
source_storage_resolver: StorageResolver::for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down Expand Up @@ -860,7 +860,7 @@ mod tests {
pipeline_id,
doc_mapper,
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
source_storage_resolver: StorageResolver::for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down Expand Up @@ -985,7 +985,7 @@ mod tests {
pipeline_id,
doc_mapper: Arc::new(broken_mapper),
source_config,
source_storage_resolver: StorageResolver::ram_and_file_for_test(),
source_storage_resolver: StorageResolver::for_test(),
indexing_directory: TempDirectory::for_test(),
indexing_settings: IndexingSettings::for_test(),
ingester_pool: IngesterPool::default(),
Expand Down
66 changes: 45 additions & 21 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use anyhow::{bail, Context};
use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_ingest::{IngesterPool, MultiFetchStream};
use quickwit_ingest::{decoded_mrecords, IngesterPool, MRecord, MultiFetchStream};
use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint};
use quickwit_metastore::Metastore;
use quickwit_proto::ingest::ingester::{
Expand Down Expand Up @@ -154,14 +154,27 @@ impl IngestSource {
batch_builder: &mut BatchBuilder,
fetch_response: FetchResponseV2,
) -> anyhow::Result<()> {
let Some(mrecord_batch) = &fetch_response.mrecord_batch else {
return Ok(());
};
let assigned_shard = self
.assigned_shards
.get_mut(&fetch_response.shard_id)
.expect("shard should be assigned");
let partition_id = assigned_shard.partition_id.clone();

for doc in fetch_response.docs() {
batch_builder.add_doc(doc);
for mrecord in decoded_mrecords(mrecord_batch) {
match mrecord {
MRecord::Doc(doc) => {
batch_builder.add_doc(doc);
}
MRecord::Commit => {
batch_builder.force_commit();
}
MRecord::Unknown => {
bail!("source cannot decode mrecord");
}
}
}
let from_position_exclusive = assigned_shard.current_position_inclusive.clone();
let to_position_inclusive = fetch_response
Expand Down Expand Up @@ -430,7 +443,7 @@ mod tests {
use quickwit_metastore::MockMetastore;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateResponse};
use quickwit_proto::ingest::{DocBatchV2, Shard};
use quickwit_proto::ingest::{MRecordBatch, Shard};
use quickwit_proto::metastore::{AcquireShardsResponse, AcquireShardsSubresponse};
use quickwit_storage::StorageResolver;
use tokio::sync::watch;
Expand Down Expand Up @@ -535,7 +548,7 @@ mod tests {
metastore,
ingester_pool: ingester_pool.clone(),
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::ram_and_file_for_test(),
storage_resolver: StorageResolver::for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
Expand Down Expand Up @@ -614,7 +627,7 @@ mod tests {
metastore,
ingester_pool: ingester_pool.clone(),
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::ram_and_file_for_test(),
storage_resolver: StorageResolver::for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
Expand Down Expand Up @@ -656,9 +669,9 @@ mod tests {
source_id: "test-source".into(),
shard_id: 1,
from_position_inclusive: 12,
doc_batch: Some(DocBatchV2 {
doc_buffer: Bytes::from_static(b"test-doc-112test-doc-113"),
doc_lengths: vec![12, 12],
mrecord_batch: Some(MRecordBatch {
mrecord_buffer: Bytes::from_static(b"\0\0test-doc-112\0\0test-doc-113\0\x01"),
mrecord_lengths: vec![14, 14, 2],
}),
}))
.await
Expand All @@ -670,9 +683,9 @@ mod tests {
source_id: "test-source".into(),
shard_id: 2,
from_position_inclusive: 23,
doc_batch: Some(DocBatchV2 {
doc_buffer: Bytes::from_static(b"test-doc-223"),
doc_lengths: vec![12],
mrecord_batch: Some(MRecordBatch {
mrecord_buffer: Bytes::from_static(b"\0\0test-doc-223"),
mrecord_lengths: vec![14],
}),
}))
.await
Expand All @@ -690,6 +703,7 @@ mod tests {
assert_eq!(doc_batch.docs[0], "test-doc-112");
assert_eq!(doc_batch.docs[1], "test-doc-113");
assert_eq!(doc_batch.docs[2], "test-doc-223");
assert!(doc_batch.force_commit);

let partition_deltas = doc_batch
.checkpoint_delta
Expand All @@ -700,7 +714,7 @@ mod tests {
assert_eq!(partition_deltas.len(), 2);
assert_eq!(partition_deltas[0].0, 1u64.into());
assert_eq!(partition_deltas[0].1.from, 11u64.into());
assert_eq!(partition_deltas[0].1.to, 13u64.into());
assert_eq!(partition_deltas[0].1.to, 14u64.into());

assert_eq!(partition_deltas[1].0, 2u64.into());
assert_eq!(partition_deltas[1].1.from, 22u64.into());
Expand Down Expand Up @@ -755,12 +769,12 @@ mod tests {
assert_eq!(request.subrequests.len(), 2);

let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 1);
assert_eq!(subrequest_0.to_position_inclusive, 11);
assert_eq!(subrequest_0.shard_id, 2);
assert_eq!(subrequest_0.to_position_inclusive, 22);

let subrequest_1 = &request.subrequests[1];
assert_eq!(subrequest_1.shard_id, 2);
assert_eq!(subrequest_1.to_position_inclusive, 22);
assert_eq!(subrequest_1.shard_id, 3);
assert_eq!(subrequest_1.to_position_inclusive, 33);

Ok(TruncateResponse {})
});
Expand Down Expand Up @@ -790,7 +804,7 @@ mod tests {
metastore,
ingester_pool: ingester_pool.clone(),
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::ram_and_file_for_test(),
storage_resolver: StorageResolver::for_test(),
});
let checkpoint = SourceCheckpoint::default();
let mut source = IngestSource::try_new(runtime_args, checkpoint)
Expand All @@ -803,7 +817,7 @@ mod tests {
let ctx: SourceContext =
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);

// In this scenario, the ingester 2 is not available and the shard 5 is no longer assigned.
// In this scenario, the ingester 2 is not available and the shard 6 is no longer assigned.
source.assigned_shards.insert(
1,
AssignedShard {
Expand Down Expand Up @@ -840,12 +854,22 @@ mod tests {
current_position_inclusive: Position::from(44u64),
},
);
source.assigned_shards.insert(
5,
AssignedShard {
leader_id: "test-ingester-2".into(),
follower_id_opt: Some("test-ingester-3".into()),
partition_id: 4u64.into(),
current_position_inclusive: Position::Beginning,
},
);
let checkpoint = SourceCheckpoint::from_iter(vec![
(1u64.into(), 11u64.into()),
(2u64.into(), 22u64.into()),
(3u64.into(), 33u64.into()),
(4u64.into(), 44u64.into()),
(5u64.into(), 55u64.into()),
(5u64.into(), Position::Beginning),
(6u64.into(), 66u64.into()),
]);
source.suggest_truncate(checkpoint, &ctx).await.unwrap();

Expand Down
40 changes: 21 additions & 19 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl SourceRuntimeArgs {
ingester_pool: IngesterPool::default(),
queues_dir_path,
source_config,
storage_resolver: StorageResolver::ram_and_file_for_test(),
storage_resolver: StorageResolver::for_test(),
})
}
}
Expand Down Expand Up @@ -455,10 +455,11 @@ impl Handler<SuggestTruncate> for SourceActor {
}

#[derive(Debug, Default)]
pub struct BatchBuilder {
pub(crate) struct BatchBuilder {
docs: Vec<Bytes>,
num_bytes: u64,
checkpoint_delta: SourceCheckpointDelta,
force_commit: bool,
}

impl BatchBuilder {
Expand All @@ -467,14 +468,19 @@ impl BatchBuilder {
self.docs.push(doc);
}

pub fn force_commit(&mut self) {
self.force_commit = true;
}

pub fn build(self) -> RawDocBatch {
RawDocBatch {
docs: self.docs,
checkpoint_delta: self.checkpoint_delta,
force_commit: false,
force_commit: self.force_commit,
}
}

#[cfg(feature = "kafka")]
pub fn clear(&mut self) {
self.docs.clear();
self.num_bytes = 0;
Expand Down Expand Up @@ -503,8 +509,7 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
check_source_connectivity(&StorageResolver::ram_and_file_for_test(), &source_config)
.await?;
check_source_connectivity(&StorageResolver::for_test(), &source_config).await?;
}
{
let source_config = SourceConfig {
Expand All @@ -516,8 +521,7 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
check_source_connectivity(&StorageResolver::ram_and_file_for_test(), &source_config)
.await?;
check_source_connectivity(&StorageResolver::for_test(), &source_config).await?;
}
{
let source_config = SourceConfig {
Expand All @@ -529,12 +533,11 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
assert!(check_source_connectivity(
&StorageResolver::ram_and_file_for_test(),
&source_config
)
.await
.is_err());
assert!(
check_source_connectivity(&StorageResolver::for_test(), &source_config)
.await
.is_err()
);
}
{
let source_config = SourceConfig {
Expand All @@ -546,12 +549,11 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
assert!(check_source_connectivity(
&StorageResolver::ram_and_file_for_test(),
&source_config
)
.await
.is_ok());
assert!(
check_source_connectivity(&StorageResolver::for_test(), &source_config)
.await
.is_ok()
);
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl TestSandbox {
let temp_dir = tempfile::tempdir()?;
let indexer_config = IndexerConfig::for_test()?;
let num_blocking_threads = 1;
let storage_resolver = StorageResolver::ram_and_file_for_test();
let storage_resolver = StorageResolver::for_test();
let metastore_resolver =
MetastoreResolver::configured(storage_resolver.clone(), &MetastoreConfigs::default());
let metastore = metastore_resolver
Expand Down
Loading

0 comments on commit 3963148

Please sign in to comment.