Skip to content

Commit

Permalink
Embed the supervizee observable state in the supervisor observable (#…
Browse files Browse the repository at this point in the history
…3929)

state.
  • Loading branch information
fulmicoton authored Oct 10, 2023
1 parent c338ef0 commit 194582d
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 37 deletions.
86 changes: 63 additions & 23 deletions quickwit/quickwit-actors/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,51 @@ use crate::{
};

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize)]
pub struct SupervisorState {
pub struct SupervisorMetrics {
pub num_panics: usize,
pub num_errors: usize,
pub num_kills: usize,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
pub struct SupervisorState<S> {
pub metrics: SupervisorMetrics,
pub state_opt: Option<S>,
}

impl<S> Default for SupervisorState<S> {
fn default() -> Self {
SupervisorState {
metrics: Default::default(),
state_opt: None,
}
}
}

pub struct Supervisor<A: Actor> {
actor_name: String,
actor_factory: Box<dyn Fn() -> A + Send>,
inbox: Inbox<A>,
handle_opt: Option<ActorHandle<A>>,
state: SupervisorState,
metrics: SupervisorMetrics,
}

#[derive(Debug, Copy, Clone)]
struct SuperviseLoop;

#[async_trait]
impl<A: Actor> Actor for Supervisor<A> {
type ObservableState = SupervisorState;
type ObservableState = SupervisorState<A::ObservableState>;

fn observable_state(&self) -> Self::ObservableState {
self.state
let state_opt: Option<A::ObservableState> = self
.handle_opt
.as_ref()
.map(|handle| handle.last_observation());
SupervisorState {
metrics: self.metrics,
state_opt,
}
}

fn name(&self) -> String {
Expand Down Expand Up @@ -99,27 +121,26 @@ impl<A: Actor> Supervisor<A> {
inbox: Inbox<A>,
handle: ActorHandle<A>,
) -> Self {
let state = Default::default();
Supervisor {
actor_name,
actor_factory,
inbox,
handle_opt: Some(handle),
state,
metrics: Default::default(),
}
}

async fn supervise(
&mut self,
ctx: &ActorContext<Supervisor<A>>,
) -> Result<(), ActorExitStatus> {
match self
let handle_ref = self
.handle_opt
.as_ref()
.expect("The actor handle should always be set.")
.check_health(true)
{
.expect("The actor handle should always be set.");
match handle_ref.check_health(true) {
Health::Healthy => {
handle_ref.refresh_observe();
return Ok(());
}
Health::FailureOrUnhealthy => {}
Expand Down Expand Up @@ -150,13 +171,13 @@ impl<A: Actor> Supervisor<A> {
return Err(ActorExitStatus::DownstreamClosed);
}
ActorExitStatus::Killed => {
self.state.num_kills += 1;
self.metrics.num_kills += 1;
}
ActorExitStatus::Failure(_err) => {
self.state.num_errors += 1;
self.metrics.num_errors += 1;
}
ActorExitStatus::Panicked => {
self.state.num_panics += 1;
self.metrics.num_panics += 1;
}
}
info!("respawning-actor");
Expand Down Expand Up @@ -193,8 +214,9 @@ mod tests {
use async_trait::async_trait;
use tracing::info;

use crate::supervisor::SupervisorState;
use crate::{Actor, ActorContext, ActorExitStatus, AskError, Handler, Universe};
use crate::supervisor::SupervisorMetrics;
use crate::tests::{Ping, PingReceiverActor};
use crate::{Actor, ActorContext, ActorExitStatus, AskError, Handler, Observe, Universe};

#[derive(Copy, Clone, Debug)]
enum FailingActorMessage {
Expand Down Expand Up @@ -280,8 +302,8 @@ mod tests {
1
);
assert_eq!(
*supervisor_handle.observe().await,
SupervisorState {
supervisor_handle.observe().await.metrics,
SupervisorMetrics {
num_panics: 1,
num_errors: 0,
num_kills: 0
Expand Down Expand Up @@ -312,8 +334,8 @@ mod tests {
1
);
assert_eq!(
*supervisor_handle.observe().await,
SupervisorState {
supervisor_handle.observe().await.metrics,
SupervisorMetrics {
num_panics: 0,
num_errors: 1,
num_kills: 0
Expand All @@ -339,8 +361,8 @@ mod tests {
2
);
assert_eq!(
*supervisor_handle.observe().await,
SupervisorState {
supervisor_handle.observe().await.metrics,
SupervisorMetrics {
num_panics: 0,
num_errors: 0,
num_kills: 0
Expand All @@ -357,8 +379,8 @@ mod tests {
1
);
assert_eq!(
*supervisor_handle.observe().await,
SupervisorState {
supervisor_handle.observe().await.metrics,
SupervisorMetrics {
num_panics: 0,
num_errors: 0,
num_kills: 1
Expand Down Expand Up @@ -420,5 +442,23 @@ mod tests {
let (_, supervisor_handle) = universe.spawn_builder().supervise(actor);
let (exit_status, _state) = supervisor_handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Success));
universe.assert_quit().await;
}

#[tokio::test]
async fn test_supervisor_state() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();
let ping_actor = PingReceiverActor::default();
let (mailbox, handler) = universe.spawn_builder().supervise(ping_actor);
let obs = handler.observe().await;
assert_eq!(obs.state.state_opt, Some(0));
let _ = mailbox.ask(Ping).await;
assert_eq!(mailbox.ask(Observe).await.unwrap(), 1);
universe.sleep(Duration::from_secs(60)).await;
let obs = handler.observe().await;
assert_eq!(obs.state.state_opt, Some(1));
handler.quit().await;
universe.assert_quit().await;
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
};

// An actor that receives ping messages.
#[derive(Default)]
#[derive(Default, Clone)]
pub struct PingReceiverActor {
ping_count: usize,
}
Expand Down
28 changes: 15 additions & 13 deletions quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use quickwit_common::temp_dir::{self};
use quickwit_common::uri::Uri;
use quickwit_config::build_doc_mapper;
use quickwit_indexing::actors::{
MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType,
MergeExecutor, MergeSplitDownloader, Packager, Publisher, PublisherCounters, Uploader,
UploaderCounters, UploaderType,
};
use quickwit_indexing::merge_policy::merge_policy_from_settings;
use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox};
Expand All @@ -45,6 +46,7 @@ use tokio::join;
use tracing::info;

use super::delete_task_planner::DeleteTaskPlanner;
use crate::actors::delete_task_planner::DeleteTaskPlannerState;

const OBSERVE_PIPELINE_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(500)
Expand All @@ -66,12 +68,12 @@ struct DeletePipelineHandle {
/// A Struct to hold all statistical data about deletes.
#[derive(Clone, Debug, Default, Serialize)]
pub struct DeleteTaskPipelineState {
pub delete_task_planner: SupervisorState,
pub downloader: SupervisorState,
pub delete_task_executor: SupervisorState,
pub packager: SupervisorState,
pub uploader: SupervisorState,
pub publisher: SupervisorState,
pub delete_task_planner: SupervisorState<DeleteTaskPlannerState>,
pub downloader: SupervisorState<()>,
pub delete_task_executor: SupervisorState<()>,
pub packager: SupervisorState<()>,
pub uploader: SupervisorState<UploaderCounters>,
pub publisher: SupervisorState<PublisherCounters>,
}

pub struct DeleteTaskPipeline {
Expand Down Expand Up @@ -404,12 +406,12 @@ mod tests {
.sleep(OBSERVE_PIPELINE_INTERVAL * 3)
.await;
let pipeline_state = pipeline_handler.process_pending_and_observe().await.state;
assert_eq!(pipeline_state.delete_task_planner.num_errors, 1);
assert_eq!(pipeline_state.downloader.num_errors, 0);
assert_eq!(pipeline_state.delete_task_executor.num_errors, 0);
assert_eq!(pipeline_state.packager.num_errors, 0);
assert_eq!(pipeline_state.uploader.num_errors, 0);
assert_eq!(pipeline_state.publisher.num_errors, 0);
assert_eq!(pipeline_state.delete_task_planner.metrics.num_errors, 1);
assert_eq!(pipeline_state.downloader.metrics.num_errors, 0);
assert_eq!(pipeline_state.delete_task_executor.metrics.num_errors, 0);
assert_eq!(pipeline_state.packager.metrics.num_errors, 0);
assert_eq!(pipeline_state.uploader.metrics.num_errors, 0);
assert_eq!(pipeline_state.publisher.metrics.num_errors, 0);
let _ = pipeline_mailbox.ask(GracefulShutdown).await;

let splits = metastore.list_all_splits(index_uid).await?;
Expand Down

0 comments on commit 194582d

Please sign in to comment.