Skip to content

Commit

Permalink
Rebalance shards
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Apr 9, 2024
1 parent 0b2721e commit 35e8cad
Show file tree
Hide file tree
Showing 32 changed files with 1,901 additions and 338 deletions.
2 changes: 1 addition & 1 deletion config/quickwit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ indexer:
# max_queue_disk_usage: 4GiB
#
# -------------------------------- Searcher settings --------------------------------
#
#
# searcher:
# fast_field_cache_capacity: 1G
# split_footer_cache_capacity: 500M
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream {
let service_name = &context.service_name;
let client_name = &context.client_name;
let mailbox_name = &context.mailbox_name;
let mock_name = &context.mock_name;
let tower_svc_stack_name = &context.tower_svc_stack_name;
let tower_layer_stack_name = &context.tower_layer_stack_name;
let error_type = &context.error_type;
Expand Down Expand Up @@ -947,6 +948,11 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream {
self.build_from_boxed(Box::new(#mailbox_name::new(mailbox)))
}

#[cfg(any(test, feature = "testsuite"))]
pub fn build_from_mock(self, mock: #mock_name) -> #client_name {
self.build_from_boxed(Box::new(#client_name::from_mock(mock)))
}

fn build_from_boxed(self, boxed_instance: Box<dyn #service_name>) -> #client_name
{
#svc_statements
Expand Down
42 changes: 27 additions & 15 deletions quickwit/quickwit-common/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,19 @@ impl RateLimiter {
}
}

/// Acquires some permits from the rate limiter.
/// If the permits are not available, returns the duration to wait before trying again.
pub fn acquire_with_duration(&mut self, num_permits: u64) -> Result<(), Duration> {
if self.acquire_inner(num_permits) {
return Ok(());
}
self.refill(Instant::now());
if self.acquire_inner(num_permits) {
return Ok(());
}
let missing = num_permits - self.available_permits;
let wait = Duration::from_micros(missing * self.refill_period_micros / self.refill_amount);
Err(wait)
}

/// Acquires some permits expressed in bytes from the rate limiter. Returns whether the permits
/// were acquired.
pub fn acquire_bytes(&mut self, bytes: ByteSize) -> bool {
self.acquire(bytes.as_u64())
}

/// Drains all the permits from the rate limiter, effectively disabling all the operations
/// guarded by the rate limiter for one refill period.
pub fn drain(&mut self) {
self.available_permits = 0;
self.refill_at = Instant::now() + self.refill_period;
}

/// Gives back some unused permits to the rate limiter.
pub fn release(&mut self, num_permits: u64) {
self.available_permits = self.max_capacity.min(self.available_permits + num_permits);
Expand Down Expand Up @@ -179,6 +173,24 @@ mod tests {
assert!(!rate_limiter.acquire_bytes(ByteSize::kb(20)));
}

#[test]
fn test_rate_limiter_drain() {
let settings = RateLimiterSettings {
burst_limit: ByteSize::mb(2).as_u64(),
rate_limit: ConstantRate::bytes_per_sec(ByteSize::mb(1)),
refill_period: Duration::from_millis(100),
};
let mut rate_limiter = RateLimiter::from_settings(settings);
rate_limiter.drain();
assert_eq!(rate_limiter.available_permits, 0);

rate_limiter.refill(Instant::now() + Duration::from_millis(50));
assert_eq!(rate_limiter.available_permits, 0);

rate_limiter.refill(Instant::now() + Duration::from_millis(100));
assert!(rate_limiter.available_permits >= ByteSize::kb(100).as_u64());
}

#[test]
fn test_rate_limiter_release() {
let settings = RateLimiterSettings {
Expand Down
130 changes: 130 additions & 0 deletions quickwit/quickwit-common/src/tower/delay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use pin_project::pin_project;
use tokio::time::Sleep;
use tower::{Layer, Service};

/// Delays a request by `delay` seconds.
#[derive(Debug, Clone)]
pub struct Delay<S> {
inner: S,
delay: Duration,
}

impl<S, R> Service<R> for Delay<S>
where S: Service<R>
{
type Response = S::Response;
type Error = S::Error;
type Future = DelayFuture<S::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: R) -> Self::Future {
DelayFuture {
inner: self.inner.call(request),
sleep: tokio::time::sleep(self.delay),
slept: false,
}
}
}

#[pin_project]
#[derive(Debug)]
pub struct DelayFuture<F> {
#[pin]
inner: F,
#[pin]
sleep: Sleep,
slept: bool,
}

impl<F, T, E> Future for DelayFuture<F>
where F: Future<Output = Result<T, E>>
{
type Output = Result<T, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

if !*this.slept {
match this.sleep.poll(cx) {
Poll::Ready(_) => *this.slept = true,
Poll::Pending => return Poll::Pending,
}
}
this.inner.poll(cx)
}
}

/// Applies a delay to requests via the supplied inner service.
#[derive(Debug, Clone)]
pub struct DelayLayer {
delay: Duration,
}

impl DelayLayer {
/// Creates a new `DelayLayer` with the specified delay.
pub fn new(delay: Duration) -> Self {
Self { delay }
}
}

impl<S> Layer<S> for DelayLayer {
type Service = Delay<S>;

fn layer(&self, service: S) -> Self::Service {
Delay {
inner: service,
delay: self.delay,
}
}
}

#[cfg(test)]
mod tests {
use std::time::Instant;

use tokio::time::Duration;
use tower::ServiceBuilder;

use super::*;

#[tokio::test]
async fn test_delay() {
let delay = Duration::from_millis(100);
let mut service = ServiceBuilder::new()
.layer(DelayLayer::new(delay))
.service_fn(|_| async { Ok::<_, ()>(()) });

let start = Instant::now();
service.call(()).await.unwrap();

let elapsed = start.elapsed();
assert!(elapsed >= delay);
}
}
2 changes: 2 additions & 0 deletions quickwit/quickwit-common/src/tower/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod box_layer;
mod box_service;
mod buffer;
mod change;
mod delay;
mod estimate_rate;
mod event_listener;
mod metrics;
Expand All @@ -39,6 +40,7 @@ pub use box_layer::BoxLayer;
pub use box_service::BoxService;
pub use buffer::{Buffer, BufferError, BufferLayer};
pub use change::Change;
pub use delay::{Delay, DelayLayer};
pub use estimate_rate::{EstimateRate, EstimateRateLayer};
pub use event_listener::{EventListener, EventListenerLayer};
use futures::Future;
Expand Down
Loading

0 comments on commit 35e8cad

Please sign in to comment.