Skip to content

Commit

Permalink
Minor refactoring in the rate limiter. (#5201)
Browse files Browse the repository at this point in the history
The point is to be able to remove the needless private function, and
test public functions.
  • Loading branch information
fulmicoton authored and trinity-1686a committed Jul 15, 2024
1 parent bfe191e commit 706ba95
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 17 deletions.
23 changes: 19 additions & 4 deletions quickwit/quickwit-common/src/tower/rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ impl ConstantRate {
///
/// # Panics
///
/// This function panics if `period` is 0.
/// This function panics if `period` is 0 while work is != 0.
pub const fn new(work: u64, period: Duration) -> Self {
assert!(!period.is_zero());

assert!(!period.is_zero() || work == 0u64);
Self { work, period }
}

Expand All @@ -69,8 +68,10 @@ impl ConstantRate {
///
/// This function panics if `new_period` is 0.
pub fn rescale(&self, new_period: Duration) -> Self {
if self.work == 0u64 {
return Self::new(0u64, new_period);
}
assert!(!new_period.is_zero());

let new_work = self.work() as u128 * new_period.as_nanos() / self.period().as_nanos();
Self::new(new_work as u64, new_period)
}
Expand All @@ -90,6 +91,20 @@ impl Rate for ConstantRate {
mod tests {
use super::*;

#[test]
#[should_panic]
fn test_rescale_zero_duration_panics() {
ConstantRate::bytes_per_period(ByteSize::b(1), Duration::default());
}

#[test]
fn test_rescale_zero_duration_accepted_if_no_work() {
let rate = ConstantRate::bytes_per_period(ByteSize::b(0), Duration::default());
let rescaled_rate = rate.rescale(Duration::from_secs(1));
assert_eq!(rescaled_rate.work_bytes(), ByteSize::b(0));
assert_eq!(rescaled_rate.period(), Duration::from_secs(1));
}

#[test]
fn test_rescale() {
let rate = ConstantRate::bytes_per_period(ByteSize::mib(5), Duration::from_secs(5));
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mockall = { workspace = true }
rand = { workspace = true }
rand_distr = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util"]}

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-cluster = { workspace = true, features = ["testsuite"] }
Expand Down
27 changes: 14 additions & 13 deletions quickwit/quickwit-ingest/src/ingest_v2/rate_meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::time::Instant;

use quickwit_common::tower::ConstantRate;
use tokio::time::Instant;

/// A naive rate meter that tracks how much work was performed during a period of time defined by
/// two successive calls to `harvest`.
Expand Down Expand Up @@ -47,10 +46,7 @@ impl RateMeter {
/// Returns the average work rate since the last call to this method and resets the internal
/// state.
pub fn harvest(&mut self) -> ConstantRate {
self.harvest_inner(Instant::now())
}

fn harvest_inner(&mut self, now: Instant) -> ConstantRate {
let now = Instant::now();
let elapsed = now.duration_since(self.harvested_at);
let rate = ConstantRate::new(self.total_work, elapsed);
self.total_work = 0;
Expand All @@ -67,20 +63,25 @@ mod tests {

use super::*;

#[test]
fn test_rate_meter() {
#[tokio::test]
async fn test_rate_meter() {
tokio::time::pause();
let mut rate_meter = RateMeter::default();
assert_eq!(rate_meter.total_work, 0);

let now = Instant::now();
rate_meter.harvested_at = now;
let rate = rate_meter.harvest();
assert_eq!(rate.work(), 0);
assert!(rate.period().is_zero());

let rate = rate_meter.harvest_inner(now + Duration::from_millis(100));
tokio::time::advance(Duration::from_millis(100)).await;

let rate = rate_meter.harvest();
assert_eq!(rate.work(), 0);
assert_eq!(rate.period(), Duration::from_millis(100));

rate_meter.update(1);
let rate = rate_meter.harvest_inner(now + Duration::from_millis(200));
tokio::time::advance(Duration::from_millis(100)).await;

let rate = rate_meter.harvest();
assert_eq!(rate.work(), 1);
assert_eq!(rate.period(), Duration::from_millis(100));
}
Expand Down

0 comments on commit 706ba95

Please sign in to comment.