Skip to content

Commit

Permalink
Track bytes in-flight in ingester persist and replicate methods
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Mar 22, 2024
1 parent dc5453e commit 621939c
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 8 deletions.
4 changes: 4 additions & 0 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ impl Default for MemoryMetrics {
pub struct InFlightDataGauges {
pub rest_server: IntGauge,
pub ingest_router: IntGauge,
pub ingester_persist: IntGauge,
pub ingester_replicate: IntGauge,
pub wal: IntGauge,
pub fetch_stream: IntGauge,
pub multi_fetch_stream: IntGauge,
Expand All @@ -275,6 +277,8 @@ impl Default for InFlightDataGauges {
Self {
rest_server: in_flight_gauge_vec.with_label_values(["rest_server"]),
ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]),
ingester_persist: in_flight_gauge_vec.with_label_values(["ingester_persist"]),
ingester_replicate: in_flight_gauge_vec.with_label_values(["ingester_replicate"]),
wal: in_flight_gauge_vec.with_label_values(["wal"]),
fetch_stream: in_flight_gauge_vec.with_label_values(["fetch_stream"]),
multi_fetch_stream: in_flight_gauge_vec.with_label_values(["multi_fetch_stream"]),
Expand Down
21 changes: 21 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mrecordlog::error::CreateQueueError;
use quickwit_cluster::Cluster;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
Expand Down Expand Up @@ -968,6 +969,16 @@ impl IngesterService for Ingester {
&mut self,
persist_request: PersistRequest,
) -> IngestV2Result<PersistResponse> {
// If the request is local, the amount of memory it occupies is already
// accounted for in the router.
let request_size_bytes = if persist_request.is_local {
0
} else {
persist_request.num_bytes()
};
let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_persist);
gauge_guard.add(request_size_bytes as i64);

self.persist_inner(persist_request).await
}

Expand Down Expand Up @@ -1500,6 +1511,7 @@ mod tests {
doc_batch: Some(DocBatchV2::for_test(["test-doc-110", "test-doc-111"])),
},
],
is_local: false,
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
Expand Down Expand Up @@ -1567,6 +1579,7 @@ mod tests {
leader_id: ingester_ctx.node_id.to_string(),
commit_type: CommitTypeV2::Force as i32,
subrequests: Vec::new(),
is_local: false,
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
Expand All @@ -1583,6 +1596,7 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: None,
}],
is_local: false,
};

let init_shards_request = InitShardsRequest {
Expand Down Expand Up @@ -1661,6 +1675,7 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])),
}],
is_local: false,
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
Expand Down Expand Up @@ -1707,6 +1722,7 @@ mod tests {
let persist_request = PersistRequest {
leader_id: "test-ingester".to_string(),
commit_type: CommitTypeV2::Force as i32,
is_local: false,
subrequests: vec![PersistSubrequest {
subrequest_id: 0,
index_uid: Some(index_uid.clone()),
Expand Down Expand Up @@ -1801,6 +1817,7 @@ mod tests {
doc_batch: Some(DocBatchV2::for_test(["test-doc-110", "test-doc-111"])),
},
],
is_local: false,
};
let persist_response = leader.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-leader");
Expand Down Expand Up @@ -1987,6 +2004,7 @@ mod tests {
doc_batch: Some(DocBatchV2::for_test(["test-doc-110", "test-doc-111"])),
},
],
is_local: false,
};
let persist_response = leader.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-leader");
Expand Down Expand Up @@ -2095,6 +2113,7 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])),
}],
is_local: false,
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
Expand Down Expand Up @@ -2161,6 +2180,7 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])),
}],
is_local: false,
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
Expand Down Expand Up @@ -2229,6 +2249,7 @@ mod tests {
shard_id: Some(ShardId::from(1)),
doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])),
}],
is_local: false,
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ mod tests {
let doc_batch = doc_batch_builder.build().unwrap();

assert_eq!(doc_batch.num_docs(), 2);
assert_eq!(doc_batch.num_bytes(), 13);
assert_eq!(doc_batch.num_bytes(), 21);
assert_eq!(doc_batch.doc_lengths, [7, 6]);
assert_eq!(doc_batch.doc_buffer, Bytes::from(&b"Hello, World!"[..]));
}
Expand Down Expand Up @@ -278,7 +278,7 @@ mod tests {
.as_ref()
.unwrap()
.num_bytes(),
13
21
);
assert_eq!(
ingest_request.subrequests[0]
Expand Down Expand Up @@ -313,7 +313,7 @@ mod tests {
.as_ref()
.unwrap()
.num_bytes(),
12
20
);
assert_eq!(
ingest_request.subrequests[1]
Expand Down Expand Up @@ -345,6 +345,6 @@ mod tests {
doc_buffer: vec![0u8; 100].into(),
doc_lengths: vec![10, 20, 30],
};
assert_eq!(estimate_size(&doc_batch), ByteSize(106));
assert_eq!(estimate_size(&doc_batch), ByteSize(118));
}
}
5 changes: 5 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::time::{Duration, Instant};
use bytesize::ByteSize;
use futures::{Future, StreamExt};
use mrecordlog::error::CreateQueueError;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use quickwit_common::{rate_limited_warn, ServiceStream};
use quickwit_proto::ingest::ingester::{
ack_replication_message, syn_replication_message, AckReplicationMessage, IngesterStatus,
Expand Down Expand Up @@ -507,6 +508,10 @@ impl ReplicationTask {
self.current_replication_seqno, replicate_request.replication_seqno
)));
}
let request_size_bytes = replicate_request.num_bytes();
let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_replicate);
gauge_guard.add(request_size_bytes as i64);

