diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index c2950ff274b..7eed30ef197 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -254,6 +254,8 @@ impl Default for MemoryMetrics { pub struct InFlightDataGauges { pub rest_server: IntGauge, pub ingest_router: IntGauge, + pub ingester_persist: IntGauge, + pub ingester_replicate: IntGauge, pub wal: IntGauge, pub fetch_stream: IntGauge, pub multi_fetch_stream: IntGauge, @@ -275,6 +277,8 @@ impl Default for InFlightDataGauges { Self { rest_server: in_flight_gauge_vec.with_label_values(["rest_server"]), ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]), + ingester_persist: in_flight_gauge_vec.with_label_values(["ingester_persist"]), + ingester_replicate: in_flight_gauge_vec.with_label_values(["ingester_replicate"]), wal: in_flight_gauge_vec.with_label_values(["wal"]), fetch_stream: in_flight_gauge_vec.with_label_values(["fetch_stream"]), multi_fetch_stream: in_flight_gauge_vec.with_label_values(["multi_fetch_stream"]), diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 09fd0fa7f94..1caccc98e4e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -31,6 +31,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use mrecordlog::error::CreateQueueError; use quickwit_cluster::Cluster; +use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -968,6 +969,16 @@ impl IngesterService for Ingester { &mut self, persist_request: PersistRequest, ) -> IngestV2Result { + // If the request is local, the amount of memory it occupies is already + // accounted for in the router. + let request_size_bytes = if persist_request.is_local { + 0 + } else { + persist_request.num_bytes() + }; + let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_persist); + gauge_guard.add(request_size_bytes as i64); + self.persist_inner(persist_request).await } @@ -1500,6 +1511,7 @@ mod tests { doc_batch: Some(DocBatchV2::for_test(["test-doc-110", "test-doc-111"])), }, ], + is_local: false, }; let persist_response = ingester.persist(persist_request).await.unwrap(); assert_eq!(persist_response.leader_id, "test-ingester"); @@ -1567,6 +1579,7 @@ mod tests { leader_id: ingester_ctx.node_id.to_string(), commit_type: CommitTypeV2::Force as i32, subrequests: Vec::new(), + is_local: false, }; let persist_response = ingester.persist(persist_request).await.unwrap(); assert_eq!(persist_response.leader_id, "test-ingester"); @@ -1583,6 +1596,7 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: None, }], + is_local: false, }; let init_shards_request = InitShardsRequest { @@ -1661,6 +1675,7 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), }], + is_local: false, }; let persist_response = ingester.persist(persist_request).await.unwrap(); assert_eq!(persist_response.leader_id, "test-ingester"); @@ -1707,6 +1722,7 @@ mod tests { let persist_request = PersistRequest { leader_id: "test-ingester".to_string(), commit_type: CommitTypeV2::Force as i32, + is_local: false, subrequests: vec![PersistSubrequest { subrequest_id: 0, index_uid: Some(index_uid.clone()), @@ -1801,6 +1817,7 @@ mod tests { doc_batch: Some(DocBatchV2::for_test(["test-doc-110", "test-doc-111"])), }, ], + is_local: false, }; let persist_response = leader.persist(persist_request).await.unwrap(); assert_eq!(persist_response.leader_id, "test-leader"); @@ -1987,6 +2004,7 @@ mod tests { doc_batch: Some(DocBatchV2::for_test(["test-doc-110", "test-doc-111"])), }, ], + is_local: false, }; let persist_response = leader.persist(persist_request).await.unwrap(); assert_eq!(persist_response.leader_id, "test-leader"); @@ -2095,6 +2113,7 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), }], + is_local: false, }; let persist_response = ingester.persist(persist_request).await.unwrap(); assert_eq!(persist_response.leader_id, "test-ingester"); @@ -2161,6 +2180,7 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), }], + is_local: false, }; let persist_response = ingester.persist(persist_request).await.unwrap(); assert_eq!(persist_response.leader_id, "test-ingester"); @@ -2229,6 +2249,7 @@ mod tests { shard_id: Some(ShardId::from(1)), doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), }], + is_local: false, }; let persist_response = ingester.persist(persist_request).await.unwrap(); assert_eq!(persist_response.leader_id, "test-ingester"); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 0f57f1e8c31..227ff5c4ab6 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -236,7 +236,7 @@ mod tests { let doc_batch = doc_batch_builder.build().unwrap(); assert_eq!(doc_batch.num_docs(), 2); - assert_eq!(doc_batch.num_bytes(), 13); + assert_eq!(doc_batch.num_bytes(), 21); assert_eq!(doc_batch.doc_lengths, [7, 6]); assert_eq!(doc_batch.doc_buffer, Bytes::from(&b"Hello, World!"[..])); } @@ -278,7 +278,7 @@ mod tests { .as_ref() .unwrap() .num_bytes(), - 13 + 21 ); assert_eq!( ingest_request.subrequests[0] @@ -313,7 +313,7 @@ mod tests { .as_ref() .unwrap() .num_bytes(), - 12 + 20 ); assert_eq!( ingest_request.subrequests[1] @@ -345,6 +345,6 @@ mod tests { doc_buffer: vec![0u8; 100].into(), doc_lengths: vec![10, 20, 30], }; - assert_eq!(estimate_size(&doc_batch), ByteSize(106)); + assert_eq!(estimate_size(&doc_batch), ByteSize(118)); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index e8265e1d4f4..9a8c64813f7 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant}; use bytesize::ByteSize; use futures::{Future, StreamExt}; use mrecordlog::error::CreateQueueError; +use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use quickwit_common::{rate_limited_warn, ServiceStream}; use quickwit_proto::ingest::ingester::{ ack_replication_message, syn_replication_message, AckReplicationMessage, IngesterStatus, @@ -507,6 +508,10 @@ impl ReplicationTask { self.current_replication_seqno, replicate_request.replication_seqno ))); } + let request_size_bytes = replicate_request.num_bytes(); + let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_replicate); + gauge_guard.add(request_size_bytes as i64); + self.current_replication_seqno += 1; let commit_type = replicate_request.commit_type(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index fd0a0029a25..fd0ce42f33e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -357,6 +357,7 @@ impl IngestRouter { for (leader_id, subrequests) in per_leader_persist_subrequests { let leader_id: NodeId = leader_id.clone(); + let is_local = leader_id == self.self_node_id; let subrequest_ids: Vec = subrequests .iter() .map(|subrequest| subrequest.subrequest_id) @@ -373,6 +374,7 @@ impl IngestRouter { leader_id: leader_id.into(), subrequests, commit_type: commit_type as i32, + is_local, }; let persist_future = async move { let persist_result = tokio::time::timeout( @@ -444,6 +446,7 @@ impl IngestRouterService for IngestRouter { let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingest_router); gauge_guard.add(request_size_bytes as i64); + let _permit = self .ingest_semaphore .clone() diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 21a407e3f5a..66c5530f0f0 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -72,6 +72,7 @@ message PersistRequest { string leader_id = 1; quickwit.ingest.CommitTypeV2 commit_type = 3; repeated PersistSubrequest subrequests = 4; + bool is_local = 6; } message PersistSubrequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 84f31e56367..aaf18868967 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -30,6 +30,8 @@ pub struct PersistRequest { pub commit_type: i32, #[prost(message, repeated, tag = "4")] pub subrequests: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "6")] + pub is_local: bool, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index fa817b93829..b2397fa7ba6 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -123,6 +123,16 @@ impl OpenFetchStreamRequest { } } +impl PersistRequest { + pub fn num_bytes(&self) -> usize { + self.subrequests + .iter() + .flat_map(|subrequest| &subrequest.doc_batch) + .map(|doc_batch| doc_batch.num_bytes()) + .sum() + } +} + impl PersistSubrequest { pub fn shard_id(&self) -> &ShardId { self.shard_id @@ -221,6 +231,16 @@ impl AckReplicationMessage { } } +impl ReplicateRequest { + pub fn num_bytes(&self) -> usize { + self.subrequests + .iter() + .flat_map(|subrequest| &subrequest.doc_batch) + .map(|doc_batch| doc_batch.num_bytes()) + .sum() + } +} + impl ReplicateSubrequest { pub fn shard_id(&self) -> &ShardId { self.shard_id diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index 7d6c3c92551..5c9829467b7 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -106,7 +106,7 @@ impl DocBatchV2 { } pub fn num_bytes(&self) -> usize { - self.doc_buffer.len() + self.doc_buffer.len() + self.doc_lengths.len() * 4 } pub fn num_docs(&self) -> usize { diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs index d1fccfa33cf..bcfa433694f 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs @@ -156,12 +156,12 @@ mod tests { assert_eq!(subrequests[0].index_id, "my-index-1"); assert_eq!(subrequests[0].source_id, INGEST_V2_SOURCE_ID); assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_docs(), 2); - assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_bytes(), 96); + assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_bytes(), 104); assert_eq!(subrequests[1].index_id, "my-index-2"); assert_eq!(subrequests[1].source_id, INGEST_V2_SOURCE_ID); assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_docs(), 1); - assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 48); + assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 52); Ok(IngestResponseV2 { successes: vec![ @@ -238,7 +238,7 @@ mod tests { assert_eq!(subrequest_0.index_id, "my-index-1"); assert_eq!(subrequest_0.source_id, INGEST_V2_SOURCE_ID); assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_docs(), 1); - assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_bytes(), 48); + assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_bytes(), 52); Ok(IngestResponseV2 { successes: vec![IngestSuccess {