From 25690652d5ecdf0f7d698b0276c9f06f00651b8a Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Tue, 2 Jan 2024 15:50:15 +0100 Subject: [PATCH] Stack layers instead of overriding them (#4273) --- .../example/src/codegen/hello.rs | 227 ++- quickwit/quickwit-codegen/example/src/lib.rs | 49 +- quickwit/quickwit-codegen/src/codegen.rs | 181 +- .../src/codegen/ingest_service.rs | 229 ++- .../quickwit-metastore/src/metastore/mod.rs | 2 +- .../quickwit/quickwit.control_plane.rs | 463 +++-- .../src/codegen/quickwit/quickwit.indexing.rs | 102 +- .../quickwit/quickwit.ingest.ingester.rs | 751 +++++--- .../quickwit/quickwit.ingest.router.rs | 93 +- .../codegen/quickwit/quickwit.metastore.rs | 1645 +++++++++++------ quickwit/quickwit-serve/src/lib.rs | 14 +- 11 files changed, 2491 insertions(+), 1265 deletions(-) diff --git a/quickwit/quickwit-codegen/example/src/codegen/hello.rs b/quickwit/quickwit-codegen/example/src/codegen/hello.rs index 8d4079c63c1..1321daf1326 100644 --- a/quickwit/quickwit-codegen/example/src/codegen/hello.rs +++ b/quickwit/quickwit-codegen/example/src/codegen/hello.rs @@ -138,8 +138,8 @@ impl HelloClient { { HelloClient::new(HelloMailbox::new(mailbox)) } - pub fn tower() -> HelloTowerBlockBuilder { - HelloTowerBlockBuilder::default() + pub fn tower() -> HelloTowerLayerStack { + HelloTowerLayerStack::default() } #[cfg(any(test, feature = "testsuite"))] pub fn mock() -> MockHello { @@ -270,9 +270,9 @@ impl tower::Service> for Box, hello_svc: quickwit_common::tower::BoxService< HelloRequest, @@ -290,7 +290,7 @@ struct HelloTowerBlock { crate::HelloError, >, } -impl Clone for HelloTowerBlock { +impl Clone for HelloTowerServiceStack { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -301,7 +301,7 @@ impl Clone for HelloTowerBlock { } } #[async_trait::async_trait] -impl Hello for HelloTowerBlock { +impl Hello for HelloTowerServiceStack { async fn hello( &mut self, request: HelloRequest, @@ -327,69 +327,133 @@ impl Hello for HelloTowerBlock { self.inner.endpoints() } } -#[derive(Debug, Default)] -pub struct HelloTowerBlockBuilder { - #[allow(clippy::type_complexity)] - hello_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - HelloRequest, - HelloResponse, - crate::HelloError, - >, - >, - #[allow(clippy::type_complexity)] - goodbye_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - GoodbyeRequest, - GoodbyeResponse, - crate::HelloError, - >, +type HelloLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService, + HelloRequest, + HelloResponse, + crate::HelloError, +>; +type GoodbyeLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + GoodbyeRequest, + GoodbyeResponse, + crate::HelloError, >, - #[allow(clippy::type_complexity)] - ping_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - quickwit_common::ServiceStream, - HelloStream, - crate::HelloError, - >, + GoodbyeRequest, + GoodbyeResponse, + crate::HelloError, +>; +type PingLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + quickwit_common::ServiceStream, + HelloStream, + crate::HelloError, >, + quickwit_common::ServiceStream, + HelloStream, + crate::HelloError, +>; +#[derive(Debug, Default)] +pub struct HelloTowerLayerStack { + hello_layers: Vec, + goodbye_layers: Vec, + ping_layers: Vec, } -impl HelloTowerBlockBuilder { - pub fn shared_layer(mut self, layer: L) -> Self +impl HelloTowerLayerStack { + pub fn stack_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Clone + Send + Sync + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + HelloRequest, + HelloResponse, + crate::HelloError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< HelloRequest, Response = HelloResponse, Error = crate::HelloError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + GoodbyeRequest, + GoodbyeResponse, + crate::HelloError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< GoodbyeRequest, Response = GoodbyeResponse, Error = crate::HelloError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + quickwit_common::ServiceStream, + HelloStream, + crate::HelloError, + >, + > + Clone + Send + Sync + 'static, + , + HelloStream, + crate::HelloError, + >, + >>::Service: tower::Service< quickwit_common::ServiceStream, Response = HelloStream, Error = crate::HelloError, > + Clone + Send + Sync + 'static, - , + HelloStream, + crate::HelloError, + >, + >>::Service as tower::Service< quickwit_common::ServiceStream, >>::Future: Send + 'static, { - self.hello_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone())); - self.goodbye_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone())); - self.ping_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.hello_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.goodbye_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.ping_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } - pub fn hello_layer(mut self, layer: L) -> Self + pub fn stack_hello_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + HelloRequest, + HelloResponse, + crate::HelloError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< HelloRequest, Response = HelloResponse, @@ -397,12 +461,18 @@ impl HelloTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.hello_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.hello_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn goodbye_layer(mut self, layer: L) -> Self + pub fn stack_goodbye_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + GoodbyeRequest, + GoodbyeResponse, + crate::HelloError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< GoodbyeRequest, Response = GoodbyeResponse, @@ -410,12 +480,18 @@ impl HelloTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.goodbye_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.goodbye_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn ping_layer(mut self, layer: L) -> Self + pub fn stack_ping_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + quickwit_common::ServiceStream, + HelloStream, + crate::HelloError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< quickwit_common::ServiceStream, Response = HelloStream, @@ -425,7 +501,7 @@ impl HelloTowerBlockBuilder { quickwit_common::ServiceStream, >>::Future: Send + 'static, { - self.ping_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.ping_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn build(self, instance: T) -> HelloClient @@ -460,28 +536,37 @@ impl HelloTowerBlockBuilder { self.build_from_boxed(Box::new(HelloMailbox::new(mailbox))) } fn build_from_boxed(self, boxed_instance: Box) -> HelloClient { - let hello_svc = if let Some(layer) = self.hello_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let goodbye_svc = if let Some(layer) = self.goodbye_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let ping_svc = if let Some(layer) = self.ping_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let tower_block = HelloTowerBlock { + let hello_svc = self + .hello_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let goodbye_svc = self + .goodbye_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let ping_svc = self + .ping_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let tower_svc_stack = HelloTowerServiceStack { inner: boxed_instance.clone(), hello_svc, goodbye_svc, ping_svc, }; - HelloClient::new(tower_block) + HelloClient::new(tower_svc_stack) } } #[derive(Debug, Clone)] diff --git a/quickwit/quickwit-codegen/example/src/lib.rs b/quickwit/quickwit-codegen/example/src/lib.rs index e2c3cfd20f7..37fcdcb3290 100644 --- a/quickwit/quickwit-codegen/example/src/lib.rs +++ b/quickwit/quickwit-codegen/example/src/lib.rs @@ -469,15 +469,17 @@ mod tests { } #[tokio::test] - async fn test_hello_codegen_tower_layers() { + async fn test_hello_codegen_tower_stack_layers() { + let layer = CounterLayer::default(); let hello_layer = CounterLayer::default(); let goodbye_layer = CounterLayer::default(); let ping_layer = CounterLayer::default(); let mut hello_tower = HelloClient::tower() - .hello_layer(hello_layer.clone()) - .goodbye_layer(goodbye_layer.clone()) - .ping_layer(ping_layer.clone()) + .stack_layer(layer.clone()) + .stack_hello_layer(hello_layer.clone()) + .stack_goodbye_layer(goodbye_layer.clone()) + .stack_ping_layer(ping_layer.clone()) .build(HelloImpl); hello_tower @@ -507,49 +509,12 @@ mod tests { "Pong, Tower!" ); + assert_eq!(layer.counter.load(Ordering::Relaxed), 3); assert_eq!(hello_layer.counter.load(Ordering::Relaxed), 1); assert_eq!(goodbye_layer.counter.load(Ordering::Relaxed), 1); assert_eq!(ping_layer.counter.load(Ordering::Relaxed), 1); } - #[tokio::test] - async fn test_hello_codegen_tower_shared_layer() { - let layer = CounterLayer::default(); - - let mut hello_tower = HelloClient::tower() - .shared_layer(layer.clone()) - .build(HelloImpl); - - hello_tower - .hello(HelloRequest { - name: "Tower".to_string(), - }) - .await - .unwrap(); - - hello_tower - .goodbye(GoodbyeRequest { - name: "Tower".to_string(), - }) - .await - .unwrap(); - - let (ping_stream_tx, ping_stream) = ServiceStream::new_bounded(1); - let mut pong_stream = hello_tower.ping(ping_stream).await.unwrap(); - - ping_stream_tx - .try_send(PingRequest { - name: "Tower".to_string(), - }) - .unwrap(); - assert_eq!( - pong_stream.next().await.unwrap().unwrap().message, - "Pong, Tower!" - ); - - assert_eq!(layer.counter.load(Ordering::Relaxed), 3); - } - #[tokio::test] async fn test_from_channel() { let balance_channed = BalanceChannel::from_channel( diff --git a/quickwit/quickwit-codegen/src/codegen.rs b/quickwit/quickwit-codegen/src/codegen.rs index ed413201eab..f93a30795ee 100644 --- a/quickwit/quickwit-codegen/src/codegen.rs +++ b/quickwit/quickwit-codegen/src/codegen.rs @@ -20,7 +20,7 @@ use std::collections::HashSet; use anyhow::ensure; -use heck::ToSnakeCase; +use heck::{ToSnakeCase, ToUpperCamelCase}; use proc_macro2::TokenStream; use prost_build::{Comments, Method, Service, ServiceGenerator}; use quote::{quote, ToTokens}; @@ -186,8 +186,8 @@ struct CodegenContext { stream_type_alias: TokenStream, methods: Vec, client_name: Ident, - tower_block_name: Ident, - tower_block_builder_name: Ident, + tower_svc_stack_name: Ident, + tower_layer_stack_name: Ident, mailbox_name: Ident, mock_mod_name: Ident, mock_name: Ident, @@ -228,8 +228,8 @@ impl CodegenContext { let methods = SynMethod::parse_prost_methods(&service.methods); let client_name = quote::format_ident!("{}Client", service.name); - let tower_block_name = quote::format_ident!("{}TowerBlock", service.name); - let tower_block_builder_name = quote::format_ident!("{}TowerBlockBuilder", service.name); + let tower_svc_stack_name = quote::format_ident!("{}TowerServiceStack", service.name); + let tower_layer_stack_name = quote::format_ident!("{}TowerLayerStack", service.name); let mailbox_name = quote::format_ident!("{}Mailbox", service.name); let grpc_client_name = quote::format_ident!("{}GrpcClient", service.name); @@ -254,8 +254,8 @@ impl CodegenContext { stream_type_alias, methods, client_name, - tower_block_name, - tower_block_builder_name, + tower_svc_stack_name, + tower_layer_stack_name, mailbox_name, mock_mod_name, mock_name, @@ -288,8 +288,8 @@ fn generate_all( let service_trait = generate_service_trait(&context); let client = generate_client(&context); let tower_services = generate_tower_services(&context); - let tower_block = generate_tower_block(&context); - let tower_block_builder = generate_tower_block_builder(&context); + let tower_svc_stack = generate_tower_svc_stack(&context); + let tower_layer_stack = generate_tower_layer_stack(&context); let tower_mailbox = generate_tower_mailbox(&context); let grpc_client_adapter = generate_grpc_client_adapter(&context); let grpc_server_adapter = generate_grpc_server_adapter(&context); @@ -317,9 +317,9 @@ fn generate_all( #tower_services - #tower_block + #tower_svc_stack - #tower_block_builder + #tower_layer_stack #tower_mailbox @@ -498,7 +498,7 @@ fn generate_service_trait_methods(context: &CodegenContext) -> TokenStream { stream } -fn generate_additional_methods_calling_inner() -> TokenStream { +fn generate_extra_methods_calling_inner() -> TokenStream { quote! { async fn check_connectivity(&mut self) -> anyhow::Result<()> { self.inner.check_connectivity().await @@ -526,19 +526,19 @@ fn generate_client(context: &CodegenContext) -> TokenStream { let mock_mod_name = &context.mock_mod_name; let mock_methods = generate_client_methods(context, true); let mailbox_name = &context.mailbox_name; - let tower_block_builder_name = &context.tower_block_builder_name; + let tower_layer_stack_name = &context.tower_layer_stack_name; let mock_name = &context.mock_name; let mock_wrapper_name = quote::format_ident!("{}Wrapper", mock_name); let error_mesage = format!( "`{}` must be wrapped in a `{}`. Use `{}::from(mock)` to instantiate the client.", mock_name, mock_wrapper_name, mock_name ); - let additional_client_methods = if context.generate_extra_service_methods { - generate_additional_methods_calling_inner() + let extra_client_methods = if context.generate_extra_service_methods { + generate_extra_methods_calling_inner() } else { TokenStream::new() }; - let additional_mock_methods = if context.generate_extra_service_methods { + let extra_mock_methods = if context.generate_extra_service_methods { quote! { async fn check_connectivity(&mut self) -> anyhow::Result<()> { self.inner.lock().await.check_connectivity().await @@ -602,8 +602,8 @@ fn generate_client(context: &CodegenContext) -> TokenStream { #client_name::new(#mailbox_name::new(mailbox)) } - pub fn tower() -> #tower_block_builder_name { - #tower_block_builder_name::default() + pub fn tower() -> #tower_layer_stack_name { + #tower_layer_stack_name::default() } #[cfg(any(test, feature = "testsuite"))] @@ -615,7 +615,7 @@ fn generate_client(context: &CodegenContext) -> TokenStream { #[async_trait::async_trait] impl #service_name for #client_name { #client_methods - #additional_client_methods + #extra_client_methods } #[cfg(any(test, feature = "testsuite"))] @@ -630,7 +630,7 @@ fn generate_client(context: &CodegenContext) -> TokenStream { #[async_trait::async_trait] impl #service_name for #mock_wrapper_name { #mock_methods - #additional_mock_methods + #extra_mock_methods } impl From<#mock_name> for #client_name { @@ -710,29 +710,29 @@ fn generate_tower_services(context: &CodegenContext) -> TokenStream { stream } -fn generate_tower_block(context: &CodegenContext) -> TokenStream { - let tower_block_name = &context.tower_block_name; +fn generate_tower_svc_stack(context: &CodegenContext) -> TokenStream { + let tower_svc_stack_name = &context.tower_svc_stack_name; let service_name = &context.service_name; - let tower_block_attributes = generate_tower_block_attributes(context); - let tower_block_clone_impl = generate_tower_block_clone_impl(context); - let tower_block_service_impl = generate_tower_block_service_impl(context); + let tower_svc_stack_attributes = generate_tower_svc_stack_attributes(context); + let tower_svc_stack_clone_impl = generate_tower_svc_stack_clone_impl(context); + let tower_svc_stack_service_impl = generate_tower_svc_stack_service_impl(context); quote! { - /// A tower block is a set of towers. Each tower is stack of layers (middlewares) that are applied to a service. + /// A tower service stack is a set of tower services. #[derive(Debug)] - struct #tower_block_name { + struct #tower_svc_stack_name { inner: Box, - #tower_block_attributes + #tower_svc_stack_attributes } - #tower_block_clone_impl + #tower_svc_stack_clone_impl - #tower_block_service_impl + #tower_svc_stack_service_impl } } -fn generate_tower_block_attributes(context: &CodegenContext) -> TokenStream { +fn generate_tower_svc_stack_attributes(context: &CodegenContext) -> TokenStream { let error_type = &context.error_type; let mut stream = TokenStream::new(); @@ -750,8 +750,8 @@ fn generate_tower_block_attributes(context: &CodegenContext) -> TokenStream { stream } -fn generate_tower_block_clone_impl(context: &CodegenContext) -> TokenStream { - let tower_block_name = &context.tower_block_name; +fn generate_tower_svc_stack_clone_impl(context: &CodegenContext) -> TokenStream { + let tower_svc_stack_name = &context.tower_svc_stack_name; let mut cloned_attributes = TokenStream::new(); @@ -764,7 +764,7 @@ fn generate_tower_block_clone_impl(context: &CodegenContext) -> TokenStream { } quote! { - impl Clone for #tower_block_name { + impl Clone for #tower_svc_stack_name { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -775,12 +775,12 @@ fn generate_tower_block_clone_impl(context: &CodegenContext) -> TokenStream { } } -fn generate_tower_block_service_impl(context: &CodegenContext) -> TokenStream { +fn generate_tower_svc_stack_service_impl(context: &CodegenContext) -> TokenStream { let service_name = &context.service_name; - let tower_block_name = &context.tower_block_name; + let tower_svc_stack_name = &context.tower_svc_stack_name; let result_type = &context.result_type; - let additional_client_methods = if context.generate_extra_service_methods { - generate_additional_methods_calling_inner() + let extra_client_methods = if context.generate_extra_service_methods { + generate_extra_methods_calling_inner() } else { TokenStream::new() }; @@ -802,106 +802,112 @@ fn generate_tower_block_service_impl(context: &CodegenContext) -> TokenStream { quote! { #[async_trait::async_trait] - impl #service_name for #tower_block_name { + impl #service_name for #tower_svc_stack_name { #methods - #additional_client_methods + #extra_client_methods } } } -fn generate_tower_block_builder(context: &CodegenContext) -> TokenStream { - let tower_block_builder_name = &context.tower_block_builder_name; - let tower_block_builder_attributes = generate_tower_block_builder_attributes(context); - let tower_block_builder_impl = generate_tower_block_builder_impl(context); +fn generate_tower_layer_stack(context: &CodegenContext) -> TokenStream { + let tower_layer_stack_name = &context.tower_layer_stack_name; + let (tower_layer_stack_types, layer_stack_attributes) = + generate_layer_stack_types_and_attributes(context); + let layer_stack_impl = generate_layer_stack_impl(context); quote! { + #tower_layer_stack_types + #[derive(Debug, Default)] - pub struct #tower_block_builder_name { - #tower_block_builder_attributes + pub struct #tower_layer_stack_name { + #layer_stack_attributes } - #tower_block_builder_impl + #layer_stack_impl } } -fn generate_tower_block_builder_attributes(context: &CodegenContext) -> TokenStream { - let service_name = &context.service_name; +fn generate_layer_stack_types_and_attributes( + context: &CodegenContext, +) -> (TokenStream, TokenStream) { let error_type = &context.error_type; - let mut stream = TokenStream::new(); + let mut type_aliases = TokenStream::new(); + let mut attributes = TokenStream::new(); for syn_method in &context.methods { - let attribute_name = quote::format_ident!("{}_layer", syn_method.name); + let service_name_upper_camel_case = syn_method.name.to_string().to_upper_camel_case(); + let type_alias_name = quote::format_ident!("{service_name_upper_camel_case}Layer"); + let attribute_name = quote::format_ident!("{}_layers", syn_method.name); let request_type = syn_method.request_type(false); let response_type = syn_method.response_type(context, false); + let type_alias = quote! { + type #type_alias_name = quickwit_common::tower::BoxLayer, #request_type, #response_type, #error_type>; + }; let attribute = quote! { - #[allow(clippy::type_complexity)] - #attribute_name: Option, #request_type, #response_type, #error_type>>, + #attribute_name: Vec<#type_alias_name>, }; - stream.extend(attribute); + type_aliases.extend(type_alias); + attributes.extend(attribute); } - stream + (type_aliases, attributes) } -fn generate_tower_block_builder_impl(context: &CodegenContext) -> TokenStream { +fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream { let service_name = &context.service_name; let client_name = &context.client_name; let mailbox_name = &context.mailbox_name; - let tower_block_name = &context.tower_block_name; - let tower_block_builder_name = &context.tower_block_builder_name; + let tower_svc_stack_name = &context.tower_svc_stack_name; + let tower_layer_stack_name = &context.tower_layer_stack_name; let error_type = &context.error_type; + let mut shared_layer_method_bounds = TokenStream::new(); let mut layer_method_bounds = TokenStream::new(); let mut layer_method_statements = TokenStream::new(); let mut layer_methods = TokenStream::new(); let mut svc_statements = TokenStream::new(); let mut svc_attribute_idents = Vec::with_capacity(context.methods.len()); - for (i, syn_method) in context.methods.iter().enumerate() { - let layer_attribute_name = quote::format_ident!("{}_layer", syn_method.name); + for syn_method in &context.methods { + let layer_attribute_name = quote::format_ident!("{}_layers", syn_method.name); + let layer_method_name = quote::format_ident!("stack_{}_layer", syn_method.name); let svc_attribute_name = quote::format_ident!("{}_svc", syn_method.name); let request_type = syn_method.request_type(false); let response_type = syn_method.response_type(context, false); + let shared_layer_method_bound = quote! { + L: tower::Layer> + Clone + Send + Sync + 'static, + >>::Service: tower::Service<#request_type, Response = #response_type, Error = #error_type> + Clone + Send + Sync + 'static, + <>>::Service as tower::Service<#request_type>>::Future: Send + 'static, + }; let layer_method_bound = quote! { + L: tower::Layer> + Send + Sync + 'static, L::Service: tower::Service<#request_type, Response = #response_type, Error = #error_type> + Clone + Send + Sync + 'static, >::Future: Send + 'static, }; - - let layer_method_statement = if i == context.methods.len() - 1 { - quote! { - self.#layer_attribute_name = Some(quickwit_common::tower::BoxLayer::new(layer)); - } - } else { - quote! { - self.#layer_attribute_name = Some(quickwit_common::tower::BoxLayer::new(layer.clone())); - } + let layer_method_statement = quote! { + self.#layer_attribute_name.push(quickwit_common::tower::BoxLayer::new(layer.clone())); }; - let layer_method = quote! { - pub fn #layer_attribute_name( + pub fn #layer_method_name( mut self, layer: L ) -> Self where - L: tower::Layer> + Send + Sync + 'static, #layer_method_bound { - self.#layer_attribute_name = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.#layer_attribute_name.push(quickwit_common::tower::BoxLayer::new(layer)); self } }; + shared_layer_method_bounds.extend(shared_layer_method_bound); layer_method_bounds.extend(layer_method_bound); layer_method_statements.extend(layer_method_statement); layer_methods.extend(layer_method); let svc_statement = quote! { - let #svc_attribute_name = if let Some(layer) = self.#layer_attribute_name { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; + let #svc_attribute_name = self.#layer_attribute_name.into_iter().rev().fold(quickwit_common::tower::BoxService::new(boxed_instance.clone()), |svc, layer| layer.layer(svc)); }; svc_statements.extend(svc_statement); @@ -909,11 +915,10 @@ fn generate_tower_block_builder_impl(context: &CodegenContext) -> TokenStream { } quote! { - impl #tower_block_builder_name { - pub fn shared_layer(mut self, layer: L) -> Self + impl #tower_layer_stack_name { + pub fn stack_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Clone + Send + Sync + 'static, - #layer_method_bounds + #shared_layer_method_bounds { #layer_method_statements self @@ -950,11 +955,11 @@ fn generate_tower_block_builder_impl(context: &CodegenContext) -> TokenStream { { #svc_statements - let tower_block = #tower_block_name { + let tower_svc_stack = #tower_svc_stack_name { inner: boxed_instance.clone(), #(#svc_attribute_idents),* }; - #client_name::new(tower_block) + #client_name::new(tower_svc_stack) } } } @@ -964,7 +969,7 @@ fn generate_tower_mailbox(context: &CodegenContext) -> TokenStream { let service_name = &context.service_name; let mailbox_name = &context.mailbox_name; let error_type = &context.error_type; - let additional_mailbox_methods = if context.generate_extra_service_methods { + let extra_mailbox_methods = if context.generate_extra_service_methods { quote! { async fn check_connectivity(&mut self) -> anyhow::Result<()> { if self.inner.is_disconnected() { @@ -1063,7 +1068,7 @@ fn generate_tower_mailbox(context: &CodegenContext) -> TokenStream { #mailbox_name: #(#mailbox_bounds)+*, { #mailbox_methods - #additional_mailbox_methods + #extra_mailbox_methods } } } @@ -1105,7 +1110,7 @@ fn generate_grpc_client_adapter(context: &CodegenContext) -> TokenStream { let grpc_client_name = &context.grpc_client_name; let grpc_client_adapter_name = &context.grpc_client_adapter_name; let grpc_server_adapter_methods = generate_grpc_client_adapter_methods(context); - let additional_grpc_server_adapter_methods = if context.generate_extra_service_methods { + let extra_grpc_server_adapter_methods = if context.generate_extra_service_methods { quote! { async fn check_connectivity(&mut self) -> anyhow::Result<()> { if self.connection_addrs_rx.borrow().len() == 0 { @@ -1153,7 +1158,7 @@ fn generate_grpc_client_adapter(context: &CodegenContext) -> TokenStream { T::Future: Send { #grpc_server_adapter_methods - #additional_grpc_server_adapter_methods + #extra_grpc_server_adapter_methods } } } diff --git a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs index 280a6c112a8..f414937ba50 100644 --- a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs +++ b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs @@ -253,8 +253,8 @@ impl IngestServiceClient { { IngestServiceClient::new(IngestServiceMailbox::new(mailbox)) } - pub fn tower() -> IngestServiceTowerBlockBuilder { - IngestServiceTowerBlockBuilder::default() + pub fn tower() -> IngestServiceTowerLayerStack { + IngestServiceTowerLayerStack::default() } #[cfg(any(test, feature = "testsuite"))] pub fn mock() -> MockIngestService { @@ -361,9 +361,9 @@ impl tower::Service for Box { Box::pin(fut) } } -/// A tower block is a set of towers. Each tower is stack of layers (middlewares) that are applied to a service. +/// A tower service stack is a set of tower services. #[derive(Debug)] -struct IngestServiceTowerBlock { +struct IngestServiceTowerServiceStack { inner: Box, ingest_svc: quickwit_common::tower::BoxService< IngestRequest, @@ -381,7 +381,7 @@ struct IngestServiceTowerBlock { crate::IngestServiceError, >, } -impl Clone for IngestServiceTowerBlock { +impl Clone for IngestServiceTowerServiceStack { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -392,7 +392,7 @@ impl Clone for IngestServiceTowerBlock { } } #[async_trait::async_trait] -impl IngestService for IngestServiceTowerBlock { +impl IngestService for IngestServiceTowerServiceStack { async fn ingest(&mut self, request: IngestRequest) -> crate::Result { self.ingest_svc.ready().await?.call(request).await } @@ -403,67 +403,135 @@ impl IngestService for IngestServiceTowerBlock { self.tail_svc.ready().await?.call(request).await } } -#[derive(Debug, Default)] -pub struct IngestServiceTowerBlockBuilder { - #[allow(clippy::type_complexity)] - ingest_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - IngestRequest, - IngestResponse, - crate::IngestServiceError, - >, +type IngestLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + IngestRequest, + IngestResponse, + crate::IngestServiceError, >, - #[allow(clippy::type_complexity)] - fetch_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - FetchRequest, - FetchResponse, - crate::IngestServiceError, - >, + IngestRequest, + IngestResponse, + crate::IngestServiceError, +>; +type FetchLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + FetchRequest, + FetchResponse, + crate::IngestServiceError, >, - #[allow(clippy::type_complexity)] - tail_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - TailRequest, - FetchResponse, - crate::IngestServiceError, - >, + FetchRequest, + FetchResponse, + crate::IngestServiceError, +>; +type TailLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + TailRequest, + FetchResponse, + crate::IngestServiceError, >, + TailRequest, + FetchResponse, + crate::IngestServiceError, +>; +#[derive(Debug, Default)] +pub struct IngestServiceTowerLayerStack { + ingest_layers: Vec, + fetch_layers: Vec, + tail_layers: Vec, } -impl IngestServiceTowerBlockBuilder { - pub fn shared_layer(mut self, layer: L) -> Self +impl IngestServiceTowerLayerStack { + pub fn stack_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Clone + Send + Sync + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + IngestRequest, + IngestResponse, + crate::IngestServiceError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< IngestRequest, Response = IngestResponse, Error = crate::IngestServiceError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + FetchRequest, + FetchResponse, + crate::IngestServiceError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< FetchRequest, Response = FetchResponse, Error = crate::IngestServiceError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + TailRequest, + FetchResponse, + crate::IngestServiceError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< TailRequest, Response = FetchResponse, Error = crate::IngestServiceError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, { - self.ingest_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone())); - self.fetch_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone())); - self.tail_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.ingest_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.fetch_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.tail_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } - pub fn ingest_layer(mut self, layer: L) -> Self + pub fn stack_ingest_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + IngestRequest, + IngestResponse, + crate::IngestServiceError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< IngestRequest, Response = IngestResponse, @@ -471,12 +539,18 @@ impl IngestServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.ingest_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.ingest_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn fetch_layer(mut self, layer: L) -> Self + pub fn stack_fetch_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + FetchRequest, + FetchResponse, + crate::IngestServiceError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< FetchRequest, Response = FetchResponse, @@ -484,12 +558,18 @@ impl IngestServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.fetch_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.fetch_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn tail_layer(mut self, layer: L) -> Self + pub fn stack_tail_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + TailRequest, + FetchResponse, + crate::IngestServiceError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< TailRequest, Response = FetchResponse, @@ -497,7 +577,7 @@ impl IngestServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.tail_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.tail_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn build(self, instance: T) -> IngestServiceClient @@ -535,28 +615,37 @@ impl IngestServiceTowerBlockBuilder { self, boxed_instance: Box, ) -> IngestServiceClient { - let ingest_svc = if let Some(layer) = self.ingest_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let fetch_svc = if let Some(layer) = self.fetch_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let tail_svc = if let Some(layer) = self.tail_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let tower_block = IngestServiceTowerBlock { + let ingest_svc = self + .ingest_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let fetch_svc = self + .fetch_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let tail_svc = self + .tail_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let tower_svc_stack = IngestServiceTowerServiceStack { inner: boxed_instance.clone(), ingest_svc, fetch_svc, tail_svc, }; - IngestServiceClient::new(tower_block) + IngestServiceClient::new(tower_svc_stack) } } #[derive(Debug, Clone)] diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 888162e0ed0..29b43407487 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -58,7 +58,7 @@ pub(crate) fn instrument_metastore( metastore_impl: impl MetastoreService, ) -> MetastoreServiceClient { MetastoreServiceClient::tower() - .shared_layer(METASTORE_METRICS_LAYER.clone()) + .stack_layer(METASTORE_METRICS_LAYER.clone()) .build(metastore_impl) } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 5587d11180e..1d79a93e8f9 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -208,8 +208,8 @@ impl ControlPlaneServiceClient { { ControlPlaneServiceClient::new(ControlPlaneServiceMailbox::new(mailbox)) } - pub fn tower() -> ControlPlaneServiceTowerBlockBuilder { - ControlPlaneServiceTowerBlockBuilder::default() + pub fn tower() -> ControlPlaneServiceTowerLayerStack { + ControlPlaneServiceTowerLayerStack::default() } #[cfg(any(test, feature = "testsuite"))] pub fn mock() -> MockControlPlaneService { @@ -428,9 +428,9 @@ impl tower::Service for Box, create_index_svc: quickwit_common::tower::BoxService< super::metastore::CreateIndexRequest, @@ -463,7 +463,7 @@ struct ControlPlaneServiceTowerBlock { crate::control_plane::ControlPlaneError, >, } -impl Clone for ControlPlaneServiceTowerBlock { +impl Clone for ControlPlaneServiceTowerServiceStack { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -477,7 +477,7 @@ impl Clone for ControlPlaneServiceTowerBlock { } } #[async_trait::async_trait] -impl ControlPlaneService for ControlPlaneServiceTowerBlock { +impl ControlPlaneService for ControlPlaneServiceTowerServiceStack { async fn create_index( &mut self, request: super::metastore::CreateIndexRequest, @@ -517,145 +517,264 @@ impl ControlPlaneService for ControlPlaneServiceTowerBlock { self.get_or_create_open_shards_svc.ready().await?.call(request).await } } -#[derive(Debug, Default)] -pub struct ControlPlaneServiceTowerBlockBuilder { - #[allow(clippy::type_complexity)] - create_index_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - super::metastore::CreateIndexRequest, - super::metastore::CreateIndexResponse, - crate::control_plane::ControlPlaneError, - >, +type CreateIndexLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + super::metastore::CreateIndexRequest, + super::metastore::CreateIndexResponse, + crate::control_plane::ControlPlaneError, >, - #[allow(clippy::type_complexity)] - delete_index_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - super::metastore::DeleteIndexRequest, - super::metastore::EmptyResponse, - crate::control_plane::ControlPlaneError, - >, + super::metastore::CreateIndexRequest, + super::metastore::CreateIndexResponse, + crate::control_plane::ControlPlaneError, +>; +type DeleteIndexLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + super::metastore::DeleteIndexRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, >, - #[allow(clippy::type_complexity)] - add_source_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - super::metastore::AddSourceRequest, - super::metastore::EmptyResponse, - crate::control_plane::ControlPlaneError, - >, + super::metastore::DeleteIndexRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, +>; +type AddSourceLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + super::metastore::AddSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, >, - #[allow(clippy::type_complexity)] - toggle_source_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - super::metastore::ToggleSourceRequest, - super::metastore::EmptyResponse, - crate::control_plane::ControlPlaneError, - >, + super::metastore::AddSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, +>; +type ToggleSourceLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + super::metastore::ToggleSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, >, - #[allow(clippy::type_complexity)] - delete_source_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - super::metastore::DeleteSourceRequest, - super::metastore::EmptyResponse, - crate::control_plane::ControlPlaneError, - >, + super::metastore::ToggleSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, +>; +type DeleteSourceLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + super::metastore::DeleteSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, >, - #[allow(clippy::type_complexity)] - get_or_create_open_shards_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - GetOrCreateOpenShardsRequest, - GetOrCreateOpenShardsResponse, - crate::control_plane::ControlPlaneError, - >, + super::metastore::DeleteSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, +>; +type GetOrCreateOpenShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + GetOrCreateOpenShardsRequest, + GetOrCreateOpenShardsResponse, + crate::control_plane::ControlPlaneError, >, + GetOrCreateOpenShardsRequest, + GetOrCreateOpenShardsResponse, + crate::control_plane::ControlPlaneError, +>; +#[derive(Debug, Default)] +pub struct ControlPlaneServiceTowerLayerStack { + create_index_layers: Vec, + delete_index_layers: Vec, + add_source_layers: Vec, + toggle_source_layers: Vec, + delete_source_layers: Vec, + get_or_create_open_shards_layers: Vec, } -impl ControlPlaneServiceTowerBlockBuilder { - pub fn shared_layer(mut self, layer: L) -> Self +impl ControlPlaneServiceTowerLayerStack { + pub fn stack_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Clone + Send + Sync + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::CreateIndexRequest, + super::metastore::CreateIndexResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< super::metastore::CreateIndexRequest, Response = super::metastore::CreateIndexResponse, Error = crate::control_plane::ControlPlaneError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< super::metastore::CreateIndexRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::DeleteIndexRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< super::metastore::DeleteIndexRequest, Response = super::metastore::EmptyResponse, Error = crate::control_plane::ControlPlaneError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< super::metastore::DeleteIndexRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::AddSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< super::metastore::AddSourceRequest, Response = super::metastore::EmptyResponse, Error = crate::control_plane::ControlPlaneError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< super::metastore::AddSourceRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::ToggleSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< super::metastore::ToggleSourceRequest, Response = super::metastore::EmptyResponse, Error = crate::control_plane::ControlPlaneError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< super::metastore::ToggleSourceRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::DeleteSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< super::metastore::DeleteSourceRequest, Response = super::metastore::EmptyResponse, Error = crate::control_plane::ControlPlaneError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< super::metastore::DeleteSourceRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + GetOrCreateOpenShardsRequest, + GetOrCreateOpenShardsResponse, + crate::control_plane::ControlPlaneError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< GetOrCreateOpenShardsRequest, Response = GetOrCreateOpenShardsResponse, Error = crate::control_plane::ControlPlaneError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< GetOrCreateOpenShardsRequest, >>::Future: Send + 'static, { - self - .create_index_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .delete_index_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .add_source_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .toggle_source_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .delete_source_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .get_or_create_open_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.create_index_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.delete_index_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.add_source_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.toggle_source_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.delete_source_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.get_or_create_open_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } - pub fn create_index_layer(mut self, layer: L) -> Self + pub fn stack_create_index_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::CreateIndexRequest, + super::metastore::CreateIndexResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< super::metastore::CreateIndexRequest, Response = super::metastore::CreateIndexResponse, @@ -665,12 +784,18 @@ impl ControlPlaneServiceTowerBlockBuilder { super::metastore::CreateIndexRequest, >>::Future: Send + 'static, { - self.create_index_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.create_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn delete_index_layer(mut self, layer: L) -> Self + pub fn stack_delete_index_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::DeleteIndexRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< super::metastore::DeleteIndexRequest, Response = super::metastore::EmptyResponse, @@ -680,12 +805,18 @@ impl ControlPlaneServiceTowerBlockBuilder { super::metastore::DeleteIndexRequest, >>::Future: Send + 'static, { - self.delete_index_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.delete_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn add_source_layer(mut self, layer: L) -> Self + pub fn stack_add_source_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::AddSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< super::metastore::AddSourceRequest, Response = super::metastore::EmptyResponse, @@ -695,12 +826,18 @@ impl ControlPlaneServiceTowerBlockBuilder { super::metastore::AddSourceRequest, >>::Future: Send + 'static, { - self.add_source_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.add_source_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn toggle_source_layer(mut self, layer: L) -> Self + pub fn stack_toggle_source_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::ToggleSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< super::metastore::ToggleSourceRequest, Response = super::metastore::EmptyResponse, @@ -710,12 +847,18 @@ impl ControlPlaneServiceTowerBlockBuilder { super::metastore::ToggleSourceRequest, >>::Future: Send + 'static, { - self.toggle_source_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.toggle_source_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn delete_source_layer(mut self, layer: L) -> Self + pub fn stack_delete_source_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + super::metastore::DeleteSourceRequest, + super::metastore::EmptyResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< super::metastore::DeleteSourceRequest, Response = super::metastore::EmptyResponse, @@ -725,12 +868,18 @@ impl ControlPlaneServiceTowerBlockBuilder { super::metastore::DeleteSourceRequest, >>::Future: Send + 'static, { - self.delete_source_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.delete_source_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn get_or_create_open_shards_layer(mut self, layer: L) -> Self + pub fn stack_get_or_create_open_shards_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + GetOrCreateOpenShardsRequest, + GetOrCreateOpenShardsResponse, + crate::control_plane::ControlPlaneError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< GetOrCreateOpenShardsRequest, Response = GetOrCreateOpenShardsResponse, @@ -740,10 +889,8 @@ impl ControlPlaneServiceTowerBlockBuilder { GetOrCreateOpenShardsRequest, >>::Future: Send + 'static, { - self - .get_or_create_open_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.get_or_create_open_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn build(self, instance: T) -> ControlPlaneServiceClient @@ -783,39 +930,55 @@ impl ControlPlaneServiceTowerBlockBuilder { self, boxed_instance: Box, ) -> ControlPlaneServiceClient { - let create_index_svc = if let Some(layer) = self.create_index_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let delete_index_svc = if let Some(layer) = self.delete_index_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let add_source_svc = if let Some(layer) = self.add_source_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let toggle_source_svc = if let Some(layer) = self.toggle_source_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let delete_source_svc = if let Some(layer) = self.delete_source_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let get_or_create_open_shards_svc = if let Some(layer) - = self.get_or_create_open_shards_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let tower_block = ControlPlaneServiceTowerBlock { + let create_index_svc = self + .create_index_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let delete_index_svc = self + .delete_index_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let add_source_svc = self + .add_source_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let toggle_source_svc = self + .toggle_source_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let delete_source_svc = self + .delete_source_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let get_or_create_open_shards_svc = self + .get_or_create_open_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let tower_svc_stack = ControlPlaneServiceTowerServiceStack { inner: boxed_instance.clone(), create_index_svc, delete_index_svc, @@ -824,7 +987,7 @@ impl ControlPlaneServiceTowerBlockBuilder { delete_source_svc, get_or_create_open_shards_svc, }; - ControlPlaneServiceClient::new(tower_block) + ControlPlaneServiceClient::new(tower_svc_stack) } } #[derive(Debug, Clone)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs index 93d095a2330..90925a09cea 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs @@ -108,8 +108,8 @@ impl IndexingServiceClient { { IndexingServiceClient::new(IndexingServiceMailbox::new(mailbox)) } - pub fn tower() -> IndexingServiceTowerBlockBuilder { - IndexingServiceTowerBlockBuilder::default() + pub fn tower() -> IndexingServiceTowerLayerStack { + IndexingServiceTowerLayerStack::default() } #[cfg(any(test, feature = "testsuite"))] pub fn mock() -> MockIndexingService { @@ -169,9 +169,9 @@ impl tower::Service for Box { Box::pin(fut) } } -/// A tower block is a set of towers. Each tower is stack of layers (middlewares) that are applied to a service. +/// A tower service stack is a set of tower services. #[derive(Debug)] -struct IndexingServiceTowerBlock { +struct IndexingServiceTowerServiceStack { inner: Box, apply_indexing_plan_svc: quickwit_common::tower::BoxService< ApplyIndexingPlanRequest, @@ -179,7 +179,7 @@ struct IndexingServiceTowerBlock { crate::indexing::IndexingError, >, } -impl Clone for IndexingServiceTowerBlock { +impl Clone for IndexingServiceTowerServiceStack { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -188,7 +188,7 @@ impl Clone for IndexingServiceTowerBlock { } } #[async_trait::async_trait] -impl IndexingService for IndexingServiceTowerBlock { +impl IndexingService for IndexingServiceTowerServiceStack { async fn apply_indexing_plan( &mut self, request: ApplyIndexingPlanRequest, @@ -196,38 +196,62 @@ impl IndexingService for IndexingServiceTowerBlock { self.apply_indexing_plan_svc.ready().await?.call(request).await } } -#[derive(Debug, Default)] -pub struct IndexingServiceTowerBlockBuilder { - #[allow(clippy::type_complexity)] - apply_indexing_plan_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - ApplyIndexingPlanRequest, - ApplyIndexingPlanResponse, - crate::indexing::IndexingError, - >, +type ApplyIndexingPlanLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ApplyIndexingPlanRequest, + ApplyIndexingPlanResponse, + crate::indexing::IndexingError, >, + ApplyIndexingPlanRequest, + ApplyIndexingPlanResponse, + crate::indexing::IndexingError, +>; +#[derive(Debug, Default)] +pub struct IndexingServiceTowerLayerStack { + apply_indexing_plan_layers: Vec, } -impl IndexingServiceTowerBlockBuilder { - pub fn shared_layer(mut self, layer: L) -> Self +impl IndexingServiceTowerLayerStack { + pub fn stack_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Clone + Send + Sync + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + ApplyIndexingPlanRequest, + ApplyIndexingPlanResponse, + crate::indexing::IndexingError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< ApplyIndexingPlanRequest, Response = ApplyIndexingPlanResponse, Error = crate::indexing::IndexingError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, { - self - .apply_indexing_plan_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.apply_indexing_plan_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } - pub fn apply_indexing_plan_layer(mut self, layer: L) -> Self + pub fn stack_apply_indexing_plan_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ApplyIndexingPlanRequest, + ApplyIndexingPlanResponse, + crate::indexing::IndexingError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< ApplyIndexingPlanRequest, Response = ApplyIndexingPlanResponse, @@ -235,10 +259,8 @@ impl IndexingServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self - .apply_indexing_plan_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.apply_indexing_plan_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn build(self, instance: T) -> IndexingServiceClient @@ -278,17 +300,19 @@ impl IndexingServiceTowerBlockBuilder { self, boxed_instance: Box, ) -> IndexingServiceClient { - let apply_indexing_plan_svc = if let Some(layer) = self.apply_indexing_plan_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let tower_block = IndexingServiceTowerBlock { + let apply_indexing_plan_svc = self + .apply_indexing_plan_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let tower_svc_stack = IndexingServiceTowerServiceStack { inner: boxed_instance.clone(), apply_indexing_plan_svc, }; - IndexingServiceClient::new(tower_block) + IndexingServiceClient::new(tower_svc_stack) } } #[derive(Debug, Clone)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index aba68ec4d95..bd1de0d36ca 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -652,8 +652,8 @@ impl IngesterServiceClient { { IngesterServiceClient::new(IngesterServiceMailbox::new(mailbox)) } - pub fn tower() -> IngesterServiceTowerBlockBuilder { - IngesterServiceTowerBlockBuilder::default() + pub fn tower() -> IngesterServiceTowerLayerStack { + IngesterServiceTowerLayerStack::default() } #[cfg(any(test, feature = "testsuite"))] pub fn mock() -> MockIngesterService { @@ -973,9 +973,9 @@ impl tower::Service for Box { Box::pin(fut) } } -/// A tower block is a set of towers. Each tower is stack of layers (middlewares) that are applied to a service. +/// A tower service stack is a set of tower services. #[derive(Debug)] -struct IngesterServiceTowerBlock { +struct IngesterServiceTowerServiceStack { inner: Box, persist_svc: quickwit_common::tower::BoxService< PersistRequest, @@ -1028,7 +1028,7 @@ struct IngesterServiceTowerBlock { crate::ingest::IngestV2Error, >, } -impl Clone for IngesterServiceTowerBlock { +impl Clone for IngesterServiceTowerServiceStack { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -1046,7 +1046,7 @@ impl Clone for IngesterServiceTowerBlock { } } #[async_trait::async_trait] -impl IngesterService for IngesterServiceTowerBlock { +impl IngesterService for IngesterServiceTowerServiceStack { async fn persist( &mut self, request: PersistRequest, @@ -1108,204 +1108,406 @@ impl IngesterService for IngesterServiceTowerBlock { self.decommission_svc.ready().await?.call(request).await } } -#[derive(Debug, Default)] -pub struct IngesterServiceTowerBlockBuilder { - #[allow(clippy::type_complexity)] - persist_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - PersistRequest, - PersistResponse, - crate::ingest::IngestV2Error, - >, +type PersistLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + PersistRequest, + PersistResponse, + crate::ingest::IngestV2Error, >, - #[allow(clippy::type_complexity)] - open_replication_stream_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - quickwit_common::ServiceStream, - IngesterServiceStream, - crate::ingest::IngestV2Error, - >, + PersistRequest, + PersistResponse, + crate::ingest::IngestV2Error, +>; +type OpenReplicationStreamLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + quickwit_common::ServiceStream, + IngesterServiceStream, + crate::ingest::IngestV2Error, >, - #[allow(clippy::type_complexity)] - open_fetch_stream_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - OpenFetchStreamRequest, - IngesterServiceStream, - crate::ingest::IngestV2Error, - >, + quickwit_common::ServiceStream, + IngesterServiceStream, + crate::ingest::IngestV2Error, +>; +type OpenFetchStreamLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + OpenFetchStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, >, - #[allow(clippy::type_complexity)] - open_observation_stream_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - OpenObservationStreamRequest, - IngesterServiceStream, - crate::ingest::IngestV2Error, - >, + OpenFetchStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, +>; +type OpenObservationStreamLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + OpenObservationStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, >, - #[allow(clippy::type_complexity)] - init_shards_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - InitShardsRequest, - InitShardsResponse, - crate::ingest::IngestV2Error, - >, + OpenObservationStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, +>; +type InitShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + InitShardsRequest, + InitShardsResponse, + crate::ingest::IngestV2Error, >, - #[allow(clippy::type_complexity)] - retain_shards_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - RetainShardsRequest, - RetainShardsResponse, - crate::ingest::IngestV2Error, - >, + InitShardsRequest, + InitShardsResponse, + crate::ingest::IngestV2Error, +>; +type RetainShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + RetainShardsRequest, + RetainShardsResponse, + crate::ingest::IngestV2Error, >, - #[allow(clippy::type_complexity)] - truncate_shards_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - TruncateShardsRequest, - TruncateShardsResponse, - crate::ingest::IngestV2Error, - >, + RetainShardsRequest, + RetainShardsResponse, + crate::ingest::IngestV2Error, +>; +type TruncateShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + TruncateShardsRequest, + TruncateShardsResponse, + crate::ingest::IngestV2Error, >, - #[allow(clippy::type_complexity)] - close_shards_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - CloseShardsRequest, - CloseShardsResponse, - crate::ingest::IngestV2Error, - >, + TruncateShardsRequest, + TruncateShardsResponse, + crate::ingest::IngestV2Error, +>; +type CloseShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + CloseShardsRequest, + CloseShardsResponse, + crate::ingest::IngestV2Error, >, - #[allow(clippy::type_complexity)] - ping_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - PingRequest, - PingResponse, - crate::ingest::IngestV2Error, - >, + CloseShardsRequest, + CloseShardsResponse, + crate::ingest::IngestV2Error, +>; +type PingLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + PingRequest, + PingResponse, + crate::ingest::IngestV2Error, >, - #[allow(clippy::type_complexity)] - decommission_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - DecommissionRequest, - DecommissionResponse, - crate::ingest::IngestV2Error, - >, + PingRequest, + PingResponse, + crate::ingest::IngestV2Error, +>; +type DecommissionLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + DecommissionRequest, + DecommissionResponse, + crate::ingest::IngestV2Error, >, -} -impl IngesterServiceTowerBlockBuilder { - pub fn shared_layer(mut self, layer: L) -> Self + DecommissionRequest, + DecommissionResponse, + crate::ingest::IngestV2Error, +>; +#[derive(Debug, Default)] +pub struct IngesterServiceTowerLayerStack { + persist_layers: Vec, + open_replication_stream_layers: Vec, + open_fetch_stream_layers: Vec, + open_observation_stream_layers: Vec, + init_shards_layers: Vec, + retain_shards_layers: Vec, + truncate_shards_layers: Vec, + close_shards_layers: Vec, + ping_layers: Vec, + decommission_layers: Vec, +} +impl IngesterServiceTowerLayerStack { + pub fn stack_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Clone + Send + Sync + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + PersistRequest, + PersistResponse, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< PersistRequest, Response = PersistResponse, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + quickwit_common::ServiceStream, + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + >>::Service: tower::Service< quickwit_common::ServiceStream, Response = IngesterServiceStream, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - , + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + >>::Service as tower::Service< quickwit_common::ServiceStream, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + OpenFetchStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + crate::ingest::IngestV2Error, + >, + >>::Service: tower::Service< OpenFetchStreamRequest, Response = IngesterServiceStream, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + crate::ingest::IngestV2Error, + >, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + OpenObservationStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + crate::ingest::IngestV2Error, + >, + >>::Service: tower::Service< OpenObservationStreamRequest, Response = IngesterServiceStream, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - , + crate::ingest::IngestV2Error, + >, + >>::Service as tower::Service< OpenObservationStreamRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + InitShardsRequest, + InitShardsResponse, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< InitShardsRequest, Response = InitShardsResponse, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + RetainShardsRequest, + RetainShardsResponse, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< RetainShardsRequest, Response = RetainShardsResponse, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + TruncateShardsRequest, + TruncateShardsResponse, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< TruncateShardsRequest, Response = TruncateShardsResponse, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + CloseShardsRequest, + CloseShardsResponse, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< CloseShardsRequest, Response = CloseShardsResponse, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + PingRequest, + PingResponse, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< PingRequest, Response = PingResponse, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DecommissionRequest, + DecommissionResponse, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< DecommissionRequest, Response = DecommissionResponse, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, { - self.persist_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone())); - self - .open_replication_stream_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .open_fetch_stream_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .open_observation_stream_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .init_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .retain_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .truncate_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .close_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self.ping_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone())); - self.decommission_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.persist_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.open_replication_stream_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.open_fetch_stream_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.open_observation_stream_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.init_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.retain_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.truncate_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.close_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.ping_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.decommission_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } - pub fn persist_layer(mut self, layer: L) -> Self + pub fn stack_persist_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + PersistRequest, + PersistResponse, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< PersistRequest, Response = PersistResponse, @@ -1313,12 +1515,18 @@ impl IngesterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.persist_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.persist_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn open_replication_stream_layer(mut self, layer: L) -> Self + pub fn stack_open_replication_stream_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + quickwit_common::ServiceStream, + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< quickwit_common::ServiceStream, Response = IngesterServiceStream, @@ -1328,15 +1536,19 @@ impl IngesterServiceTowerBlockBuilder { quickwit_common::ServiceStream, >>::Future: Send + 'static, { - self - .open_replication_stream_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.open_replication_stream_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn open_fetch_stream_layer(mut self, layer: L) -> Self + pub fn stack_open_fetch_stream_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + OpenFetchStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< OpenFetchStreamRequest, Response = IngesterServiceStream, @@ -1344,15 +1556,18 @@ impl IngesterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self - .open_fetch_stream_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.open_fetch_stream_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn open_observation_stream_layer(mut self, layer: L) -> Self + pub fn stack_open_observation_stream_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + OpenObservationStreamRequest, + IngesterServiceStream, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< OpenObservationStreamRequest, Response = IngesterServiceStream, @@ -1362,15 +1577,19 @@ impl IngesterServiceTowerBlockBuilder { OpenObservationStreamRequest, >>::Future: Send + 'static, { - self - .open_observation_stream_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.open_observation_stream_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn init_shards_layer(mut self, layer: L) -> Self + pub fn stack_init_shards_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + InitShardsRequest, + InitShardsResponse, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< InitShardsRequest, Response = InitShardsResponse, @@ -1378,12 +1597,18 @@ impl IngesterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.init_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.init_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn retain_shards_layer(mut self, layer: L) -> Self + pub fn stack_retain_shards_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + RetainShardsRequest, + RetainShardsResponse, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< RetainShardsRequest, Response = RetainShardsResponse, @@ -1391,12 +1616,18 @@ impl IngesterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.retain_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.retain_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn truncate_shards_layer(mut self, layer: L) -> Self + pub fn stack_truncate_shards_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + TruncateShardsRequest, + TruncateShardsResponse, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< TruncateShardsRequest, Response = TruncateShardsResponse, @@ -1404,12 +1635,18 @@ impl IngesterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.truncate_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.truncate_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn close_shards_layer(mut self, layer: L) -> Self + pub fn stack_close_shards_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + CloseShardsRequest, + CloseShardsResponse, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< CloseShardsRequest, Response = CloseShardsResponse, @@ -1417,12 +1654,18 @@ impl IngesterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.close_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.close_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn ping_layer(mut self, layer: L) -> Self + pub fn stack_ping_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + PingRequest, + PingResponse, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< PingRequest, Response = PingResponse, @@ -1430,12 +1673,18 @@ impl IngesterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.ping_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.ping_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn decommission_layer(mut self, layer: L) -> Self + pub fn stack_decommission_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DecommissionRequest, + DecommissionResponse, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< DecommissionRequest, Response = DecommissionResponse, @@ -1443,7 +1692,7 @@ impl IngesterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.decommission_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.decommission_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn build(self, instance: T) -> IngesterServiceClient @@ -1483,61 +1732,87 @@ impl IngesterServiceTowerBlockBuilder { self, boxed_instance: Box, ) -> IngesterServiceClient { - let persist_svc = if let Some(layer) = self.persist_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let open_replication_stream_svc = if let Some(layer) - = self.open_replication_stream_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let open_fetch_stream_svc = if let Some(layer) = self.open_fetch_stream_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let open_observation_stream_svc = if let Some(layer) - = self.open_observation_stream_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let init_shards_svc = if let Some(layer) = self.init_shards_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let retain_shards_svc = if let Some(layer) = self.retain_shards_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let truncate_shards_svc = if let Some(layer) = self.truncate_shards_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let close_shards_svc = if let Some(layer) = self.close_shards_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let ping_svc = if let Some(layer) = self.ping_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let decommission_svc = if let Some(layer) = self.decommission_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let tower_block = IngesterServiceTowerBlock { + let persist_svc = self + .persist_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let open_replication_stream_svc = self + .open_replication_stream_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let open_fetch_stream_svc = self + .open_fetch_stream_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let open_observation_stream_svc = self + .open_observation_stream_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let init_shards_svc = self + .init_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let retain_shards_svc = self + .retain_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let truncate_shards_svc = self + .truncate_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let close_shards_svc = self + .close_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let ping_svc = self + .ping_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let decommission_svc = self + .decommission_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let tower_svc_stack = IngesterServiceTowerServiceStack { inner: boxed_instance.clone(), persist_svc, open_replication_stream_svc, @@ -1550,7 +1825,7 @@ impl IngesterServiceTowerBlockBuilder { ping_svc, decommission_svc, }; - IngesterServiceClient::new(tower_block) + IngesterServiceClient::new(tower_svc_stack) } } #[derive(Debug, Clone)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs index 3dd264ad14d..92781b1ed8c 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs @@ -195,8 +195,8 @@ impl IngestRouterServiceClient { { IngestRouterServiceClient::new(IngestRouterServiceMailbox::new(mailbox)) } - pub fn tower() -> IngestRouterServiceTowerBlockBuilder { - IngestRouterServiceTowerBlockBuilder::default() + pub fn tower() -> IngestRouterServiceTowerLayerStack { + IngestRouterServiceTowerLayerStack::default() } #[cfg(any(test, feature = "testsuite"))] pub fn mock() -> MockIngestRouterService { @@ -256,9 +256,9 @@ impl tower::Service for Box { Box::pin(fut) } } -/// A tower block is a set of towers. Each tower is stack of layers (middlewares) that are applied to a service. +/// A tower service stack is a set of tower services. #[derive(Debug)] -struct IngestRouterServiceTowerBlock { +struct IngestRouterServiceTowerServiceStack { inner: Box, ingest_svc: quickwit_common::tower::BoxService< IngestRequestV2, @@ -266,7 +266,7 @@ struct IngestRouterServiceTowerBlock { crate::ingest::IngestV2Error, >, } -impl Clone for IngestRouterServiceTowerBlock { +impl Clone for IngestRouterServiceTowerServiceStack { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -275,7 +275,7 @@ impl Clone for IngestRouterServiceTowerBlock { } } #[async_trait::async_trait] -impl IngestRouterService for IngestRouterServiceTowerBlock { +impl IngestRouterService for IngestRouterServiceTowerServiceStack { async fn ingest( &mut self, request: IngestRequestV2, @@ -283,35 +283,61 @@ impl IngestRouterService for IngestRouterServiceTowerBlock { self.ingest_svc.ready().await?.call(request).await } } -#[derive(Debug, Default)] -pub struct IngestRouterServiceTowerBlockBuilder { - #[allow(clippy::type_complexity)] - ingest_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - IngestRequestV2, - IngestResponseV2, - crate::ingest::IngestV2Error, - >, +type IngestLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + IngestRequestV2, + IngestResponseV2, + crate::ingest::IngestV2Error, >, + IngestRequestV2, + IngestResponseV2, + crate::ingest::IngestV2Error, +>; +#[derive(Debug, Default)] +pub struct IngestRouterServiceTowerLayerStack { + ingest_layers: Vec, } -impl IngestRouterServiceTowerBlockBuilder { - pub fn shared_layer(mut self, layer: L) -> Self +impl IngestRouterServiceTowerLayerStack { + pub fn stack_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Clone + Send + Sync + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + IngestRequestV2, + IngestResponseV2, + crate::ingest::IngestV2Error, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< IngestRequestV2, Response = IngestResponseV2, Error = crate::ingest::IngestV2Error, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, { - self.ingest_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.ingest_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } - pub fn ingest_layer(mut self, layer: L) -> Self + pub fn stack_ingest_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + IngestRequestV2, + IngestResponseV2, + crate::ingest::IngestV2Error, + >, + > + Send + Sync + 'static, L::Service: tower::Service< IngestRequestV2, Response = IngestResponseV2, @@ -319,7 +345,7 @@ impl IngestRouterServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.ingest_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.ingest_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn build(self, instance: T) -> IngestRouterServiceClient @@ -359,16 +385,19 @@ impl IngestRouterServiceTowerBlockBuilder { self, boxed_instance: Box, ) -> IngestRouterServiceClient { - let ingest_svc = if let Some(layer) = self.ingest_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let tower_block = IngestRouterServiceTowerBlock { + let ingest_svc = self + .ingest_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let tower_svc_stack = IngestRouterServiceTowerServiceStack { inner: boxed_instance.clone(), ingest_svc, }; - IngestRouterServiceClient::new(tower_block) + IngestRouterServiceClient::new(tower_svc_stack) } } #[derive(Debug, Clone)] diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 84eb8f9c24a..9ccabf2e1b6 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -770,8 +770,8 @@ impl MetastoreServiceClient { { MetastoreServiceClient::new(MetastoreServiceMailbox::new(mailbox)) } - pub fn tower() -> MetastoreServiceTowerBlockBuilder { - MetastoreServiceTowerBlockBuilder::default() + pub fn tower() -> MetastoreServiceTowerLayerStack { + MetastoreServiceTowerLayerStack::default() } #[cfg(any(test, feature = "testsuite"))] pub fn mock() -> MockMetastoreService { @@ -1435,9 +1435,9 @@ impl tower::Service for Box { Box::pin(fut) } } -/// A tower block is a set of towers. Each tower is stack of layers (middlewares) that are applied to a service. +/// A tower service stack is a set of tower services. #[derive(Debug)] -struct MetastoreServiceTowerBlock { +struct MetastoreServiceTowerServiceStack { inner: Box, create_index_svc: quickwit_common::tower::BoxService< CreateIndexRequest, @@ -1550,7 +1550,7 @@ struct MetastoreServiceTowerBlock { crate::metastore::MetastoreError, >, } -impl Clone for MetastoreServiceTowerBlock { +impl Clone for MetastoreServiceTowerServiceStack { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -1582,7 +1582,7 @@ impl Clone for MetastoreServiceTowerBlock { } } #[async_trait::async_trait] -impl MetastoreService for MetastoreServiceTowerBlock { +impl MetastoreService for MetastoreServiceTowerServiceStack { async fn create_index( &mut self, request: CreateIndexRequest, @@ -1722,442 +1722,868 @@ impl MetastoreService for MetastoreServiceTowerBlock { self.inner.endpoints() } } -#[derive(Debug, Default)] -pub struct MetastoreServiceTowerBlockBuilder { - #[allow(clippy::type_complexity)] - create_index_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - CreateIndexRequest, - CreateIndexResponse, - crate::metastore::MetastoreError, - >, +type CreateIndexLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + CreateIndexRequest, + CreateIndexResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - index_metadata_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - IndexMetadataRequest, - IndexMetadataResponse, - crate::metastore::MetastoreError, - >, + CreateIndexRequest, + CreateIndexResponse, + crate::metastore::MetastoreError, +>; +type IndexMetadataLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + IndexMetadataRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - list_indexes_metadata_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - ListIndexesMetadataRequest, - ListIndexesMetadataResponse, - crate::metastore::MetastoreError, - >, + IndexMetadataRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, +>; +type ListIndexesMetadataLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ListIndexesMetadataRequest, + ListIndexesMetadataResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - delete_index_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - DeleteIndexRequest, - EmptyResponse, - crate::metastore::MetastoreError, - >, + ListIndexesMetadataRequest, + ListIndexesMetadataResponse, + crate::metastore::MetastoreError, +>; +type DeleteIndexLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + DeleteIndexRequest, + EmptyResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - list_splits_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - ListSplitsRequest, - MetastoreServiceStream, - crate::metastore::MetastoreError, - >, + DeleteIndexRequest, + EmptyResponse, + crate::metastore::MetastoreError, +>; +type ListSplitsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ListSplitsRequest, + MetastoreServiceStream, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - stage_splits_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - StageSplitsRequest, - EmptyResponse, - crate::metastore::MetastoreError, - >, + ListSplitsRequest, + MetastoreServiceStream, + crate::metastore::MetastoreError, +>; +type StageSplitsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + StageSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - publish_splits_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - PublishSplitsRequest, - EmptyResponse, - crate::metastore::MetastoreError, - >, + StageSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, +>; +type PublishSplitsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + PublishSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - mark_splits_for_deletion_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - MarkSplitsForDeletionRequest, - EmptyResponse, - crate::metastore::MetastoreError, - >, + PublishSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, +>; +type MarkSplitsForDeletionLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + MarkSplitsForDeletionRequest, + EmptyResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - delete_splits_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - DeleteSplitsRequest, - EmptyResponse, - crate::metastore::MetastoreError, - >, + MarkSplitsForDeletionRequest, + EmptyResponse, + crate::metastore::MetastoreError, +>; +type DeleteSplitsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + DeleteSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - add_source_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - AddSourceRequest, - EmptyResponse, - crate::metastore::MetastoreError, - >, + DeleteSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, +>; +type AddSourceLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + AddSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - toggle_source_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - ToggleSourceRequest, - EmptyResponse, - crate::metastore::MetastoreError, - >, + AddSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, +>; +type ToggleSourceLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ToggleSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - delete_source_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - DeleteSourceRequest, - EmptyResponse, - crate::metastore::MetastoreError, - >, + ToggleSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, +>; +type DeleteSourceLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + DeleteSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - reset_source_checkpoint_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - ResetSourceCheckpointRequest, - EmptyResponse, - crate::metastore::MetastoreError, - >, + DeleteSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, +>; +type ResetSourceCheckpointLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ResetSourceCheckpointRequest, + EmptyResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - last_delete_opstamp_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - LastDeleteOpstampRequest, - LastDeleteOpstampResponse, - crate::metastore::MetastoreError, - >, + ResetSourceCheckpointRequest, + EmptyResponse, + crate::metastore::MetastoreError, +>; +type LastDeleteOpstampLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + LastDeleteOpstampRequest, + LastDeleteOpstampResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - create_delete_task_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - DeleteQuery, - DeleteTask, - crate::metastore::MetastoreError, - >, + LastDeleteOpstampRequest, + LastDeleteOpstampResponse, + crate::metastore::MetastoreError, +>; +type CreateDeleteTaskLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + DeleteQuery, + DeleteTask, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - update_splits_delete_opstamp_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - UpdateSplitsDeleteOpstampRequest, - UpdateSplitsDeleteOpstampResponse, - crate::metastore::MetastoreError, - >, + DeleteQuery, + DeleteTask, + crate::metastore::MetastoreError, +>; +type UpdateSplitsDeleteOpstampLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - list_delete_tasks_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - ListDeleteTasksRequest, - ListDeleteTasksResponse, - crate::metastore::MetastoreError, - >, + UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, + crate::metastore::MetastoreError, +>; +type ListDeleteTasksLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ListDeleteTasksRequest, + ListDeleteTasksResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - list_stale_splits_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - ListStaleSplitsRequest, - ListSplitsResponse, - crate::metastore::MetastoreError, - >, + ListDeleteTasksRequest, + ListDeleteTasksResponse, + crate::metastore::MetastoreError, +>; +type ListStaleSplitsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ListStaleSplitsRequest, + ListSplitsResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - open_shards_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - OpenShardsRequest, - OpenShardsResponse, - crate::metastore::MetastoreError, - >, + ListStaleSplitsRequest, + ListSplitsResponse, + crate::metastore::MetastoreError, +>; +type OpenShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + OpenShardsRequest, + OpenShardsResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - acquire_shards_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - AcquireShardsRequest, - AcquireShardsResponse, - crate::metastore::MetastoreError, - >, + OpenShardsRequest, + OpenShardsResponse, + crate::metastore::MetastoreError, +>; +type AcquireShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + AcquireShardsRequest, + AcquireShardsResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - delete_shards_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - DeleteShardsRequest, - DeleteShardsResponse, - crate::metastore::MetastoreError, - >, + AcquireShardsRequest, + AcquireShardsResponse, + crate::metastore::MetastoreError, +>; +type DeleteShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + DeleteShardsRequest, + DeleteShardsResponse, + crate::metastore::MetastoreError, >, - #[allow(clippy::type_complexity)] - list_shards_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - ListShardsRequest, - ListShardsResponse, - crate::metastore::MetastoreError, - >, + DeleteShardsRequest, + DeleteShardsResponse, + crate::metastore::MetastoreError, +>; +type ListShardsLayer = quickwit_common::tower::BoxLayer< + quickwit_common::tower::BoxService< + ListShardsRequest, + ListShardsResponse, + crate::metastore::MetastoreError, >, + ListShardsRequest, + ListShardsResponse, + crate::metastore::MetastoreError, +>; +#[derive(Debug, Default)] +pub struct MetastoreServiceTowerLayerStack { + create_index_layers: Vec, + index_metadata_layers: Vec, + list_indexes_metadata_layers: Vec, + delete_index_layers: Vec, + list_splits_layers: Vec, + stage_splits_layers: Vec, + publish_splits_layers: Vec, + mark_splits_for_deletion_layers: Vec, + delete_splits_layers: Vec, + add_source_layers: Vec, + toggle_source_layers: Vec, + delete_source_layers: Vec, + reset_source_checkpoint_layers: Vec, + last_delete_opstamp_layers: Vec, + create_delete_task_layers: Vec, + update_splits_delete_opstamp_layers: Vec, + list_delete_tasks_layers: Vec, + list_stale_splits_layers: Vec, + open_shards_layers: Vec, + acquire_shards_layers: Vec, + delete_shards_layers: Vec, + list_shards_layers: Vec, } -impl MetastoreServiceTowerBlockBuilder { - pub fn shared_layer(mut self, layer: L) -> Self +impl MetastoreServiceTowerLayerStack { + pub fn stack_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Clone + Send + Sync + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + CreateIndexRequest, + CreateIndexResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< CreateIndexRequest, Response = CreateIndexResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + IndexMetadataRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< IndexMetadataRequest, Response = IndexMetadataResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListIndexesMetadataRequest, + ListIndexesMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< ListIndexesMetadataRequest, Response = ListIndexesMetadataResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< ListIndexesMetadataRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteIndexRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< DeleteIndexRequest, Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListSplitsRequest, + MetastoreServiceStream, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + crate::metastore::MetastoreError, + >, + >>::Service: tower::Service< ListSplitsRequest, Response = MetastoreServiceStream, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + crate::metastore::MetastoreError, + >, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + StageSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< StageSplitsRequest, Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + PublishSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< PublishSplitsRequest, Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + MarkSplitsForDeletionRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< MarkSplitsForDeletionRequest, Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< MarkSplitsForDeletionRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< DeleteSplitsRequest, Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + AddSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< AddSourceRequest, Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ToggleSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< ToggleSourceRequest, Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< DeleteSourceRequest, Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ResetSourceCheckpointRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< ResetSourceCheckpointRequest, Response = EmptyResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< ResetSourceCheckpointRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + LastDeleteOpstampRequest, + LastDeleteOpstampResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< LastDeleteOpstampRequest, Response = LastDeleteOpstampResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteQuery, + DeleteTask, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< DeleteQuery, Response = DeleteTask, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< UpdateSplitsDeleteOpstampRequest, Response = UpdateSplitsDeleteOpstampResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - , + >>::Service as tower::Service< UpdateSplitsDeleteOpstampRequest, >>::Future: Send + 'static, - L::Service: tower::Service< + L: tower::Layer< + quickwit_common::tower::BoxService< + ListDeleteTasksRequest, + ListDeleteTasksResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< ListDeleteTasksRequest, Response = ListDeleteTasksResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListStaleSplitsRequest, + ListSplitsResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< ListStaleSplitsRequest, Response = ListSplitsResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + OpenShardsRequest, + OpenShardsResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< OpenShardsRequest, Response = OpenShardsResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + AcquireShardsRequest, + AcquireShardsResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< AcquireShardsRequest, Response = AcquireShardsResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteShardsRequest, + DeleteShardsResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< DeleteShardsRequest, Response = DeleteShardsResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - L::Service: tower::Service< + <, + >>::Service as tower::Service>::Future: Send + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListShardsRequest, + ListShardsResponse, + crate::metastore::MetastoreError, + >, + > + Clone + Send + Sync + 'static, + , + >>::Service: tower::Service< ListShardsRequest, Response = ListShardsResponse, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, + <, + >>::Service as tower::Service>::Future: Send + 'static, { - self - .create_index_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .index_metadata_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .list_indexes_metadata_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .delete_index_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .list_splits_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .stage_splits_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .publish_splits_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .mark_splits_for_deletion_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .delete_splits_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .add_source_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .toggle_source_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .delete_source_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .reset_source_checkpoint_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .last_delete_opstamp_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .create_delete_task_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .update_splits_delete_opstamp_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .list_delete_tasks_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .list_stale_splits_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .open_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .acquire_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .delete_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self.list_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.create_index_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.index_metadata_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.list_indexes_metadata_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.delete_index_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.list_splits_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.stage_splits_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.publish_splits_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.mark_splits_for_deletion_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.delete_splits_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.add_source_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.toggle_source_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.delete_source_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.reset_source_checkpoint_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.last_delete_opstamp_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.create_delete_task_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.update_splits_delete_opstamp_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.list_delete_tasks_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.list_stale_splits_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.open_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.acquire_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.delete_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); + self.list_shards_layers + .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self } - pub fn create_index_layer(mut self, layer: L) -> Self + pub fn stack_create_index_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + CreateIndexRequest, + CreateIndexResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< CreateIndexRequest, Response = CreateIndexResponse, @@ -2165,12 +2591,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.create_index_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.create_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn index_metadata_layer(mut self, layer: L) -> Self + pub fn stack_index_metadata_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + IndexMetadataRequest, + IndexMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< IndexMetadataRequest, Response = IndexMetadataResponse, @@ -2178,12 +2610,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.index_metadata_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.index_metadata_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn list_indexes_metadata_layer(mut self, layer: L) -> Self + pub fn stack_list_indexes_metadata_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListIndexesMetadataRequest, + ListIndexesMetadataResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< ListIndexesMetadataRequest, Response = ListIndexesMetadataResponse, @@ -2193,15 +2631,19 @@ impl MetastoreServiceTowerBlockBuilder { ListIndexesMetadataRequest, >>::Future: Send + 'static, { - self - .list_indexes_metadata_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.list_indexes_metadata_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn delete_index_layer(mut self, layer: L) -> Self + pub fn stack_delete_index_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteIndexRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< DeleteIndexRequest, Response = EmptyResponse, @@ -2209,12 +2651,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.delete_index_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.delete_index_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn list_splits_layer(mut self, layer: L) -> Self + pub fn stack_list_splits_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListSplitsRequest, + MetastoreServiceStream, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< ListSplitsRequest, Response = MetastoreServiceStream, @@ -2222,12 +2670,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.list_splits_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.list_splits_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn stage_splits_layer(mut self, layer: L) -> Self + pub fn stack_stage_splits_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + StageSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< StageSplitsRequest, Response = EmptyResponse, @@ -2235,12 +2689,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.stage_splits_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.stage_splits_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn publish_splits_layer(mut self, layer: L) -> Self + pub fn stack_publish_splits_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + PublishSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< PublishSplitsRequest, Response = EmptyResponse, @@ -2248,12 +2708,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.publish_splits_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.publish_splits_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn mark_splits_for_deletion_layer(mut self, layer: L) -> Self + pub fn stack_mark_splits_for_deletion_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + MarkSplitsForDeletionRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< MarkSplitsForDeletionRequest, Response = EmptyResponse, @@ -2263,15 +2729,19 @@ impl MetastoreServiceTowerBlockBuilder { MarkSplitsForDeletionRequest, >>::Future: Send + 'static, { - self - .mark_splits_for_deletion_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.mark_splits_for_deletion_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn delete_splits_layer(mut self, layer: L) -> Self + pub fn stack_delete_splits_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteSplitsRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< DeleteSplitsRequest, Response = EmptyResponse, @@ -2279,12 +2749,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.delete_splits_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.delete_splits_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn add_source_layer(mut self, layer: L) -> Self + pub fn stack_add_source_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + AddSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< AddSourceRequest, Response = EmptyResponse, @@ -2292,12 +2768,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.add_source_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.add_source_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn toggle_source_layer(mut self, layer: L) -> Self + pub fn stack_toggle_source_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ToggleSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< ToggleSourceRequest, Response = EmptyResponse, @@ -2305,12 +2787,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.toggle_source_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.toggle_source_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn delete_source_layer(mut self, layer: L) -> Self + pub fn stack_delete_source_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteSourceRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< DeleteSourceRequest, Response = EmptyResponse, @@ -2318,12 +2806,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.delete_source_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.delete_source_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn reset_source_checkpoint_layer(mut self, layer: L) -> Self + pub fn stack_reset_source_checkpoint_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ResetSourceCheckpointRequest, + EmptyResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< ResetSourceCheckpointRequest, Response = EmptyResponse, @@ -2333,15 +2827,19 @@ impl MetastoreServiceTowerBlockBuilder { ResetSourceCheckpointRequest, >>::Future: Send + 'static, { - self - .reset_source_checkpoint_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.reset_source_checkpoint_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn last_delete_opstamp_layer(mut self, layer: L) -> Self + pub fn stack_last_delete_opstamp_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + LastDeleteOpstampRequest, + LastDeleteOpstampResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< LastDeleteOpstampRequest, Response = LastDeleteOpstampResponse, @@ -2349,15 +2847,19 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self - .last_delete_opstamp_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.last_delete_opstamp_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn create_delete_task_layer(mut self, layer: L) -> Self + pub fn stack_create_delete_task_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteQuery, + DeleteTask, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< DeleteQuery, Response = DeleteTask, @@ -2365,15 +2867,19 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self - .create_delete_task_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.create_delete_task_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn update_splits_delete_opstamp_layer(mut self, layer: L) -> Self + pub fn stack_update_splits_delete_opstamp_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< UpdateSplitsDeleteOpstampRequest, Response = UpdateSplitsDeleteOpstampResponse, @@ -2383,15 +2889,19 @@ impl MetastoreServiceTowerBlockBuilder { UpdateSplitsDeleteOpstampRequest, >>::Future: Send + 'static, { - self - .update_splits_delete_opstamp_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.update_splits_delete_opstamp_layers + .push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn list_delete_tasks_layer(mut self, layer: L) -> Self + pub fn stack_list_delete_tasks_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListDeleteTasksRequest, + ListDeleteTasksResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< ListDeleteTasksRequest, Response = ListDeleteTasksResponse, @@ -2399,15 +2909,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self - .list_delete_tasks_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.list_delete_tasks_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn list_stale_splits_layer(mut self, layer: L) -> Self + pub fn stack_list_stale_splits_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListStaleSplitsRequest, + ListSplitsResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< ListStaleSplitsRequest, Response = ListSplitsResponse, @@ -2415,15 +2928,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self - .list_stale_splits_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.list_stale_splits_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn open_shards_layer(mut self, layer: L) -> Self + pub fn stack_open_shards_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + OpenShardsRequest, + OpenShardsResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< OpenShardsRequest, Response = OpenShardsResponse, @@ -2431,12 +2947,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.open_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.open_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn acquire_shards_layer(mut self, layer: L) -> Self + pub fn stack_acquire_shards_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + AcquireShardsRequest, + AcquireShardsResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< AcquireShardsRequest, Response = AcquireShardsResponse, @@ -2444,12 +2966,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.acquire_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.acquire_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn delete_shards_layer(mut self, layer: L) -> Self + pub fn stack_delete_shards_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + DeleteShardsRequest, + DeleteShardsResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< DeleteShardsRequest, Response = DeleteShardsResponse, @@ -2457,12 +2985,18 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.delete_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.delete_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn list_shards_layer(mut self, layer: L) -> Self + pub fn stack_list_shards_layer(mut self, layer: L) -> Self where - L: tower::Layer> + Send + Sync + 'static, + L: tower::Layer< + quickwit_common::tower::BoxService< + ListShardsRequest, + ListShardsResponse, + crate::metastore::MetastoreError, + >, + > + Send + Sync + 'static, L::Service: tower::Service< ListShardsRequest, Response = ListShardsResponse, @@ -2470,7 +3004,7 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.list_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.list_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn build(self, instance: T) -> MetastoreServiceClient @@ -2510,126 +3044,183 @@ impl MetastoreServiceTowerBlockBuilder { self, boxed_instance: Box, ) -> MetastoreServiceClient { - let create_index_svc = if let Some(layer) = self.create_index_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let index_metadata_svc = if let Some(layer) = self.index_metadata_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let list_indexes_metadata_svc = if let Some(layer) - = self.list_indexes_metadata_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let delete_index_svc = if let Some(layer) = self.delete_index_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let list_splits_svc = if let Some(layer) = self.list_splits_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let stage_splits_svc = if let Some(layer) = self.stage_splits_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let publish_splits_svc = if let Some(layer) = self.publish_splits_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let mark_splits_for_deletion_svc = if let Some(layer) - = self.mark_splits_for_deletion_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let delete_splits_svc = if let Some(layer) = self.delete_splits_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let add_source_svc = if let Some(layer) = self.add_source_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let toggle_source_svc = if let Some(layer) = self.toggle_source_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let delete_source_svc = if let Some(layer) = self.delete_source_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let reset_source_checkpoint_svc = if let Some(layer) - = self.reset_source_checkpoint_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let last_delete_opstamp_svc = if let Some(layer) = self.last_delete_opstamp_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let create_delete_task_svc = if let Some(layer) = self.create_delete_task_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let update_splits_delete_opstamp_svc = if let Some(layer) - = self.update_splits_delete_opstamp_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let list_delete_tasks_svc = if let Some(layer) = self.list_delete_tasks_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let list_stale_splits_svc = if let Some(layer) = self.list_stale_splits_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let open_shards_svc = if let Some(layer) = self.open_shards_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let acquire_shards_svc = if let Some(layer) = self.acquire_shards_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let delete_shards_svc = if let Some(layer) = self.delete_shards_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let list_shards_svc = if let Some(layer) = self.list_shards_layer { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; - let tower_block = MetastoreServiceTowerBlock { + let create_index_svc = self + .create_index_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let index_metadata_svc = self + .index_metadata_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let list_indexes_metadata_svc = self + .list_indexes_metadata_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let delete_index_svc = self + .delete_index_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let list_splits_svc = self + .list_splits_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let stage_splits_svc = self + .stage_splits_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let publish_splits_svc = self + .publish_splits_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let mark_splits_for_deletion_svc = self + .mark_splits_for_deletion_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let delete_splits_svc = self + .delete_splits_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let add_source_svc = self + .add_source_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let toggle_source_svc = self + .toggle_source_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let delete_source_svc = self + .delete_source_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let reset_source_checkpoint_svc = self + .reset_source_checkpoint_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let last_delete_opstamp_svc = self + .last_delete_opstamp_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let create_delete_task_svc = self + .create_delete_task_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let update_splits_delete_opstamp_svc = self + .update_splits_delete_opstamp_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let list_delete_tasks_svc = self + .list_delete_tasks_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let list_stale_splits_svc = self + .list_stale_splits_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let open_shards_svc = self + .open_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let acquire_shards_svc = self + .acquire_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let delete_shards_svc = self + .delete_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let list_shards_svc = self + .list_shards_layers + .into_iter() + .rev() + .fold( + quickwit_common::tower::BoxService::new(boxed_instance.clone()), + |svc, layer| layer.layer(svc), + ); + let tower_svc_stack = MetastoreServiceTowerServiceStack { inner: boxed_instance.clone(), create_index_svc, index_metadata_svc, @@ -2654,7 +3245,7 @@ impl MetastoreServiceTowerBlockBuilder { delete_shards_svc, list_shards_svc, }; - MetastoreServiceClient::new(tower_block) + MetastoreServiceClient::new(tower_svc_stack) } } #[derive(Debug, Clone)] diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index c753b65de17..9fcff6f8268 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -196,7 +196,7 @@ async fn start_ingest_client_if_needed( let min_rate = ConstantRate::new(ByteSize::mib(1).as_u64(), Duration::from_millis(100)); let rate_modulator = RateModulator::new(rate_estimator.clone(), memory_capacity, min_rate); let ingest_service = IngestServiceClient::tower() - .ingest_layer( + .stack_ingest_layer( ServiceBuilder::new() .layer(EstimateRateLayer::::new(rate_estimator)) .layer(BufferLayer::new(100)) @@ -282,11 +282,11 @@ pub async fn serve_quickwit( .await?; let broker_layer = EventListenerLayer::new(event_broker.clone()); let metastore = MetastoreServiceClient::tower() - .create_index_layer(broker_layer.clone()) - .delete_index_layer(broker_layer.clone()) - .add_source_layer(broker_layer.clone()) - .delete_source_layer(broker_layer.clone()) - .toggle_source_layer(broker_layer) + .stack_create_index_layer(broker_layer.clone()) + .stack_delete_index_layer(broker_layer.clone()) + .stack_add_source_layer(broker_layer.clone()) + .stack_delete_source_layer(broker_layer.clone()) + .stack_toggle_source_layer(broker_layer) .build(metastore); Some(metastore) } else { @@ -316,7 +316,7 @@ pub async fn serve_quickwit( let metastore_client = MetastoreServiceClient::from_balance_channel(balance_channel); let retry_layer = RetryLayer::new(RetryPolicy::default()); MetastoreServiceClient::tower() - .shared_layer(retry_layer) + .stack_layer(retry_layer) .build(metastore_client) };