Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose workflow cancel cause #875

Merged
merged 4 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/core_tests/child_workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn parent_cancels_child_wf(ctx: WfContext) -> WorkflowResult<()> {
.await
.into_started()
.expect("Child should get started");
start_res.cancel(&ctx);
start_res.cancel(&ctx, "cancel reason".to_string());
let stat = start_res
.result()
.await
Expand Down Expand Up @@ -157,6 +157,7 @@ async fn cancel_child_workflow_lang_thinks_not_started_but_is(
act.run_id,
CancelChildWorkflowExecution {
child_workflow_seq: 1,
reason: "dieee".to_string(),
}
.into(),
))
Expand Down Expand Up @@ -215,6 +216,7 @@ async fn cancel_already_complete_child_ignored() {
vec![
CancelChildWorkflowExecution {
child_workflow_seq: 1,
reason: "go away!".to_string(),
}
.into(),
CompleteWorkflowExecution { result: None }.into(),
Expand Down
84 changes: 41 additions & 43 deletions core/src/worker/workflow/machines/activity_state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(clippy::large_enum_variant)]

use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::{
abstractions::dbg_panic,
Expand Down Expand Up @@ -164,6 +164,45 @@ impl ActivityMachine {
is_local: false,
}
}

pub(super) fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<WFMachinesError>> {
if matches!(
self.state(),
ActivityMachineState::Completed(_)
| ActivityMachineState::Canceled(_)
| ActivityMachineState::Failed(_)
| ActivityMachineState::TimedOut(_)
) {
// Ignore attempted cancels in terminal states
debug!(
"Attempted to cancel already resolved activity (seq {})",
self.shared_state.attrs.seq
);
return Ok(vec![]);
}
let event = match self.shared_state.cancellation_type {
ActivityCancellationType::Abandon => ActivityMachineEvents::Abandon,
_ => ActivityMachineEvents::Cancel,
};
let vec = OnEventWrapper::on_event_mut(self, event)?;
let res = vec
.into_iter()
.flat_map(|amc| match amc {
ActivityMachineCommand::RequestCancellation(cmd) => {
self.machine_responses_from_cancel_request(cmd)
}
ActivityMachineCommand::Cancel(details) => {
vec![self.create_cancelation_resolve(details).into()]
}
x => panic!("Invalid cancel event response {x:?}"),
})
.collect();
Ok(res)
}

pub(super) fn was_cancelled_before_sent_to_server(&self) -> bool {
self.shared_state().cancelled_before_sent
}
}

impl TryFrom<HistEventData> for ActivityMachineEvents {
Expand Down Expand Up @@ -299,47 +338,6 @@ impl TryFrom<CommandType> for ActivityMachineEvents {
}
}

impl Cancellable for ActivityMachine {
fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<Self::Error>> {
if matches!(
self.state(),
ActivityMachineState::Completed(_)
| ActivityMachineState::Canceled(_)
| ActivityMachineState::Failed(_)
| ActivityMachineState::TimedOut(_)
) {
// Ignore attempted cancels in terminal states
debug!(
"Attempted to cancel already resolved activity (seq {})",
self.shared_state.attrs.seq
);
return Ok(vec![]);
}
let event = match self.shared_state.cancellation_type {
ActivityCancellationType::Abandon => ActivityMachineEvents::Abandon,
_ => ActivityMachineEvents::Cancel,
};
let vec = OnEventWrapper::on_event_mut(self, event)?;
let res = vec
.into_iter()
.flat_map(|amc| match amc {
ActivityMachineCommand::RequestCancellation(cmd) => {
self.machine_responses_from_cancel_request(cmd)
}
ActivityMachineCommand::Cancel(details) => {
vec![self.create_cancelation_resolve(details).into()]
}
x => panic!("Invalid cancel event response {x:?}"),
})
.collect();
Ok(res)
}

fn was_cancelled_before_sent_to_server(&self) -> bool {
self.shared_state().cancelled_before_sent
}
}

#[derive(Clone)]
pub(super) struct SharedState {
scheduled_event_id: i64,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -208,8 +208,6 @@ impl WFMachinesAdapter for CancelExternalMachine {
}
}

