From 756d37cf8582ffa4264a676986f42453071c641c Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Wed, 7 Feb 2024 11:48:48 -0500 Subject: [PATCH] WIP --- quickwit/quickwit-indexing/failpoints/mod.rs | 2 +- quickwit/quickwit-ingest/Cargo.toml | 3 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 77 +++-- .../src/ingest_v2/mrecordlog_utils.rs | 30 +- .../src/ingest_v2/replication.md | 14 +- .../src/ingest_v2/replication.rs | 317 +++++++++++++++--- 6 files changed, 336 insertions(+), 107 deletions(-) diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index fcca994712f..8ea6b81b2c2 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -211,7 +211,7 @@ async fn aux_test_failpoints() -> anyhow::Result<()> { Ok(()) } -const TEST_TEXT: &'static str = r#"His sole child, my lord, and bequeathed to my +const TEST_TEXT: &str = r#"His sole child, my lord, and bequeathed to my overlooking. I have those hopes of her good that her education promises; her dispositions she inherits, which makes fair gifts fairer; for where diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index b1a236d34e4..ecec67187e3 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -11,7 +11,7 @@ async-trait = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } dyn-clone = { workspace = true } -fail = { workspace = true } +fail = { workspace = true, optional = true } flume = { workspace = true } futures = { workspace = true } http = { workspace = true } @@ -53,4 +53,5 @@ quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-codegen = { workspace = true } [features] +failpoints = ["fail/failpoints"] testsuite = ["mockall"] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 2330aaa4d3b..d3ff9e0b4b1 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -152,9 +152,9 @@ impl Ingester { ) -> IngestV2Result<()> { let queue_id = shard.queue_id(); info!( - index_uid = shard.index_uid, - source = shard.source_id, - shard = %shard.shard_id(), + index_uid=shard.index_uid, + source_id=shard.source_id, + shard_id=%shard.shard_id(), "init primary shard" ); let Entry::Vacant(entry) = state.shards.entry(queue_id.clone()) else { @@ -162,14 +162,15 @@ impl Ingester { }; match mrecordlog.create_queue(&queue_id).await { Ok(_) => {} - Err(CreateQueueError::AlreadyExists) => panic!("queue should not exist"), + Err(CreateQueueError::AlreadyExists) => { + error!("WAL queue `{queue_id}` already exists"); + let message = format!("WAL queue `{queue_id}` already exists"); + return Err(IngestV2Error::Internal(message)); + } Err(CreateQueueError::IoError(io_error)) => { - // TODO: Close all shards and set readiness to false. - error!( - "failed to create mrecordlog queue `{}`: {}", - queue_id, io_error - ); - return Err(IngestV2Error::Internal(format!("Io Error: {io_error}"))); + error!("failed to create WAL queue `{queue_id}`: {io_error}",); + let message = format!("failed to create WAL queue `{queue_id}`: {io_error}"); + return Err(IngestV2Error::Internal(message)); } }; let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings); @@ -330,7 +331,8 @@ impl Ingester { // first verify if we would locally accept each subrequest { - let mut sum_of_requested_capacity = bytesize::ByteSize::b(0); + let mut total_requested_capacity = bytesize::ByteSize::b(0); + for subrequest in persist_request.subrequests { let queue_id = subrequest.queue_id(); @@ -380,29 +382,26 @@ impl Ingester { }; let requested_capacity = estimate_size(&doc_batch); - match check_enough_capacity( + if let Err(error) = check_enough_capacity( &state_guard.mrecordlog, self.disk_capacity, self.memory_capacity, - requested_capacity + sum_of_requested_capacity, + requested_capacity + total_requested_capacity, ) { - Ok(_usage) => (), - Err(error) => { - rate_limited_warn!( - limit_per_min = 10, - "failed to persist records to ingester `{}`: {error}", - self.self_node_id - ); - let persist_failure = PersistFailure { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - reason: PersistFailureReason::ResourceExhausted as i32, - }; - persist_failures.push(persist_failure); - continue; - } + rate_limited_warn!( + limit_per_min = 10, + "failed to persist records to ingester `{}`: {error}", + self.self_node_id + ); + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: PersistFailureReason::ResourceExhausted as i32, + }; + persist_failures.push(persist_failure); + continue; }; let (rate_limiter, rate_meter) = state_guard @@ -426,7 +425,7 @@ impl Ingester { let batch_num_bytes = doc_batch.num_bytes() as u64; rate_meter.update(batch_num_bytes); - sum_of_requested_capacity += requested_capacity; + total_requested_capacity += requested_capacity; if let Some(follower_id) = follower_id_opt { let replicate_subrequest = ReplicateSubrequest { @@ -618,18 +617,15 @@ impl Ingester { shard.shard_state = ShardState::Closed; shard.notify_shard_status(); + warn!("closed shard `{queue_id}` following IO error"); } - info!( - "closed {} shard(s) following IO error(s)", - shards_to_close.len() - ); } if !shards_to_delete.is_empty() { for queue_id in &shards_to_delete { state_guard.shards.remove(queue_id); state_guard.rate_trackers.remove(queue_id); + warn!("deleted dangling shard `{queue_id}`"); } - info!("deleted {} dangling shard(s)", shards_to_delete.len()); } INGEST_V2_METRICS @@ -1535,10 +1531,13 @@ mod tests { ); } - // This test should be run manually and independently of other tests with the `fail/failpoints` - // feature enabled. + // This test should be run manually and independently of other tests with the `failpoints` + // feature enabled: + // ```sh + // cargo test -p quickwit-ingest --features failpoints -- test_ingester_persist_closes_shard_on_io_error + // ``` + #[cfg(feature = "failpoints")] #[tokio::test] - #[ignore] async fn test_ingester_persist_closes_shard_on_io_error() { let scenario = fail::FailScenario::setup(); fail::cfg("ingester:append_records", "return").unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs index eaf17f0e469..5dd5a4104aa 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -22,6 +22,7 @@ use std::iter::once; use std::ops::RangeInclusive; use bytesize::ByteSize; +#[cfg(feature = "failpoints")] use fail::fail_point; use mrecordlog::error::{AppendError, DeleteQueueError}; use mrecordlog::MultiRecordLog; @@ -54,19 +55,25 @@ pub(super) async fn append_non_empty_doc_batch( .docs() .map(|doc| MRecord::Doc(doc).encode()) .chain(once(MRecord::Commit.encode())); + + #[cfg(feature = "failpoints")] fail_point!("ingester:append_records", |_| { let io_error = io::Error::from(io::ErrorKind::PermissionDenied); Err(AppendDocBatchError::Io(io_error)) }); + mrecordlog .append_records(queue_id, None, encoded_mrecords) .await } else { let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); + + #[cfg(feature = "failpoints")] fail_point!("ingester:append_records", |_| { let io_error = io::Error::from(io::ErrorKind::PermissionDenied); Err(AppendDocBatchError::Io(io_error)) }); + mrecordlog .append_records(queue_id, None, encoded_mrecords) .await @@ -84,12 +91,6 @@ pub(super) async fn append_non_empty_doc_batch( } } -#[derive(Debug, Clone, Copy)] -pub(super) struct MRecordLogUsage { - pub disk: ByteSize, - pub memory: ByteSize, -} - /// Error returned when the mrecordlog does not have enough capacity to store some records. #[derive(Debug, Clone, Copy, thiserror::Error)] pub(super) enum NotEnoughCapacityError { @@ -118,7 +119,7 @@ pub(super) fn check_enough_capacity( disk_capacity: ByteSize, memory_capacity: ByteSize, requested_capacity: ByteSize, -) -> Result { +) -> Result<(), NotEnoughCapacityError> { let disk_usage = ByteSize(mrecordlog.disk_usage() as u64); if disk_usage + requested_capacity > disk_capacity { @@ -137,11 +138,7 @@ pub(super) fn check_enough_capacity( requested: requested_capacity, }); } - let usage = MRecordLogUsage { - disk: disk_usage, - memory: memory_usage, - }; - Ok(usage) + Ok(()) } /// Deletes a queue from the WAL. Returns without error if the queue does not exist. @@ -210,10 +207,13 @@ mod tests { assert_eq!(position, Position::offset(2u64)); } - // This test should be run manually and independently of other tests with the `fail/failpoints` - // feature enabled. + // This test should be run manually and independently of other tests with the `failpoints` + // feature enabled: + // ```sh + // cargo test -p quickwit-ingest --features failpoints -- test_append_non_empty_doc_batch_io_error + // ``` + #[cfg(feature = "failpoints")] #[tokio::test] - #[ignore] async fn test_append_non_empty_doc_batch_io_error() { let scenario = fail::FailScenario::setup(); fail::cfg("ingester:append_records", "return").unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.md b/quickwit/quickwit-ingest/src/ingest_v2/replication.md index fdc4ab03adf..c95fa7d7851 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.md +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.md @@ -9,14 +9,20 @@ Two gRPC streams back the independent streams of requests and responses between ### Life of a happy persist request 1. Leader receives a persist request pre-assigned to a shard from a router. -1. Leader writes the data to the corresponding mrecordlog queue and records the new position of the queue called `primary_position`. - -1. Leader sends replicate request to follower of the shard via the SYN replication stream. +1. Leader forwards replicate request to follower of the shard via the SYN replication stream. 1. Follower receives the replicate request, writes the data to its replica queue, and records the new position of the queue called `replica_position`. 1. Follower returns replicate response to leader via the ACK replication stream. -1. Leader records the new position of the replica queue. It should match the `primary_position`. +1. Leader records the new position of the replica queue. + +1. Leader writes the data to its local mrecordlog queue and records the new position of the queue called `primary_position`. It should match the `replica_position`. 1. Leader return success persist response to router. + +### Replication stream errors + +- When a replication request fails, the leader and follower close the shard(s) targetted by the request. + +- When a replication stream fails (transport error, timeout), the leader and follower close the shard(s) targetted by the stream. Then, the leader reopens a new stream if necessary. diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index b4437b18fd8..a8748e7e601 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -17,12 +17,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::iter::once; +use std::collections::HashSet; use std::time::Duration; use bytesize::ByteSize; use futures::{Future, StreamExt}; -use quickwit_common::ServiceStream; +use mrecordlog::error::CreateQueueError; +use quickwit_common::{rate_limited_warn, ServiceStream}; use quickwit_proto::ingest::ingester::{ ack_replication_message, syn_replication_message, AckReplicationMessage, IngesterStatus, InitReplicaRequest, InitReplicaResponse, ReplicateFailure, ReplicateFailureReason, @@ -30,17 +31,17 @@ use quickwit_proto::ingest::ingester::{ SynReplicationMessage, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState}; -use quickwit_proto::types::{NodeId, Position}; +use quickwit_proto::types::{NodeId, Position, QueueId}; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tracing::{error, warn}; use super::models::IngesterShard; -use super::mrecord::MRecord; use super::mrecordlog_utils::check_enough_capacity; use super::state::IngesterState; use crate::ingest_v2::metrics::INGEST_V2_METRICS; +use crate::ingest_v2::mrecordlog_utils::{append_non_empty_doc_batch, AppendDocBatchError}; use crate::metrics::INGEST_METRICS; use crate::{estimate_size, with_lock_metrics, with_request_metrics}; @@ -471,12 +472,19 @@ impl ReplicationTask { let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "init_replica", "write").await?; - state_guard - .mrecordlog - .create_queue(&queue_id) - .await - .expect("TODO: Handle IO error"); - + match state_guard.mrecordlog.create_queue(&queue_id).await { + Ok(_) => {} + Err(CreateQueueError::AlreadyExists) => { + error!("WAL queue `{queue_id}` already exists"); + let message = format!("WAL queue `{queue_id}` already exists"); + return Err(IngestV2Error::Internal(message)); + } + Err(CreateQueueError::IoError(io_error)) => { + error!("failed to create WAL queue `{queue_id}`: {io_error}",); + let message = format!("failed to create WAL queue `{queue_id}`: {io_error}"); + return Err(IngestV2Error::Internal(message)); + } + }; let replica_shard = IngesterShard::new_replica( replica_shard.leader_id.into(), ShardState::Open, @@ -522,6 +530,13 @@ impl ReplicationTask { let mut replicate_successes = Vec::with_capacity(replicate_request.subrequests.len()); let mut replicate_failures = Vec::new(); + // Keep track of the shards that need to be closed following an IO error. + let mut shards_to_close: HashSet = HashSet::new(); + + // Keep track of dangling shards, i.e., shards for which there is no longer a corresponding + // queue in the WAL and should be deleted. + let mut shards_to_delete: HashSet = HashSet::new(); + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "replicate", "write").await?; @@ -597,58 +612,64 @@ impl ReplicationTask { }; let requested_capacity = estimate_size(&doc_batch); - let current_usage = match check_enough_capacity( + if let Err(error) = check_enough_capacity( &state_guard.mrecordlog, self.disk_capacity, self.memory_capacity, requested_capacity, ) { - Ok(usage) => usage, - Err(error) => { - warn!("failed to replicate records: {error}"); - + rate_limited_warn!( + limit_per_min = 10, + "failed to replicate records to ingester `{}`: {error}", + self.follower_id + ); + let replicate_failure = ReplicateFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: ReplicateFailureReason::ResourceExhausted as i32, + }; + replicate_failures.push(replicate_failure); + continue; + }; + let append_result = append_non_empty_doc_batch( + &mut state_guard.mrecordlog, + &queue_id, + &doc_batch, + force_commit, + ) + .await; + + let current_position_inclusive = match append_result { + Ok(current_position_inclusive) => current_position_inclusive, + Err(append_error) => { + let reason = match &append_error { + AppendDocBatchError::Io(io_error) => { + error!("failed to replicate records to shard `{queue_id}`: {io_error}"); + shards_to_close.insert(queue_id); + ReplicateFailureReason::ShardClosed + } + AppendDocBatchError::QueueNotFound(_) => { + error!( + "failed to replicate records to shard `{queue_id}`: WAL queue not \ + found" + ); + shards_to_delete.insert(queue_id); + ReplicateFailureReason::ShardNotFound + } + }; let replicate_failure = ReplicateFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: ReplicateFailureReason::ResourceExhausted as i32, + reason: reason as i32, }; replicate_failures.push(replicate_failure); continue; } }; - let current_position_inclusive: Position = if force_commit { - let encoded_mrecords = doc_batch - .docs() - .map(|doc| MRecord::Doc(doc).encode()) - .chain(once(MRecord::Commit.encode())); - state_guard - .mrecordlog - .append_records(&queue_id, None, encoded_mrecords) - .await - .expect("TODO") - } else { - let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); - state_guard - .mrecordlog - .append_records(&queue_id, None, encoded_mrecords) - .await - .expect("TODO") - } - .map(Position::offset) - .expect("records should not be empty"); - - let new_disk_usage = current_usage.disk + requested_capacity; - let new_memory_usage = current_usage.memory + requested_capacity; - - INGEST_V2_METRICS - .wal_disk_usage_bytes - .set(new_disk_usage.as_u64() as i64); - INGEST_V2_METRICS - .wal_memory_usage_bytes - .set(new_memory_usage.as_u64() as i64); - let batch_num_bytes = doc_batch.num_bytes() as u64; let batch_num_docs = doc_batch.num_docs() as u64; @@ -674,6 +695,35 @@ impl ReplicationTask { }; replicate_successes.push(replicate_success); } + if !shards_to_close.is_empty() { + for queue_id in &shards_to_close { + let shard = state_guard + .shards + .get_mut(queue_id) + .expect("shard should exist"); + + shard.shard_state = ShardState::Closed; + shard.notify_shard_status(); + warn!("closed shard `{queue_id}` following IO error"); + } + } + if !shards_to_delete.is_empty() { + for queue_id in &shards_to_delete { + state_guard.shards.remove(queue_id); + state_guard.rate_trackers.remove(queue_id); + warn!("deleted dangling shard `{queue_id}`"); + } + } + + INGEST_V2_METRICS + .wal_disk_usage_bytes + .set(state_guard.mrecordlog.disk_usage() as i64); + INGEST_V2_METRICS + .wal_memory_usage_bytes + .set(state_guard.mrecordlog.memory_usage() as i64); + + drop(state_guard); + let follower_id = self.follower_id.clone().into(); let replicate_response = ReplicateResponse { @@ -1340,6 +1390,179 @@ mod tests { ); } + #[tokio::test] + async fn test_replication_task_deletes_dangling_shard() { + let leader_id: NodeId = "test-leader".into(); + let follower_id: NodeId = "test-follower".into(); + let (_temp_dir, state, _status_rx) = IngesterState::for_test().await; + let (syn_replication_stream_tx, syn_replication_stream) = + ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); + let (ack_replication_stream_tx, mut ack_replication_stream) = + ServiceStream::new_unbounded(); + + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); + + let _replication_task_handle = ReplicationTask::spawn( + leader_id.clone(), + follower_id, + state.clone(), + syn_replication_stream, + ack_replication_stream_tx, + disk_capacity, + memory_capacity, + ); + + let queue_id_01 = queue_id("test-index:0", "test-source", &ShardId::from(1)); + let replica_shard = IngesterShard::new_replica( + leader_id, + ShardState::Open, + Position::Beginning, + Position::Beginning, + ); + state + .lock_fully() + .await + .unwrap() + .shards + .insert(queue_id_01.clone(), replica_shard); + + let replicate_request = ReplicateRequest { + leader_id: "test-leader".to_string(), + follower_id: "test-follower".to_string(), + commit_type: CommitTypeV2::Auto as i32, + subrequests: vec![ReplicateSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), + from_position_exclusive: Position::offset(0u64).into(), + }], + replication_seqno: 0, + }; + let syn_replication_message = + SynReplicationMessage::new_replicate_request(replicate_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let ack_replication_message = ack_replication_stream.next().await.unwrap().unwrap(); + let replicate_response = into_replicate_response(ack_replication_message); + + assert_eq!(replicate_response.follower_id, "test-follower"); + assert_eq!(replicate_response.successes.len(), 0); + assert_eq!(replicate_response.failures.len(), 1); + + let replicate_failure = &replicate_response.failures[0]; + assert_eq!(replicate_failure.index_uid, "test-index:0"); + assert_eq!(replicate_failure.source_id, "test-source"); + assert_eq!(replicate_failure.shard_id(), ShardId::from(1)); + assert_eq!( + replicate_failure.reason(), + ReplicateFailureReason::ShardNotFound + ); + + let state_guard = state.lock_partially().await; + assert!(!state_guard.shards.contains_key(&queue_id_01)); + } + + // This test should be run manually and independently of other tests with the `failpoints` + // feature enabled: + // ```sh + // cargo test --manifest-path quickwit/Cargo.toml -p quickwit-ingest --features failpoints -- test_replication_task_closes_shard_on_io_error + // ``` + #[cfg(feature = "failpoints")] + #[tokio::test] + async fn test_replication_task_closes_shard_on_io_error() { + let scenario = fail::FailScenario::setup(); + fail::cfg("ingester:append_records", "return").unwrap(); + + let leader_id: NodeId = "test-leader".into(); + let follower_id: NodeId = "test-follower".into(); + let (_temp_dir, state, _status_rx) = IngesterState::for_test().await; + let (syn_replication_stream_tx, syn_replication_stream) = + ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); + let (ack_replication_stream_tx, mut ack_replication_stream) = + ServiceStream::new_unbounded(); + + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); + + let _replication_task_handle = ReplicationTask::spawn( + leader_id.clone(), + follower_id, + state.clone(), + syn_replication_stream, + ack_replication_stream_tx, + disk_capacity, + memory_capacity, + ); + + let queue_id_01 = queue_id("test-index:0", "test-source", &ShardId::from(1)); + let replica_shard = IngesterShard::new_replica( + leader_id, + ShardState::Open, + Position::Beginning, + Position::Beginning, + ); + let mut state_guard = state.lock_fully().await.unwrap(); + + state_guard + .shards + .insert(queue_id_01.clone(), replica_shard); + + state_guard + .mrecordlog + .create_queue(&queue_id_01) + .await + .unwrap(); + + drop(state_guard); + + let replicate_request = ReplicateRequest { + leader_id: "test-leader".to_string(), + follower_id: "test-follower".to_string(), + commit_type: CommitTypeV2::Auto as i32, + subrequests: vec![ReplicateSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), + from_position_exclusive: Position::offset(0u64).into(), + }], + replication_seqno: 0, + }; + let syn_replication_message = + SynReplicationMessage::new_replicate_request(replicate_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let ack_replication_message = ack_replication_stream.next().await.unwrap().unwrap(); + let replicate_response = into_replicate_response(ack_replication_message); + + assert_eq!(replicate_response.follower_id, "test-follower"); + assert_eq!(replicate_response.successes.len(), 0); + assert_eq!(replicate_response.failures.len(), 1); + + let replicate_failure = &replicate_response.failures[0]; + assert_eq!(replicate_failure.index_uid, "test-index:0"); + assert_eq!(replicate_failure.source_id, "test-source"); + assert_eq!(replicate_failure.shard_id(), ShardId::from(1)); + assert_eq!( + replicate_failure.reason(), + ReplicateFailureReason::ShardClosed + ); + + let state_guard = state.lock_partially().await; + let replica_shard = state_guard.shards.get(&queue_id_01).unwrap(); + replica_shard.assert_is_closed(); + + scenario.teardown(); + } + #[tokio::test] async fn test_replication_task_resource_exhausted() { let leader_id: NodeId = "test-leader".into();