Skip to content

Commit

Permalink
fix(sdk-go): Use int32 on WithRetries, Test `WithExponentialBacko…
Browse files Browse the repository at this point in the history
…ff` (#1263)
  • Loading branch information
Snarr authored Jan 21, 2025
1 parent 5b37303 commit 04aaa5f
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 5 deletions.
7 changes: 3 additions & 4 deletions sdk-go/littlehorse/wf_lib_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ func (tn *TaskNodeOutput) withExponentialBackoffImpl(policy *lhproto.Exponential
return tn
}

func (tn *TaskNodeOutput) withRetriesImpl(retries int) *TaskNodeOutput {
func (tn *TaskNodeOutput) withRetriesImpl(retries int32) *TaskNodeOutput {
tn.parent.overrideTaskRetries(tn, retries)
return tn
}
Expand Down Expand Up @@ -1032,16 +1032,15 @@ func (c *WorkflowCondition) getReverse() *lhproto.EdgeCondition {
return out
}

func (t *WorkflowThread) overrideTaskRetries(taskNodeOutput *TaskNodeOutput, retries int) {
func (t *WorkflowThread) overrideTaskRetries(taskNodeOutput *TaskNodeOutput, retries int32) {
t.checkIfIsActive()

node := t.spec.Nodes[taskNodeOutput.Output.nodeName]
if node.GetTask() == nil {
// Error
t.throwError(errors.New("impossible to not have task node here"))
}

node.GetTask().Retries = int32(retries)
node.GetTask().Retries = retries
}

func (t *WorkflowThread) overrideTaskExponentialBackoffPolicy(taskNodeOutput *TaskNodeOutput, policy *lhproto.ExponentialBackoffRetryPolicy) {
Expand Down
36 changes: 36 additions & 0 deletions sdk-go/littlehorse/wf_lib_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,42 @@ func TestAssigningVariablesToOtherVariablesShouldCarryJsonPath(t *testing.T) {
assert.Equal(t, taskNode.OutgoingEdges[0].VariableMutations[0].GetRhsAssignment().GetJsonPath(), "$.hello.there")
}

func TestAddRetriesToTaskNodeOutput(t *testing.T) {
wf := littlehorse.NewWorkflow(func(t *littlehorse.WorkflowThread) {
t.Execute("task-one").WithRetries(5)
}, "my-workflow")

putWf, err := wf.Compile()
if err != nil {
t.Error(err)
}

entrypoint := putWf.ThreadSpecs[putWf.EntrypointThreadName]
taskNode := entrypoint.Nodes["1-task-one-TASK"]
assert.Equal(t, taskNode.GetTask().Retries, int32(5))
}

func TestExponentialBackoffRetryPolicyToTaskNodeOutput(t *testing.T) {
wf := littlehorse.NewWorkflow(func(t *littlehorse.WorkflowThread) {
t.Execute("task-one").WithExponentialBackoff(&lhproto.ExponentialBackoffRetryPolicy{
BaseIntervalMs: 500,
MaxDelayMs: 2000,
Multiplier: 2,
})
}, "my-workflow")

putWf, err := wf.Compile()
if err != nil {
t.Error(err)
}

entrypoint := putWf.ThreadSpecs[putWf.EntrypointThreadName]
taskNode := entrypoint.Nodes["1-task-one-TASK"]
assert.Equal(t, taskNode.GetTask().ExponentialBackoff.BaseIntervalMs, int32(500))
assert.Equal(t, taskNode.GetTask().ExponentialBackoff.MaxDelayMs, int64(2000))
assert.Equal(t, taskNode.GetTask().ExponentialBackoff.Multiplier, float32(2))
}

func TestParallelSpawnThreadsWithInput(t *testing.T) {
wf := littlehorse.NewWorkflow(func(t *littlehorse.WorkflowThread) {
myArr := t.AddVariable("my-arr", lhproto.VariableType_JSON_ARR)
Expand Down
2 changes: 1 addition & 1 deletion sdk-go/littlehorse/wf_lib_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (n *NodeOutput) HandleAnyFailureOnChild(handler ThreadFunc) {
n.handleAnyFailureOnChild(handler)
}

func (t *TaskNodeOutput) WithRetries(retries int) *TaskNodeOutput {
func (t *TaskNodeOutput) WithRetries(retries int32) *TaskNodeOutput {
return t.withRetriesImpl(retries)
}

Expand Down

0 comments on commit 04aaa5f

Please sign in to comment.