From 12c2d39b9be5f0331b0c8ba0ec0e63e27d39160c Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 26 Jun 2024 10:37:23 +0200 Subject: [PATCH] Adjustments after rebase --- quickwit/quickwit-indexing/src/source/mod.rs | 2 +- .../src/source/queue_sources/memory_queue.rs | 2 +- quickwit/quickwit-indexing/src/source/queue_sources/mod.rs | 2 +- .../quickwit-indexing/src/source/queue_sources/processor.rs | 6 +++--- .../src/source/queue_sources/shared_state.rs | 3 ++- .../quickwit-indexing/src/source/queue_sources/sqs_queue.rs | 4 ++-- quickwit/quickwit-metastore/src/tests/shard.rs | 1 + 7 files changed, 11 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index b896aad8898..28c516f8aab 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -739,7 +739,7 @@ mod test_setup_helper { use crate::new_split_id; pub async fn setup_index( - mut metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, index_id: &str, source_config: &SourceConfig, partition_deltas: &[(PartitionId, Position, Position)], diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/memory_queue.rs b/quickwit/quickwit-indexing/src/source/queue_sources/memory_queue.rs index cdc58630486..a3ec306be88 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/memory_queue.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/memory_queue.rs @@ -85,7 +85,7 @@ impl Queue for MemoryQueue { Ok(vec![]) } - async fn acknowledge(&self, ack_ids: &Vec) -> anyhow::Result<()> { + async fn acknowledge(&self, ack_ids: &[String]) -> anyhow::Result<()> { let mut inner_state = self.inner_state.lock().unwrap(); for ack_id in ack_ids { if let Some(msg) = inner_state.in_flight.remove(ack_id) { diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/mod.rs b/quickwit/quickwit-indexing/src/source/queue_sources/mod.rs index 98f22b42d8d..955f2f6510c 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/mod.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/mod.rs @@ -56,7 +56,7 @@ pub trait Queue: fmt::Debug + Send + Sync + 'static { /// The call returns `Ok(())` if: /// - the acknowledgement of some of the messages failed due to a transient failure /// - the message was already acknowledged - async fn acknowledge(&self, ack_ids: &Vec) -> anyhow::Result<()>; + async fn acknowledge(&self, ack_ids: &[String]) -> anyhow::Result<()>; /// Modify the visibility deadline of the messages. /// diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/processor.rs b/quickwit/quickwit-indexing/src/source/queue_sources/processor.rs index 1f02d87401a..bfe05294d6f 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/processor.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/processor.rs @@ -161,7 +161,7 @@ impl QueueProcessor { message.metadata.initial_deadline, ), content: message, - position: position, + position, }) } } @@ -172,7 +172,7 @@ impl QueueProcessor { let ack_ids = already_completed .iter() .map(|msg| msg.metadata.ack_id.clone()) - .collect(); + .collect::>(); self.queue.acknowledge(&ack_ids).await?; Ok(()) @@ -264,7 +264,7 @@ mod tests { node_id: NodeId::from_str("test-node").unwrap(), index_uid: IndexUid::for_test("test-index", 0), source_id: "test-source".to_string(), - pipeline_uid: PipelineUid::new(), + pipeline_uid: PipelineUid::random(), }; let queue_params = QueueParams { message_type: QueueMessageType::RawUri, diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs index 885fcfb3ec3..3d0d46f3e82 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs @@ -25,7 +25,7 @@ use quickwit_metastore::checkpoint::PartitionId; use quickwit_proto::metastore::{ MetastoreService, MetastoreServiceClient, OpenShardSubrequest, OpenShardsRequest, }; -use quickwit_proto::types::{IndexUid, Position, ShardId}; +use quickwit_proto::types::{DocMappingUid, IndexUid, Position, ShardId}; use super::message::PreProcessedMessage; @@ -68,6 +68,7 @@ impl QueueSharedStateImpl { leader_id: String::new(), follower_id: None, shard_id: Some(ShardId::from(partition_id.as_str())), + doc_mapping_uid: Some(DocMappingUid::default()), publish_token: Some(publish_token.to_string()), }) .collect(); diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs b/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs index 190f839b999..0091de14685 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs @@ -104,7 +104,7 @@ impl Queue for SqsQueue { .collect() } - async fn acknowledge(&self, ack_ids: &Vec) -> anyhow::Result<()> { + async fn acknowledge(&self, ack_ids: &[String]) -> anyhow::Result<()> { let entry_batches: Vec<_> = ack_ids .iter() .enumerate() @@ -305,7 +305,7 @@ mod localstack_tests { .await .unwrap(); queue - .acknowledge(&vec![messages[0].metadata.ack_id.clone()]) + .acknowledge(&[messages[0].metadata.ack_id.clone()]) .await .unwrap(); } diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index a35a9668feb..ed3327774ec 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -192,6 +192,7 @@ pub async fn test_metastore_open_shards< shard_id: Some(ShardId::from(2)), leader_id: "test-ingester-foo".to_string(), follower_id: None, + doc_mapping_uid: Some(DocMappingUid::default()), publish_token: Some("publish-token-open".to_string()), }], };