Skip to content

Commit

Permalink
Adjustments after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 26, 2024
1 parent 9f6f029 commit 12c2d39
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 9 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ mod test_setup_helper {
use crate::new_split_id;

pub async fn setup_index(
mut metastore: MetastoreServiceClient,
metastore: MetastoreServiceClient,
index_id: &str,
source_config: &SourceConfig,
partition_deltas: &[(PartitionId, Position, Position)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Queue for MemoryQueue {
Ok(vec![])
}

async fn acknowledge(&self, ack_ids: &Vec<String>) -> anyhow::Result<()> {
async fn acknowledge(&self, ack_ids: &[String]) -> anyhow::Result<()> {
let mut inner_state = self.inner_state.lock().unwrap();
for ack_id in ack_ids {
if let Some(msg) = inner_state.in_flight.remove(ack_id) {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/queue_sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub trait Queue: fmt::Debug + Send + Sync + 'static {
/// The call returns `Ok(())` if:
/// - the acknowledgement of some of the messages failed due to a transient failure
/// - the message was already acknowledged
async fn acknowledge(&self, ack_ids: &Vec<String>) -> anyhow::Result<()>;
async fn acknowledge(&self, ack_ids: &[String]) -> anyhow::Result<()>;

/// Modify the visibility deadline of the messages.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl QueueProcessor {
message.metadata.initial_deadline,
),
content: message,
position: position,
position,
})
}
}
Expand All @@ -172,7 +172,7 @@ impl QueueProcessor {
let ack_ids = already_completed
.iter()
.map(|msg| msg.metadata.ack_id.clone())
.collect();
.collect::<Vec<_>>();
self.queue.acknowledge(&ack_ids).await?;

Ok(())
Expand Down Expand Up @@ -264,7 +264,7 @@ mod tests {
node_id: NodeId::from_str("test-node").unwrap(),
index_uid: IndexUid::for_test("test-index", 0),
source_id: "test-source".to_string(),
pipeline_uid: PipelineUid::new(),
pipeline_uid: PipelineUid::random(),
};
let queue_params = QueueParams {
message_type: QueueMessageType::RawUri,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use quickwit_metastore::checkpoint::PartitionId;
use quickwit_proto::metastore::{
MetastoreService, MetastoreServiceClient, OpenShardSubrequest, OpenShardsRequest,
};
use quickwit_proto::types::{IndexUid, Position, ShardId};
use quickwit_proto::types::{DocMappingUid, IndexUid, Position, ShardId};

use super::message::PreProcessedMessage;

Expand Down Expand Up @@ -68,6 +68,7 @@ impl QueueSharedStateImpl {
leader_id: String::new(),
follower_id: None,
shard_id: Some(ShardId::from(partition_id.as_str())),
doc_mapping_uid: Some(DocMappingUid::default()),
publish_token: Some(publish_token.to_string()),
})
.collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Queue for SqsQueue {
.collect()
}

async fn acknowledge(&self, ack_ids: &Vec<String>) -> anyhow::Result<()> {
async fn acknowledge(&self, ack_ids: &[String]) -> anyhow::Result<()> {
let entry_batches: Vec<_> = ack_ids
.iter()
.enumerate()
Expand Down Expand Up @@ -305,7 +305,7 @@ mod localstack_tests {
.await
.unwrap();
queue
.acknowledge(&vec![messages[0].metadata.ack_id.clone()])
.acknowledge(&[messages[0].metadata.ack_id.clone()])
.await
.unwrap();
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-metastore/src/tests/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub async fn test_metastore_open_shards<
shard_id: Some(ShardId::from(2)),
leader_id: "test-ingester-foo".to_string(),
follower_id: None,
doc_mapping_uid: Some(DocMappingUid::default()),
publish_token: Some("publish-token-open".to_string()),
}],
};
Expand Down

0 comments on commit 12c2d39

Please sign in to comment.