impl Cancellable for CancelExternalMachine {}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -222,11 +220,14 @@ mod tests {

async fn cancel_sender(ctx: WfContext) -> WorkflowResult<()> {
let res = ctx
.cancel_external(NamespacedWorkflowExecution {
namespace: "some_namespace".to_string(),
workflow_id: "fake_wid".to_string(),
run_id: "fake_rid".to_string(),
})
.cancel_external(
NamespacedWorkflowExecution {
namespace: "some_namespace".to_string(),
workflow_id: "fake_wid".to_string(),
run_id: "fake_rid".to_string(),
},
"cancel reason".to_string(),
)
.await;
if res.is_err() {
Err(anyhow::anyhow!("Cancel fail!"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -97,8 +97,6 @@ impl WFMachinesAdapter for CancelWorkflowMachine {
}
}

impl Cancellable for CancelWorkflowMachine {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
80 changes: 42 additions & 38 deletions core/src/worker/workflow/machines/child_workflow_state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::{
internal_flags::CoreInternalFlags,
Expand Down Expand Up @@ -54,7 +54,7 @@ fsm! {
StartCommandCreated --(CommandStartChildWorkflowExecution) --> StartCommandCreated;
StartCommandCreated --(StartChildWorkflowExecutionInitiated(ChildWorkflowInitiatedData),
shared on_start_child_workflow_execution_initiated) --> StartEventRecorded;
StartCommandCreated --(Cancel, shared on_cancelled) --> Cancelled;
StartCommandCreated --(Cancel(String), shared on_cancelled) --> Cancelled;

StartEventRecorded --(ChildWorkflowExecutionStarted(ChildWorkflowExecutionStartedEvent),
shared on_child_workflow_execution_started) --> Started;
Expand All @@ -74,13 +74,13 @@ fsm! {
// If cancelled after started, we need to issue a cancel external workflow command, and then
// the child workflow will resolve somehow, so we want to go back to started and wait for that
// resolution.
Started --(Cancel, shared on_cancelled) --> Started;
Started --(Cancel(String), shared on_cancelled) --> Started;
// Abandon & try cancel modes may immediately move to cancelled
Started --(Cancel, shared on_cancelled) --> Cancelled;
Started --(Cancel(String), shared on_cancelled) --> Cancelled;
Started --(CommandRequestCancelExternalWorkflowExecution) --> Started;

// Ignore any spurious cancellations after resolution
Cancelled --(Cancel) --> Cancelled;
Cancelled --(Cancel(String)) --> Cancelled;
Cancelled --(ChildWorkflowExecutionCancelled,
on_child_workflow_execution_cancelled) --> Cancelled;
// Completions of any kind after cancellation are acceptable for abandoned children
Expand All @@ -92,10 +92,10 @@ fsm! {
shared on_child_workflow_execution_timed_out) --> Cancelled;
Cancelled --(ChildWorkflowExecutionTerminated,
shared on_child_workflow_execution_terminated) --> Cancelled;
Failed --(Cancel) --> Failed;
StartFailed --(Cancel) --> StartFailed;
TimedOut --(Cancel) --> TimedOut;
Completed --(Cancel) --> Completed;
Failed --(Cancel(String)) --> Failed;
StartFailed --(Cancel(String)) --> StartFailed;
TimedOut --(Cancel(String)) --> TimedOut;
Completed --(Cancel(String)) --> Completed;
}

pub(super) struct ChildWorkflowExecutionStartedEvent {
Expand Down Expand Up @@ -255,10 +255,14 @@ impl StartCommandCreated {
pub(super) fn on_cancelled(
self,
state: &mut SharedState,
reason: String,
) -> ChildWorkflowMachineTransition<Cancelled> {
state.cancelled_before_sent = true;
ChildWorkflowMachineTransition::commands(vec![ChildWorkflowCommand::StartCancel(Failure {
message: "Child Workflow execution cancelled before scheduled".to_owned(),
message: format!(
"Child Workflow Execution cancelled before scheduled: {}",
reason
),
cause: Some(Box::new(Failure {
failure_info: Some(FailureInfo::CanceledFailureInfo(
failure::CanceledFailureInfo {
Expand Down Expand Up @@ -385,6 +389,7 @@ impl Started {
fn on_cancelled(
self,
state: &mut SharedState,
reason: String,
) -> ChildWorkflowMachineTransition<StartedOrCancelled> {
let dest = match state.cancel_type {
ChildWorkflowCancellationType::Abandon | ChildWorkflowCancellationType::TryCancel => {
Expand All @@ -393,9 +398,7 @@ impl Started {
_ => StartedOrCancelled::Started(Default::default()),
};
TransitionResult::ok(
[ChildWorkflowCommand::IssueCancelAfterStarted {
reason: "Parent workflow requested cancel".to_string(),
}],
[ChildWorkflowCommand::IssueCancelAfterStarted { reason }],
dest,
)
}
Expand Down Expand Up @@ -483,6 +486,30 @@ impl ChildWorkflowMachine {
}),
}
}

pub(super) fn cancel(
&mut self,
reason: String,
) -> Result<Vec<MachineResponse>, MachineError<WFMachinesError>> {
let event = ChildWorkflowMachineEvents::Cancel(reason);
let vec = OnEventWrapper::on_event_mut(self, event)?;
let res = vec
.into_iter()
.map(|mc| match mc {
c @ ChildWorkflowCommand::StartCancel(_)
| c @ ChildWorkflowCommand::IssueCancelAfterStarted { .. } => {
self.adapt_response(c, None)
}
x => panic!("Invalid cancel event response {x:?}"),
})
.flatten_ok()
.try_collect()?;
Ok(res)
}

pub(super) fn was_cancelled_before_sent_to_server(&self) -> bool {
self.shared_state.cancelled_before_sent
}
}

impl TryFrom<HistEventData> for ChildWorkflowMachineEvents {
Expand Down Expand Up @@ -713,29 +740,6 @@ impl TryFrom<CommandType> for ChildWorkflowMachineEvents {
}
}

impl Cancellable for ChildWorkflowMachine {
fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<Self::Error>> {
let event = ChildWorkflowMachineEvents::Cancel;
let vec = OnEventWrapper::on_event_mut(self, event)?;
let res = vec
.into_iter()
.map(|mc| match mc {
c @ ChildWorkflowCommand::StartCancel(_)
| c @ ChildWorkflowCommand::IssueCancelAfterStarted { .. } => {
self.adapt_response(c, None)
}
x => panic!("Invalid cancel event response {x:?}"),
})
.flatten_ok()
.try_collect()?;
Ok(res)
}

fn was_cancelled_before_sent_to_server(&self) -> bool {
self.shared_state.cancelled_before_sent
}
}

fn failure_info_from_state(state: &SharedState, retry_state: RetryState) -> Option<FailureInfo> {
Some(FailureInfo::ChildWorkflowExecutionFailureInfo(
failure::ChildWorkflowExecutionFailureInfo {
Expand Down Expand Up @@ -988,7 +992,7 @@ mod test {
internal_flags: Rc::new(RefCell::new(InternalFlags::default())),
},
);
let cmds = s.cancel().unwrap();
let cmds = s.cancel("cancel reason".to_string()).unwrap();
assert_eq!(cmds.len(), 0);
assert_eq!(discriminant(&state), discriminant(s.state()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -111,5 +111,3 @@ impl WFMachinesAdapter for CompleteWorkflowMachine {
Ok(vec![])
}
}

impl Cancellable for CompleteWorkflowMachine {}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
Cancellable, EventInfo, MachineResponse, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
EventInfo, MachineResponse, NewMachineWithCommand, OnEventWrapper, WFMachinesAdapter,
WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -105,8 +105,6 @@ impl WFMachinesAdapter for ContinueAsNewWorkflowMachine {
}
}

impl Cancellable for ContinueAsNewWorkflowMachine {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -113,5 +113,3 @@ impl WFMachinesAdapter for FailWorkflowMachine {
Ok(vec![])
}
}

impl Cancellable for FailWorkflowMachine {}
Loading
Loading