Skip to content

Commit

Permalink
Apply the one task per call layer to the ingester on the local (non-g…
Browse files Browse the repository at this point in the history
…rpc) path as well. (#4874)

* bug poison

* Apply the one task per call layer to the ingester on the local (non-grpc) path as well.

Closes #4872
  • Loading branch information
fulmicoton authored Apr 23, 2024
1 parent ffbf6cf commit cfa2191
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 32 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/mrecordlog_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ pub struct MultiRecordLogAsync {
impl MultiRecordLogAsync {
fn take(&mut self) -> MultiRecordLog {
let Some(mrecordlog) = self.mrecordlog_opt.take() else {
error!("wal is poisoned, aborting process");
error!("wal is poisoned (on write), aborting process");
std::process::abort();
};
mrecordlog
}

fn mrecordlog_ref(&self) -> &MultiRecordLog {
let Some(mrecordlog) = &self.mrecordlog_opt else {
error!("the mrecordlog is corrupted, aborting process");
error!("wal is poisoned (on read), aborting process");
std::process::abort();
};
mrecordlog
Expand Down
26 changes: 5 additions & 21 deletions quickwit/quickwit-serve/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use quickwit_common::tower::BoxFutureInfaillible;
use quickwit_config::service::QuickwitService;
use quickwit_proto::developer::DeveloperServiceClient;
use quickwit_proto::indexing::IndexingServiceClient;
use quickwit_proto::ingest::ingester::IngesterServiceClient;
use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPluginServer;
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsServiceServer;
use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceServiceServer;
Expand All @@ -38,9 +37,7 @@ use tracing::*;

use crate::developer_api::DeveloperApiServer;
use crate::search_api::GrpcSearchAdapter;
use crate::{
QuickwitServices, INDEXING_GRPC_SERVER_METRICS_LAYER, INGEST_GRPC_SERVER_METRICS_LAYER,
};
use crate::{QuickwitServices, INDEXING_GRPC_SERVER_METRICS_LAYER};

/// Starts and binds gRPC services to `grpc_listen_addr`.
pub(crate) async fn start_grpc_server(
Expand Down Expand Up @@ -102,27 +99,14 @@ pub(crate) async fn start_grpc_server(
} else {
None
};
let ingester_grpc_service = if services
.node_config
.is_service_enabled(QuickwitService::Indexer)
{

let ingester_grpc_service = if let Some(ingester_service) = services.ingester_service() {
enabled_grpc_services.insert("ingester");
services.ingester_opt.clone().map(|ingester| {
IngesterServiceClient::tower()
.stack_persist_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_open_replication_stream_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_init_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_retain_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_truncate_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_close_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_decommission_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone())
.build(ingester)
.as_grpc_service(max_message_size)
})
Some(ingester_service.as_grpc_service(max_message_size))
} else {
None
};

// Mount gRPC control plane service if `QuickwitService::ControlPlane` is enabled on node.
let control_plane_grpc_service = if services
.node_config
Expand Down
45 changes: 36 additions & 9 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ use quickwit_metastore::{
use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService};
use quickwit_proto::control_plane::ControlPlaneServiceClient;
use quickwit_proto::indexing::{IndexingServiceClient, ShardPositionsUpdate};
use quickwit_proto::ingest::ingester::{IngesterService, IngesterServiceClient, IngesterStatus};
use quickwit_proto::ingest::ingester::{
IngesterService, IngesterServiceClient, IngesterServiceTowerLayerStack, IngesterStatus,
};
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::{
EntityKind, ListIndexesMetadataRequest, MetastoreError, MetastoreService,
Expand Down Expand Up @@ -187,7 +189,8 @@ struct QuickwitServices {
pub ingest_service: IngestServiceClient,
// Ingest v2
pub ingest_router_service: IngestRouterServiceClient,
pub ingester_opt: Option<Ingester>,
ingester_opt: Option<Ingester>,

pub janitor_service_opt: Option<Mailbox<JanitorService>>,
pub jaeger_service_opt: Option<JaegerService>,
pub otlp_logs_service_opt: Option<OtlpGrpcLogsService>,
Expand All @@ -206,6 +209,17 @@ struct QuickwitServices {
_report_splits_subscription_handle_opt: Option<EventSubscriptionHandle>,
}

impl QuickwitServices {
/// Client in the type is a bit misleading here.
///
/// The object returned is the implementation of the local ingester service,
/// with all of the appropriate tower layers.
pub fn ingester_service(&self) -> Option<IngesterServiceClient> {
let ingester = self.ingester_opt.clone()?;
Some(ingester_service_layer_stack(IngesterServiceClient::tower()).build(ingester))
}
}

async fn balance_channel_for_service(
cluster: &Cluster,
service: QuickwitService,
Expand Down Expand Up @@ -789,6 +803,21 @@ pub async fn serve_quickwit(
Ok(actor_exit_statuses)
}

/// Stack of layers to use on the server side of the ingester service.
fn ingester_service_layer_stack(
layer_stack: IngesterServiceTowerLayerStack,
) -> IngesterServiceTowerLayerStack {
layer_stack
.stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone())
.stack_persist_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_open_replication_stream_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_init_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_retain_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_truncate_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_close_shards_layer(quickwit_common::tower::OneTaskPerCallLayer)
.stack_decommission_layer(quickwit_common::tower::OneTaskPerCallLayer)
}

async fn setup_ingest_v2(
node_config: &NodeConfig,
cluster: &Cluster,
Expand Down Expand Up @@ -878,13 +907,11 @@ async fn setup_ingest_v2(
// metrics, so we use both metrics layers.
let ingester = ingester_opt_clone_clone
.expect("ingester service should be initialized");
let shared_layers = ServiceBuilder::new()
.layer(INGEST_GRPC_CLIENT_METRICS_LAYER.clone())
.layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone())
.into_inner();
let ingester_service = IngesterServiceClient::tower()
.stack_layer(shared_layers)
.build(ingester);
let ingester_service = ingester_service_layer_stack(
IngesterServiceClient::tower()
.stack_layer(INGEST_GRPC_CLIENT_METRICS_LAYER.clone()),
)
.build(ingester);
Some(Change::Insert(node_id, ingester_service))
} else {
let ingester_service = IngesterServiceClient::tower()
Expand Down

0 comments on commit cfa2191

Please sign in to comment.