Skip to content

Commit

Permalink
add env var to disable delete tasks service (#4559)
Browse files Browse the repository at this point in the history
Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
trinity-1686a and guilload authored Feb 9, 2024
1 parent 0bf0d6f commit 7464ec9
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
10 changes: 7 additions & 3 deletions quickwit/quickwit-janitor/src/janitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use serde_json::{json, Value as JsonValue};
use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor};

pub struct JanitorService {
delete_task_service_handle: ActorHandle<DeleteTaskService>,
delete_task_service_handle: Option<ActorHandle<DeleteTaskService>>,
garbage_collector_handle: ActorHandle<GarbageCollector>,
retention_policy_executor_handle: ActorHandle<RetentionPolicyExecutor>,
}

impl JanitorService {
pub fn new(
delete_task_service_handle: ActorHandle<DeleteTaskService>,
delete_task_service_handle: Option<ActorHandle<DeleteTaskService>>,
garbage_collector_handle: ActorHandle<GarbageCollector>,
retention_policy_executor_handle: ActorHandle<RetentionPolicyExecutor>,
) -> Self {
Expand All @@ -45,7 +45,11 @@ impl JanitorService {
}

fn is_healthy(&self) -> bool {
self.delete_task_service_handle.state() != ActorState::Failure
self.delete_task_service_handle
.as_ref()
.map_or(true, |delete_task_service_handle| {
delete_task_service_handle.state() != ActorState::Failure
})
&& self.garbage_collector_handle.state() != ActorState::Failure
&& self.retention_policy_executor_handle.state() != ActorState::Failure
}
Expand Down
29 changes: 18 additions & 11 deletions quickwit/quickwit-janitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub async fn start_janitor_service(
search_job_placer: SearchJobPlacer,
storage_resolver: StorageResolver,
event_broker: EventBroker,
run_delete_task_service: bool,
) -> anyhow::Result<Mailbox<JanitorService>> {
info!("starting janitor service");
let garbage_collector = GarbageCollector::new(metastore.clone(), storage_resolver.clone());
Expand All @@ -59,17 +60,23 @@ pub async fn start_janitor_service(
let retention_policy_executor = RetentionPolicyExecutor::new(metastore.clone());
let (_, retention_policy_executor_handle) =
universe.spawn_builder().spawn(retention_policy_executor);
let delete_task_service = DeleteTaskService::new(
metastore,
search_job_placer,
storage_resolver,
config.data_dir_path.clone(),
config.indexer_config.max_concurrent_split_uploads,
universe.get_or_spawn_one::<MergeSchedulerService>(),
event_broker,
)
.await?;
let (_, delete_task_service_handle) = universe.spawn_builder().spawn(delete_task_service);
let delete_task_service_handle = if run_delete_task_service {
let delete_task_service = DeleteTaskService::new(
metastore,
search_job_placer,
storage_resolver,
config.data_dir_path.clone(),
config.indexer_config.max_concurrent_split_uploads,
universe.get_or_spawn_one::<MergeSchedulerService>(),
event_broker,
)
.await?;
let (_, delete_task_service_handle) = universe.spawn_builder().spawn(delete_task_service);
Some(delete_task_service_handle)
} else {
tracing::warn!("delete task service is disabled: delete queries will not be processed");
None
};

let janitor_service = JanitorService::new(
delete_task_service_handle,
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ const READINESS_REPORTING_INTERVAL: Duration = if cfg!(any(test, feature = "test

const METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY: &str = "QW_METASTORE_CLIENT_MAX_CONCURRENCY";
const DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY: usize = 6;
const DISABLE_DELETE_TASK_SERVICE_ENV_KEY: &str = "QW_DISABLE_DELETE_TASK_SERVICE";

fn get_metastore_client_max_concurrency() -> usize {
std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok()
Expand Down Expand Up @@ -514,6 +515,7 @@ pub async fn serve_quickwit(
search_job_placer,
storage_resolver.clone(),
event_broker.clone(),
std::env::var(DISABLE_DELETE_TASK_SERVICE_ENV_KEY).is_err(),
)
.await?;
Some(janitor_service)
Expand Down

0 comments on commit 7464ec9

Please sign in to comment.