self.current_replication_seqno += 1;

let commit_type = replicate_request.commit_type();
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ impl IngestRouter {

for (leader_id, subrequests) in per_leader_persist_subrequests {
let leader_id: NodeId = leader_id.clone();
let is_local = leader_id == self.self_node_id;
let subrequest_ids: Vec<SubrequestId> = subrequests
.iter()
.map(|subrequest| subrequest.subrequest_id)
Expand All @@ -373,6 +374,7 @@ impl IngestRouter {
leader_id: leader_id.into(),
subrequests,
commit_type: commit_type as i32,
is_local,
};
let persist_future = async move {
let persist_result = tokio::time::timeout(
Expand Down Expand Up @@ -444,6 +446,7 @@ impl IngestRouterService for IngestRouter {

let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingest_router);
gauge_guard.add(request_size_bytes as i64);

let _permit = self
.ingest_semaphore
.clone()
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-proto/protos/quickwit/ingester.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ message PersistRequest {
string leader_id = 1;
quickwit.ingest.CommitTypeV2 commit_type = 3;
repeated PersistSubrequest subrequests = 4;
bool is_local = 6;
}

message PersistSubrequest {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions quickwit/quickwit-proto/src/ingest/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ impl OpenFetchStreamRequest {
}
}

impl PersistRequest {
pub fn num_bytes(&self) -> usize {
self.subrequests
.iter()
.flat_map(|subrequest| &subrequest.doc_batch)
.map(|doc_batch| doc_batch.num_bytes())
.sum()
}
}

impl PersistSubrequest {
pub fn shard_id(&self) -> &ShardId {
self.shard_id
Expand Down Expand Up @@ -221,6 +231,16 @@ impl AckReplicationMessage {
}
}

impl ReplicateRequest {
pub fn num_bytes(&self) -> usize {
self.subrequests
.iter()
.flat_map(|subrequest| &subrequest.doc_batch)
.map(|doc_batch| doc_batch.num_bytes())
.sum()
}
}

impl ReplicateSubrequest {
pub fn shard_id(&self) -> &ShardId {
self.shard_id
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl DocBatchV2 {
}

pub fn num_bytes(&self) -> usize {
self.doc_buffer.len()
self.doc_buffer.len() + self.doc_lengths.len() * 4
}

pub fn num_docs(&self) -> usize {
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ mod tests {
assert_eq!(subrequests[0].index_id, "my-index-1");
assert_eq!(subrequests[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_docs(), 2);
assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_bytes(), 96);
assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_bytes(), 104);

assert_eq!(subrequests[1].index_id, "my-index-2");
assert_eq!(subrequests[1].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_docs(), 1);
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 48);
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 52);

Ok(IngestResponseV2 {
successes: vec![
Expand Down Expand Up @@ -238,7 +238,7 @@ mod tests {
assert_eq!(subrequest_0.index_id, "my-index-1");
assert_eq!(subrequest_0.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_docs(), 1);
assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_bytes(), 48);
assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_bytes(), 52);

Ok(IngestResponseV2 {
successes: vec![IngestSuccess {
Expand Down

0 comments on commit 621939c

Please sign in to comment.