From 2cc57618009318dd8c4cff6c09e30adff20bd33b Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 17 Jan 2024 19:00:21 +0100 Subject: [PATCH] change order of remote vs local write --- .../quickwit-ingest/src/ingest_v2/ingester.rs | 463 ++++++++++-------- 1 file changed, 259 insertions(+), 204 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 1512f1d6ffb..629a5dca118 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -403,7 +403,9 @@ impl Ingester { } let mut persist_successes = Vec::with_capacity(persist_request.subrequests.len()); let mut persist_failures = Vec::new(); - let mut replicate_subrequests: HashMap> = HashMap::new(); + let mut replicate_subrequests: HashMap> = + HashMap::new(); + let mut local_persist_subrequests: Vec = Vec::new(); // Keep track of the shards that need to be closed following an IO error. let mut shards_to_close: HashSet = HashSet::new(); @@ -438,173 +440,276 @@ impl Ingester { }; return Ok(persist_response); } - for subrequest in persist_request.subrequests { - let queue_id = subrequest.queue_id(); - let Some(shard) = state_guard.shards.get_mut(&queue_id) else { - let persist_failure = PersistFailure { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - reason: PersistFailureReason::ShardNotFound as i32, - }; - persist_failures.push(persist_failure); - continue; - }; - if shard.shard_state.is_closed() { - let persist_failure = PersistFailure { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - reason: PersistFailureReason::ShardClosed as i32, - }; - persist_failures.push(persist_failure); - continue; - } - let follower_id_opt = shard.follower_id_opt().cloned(); - let from_position_exclusive = shard.replication_position_inclusive.clone(); - - let doc_batch = match subrequest.doc_batch { - Some(doc_batch) if !doc_batch.is_empty() => doc_batch, - _ => { - warn!("received empty persist request"); + // first verify if we would locally accept each subrequest + { + let mut sum_of_requested_capacity = bytesize::ByteSize::b(0); + for subrequest in persist_request.subrequests { + let queue_id = subrequest.queue_id(); - let persist_success = PersistSuccess { + let Some(shard) = state_guard.shards.get_mut(&queue_id) else { + let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - replication_position_inclusive: Some( - shard.replication_position_inclusive.clone(), - ), + reason: PersistFailureReason::ShardNotFound as i32, }; - persist_successes.push(persist_success); + persist_failures.push(persist_failure); continue; - } - }; - let requested_capacity = estimate_size(&doc_batch); - - let current_usage = match check_enough_capacity( - &state_guard.mrecordlog, - self.disk_capacity, - self.memory_capacity, - requested_capacity, - ) { - Ok(usage) => usage, - Err(error) => { - warn!( - "failed to persist records to ingester `{}`: {error}", - self.self_node_id - ); + }; + if shard.shard_state.is_closed() { let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: PersistFailureReason::ResourceExhausted as i32, + reason: PersistFailureReason::ShardClosed as i32, }; persist_failures.push(persist_failure); continue; } - }; - let (rate_limiter, rate_meter) = state_guard - .rate_trackers - .get_mut(&queue_id) - .expect("rate limiter should be initialized"); - - if !rate_limiter.acquire_bytes(requested_capacity) { - debug!("failed to persist records to shard `{queue_id}`: rate limited"); - let persist_failure = PersistFailure { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - reason: PersistFailureReason::RateLimited as i32, + let follower_id_opt = shard.follower_id_opt().cloned(); + let from_position_exclusive = shard.replication_position_inclusive.clone(); + + let doc_batch = match subrequest.doc_batch { + Some(doc_batch) if !doc_batch.is_empty() => doc_batch, + _ => { + warn!("received empty persist request"); + + let persist_success = PersistSuccess { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + replication_position_inclusive: Some( + shard.replication_position_inclusive.clone(), + ), + }; + persist_successes.push(persist_success); + continue; + } + }; + let requested_capacity = estimate_size(&doc_batch); + + match check_enough_capacity( + &state_guard.mrecordlog, + self.disk_capacity, + self.memory_capacity, + requested_capacity + sum_of_requested_capacity, + ) { + Ok(_usage) => (), + Err(error) => { + warn!( + "failed to persist records to ingester `{}`: {error}", + self.self_node_id + ); + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: PersistFailureReason::ResourceExhausted as i32, + }; + persist_failures.push(persist_failure); + continue; + } }; - persist_failures.push(persist_failure); - continue; - } - let batch_num_bytes = doc_batch.num_bytes() as u64; - let batch_num_docs = doc_batch.num_docs() as u64; - rate_meter.update(batch_num_bytes); + let (rate_limiter, rate_meter) = state_guard + .rate_trackers + .get_mut(&queue_id) + .expect("rate limiter should be initialized"); - let append_result = append_non_empty_doc_batch( - &mut state_guard.mrecordlog, - &queue_id, - &doc_batch, - force_commit, - ) - .await; + if !rate_limiter.acquire_bytes(requested_capacity) { + debug!("failed to persist records to shard `{queue_id}`: rate limited"); - let current_position_inclusive = match append_result { - Ok(current_position_inclusive) => current_position_inclusive, - Err(append_error) => { - let reason = match &append_error { - AppendDocBatchError::Io(io_error) => { - error!("failed to persist records to shard `{queue_id}`: {io_error}"); - shards_to_close.insert(queue_id); - PersistFailureReason::ShardClosed - } - AppendDocBatchError::QueueNotFound(_) => { - error!( - "failed to persist records to shard `{queue_id}`: WAL queue not \ - found" - ); - shards_to_delete.insert(queue_id); - PersistFailureReason::ShardNotFound - } - }; let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: reason as i32, + reason: PersistFailureReason::RateLimited as i32, }; persist_failures.push(persist_failure); continue; } - }; - // It's more precise the compute the new usage from the current usage + the requested - // capacity than from continuously summing up the requested capacities, which are - // approximations. - let new_disk_usage = current_usage.disk + requested_capacity; - let new_memory_usage = current_usage.memory + requested_capacity; - - INGEST_V2_METRICS - .wal_disk_usage_bytes - .set(new_disk_usage.as_u64() as i64); - INGEST_V2_METRICS - .wal_memory_usage_bytes - .set(new_memory_usage.as_u64() as i64); - - INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes); - INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs); - - state_guard - .shards - .get_mut(&queue_id) - .expect("primary shard should exist") - .set_replication_position_inclusive(current_position_inclusive.clone()); - - if let Some(follower_id) = follower_id_opt { - let replicate_subrequest = ReplicateSubrequest { - subrequest_id: subrequest.subrequest_id, - index_uid: subrequest.index_uid, - source_id: subrequest.source_id, - shard_id: subrequest.shard_id, - from_position_exclusive: Some(from_position_exclusive), - doc_batch: Some(doc_batch), + + let batch_num_bytes = doc_batch.num_bytes() as u64; + rate_meter.update(batch_num_bytes); + sum_of_requested_capacity += requested_capacity; + + if let Some(follower_id) = follower_id_opt { + let replicate_subrequest = ReplicateSubrequest { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + from_position_exclusive: Some(from_position_exclusive), + doc_batch: Some(doc_batch), + }; + replicate_subrequests + .entry(follower_id) + .or_default() + .push((replicate_subrequest, queue_id)); + } else { + local_persist_subrequests.push(LocalPersistSubrequest { + queue_id, + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + doc_batch, + expected_position_inclusive: None, + }) + } + } + } + + // replicate to the follower + { + let mut replicate_futures = FuturesUnordered::new(); + let mut doc_batch_map = HashMap::new(); + + for (follower_id, subrequests_with_queue_id) in replicate_subrequests { + let replication_client = state_guard + .replication_streams + .get(&follower_id) + .expect("replication stream should be initialized") + .replication_client(); + let leader_id = self.self_node_id.clone(); + let mut subrequests = Vec::with_capacity(subrequests_with_queue_id.len()); + for (subrequest, queue_id) in subrequests_with_queue_id { + let doc_batch = subrequest + .doc_batch + .clone() + .expect("we already verified doc is present and not empty"); + doc_batch_map.insert(subrequest.subrequest_id, (doc_batch, queue_id)); + subrequests.push(subrequest); + } + let replicate_future = + replication_client.replicate(leader_id, follower_id, subrequests, commit_type); + replicate_futures.push(replicate_future); + } + + while let Some(replication_result) = replicate_futures.next().await { + let replicate_response = match replication_result { + Ok(replicate_response) => replicate_response, + Err(_) => { + // TODO: Handle replication error: + // 1. Close and evict all the shards hosted by the follower. + // 2. Close and evict the replication client. + // 3. Return `PersistFailureReason::ShardClosed` to router. + continue; + } }; - replicate_subrequests - .entry(follower_id) - .or_default() - .push(replicate_subrequest); - } else { + for replicate_success in replicate_response.successes { + let (doc_batch, queue_id) = doc_batch_map + .remove(&replicate_success.subrequest_id) + .expect("expected known subrequest id"); + let local_persist_subrequest = LocalPersistSubrequest { + queue_id, + subrequest_id: replicate_success.subrequest_id, + index_uid: replicate_success.index_uid, + source_id: replicate_success.source_id, + shard_id: replicate_success.shard_id, + doc_batch, + expected_position_inclusive: replicate_success + .replication_position_inclusive, + }; + local_persist_subrequests.push(local_persist_subrequest); + } + for replicate_failure in replicate_response.failures { + // TODO: If the replica shard is closed, close the primary shard if it is not + // already. + let persist_failure_reason = match replicate_failure.reason() { + ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, + ReplicateFailureReason::ShardNotFound => { + PersistFailureReason::ShardNotFound + } + ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, + ReplicateFailureReason::ResourceExhausted => { + PersistFailureReason::ResourceExhausted + } + }; + let persist_failure = PersistFailure { + subrequest_id: replicate_failure.subrequest_id, + index_uid: replicate_failure.index_uid, + source_id: replicate_failure.source_id, + shard_id: replicate_failure.shard_id, + reason: persist_failure_reason as i32, + }; + persist_failures.push(persist_failure); + } + } + } + + // finally write locally + { + for subrequest in local_persist_subrequests { + let queue_id = subrequest.queue_id; + + let append_result = append_non_empty_doc_batch( + &mut state_guard.mrecordlog, + &queue_id, + &subrequest.doc_batch, + force_commit, + ) + .await; + + let current_position_inclusive = match append_result { + Ok(current_position_inclusive) => current_position_inclusive, + Err(append_error) => { + let reason = match &append_error { + AppendDocBatchError::Io(io_error) => { + error!( + "failed to persist records to shard `{queue_id}`: {io_error}" + ); + shards_to_close.insert(queue_id); + PersistFailureReason::ShardClosed + } + AppendDocBatchError::QueueNotFound(_) => { + error!( + "failed to persist records to shard `{queue_id}`: WAL queue \ + not found" + ); + shards_to_delete.insert(queue_id); + PersistFailureReason::ShardNotFound + } + }; + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: reason as i32, + }; + persist_failures.push(persist_failure); + continue; + } + }; + + if let Some(expected_position_inclusive) = subrequest.expected_position_inclusive { + if expected_position_inclusive != current_position_inclusive { + return Err(IngestV2Error::Internal(format!( + "bad replica position: expected {expected_position_inclusive:?}, got \ + {current_position_inclusive:?}" + ))); + } + } + + state_guard + .shards + .get_mut(&queue_id) + .expect("primary shard should exist") + .set_replication_position_inclusive(current_position_inclusive.clone()); + + let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64; + let batch_num_docs = subrequest.doc_batch.num_docs() as u64; + INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes); + INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs); + let persist_success = PersistSuccess { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, @@ -615,6 +720,7 @@ impl Ingester { persist_successes.push(persist_success); } } + if !shards_to_close.is_empty() { for queue_id in &shards_to_close { let shard = state_guard @@ -637,77 +743,16 @@ impl Ingester { } info!("deleted {} dangling shard(s)", shards_to_delete.len()); } - if replicate_subrequests.is_empty() { - let leader_id = self.self_node_id.to_string(); - let persist_response = PersistResponse { - leader_id, - successes: persist_successes, - failures: persist_failures, - }; - return Ok(persist_response); - } - let mut replicate_futures = FuturesUnordered::new(); - for (follower_id, subrequests) in replicate_subrequests { - let replication_client = state_guard - .replication_streams - .get(&follower_id) - .expect("replication stream should be initialized") - .replication_client(); - let leader_id = self.self_node_id.clone(); - let replicate_future = - replication_client.replicate(leader_id, follower_id, subrequests, commit_type); - replicate_futures.push(replicate_future); - } - // Drop the write lock AFTER pushing the replicate request into the replication client - // channel to ensure that sequential writes in mrecordlog turn into sequential replicate - // requests in the same order. + INGEST_V2_METRICS + .wal_disk_usage_bytes + .set(state_guard.mrecordlog.disk_usage() as i64); + INGEST_V2_METRICS + .wal_memory_usage_bytes + .set(state_guard.mrecordlog.memory_usage() as i64); + drop(state_guard); - while let Some(replication_result) = replicate_futures.next().await { - let replicate_response = match replication_result { - Ok(replicate_response) => replicate_response, - Err(_) => { - // TODO: Handle replication error: - // 1. Close and evict all the shards hosted by the follower. - // 2. Close and evict the replication client. - // 3. Return `PersistFailureReason::ShardClosed` to router. - continue; - } - }; - for replicate_success in replicate_response.successes { - // TODO verify replication_position_inclusive matches what's expected locally - let persist_success = PersistSuccess { - subrequest_id: replicate_success.subrequest_id, - index_uid: replicate_success.index_uid, - source_id: replicate_success.source_id, - shard_id: replicate_success.shard_id, - replication_position_inclusive: replicate_success - .replication_position_inclusive, - }; - persist_successes.push(persist_success); - } - for replicate_failure in replicate_response.failures { - // TODO: If the replica shard is closed, close the primary shard if it is not - // already. - let persist_failure_reason = match replicate_failure.reason() { - ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, - ReplicateFailureReason::ShardNotFound => PersistFailureReason::ShardNotFound, - ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, - ReplicateFailureReason::ResourceExhausted => { - PersistFailureReason::ResourceExhausted - } - }; - let persist_failure = PersistFailure { - subrequest_id: replicate_failure.subrequest_id, - index_uid: replicate_failure.index_uid, - source_id: replicate_failure.source_id, - shard_id: replicate_failure.shard_id, - reason: persist_failure_reason as i32, - }; - persist_failures.push(persist_failure); - } - } let leader_id = self.self_node_id.to_string(); let persist_response = PersistResponse { leader_id, @@ -1181,6 +1226,16 @@ pub async fn wait_for_ingester_decommission(ingester_opt: Option, + doc_batch: quickwit_proto::ingest::DocBatchV2, + expected_position_inclusive: Option, +} + #[cfg(test)] mod tests { #![allow(clippy::mutable_key_type)]