Skip to content

Commit

Permalink
Init rate limiter and meter on ingester init (#4195)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Nov 25, 2023
1 parent 5afd086 commit 969a961
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,13 @@ impl Ingester {
.unwrap_or(Position::Eof);
let solo_shard =
IngesterShard::new_solo(ShardState::Closed, Position::Eof, truncation_position);
state_guard.shards.insert(queue_id, solo_shard);
state_guard.shards.insert(queue_id.clone(), solo_shard);

let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings);
let rate_meter = RateMeter::default();
state_guard
.rate_trackers
.insert(queue_id, (rate_limiter, rate_meter));
}
Ok(())
}
Expand Down Expand Up @@ -1009,6 +1015,8 @@ mod tests {
solo_shard_01.assert_replication_position(Position::Eof);
solo_shard_01.assert_truncation_position(Position::Eof);

assert!(state_guard.rate_trackers.contains_key(&queue_id_01));

state_guard
.mrecordlog
.assert_records_eq(&queue_id_01, .., &[(1, "\0\x02")]);
Expand All @@ -1019,6 +1027,8 @@ mod tests {
solo_shard_02.assert_replication_position(Position::Eof);
solo_shard_02.assert_truncation_position(0u64);

assert!(state_guard.rate_trackers.contains_key(&queue_id_02));

state_guard.mrecordlog.assert_records_eq(
&queue_id_02,
..,
Expand All @@ -1031,6 +1041,8 @@ mod tests {
solo_shard_03.assert_replication_position(Position::Eof);
solo_shard_03.assert_truncation_position(Position::Eof);

assert!(state_guard.rate_trackers.contains_key(&queue_id_02));

state_guard
.mrecordlog
.assert_records_eq(&queue_id_03, .., &[(0, "\0\x02")]);
Expand Down

0 comments on commit 969a961

Please sign in to comment.