Skip to content

Commit

Permalink
Fix Start -> Init Workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Nov 26, 2024
1 parent 7552b85 commit b992dd4
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
24 changes: 12 additions & 12 deletions src/Temporalio/Worker/WorkflowCodecHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ await codec.DecodeFailureAsync(
await DecodeAsync(codec, job.SignalWorkflow.Input).ConfigureAwait(false);
await DecodeAsync(codec, job.SignalWorkflow.Headers).ConfigureAwait(false);
break;
case WorkflowActivationJob.VariantOneofCase.StartWorkflow:
await DecodeAsync(codec, job.StartWorkflow).ConfigureAwait(false);
case WorkflowActivationJob.VariantOneofCase.InitializeWorkflow:
await DecodeAsync(codec, job.InitializeWorkflow).ConfigureAwait(false);
break;
}
}
Expand Down Expand Up @@ -278,21 +278,21 @@ private static async Task DecodeAsync(IPayloadCodec codec, ChildWorkflowResult r
}
}

private static async Task DecodeAsync(IPayloadCodec codec, StartWorkflow start)
private static async Task DecodeAsync(IPayloadCodec codec, InitializeWorkflow init)
{
await DecodeAsync(codec, start.Arguments).ConfigureAwait(false);
if (start.ContinuedFailure != null)
await DecodeAsync(codec, init.Arguments).ConfigureAwait(false);
if (init.ContinuedFailure != null)
{
await codec.DecodeFailureAsync(start.ContinuedFailure).ConfigureAwait(false);
await codec.DecodeFailureAsync(init.ContinuedFailure).ConfigureAwait(false);
}
if (start.Memo != null)
if (init.Memo != null)
{
await DecodeAsync(codec, start.Memo.Fields).ConfigureAwait(false);
await DecodeAsync(codec, init.Memo.Fields).ConfigureAwait(false);
}
await DecodeAsync(codec, start.Headers).ConfigureAwait(false);
if (start.LastCompletionResult != null)
await DecodeAsync(codec, init.Headers).ConfigureAwait(false);
if (init.LastCompletionResult != null)
{
await DecodeAsync(codec, start.LastCompletionResult.Payloads_).ConfigureAwait(false);
await DecodeAsync(codec, init.LastCompletionResult.Payloads_).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -330,4 +330,4 @@ private static async Task DecodeAsync(IPayloadCodec codec, Payload payload)
payload.MergeFrom(decoded.Single());
}
}
}
}
16 changes: 8 additions & 8 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ public WorkflowInstance(WorkflowInstanceDetails details)
mutableQueries = new(() => new(Definition.Queries, OnQueryDefinitionAdded), false);
mutableSignals = new(() => new(Definition.Signals, OnSignalDefinitionAdded), false);
mutableUpdates = new(() => new(Definition.Updates, OnUpdateDefinitionAdded), false);
var initialMemo = details.Start.Memo;
var initialMemo = details.Init.Memo;
memo = new(
() => initialMemo == null ? new Dictionary<string, IRawValue>(0) :
initialMemo.Fields.ToDictionary(
kvp => kvp.Key,
kvp => (IRawValue)new RawValue(kvp.Value)),
false);
var initialSearchAttributes = details.Start.SearchAttributes;
var initialSearchAttributes = details.Init.SearchAttributes;
typedSearchAttributes = new(
() => initialSearchAttributes == null ? new(new()) :
SearchAttributeCollection.FromProto(initialSearchAttributes),
false);
var act = details.InitialActivation;
CurrentBuildId = act.BuildIdForCurrentTask;
var start = details.Start;
var start = details.Init;
startArgs = new(
() => DecodeArgs(
method: Definition.RunMethod,
Expand All @@ -151,7 +151,7 @@ public WorkflowInstance(WorkflowInstanceDetails details)
{ "task_queue", details.TaskQueue },
{ "workflow_type", start.WorkflowType },
})));
initialSearchAttributes = details.Start.SearchAttributes;
initialSearchAttributes = details.Init.SearchAttributes;
WorkflowInfo.ParentInfo? parent = null;
if (start.ParentWorkflowInfo != null)
{
Expand Down Expand Up @@ -188,7 +188,7 @@ public WorkflowInstance(WorkflowInstanceDetails details)
replaySafeLogger = new(logger);
onTaskStarting = details.OnTaskStarting;
onTaskCompleted = details.OnTaskCompleted;
Random = new(details.Start.RandomnessSeed);
Random = new(details.Init.RandomnessSeed);
TracingEventsEnabled = !details.DisableTracingEvents;
workerLevelFailureExceptionTypes = details.WorkerLevelFailureExceptionTypes;
disableEagerActivityExecution = details.DisableEagerActivityExecution;
Expand Down Expand Up @@ -880,8 +880,8 @@ private void Apply(WorkflowActivationJob job)
case WorkflowActivationJob.VariantOneofCase.SignalWorkflow:
ApplySignalWorkflow(job.SignalWorkflow);
break;
case WorkflowActivationJob.VariantOneofCase.StartWorkflow:
ApplyStartWorkflow(job.StartWorkflow);
case WorkflowActivationJob.VariantOneofCase.InitializeWorkflow:
ApplyStartWorkflow(job.InitializeWorkflow);
break;
case WorkflowActivationJob.VariantOneofCase.UpdateRandomSeed:
ApplyUpdateRandomSeed(job.UpdateRandomSeed);
Expand Down Expand Up @@ -1292,7 +1292,7 @@ await inbound.Value.HandleSignalAsync(new(
}));
}

