Skip to content

Commit

Permalink
Merge branch 'main' into lambda-searcher-get-indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
tyrwzl authored Jun 4, 2024
2 parents 4f5e54d + 6fc5a64 commit ecbd6a9
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 45 deletions.
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ tower = { version = "0.4.13", features = [
"retry",
"util",
] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] }
tower-http = { version = "0.4.0", features = [
"compression-zstd",
"compression-gzip",
"cors",
] }
tracing = "0.1.37"
tracing-opentelemetry = "0.20.0"
tracing-subscriber = { version = "0.3.16", features = [
Expand Down
38 changes: 30 additions & 8 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl BroadcastLocalShardsTask {
.shards
.iter()
.filter_map(|(queue_id, shard)| {
if !shard.is_replica() {
if shard.is_advertisable && !shard.is_replica() {
Some((queue_id.clone(), shard.shard_state))
} else {
None
Expand Down Expand Up @@ -479,22 +479,44 @@ mod tests {
let mut state_guard = state.lock_partially().await.unwrap();

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let queue_id_00 = queue_id(&index_uid, "test-source", &ShardId::from(0));
let shard_00 = IngesterShard::new_solo(
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
state_guard.shards.insert(queue_id_00.clone(), shard_00);

let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));
let shard = IngesterShard::new_solo(
let mut shard_01 = IngesterShard::new_solo(
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
state_guard.shards.insert(queue_id_01.clone(), shard);
shard_01.is_advertisable = true;
state_guard.shards.insert(queue_id_01.clone(), shard_01);

let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();
let queue_id_02 = queue_id(&index_uid, "test-source", &ShardId::from(2));
let mut shard_02 = IngesterShard::new_replica(
NodeId::from("test-leader"),
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
shard_02.is_advertisable = true;
state_guard.shards.insert(queue_id_02.clone(), shard_02);

state_guard
.rate_trackers
.insert(queue_id_01.clone(), (rate_limiter, rate_meter));
for queue_id in [queue_id_00, queue_id_01, queue_id_02] {
let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();

state_guard
.rate_trackers
.insert(queue_id, (rate_limiter, rate_meter));
}
drop(state_guard);

let new_snapshot = task.snapshot_local_shards().await.unwrap();
Expand Down
73 changes: 45 additions & 28 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ impl Ingester {
};
return Ok(persist_response);
}

// first verify if we would locally accept each subrequest
{
let mut total_requested_capacity = bytesize::ByteSize::b(0);
Expand All @@ -491,6 +490,11 @@ impl Ingester {
persist_failures.push(persist_failure);
continue;
};
// A router can only know about a newly opened shard if it has been informed by the
// control plane, which confirms that the shard was correctly opened in the
// metastore.
shard.is_advertisable = true;

if shard.is_closed() {
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
Expand Down Expand Up @@ -597,7 +601,6 @@ impl Ingester {
}
}
}

// replicate to the follower
{
let mut replicate_futures = FuturesUnordered::new();
Expand Down Expand Up @@ -846,17 +849,22 @@ impl Ingester {
open_fetch_stream_request: OpenFetchStreamRequest,
) -> IngestV2Result<ServiceStream<IngestV2Result<FetchMessage>>> {
let queue_id = open_fetch_stream_request.queue_id();
let shard_status_rx = self
.state
.lock_partially()
.await?
.shards
.get(&queue_id)
.ok_or_else(|| IngestV2Error::ShardNotFound {
shard_id: open_fetch_stream_request.shard_id().clone(),
})?
.shard_status_rx
.clone();

let mut state_guard = self.state.lock_partially().await?;

let shard =
state_guard
.shards
.get_mut(&queue_id)
.ok_or_else(|| IngestV2Error::ShardNotFound {
shard_id: open_fetch_stream_request.shard_id().clone(),
})?;
// An indexer can only know about a newly opened shard if it has been scheduled by the
// control plane, which confirms that the shard was correctly opened in the
// metastore.
shard.is_advertisable = true;

let shard_status_rx = shard.shard_status_rx.clone();
let mrecordlog = self.state.mrecordlog();
let (service_stream, _fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
Expand Down Expand Up @@ -1478,6 +1486,7 @@ mod tests {
solo_shard_02.assert_is_closed();
solo_shard_02.assert_replication_position(Position::offset(1u64));
solo_shard_02.assert_truncation_position(Position::offset(0u64));
assert!(solo_shard_02.is_advertisable);

state_guard
.mrecordlog
Expand All @@ -1495,21 +1504,32 @@ mod tests {
let mut state_guard = ingester.state.lock_fully().await.unwrap();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);

let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));
let shard = IngesterShard::new_solo(
let queue_id_00 = queue_id(&index_uid, "test-source", &ShardId::from(0));
let shard_00 = IngesterShard::new_solo(
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
state_guard.shards.insert(queue_id_01.clone(), shard);

let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();
state_guard
.rate_trackers
.insert(queue_id_01.clone(), (rate_limiter, rate_meter));
state_guard.shards.insert(queue_id_00.clone(), shard_00);

let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));
let mut shard_01 = IngesterShard::new_solo(
ShardState::Open,
Position::Beginning,
Position::Beginning,
Instant::now(),
);
shard_01.is_advertisable = true;
state_guard.shards.insert(queue_id_01.clone(), shard_01);

for queue_id in [&queue_id_00, &queue_id_01] {
let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();
state_guard
.rate_trackers
.insert(queue_id.clone(), (rate_limiter, rate_meter));
}
drop(state_guard);

tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -2548,12 +2568,9 @@ mod tests {
.await
.unwrap();

state_guard
.shards
.get(&queue_id)
.unwrap()
.notify_shard_status();

let shard = state_guard.shards.get(&queue_id).unwrap();
assert!(shard.is_advertisable);
shard.notify_shard_status();
drop(state_guard);

let fetch_response = fetch_stream.next().await.unwrap().unwrap();
Expand Down
15 changes: 15 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ pub(super) struct IngesterShard {
pub replication_position_inclusive: Position,
/// Position up to which the shard has been truncated.
pub truncation_position_inclusive: Position,
/// Whether the shard should be advertised to other nodes (routers) via gossip.
///
/// Because shards are created in multiple steps, (e.g., init shard on leader, create shard in
/// metastore), we must receive a "signal" from the control plane confirming that a shard
/// was successfully opened before advertising it. Currently, this confirmation comes in the
/// form of `PersistRequest` or `FetchRequest`.
pub is_advertisable: bool,
pub shard_status_tx: watch::Sender<ShardStatus>,
pub shard_status_rx: watch::Receiver<ShardStatus>,
/// Instant at which the shard was last written to.
Expand All @@ -65,6 +72,7 @@ impl IngesterShard {
shard_state,
replication_position_inclusive,
truncation_position_inclusive,
is_advertisable: false,
shard_status_tx,
shard_status_rx,
last_write_instant: now,
Expand All @@ -85,6 +93,9 @@ impl IngesterShard {
shard_state,
replication_position_inclusive,
truncation_position_inclusive,
// This is irrelevant for replica shards since they are not advertised via gossip
// anyway.
is_advertisable: false,
shard_status_tx,
shard_status_rx,
last_write_instant: now,
Expand All @@ -104,6 +115,7 @@ impl IngesterShard {
shard_state,
replication_position_inclusive,
truncation_position_inclusive,
is_advertisable: false,
shard_status_tx,
shard_status_rx,
last_write_instant: now,
Expand Down Expand Up @@ -240,6 +252,7 @@ mod tests {
primary_shard.truncation_position_inclusive,
Position::Beginning
);
assert!(!primary_shard.is_advertisable);
}

#[test]
Expand All @@ -265,6 +278,7 @@ mod tests {
replica_shard.truncation_position_inclusive,
Position::Beginning
);
assert!(!replica_shard.is_advertisable);
}

#[test]
Expand All @@ -286,5 +300,6 @@ mod tests {
solo_shard.truncation_position_inclusive,
Position::Beginning
);
assert!(!solo_shard.is_advertisable);
}
}
4 changes: 3 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,14 @@ impl IngesterState {
} else {
Position::offset(*position_range.start() - 1)
};
let solo_shard = IngesterShard::new_solo(
let mut solo_shard = IngesterShard::new_solo(
ShardState::Closed,
replication_position_inclusive,
truncation_position_inclusive,
now,
);
// We want to advertise the shard as read-only right away.
solo_shard.is_advertisable = true;
inner_guard.shards.insert(queue_id.clone(), solo_shard);

let rate_limiter = RateLimiter::from_settings(rate_limiter_settings);
Expand Down
44 changes: 37 additions & 7 deletions quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use hyper::{http, Method, StatusCode};
use quickwit_common::tower::BoxFutureInfaillible;
use tower::make::Shared;
use tower::ServiceBuilder;
use tower_http::compression::predicate::{DefaultPredicate, Predicate, SizeAbove};
use tower_http::compression::predicate::{NotForContentType, Predicate, SizeAbove};
use tower_http::compression::CompressionLayer;
use tower_http::cors::CorsLayer;
use tracing::{error, info};
Expand All @@ -52,10 +52,6 @@ use crate::template_api::index_template_api_handlers;
use crate::ui_handler::ui_handler;
use crate::{BodyFormat, BuildInfo, QuickwitServices, RuntimeInfo};

/// The minimum size a response body must be in order to
/// be automatically compressed with gzip.
const MINIMUM_RESPONSE_COMPRESSION_SIZE: u16 = 10 << 10;

#[derive(Debug)]
pub(crate) struct InvalidJsonRequest(pub serde_json::Error);

Expand Down Expand Up @@ -88,6 +84,39 @@ impl std::fmt::Display for InternalError {
}
}

/// Env variable key to define the minimum size above which a response should be compressed.
/// If unset, no compression is applied.
const QW_MINIMUM_COMPRESSION_SIZE_KEY: &str = "QW_MINIMUM_COMPRESSION_SIZE";

#[derive(Clone, Copy)]
struct CompressionPredicate {
size_above_opt: Option<SizeAbove>,
}

impl CompressionPredicate {
fn from_env() -> CompressionPredicate {
let minimum_compression_size_opt: Option<u16> = quickwit_common::get_from_env_opt::<usize>(
QW_MINIMUM_COMPRESSION_SIZE_KEY,
)
.map(|minimum_compression_size: usize| {
u16::try_from(minimum_compression_size).unwrap_or(u16::MAX)
});
let size_above_opt = minimum_compression_size_opt.map(SizeAbove::new);
CompressionPredicate { size_above_opt }
}
}

impl Predicate for CompressionPredicate {
fn should_compress<B>(&self, response: &http::Response<B>) -> bool
where B: hyper::body::HttpBody {
if let Some(size_above) = self.size_above_opt {
size_above.should_compress(response)
} else {
false
}
}
}

/// Starts REST services.
pub(crate) async fn start_rest_server(
rest_listen_addr: SocketAddr,
Expand Down Expand Up @@ -158,14 +187,15 @@ pub(crate) async fn start_rest_server(
.boxed();

let warp_service = warp::service(rest_routes);
let compression_predicate =
DefaultPredicate::new().and(SizeAbove::new(MINIMUM_RESPONSE_COMPRESSION_SIZE));
let compression_predicate = CompressionPredicate::from_env().and(NotForContentType::IMAGES);
let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins);

let service = ServiceBuilder::new()
.layer(
CompressionLayer::new()
.zstd(true)
.gzip(true)
.quality(tower_http::CompressionLevel::Fastest)
.compress_when(compression_predicate),
)
.layer(cors)
Expand Down

0 comments on commit ecbd6a9

Please sign in to comment.