Skip to content

Commit

Permalink
Handle shard not found error in fetch stream and souce (#4247)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Dec 11, 2023
1 parent 2f8ead0 commit 5e823a7
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 39 deletions.
182 changes: 161 additions & 21 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use quickwit_proto::ingest::ingester::{
fetch_message, FetchEof, FetchPayload, IngesterService, TruncateShardsRequest,
TruncateShardsSubrequest,
};
use quickwit_proto::ingest::IngestV2Error;
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsSubrequest, AcquireShardsSubresponse, MetastoreService,
MetastoreServiceClient,
Expand Down Expand Up @@ -119,13 +120,16 @@ impl ClientId {
#[serde(rename_all = "snake_case")]
enum IndexingStatus {
#[default]
// Indexing is in progress.
Active,
// We have received all documents from the stream. Note that they
// are not necessarily published yet.
ReachedEof,
// All documents have been indexed AND published.
Complete,
Error,
// The shard no longer exists.
NotFound,
// We have received all documents from the stream. Note that they
// are not necessarily published yet.
ReachedEof,
}

#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -269,15 +273,30 @@ impl IngestSource {
Ok(())
}

fn process_fetch_stream_error(&mut self, fetch_stream_error: FetchStreamError) {
if let Some(assigned_shard) = self.assigned_shards.get_mut(&fetch_stream_error.shard_id) {
if !matches!(
assigned_shard.status,
IndexingStatus::ReachedEof | IndexingStatus::Complete
) {
assigned_shard.status = IndexingStatus::Error;
}
fn process_fetch_stream_error(
&mut self,
batch_builder: &mut BatchBuilder,
fetch_stream_error: FetchStreamError,
) -> anyhow::Result<()> {
let Some(assigned_shard) = self.assigned_shards.get_mut(&fetch_stream_error.shard_id)
else {
return Ok(());
};
if assigned_shard.status == IndexingStatus::Complete {
return Ok(());
}
if let IngestV2Error::ShardNotFound { .. } = fetch_stream_error.ingest_error {
batch_builder.checkpoint_delta.record_partition_delta(
assigned_shard.partition_id.clone(),
assigned_shard.current_position_inclusive.clone(),
assigned_shard.current_position_inclusive.as_eof(),
)?;
assigned_shard.current_position_inclusive.to_eof();
assigned_shard.status = IndexingStatus::NotFound;
} else if assigned_shard.status != IndexingStatus::ReachedEof {
assigned_shard.status = IndexingStatus::Error;
}
Ok(())
}

async fn truncate(&mut self, truncate_up_to_positions: Vec<(ShardId, Position)>) {
Expand Down Expand Up @@ -479,7 +498,7 @@ impl Source for IngestSource {
}
},
Ok(Err(fetch_stream_error)) => {
self.process_fetch_stream_error(fetch_stream_error);
self.process_fetch_stream_error(&mut batch_builder, fetch_stream_error)?;
}
Err(_) => {
// The deadline has elapsed.
Expand Down Expand Up @@ -549,7 +568,7 @@ impl Source for IngestSource {
.find(|subresponse| self.contains_publish_token(subresponse))
.context("acquire shards response is empty")?;

let mut truncate_positions =
let mut truncate_up_to_positions =
Vec::with_capacity(acquire_shards_subresponse.acquired_shards.len());

for acquired_shard in acquire_shards_subresponse.acquired_shards {
Expand All @@ -559,10 +578,11 @@ impl Source for IngestSource {
let source_id: SourceId = acquired_shard.source_id;
let shard_id = acquired_shard.shard_id;
let partition_id = PartitionId::from(shard_id);
let current_position_inclusive = acquired_shard
let mut current_position_inclusive = acquired_shard
.publish_position_inclusive
.unwrap_or_default();
let from_position_exclusive = current_position_inclusive.clone();

let status = if from_position_exclusive.is_eof() {
IndexingStatus::Complete
} else if let Err(error) = ctx
Expand All @@ -576,12 +596,19 @@ impl Source for IngestSource {
))
.await
{
error!(error=%error, "failed to subscribe to shard");
IndexingStatus::Error
if let IngestV2Error::ShardNotFound { .. } = error {
error!("failed to subscribe to shard `{shard_id}`: shard not found");
current_position_inclusive.to_eof();
IndexingStatus::NotFound
} else {
error!(%error, "failed to subscribe to shard `{shard_id}`");
IndexingStatus::Error
}
} else {
IndexingStatus::Active
};
truncate_positions.push((shard_id, current_position_inclusive.clone()));
truncate_up_to_positions.push((shard_id, current_position_inclusive.clone()));

let assigned_shard = AssignedShard {
leader_id,
follower_id_opt,
Expand All @@ -591,7 +618,7 @@ impl Source for IngestSource {
};
self.assigned_shards.insert(shard_id, assigned_shard);
}
self.truncate(truncate_positions).await;
self.truncate(truncate_up_to_positions).await;

Ok(())
}
Expand All @@ -601,13 +628,14 @@ impl Source for IngestSource {
checkpoint: SourceCheckpoint,
_ctx: &SourceContext,
) -> anyhow::Result<()> {
let mut truncate_positions: Vec<(ShardId, Position)> =
let mut truncate_up_to_positions: Vec<(ShardId, Position)> =
Vec::with_capacity(checkpoint.num_partitions());

for (partition_id, position) in checkpoint.iter() {
let shard_id = partition_id.as_u64().expect("shard ID should be a u64");
truncate_positions.push((shard_id, position));
truncate_up_to_positions.push((shard_id, position));
}
self.truncate(truncate_positions).await;
self.truncate(truncate_up_to_positions).await;
Ok(())
}

Expand Down Expand Up @@ -1509,6 +1537,118 @@ mod tests {
assert_eq!(shard.status, IndexingStatus::Active);
}

#[tokio::test]
async fn test_ingest_source_emit_batches_shard_not_found() {
let pipeline_id = IndexingPipelineId {
node_id: "test-node".to_string(),
index_uid: "test-index:0".into(),
source_id: "test-source".to_string(),
pipeline_uid: PipelineUid::default(),
};
let source_config = SourceConfig::for_test("test-source", SourceParams::Ingest);
let publish_token = "indexer/test-node/test-index:0/test-source/\
00000000000000000000000000/00000000000000000000000000";

let mut mock_metastore = MetastoreServiceClient::mock();
mock_metastore
.expect_acquire_shards()
.once()
.returning(|request| {
assert_eq!(request.subrequests.len(), 1);

let subrequest = &request.subrequests[0];
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(subrequest.shard_ids, vec![1]);

let response = AcquireShardsResponse {
subresponses: vec![AcquireShardsSubresponse {
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
acquired_shards: vec![Shard {
leader_id: "test-ingester-0".to_string(),
follower_id: None,
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
shard_id: 1,
shard_state: ShardState::Open as i32,
publish_position_inclusive: Some(Position::Beginning),
publish_token: Some(publish_token.to_string()),
}],
}],
};
Ok(response)
});
let ingester_pool = IngesterPool::default();

let mut ingester_mock_0 = IngesterServiceClient::mock();
ingester_mock_0
.expect_open_fetch_stream()
.once()
.returning(|request| {
assert_eq!(request.index_uid, "test-index:0");
assert_eq!(request.source_id, "test-source");
assert_eq!(request.shard_id, 1);
assert_eq!(request.from_position_exclusive(), Position::Beginning);

Err(IngestV2Error::ShardNotFound { shard_id: 1 })
});

let ingester_0: IngesterServiceClient = ingester_mock_0.into();
ingester_pool.insert("test-ingester-0".into(), ingester_0.clone());

let event_broker = EventBroker::default();
let runtime_args = Arc::new(SourceRuntimeArgs {
pipeline_id,
source_config,
metastore: MetastoreServiceClient::from(mock_metastore),
ingester_pool,
queues_dir_path: PathBuf::from("./queues"),
storage_resolver: StorageResolver::for_test(),
event_broker,
});
let retry_params = RetryParams::for_test();
let mut source = IngestSource::try_new(runtime_args, retry_params)
.await
.unwrap();

let universe = Universe::with_accelerated_time();
let (source_mailbox, _source_inbox) = universe.create_test_mailbox::<SourceActor>();
let (doc_processor_mailbox, doc_processor_inbox) =
universe.create_test_mailbox::<DocProcessor>();
let (observable_state_tx, _observable_state_rx) = watch::channel(serde_json::Value::Null);
let ctx: SourceContext =
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);

let shard_ids: BTreeSet<ShardId> = BTreeSet::from_iter([1]);

source
.assign_shards(shard_ids, &doc_processor_mailbox, &ctx)
.await
.unwrap();

source
.emit_batches(&doc_processor_mailbox, &ctx)
.await
.unwrap();

let shard = source.assigned_shards.get(&1).unwrap();
assert_eq!(shard.status, IndexingStatus::NotFound);
assert_eq!(
shard.current_position_inclusive,
Position::Beginning.as_eof()
);
let raw_doc_batch = doc_processor_inbox
.recv_typed_message::<RawDocBatch>()
.await
.unwrap();

let (partition_id, position) = raw_doc_batch.checkpoint_delta.iter().next().unwrap();
assert_eq!(partition_id, PartitionId::from(1u64));
assert_eq!(position.from, Position::Beginning);
assert_eq!(position.to, Position::Beginning.as_eof());
}

#[tokio::test]
async fn test_ingest_source_suggest_truncate() {
let pipeline_id = IndexingPipelineId {
Expand Down
79 changes: 75 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,14 +475,32 @@ async fn fault_tolerant_fetch_stream(
};
let mut fetch_stream = match ingester.open_fetch_stream(open_fetch_stream_request).await {
Ok(fetch_stream) => fetch_stream,
Err(ingest_error) => {
Err(shard_not_found_error @ IngestV2Error::ShardNotFound { .. }) => {
error!(
client_id=%client_id,
index_uid=%index_uid,
source_id=%source_id,
shard_id=%shard_id,
"failed to open fetch stream from ingester `{ingester_id}`: shard not found"
);
let fetch_stream_error = FetchStreamError {
index_uid,
source_id,
shard_id,
ingest_error: shard_not_found_error,
};
let _ = fetch_message_tx.send(Err(fetch_stream_error)).await;
from_position_exclusive.to_eof();
return;
}
Err(other_ingest_error) => {
if let Some(failover_ingester_id) = failover_ingester_id_opt {
warn!(
client_id=%client_id,
index_uid=%index_uid,
source_id=%source_id,
shard_id=%shard_id,
error=%ingest_error,
error=%other_ingest_error,
"failed to open fetch stream from ingester `{ingester_id}`: failing over to ingester `{failover_ingester_id}`"
);
} else {
Expand All @@ -491,14 +509,14 @@ async fn fault_tolerant_fetch_stream(
index_uid=%index_uid,
source_id=%source_id,
shard_id=%shard_id,
error=%ingest_error,
error=%other_ingest_error,
"failed to open fetch stream from ingester `{ingester_id}`: closing fetch stream"
);
let fetch_stream_error = FetchStreamError {
index_uid,
source_id,
shard_id,
ingest_error,
ingest_error: other_ingest_error,
};
let _ = fetch_message_tx.send(Err(fetch_stream_error)).await;
return;
Expand Down Expand Up @@ -1440,6 +1458,59 @@ pub(super) mod tests {
.is_none());
}

#[tokio::test]
async fn test_fault_tolerant_fetch_stream_shard_not_found() {
let client_id = "test-client".to_string();
let index_uid: IndexUid = "test-index:0".into();
let source_id: SourceId = "test-source".into();
let shard_id: ShardId = 1;
let mut from_position_exclusive = Position::offset(0u64);

let ingester_ids: Vec<NodeId> = vec!["test-ingester-0".into(), "test-ingester-1".into()];
let ingester_pool = IngesterPool::default();

let (fetch_message_tx, mut fetch_stream) = ServiceStream::new_bounded(5);

let mut ingester_mock_0 = IngesterServiceClient::mock();
ingester_mock_0
.expect_open_fetch_stream()
.return_once(move |request| {
assert_eq!(request.client_id, "test-client");
assert_eq!(request.index_uid, "test-index:0");
assert_eq!(request.source_id, "test-source");
assert_eq!(request.shard_id, 1);
assert_eq!(request.from_position_exclusive(), Position::offset(0u64));

Err(IngestV2Error::ShardNotFound { shard_id: 1 })
});
let ingester_0: IngesterServiceClient = ingester_mock_0.into();
ingester_pool.insert("test-ingester-0".into(), ingester_0);

fault_tolerant_fetch_stream(
client_id,
index_uid,
source_id,
shard_id,
&mut from_position_exclusive,
&ingester_ids,
ingester_pool,
fetch_message_tx,
)
.await;

let fetch_stream_error = timeout(Duration::from_millis(100), fetch_stream.next())
.await
.unwrap()
.unwrap()
.unwrap_err();

assert!(matches!(
fetch_stream_error.ingest_error,
IngestV2Error::ShardNotFound { shard_id: 1 }
));
assert!(from_position_exclusive.is_eof());
}

#[tokio::test]
async fn test_retrying_fetch_stream() {
let client_id = "test-client".to_string();
Expand Down
Loading

0 comments on commit 5e823a7

Please sign in to comment.