Skip to content

Commit

Permalink
Add janitor service to indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Feb 21, 2024
1 parent 518dd37 commit 94cabff
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-lambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ quickwit-doc-mapper = { workspace = true }
quickwit-index-management = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-ingest = { workspace = true }
quickwit-janitor = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-rest-client = { workspace = true }
Expand Down
25 changes: 23 additions & 2 deletions quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ use quickwit_indexing::actors::{
use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, SpawnPipeline};
use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_janitor::{start_janitor_service, JanitorService};
use quickwit_metastore::CreateIndexRequestExt;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::metastore::{
CreateIndexRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::{NodeId, PipelineUid};
use quickwit_search::SearchJobPlacer;
use quickwit_storage::StorageResolver;
use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent};
use tracing::{debug, info, instrument};
Expand Down Expand Up @@ -195,7 +197,11 @@ pub(super) async fn spawn_services(
storage_resolver: StorageResolver,
node_config: &NodeConfig,
runtime_config: RuntimesConfig,
) -> anyhow::Result<(Mailbox<IndexingService>, ActorHandle<IndexingService>)> {
) -> anyhow::Result<(
Mailbox<IndexingService>,
ActorHandle<IndexingService>,
Mailbox<JanitorService>,
)> {
let event_broker = EventBroker::default();

// spawn merge scheduler service
Expand All @@ -222,7 +228,22 @@ pub(super) async fn spawn_services(
let (indexing_server_mailbox, indexing_server_handle) =
universe.spawn_builder().spawn(indexing_server);

Ok((indexing_server_mailbox, indexing_server_handle))
let janitor_service_mailbox = start_janitor_service(
universe,
node_config,
metastore,
SearchJobPlacer::default(),
storage_resolver,
event_broker,
true,
)
.await?;

Ok((
indexing_server_mailbox,
indexing_server_handle,
janitor_service_mailbox,
))
}

pub(super) async fn spawn_pipelines(
Expand Down
22 changes: 13 additions & 9 deletions quickwit/quickwit-lambda/src/indexer/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
&HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
)?;

let (indexing_server_mailbox, indexing_server_handle) = spawn_services(
&universe,
cluster,
metastore.clone(),
storage_resolver.clone(),
&config,
runtimes_config,
)
.await?;
let (indexing_server_mailbox, indexing_server_handle, janitor_service_mailbox) =
spawn_services(
&universe,
cluster,
metastore.clone(),
storage_resolver.clone(),
&config,
runtimes_config,
)
.await?;

let (indexing_pipeline_handle, merge_pipeline_handle) =
spawn_pipelines(&indexing_server_mailbox, source_config).await?;
Expand All @@ -95,6 +96,9 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {

debug!("indexing completed, tear down actors");
merge_pipeline_handle.quit().await;
universe
.send_exit_with_success(&janitor_service_mailbox)
.await?;
universe
.send_exit_with_success(&indexing_server_mailbox)
.await?;
Expand Down

0 comments on commit 94cabff

Please sign in to comment.