private void ApplyStartWorkflow(StartWorkflow start)
private void ApplyStartWorkflow(InitializeWorkflow init)
{
_ = QueueNewTaskAsync(() => RunTopLevelAsync(async () =>
{
Expand Down
6 changes: 3 additions & 3 deletions src/Temporalio/Worker/WorkflowInstanceDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Temporalio.Worker
/// <param name="TaskQueue">Workflow task queue.</param>
/// <param name="Definition">Workflow definition.</param>
/// <param name="InitialActivation">Initial activation for the workflow.</param>
/// <param name="Start">Start attributes for the workflow.</param>
/// <param name="Init">Start attributes for the workflow.</param>
/// <param name="Interceptors">Interceptors.</param>
/// <param name="PayloadConverter">Payload converter.</param>
/// <param name="FailureConverter">Failure converter.</param>
Expand All @@ -32,7 +32,7 @@ internal record WorkflowInstanceDetails(
string TaskQueue,
WorkflowDefinition Definition,
WorkflowActivation InitialActivation,
StartWorkflow Start,
InitializeWorkflow Init,
IReadOnlyCollection<Interceptors.IWorkerInterceptor> Interceptors,
IPayloadConverter PayloadConverter,
IFailureConverter FailureConverter,
Expand All @@ -44,4 +44,4 @@ internal record WorkflowInstanceDetails(
Lazy<MetricMeter> RuntimeMetricMeter,
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
bool DisableEagerActivityExecution);
}
}
10 changes: 5 additions & 5 deletions src/Temporalio/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,16 @@ private async Task HandleActivationAsync(IPayloadCodec? codec, WorkflowActivatio

private IWorkflowInstance CreateInstance(WorkflowActivation act)
{
var start = act.Jobs.Select(j => j.StartWorkflow).FirstOrDefault(s => s != null) ??
var init = act.Jobs.Select(j => j.InitializeWorkflow).FirstOrDefault(s => s != null) ??
throw new InvalidOperationException("Missing workflow start (unexpectedly evicted?)");
if (!workflows.TryGetValue(start.WorkflowType, out var defn))
if (!workflows.TryGetValue(init.WorkflowType, out var defn))
{
defn = dynamicWorkflow;
if (defn == null)
{
var names = string.Join(", ", workflows.Keys.OrderBy(s => s));
throw new ApplicationFailureException(
$"Workflow type {start.WorkflowType} is not registered on this worker, available workflows: {names}",
$"Workflow type {init.WorkflowType} is not registered on this worker, available workflows: {names}",
"NotFoundError");
}
}
Expand All @@ -273,7 +273,7 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act)
TaskQueue: options.TaskQueue,
Definition: defn,
InitialActivation: act,
Start: start,
Init: init,
Interceptors: options.Interceptors,
PayloadConverter: options.DataConverter.PayloadConverter,
FailureConverter: options.DataConverter.FailureConverter,
Expand All @@ -287,4 +287,4 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act)
DisableEagerActivityExecution: options.DisableEagerActivityExecution));
}
}
}
}

0 comments on commit b992dd4

Please sign in to comment.