Skip to content

Commit

Permalink
[CAPPL-197/CAPPL-309] Small fixes to compute (#951)
Browse files Browse the repository at this point in the history
* [CAPPL-197/CAPPL-309] Small fixes to compute

- Add a recovery handler to the runner's Run method. This means we
  preserve stack traces and will help debugging.
- Allow users to explicitly error with an error via ExitWithError.
- Remove owner and name from the factory constructor.

* [CAPPL-197/CAPPL-309] Small fixes to compute

- Add a recovery handler to the runner's Run method. This means we
  preserve stack traces and will help debugging.
- Allow users to explicitly error with an error via ExitWithError.
- Remove owner and name from the factory constructor.
  • Loading branch information
cedric-cordenier authored Nov 25, 2024
1 parent e0189e5 commit 97ceadb
Show file tree
Hide file tree
Showing 27 changed files with 226 additions and 196 deletions.
2 changes: 1 addition & 1 deletion pkg/capabilities/cli/cmd/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestTypeGeneration(t *testing.T) {
})

t.Run("casing is respected from the json schema", func(t *testing.T) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Owner: "owner", Name: "name"})
workflow := sdk.NewWorkflowSpecFactory()
ai := basicaction.ActionConfig{CamelCaseInSchemaForTesting: "foo", SnakeCaseInSchemaForTesting: 12}.
New(workflow, "ref", basicaction.ActionInput{InputThing: sdk.ConstantDefinition[bool](true)})
spec, _ := workflow.Spec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ import (

func TestIdenticalConsensus(t *testing.T) {
t.Parallel()
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{
Owner: "0x1234",
Name: "Test",
})
workflow := sdk.NewWorkflowSpecFactory()

trigger := basictrigger.TriggerConfig{Name: "1234", Number: 1}.New(workflow)

Expand All @@ -43,8 +40,6 @@ func TestIdenticalConsensus(t *testing.T) {
require.NoError(t, err)

expected := sdk.WorkflowSpec{
Name: "Test",
Owner: "0x1234",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ import (

func TestReduceConsensus(t *testing.T) {
t.Parallel()
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{
Owner: "0x1234",
Name: "Test",
})
workflow := sdk.NewWorkflowSpecFactory()

trigger := basictrigger.TriggerConfig{Name: "1234", Number: 1}.New(workflow)

Expand Down Expand Up @@ -73,8 +70,6 @@ func TestReduceConsensus(t *testing.T) {
require.NoError(t, err)

expected := sdk.WorkflowSpec{
Name: "Test",
Owner: "0x1234",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Expand Down
11 changes: 1 addition & 10 deletions pkg/workflows/sdk/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,9 @@ func (c *capDefinitionImpl[O]) self() CapDefinition[O] {

func (c *capDefinitionImpl[O]) private() {}

type NewWorkflowParams struct {
Owner string
Name string
}

func NewWorkflowSpecFactory(
params NewWorkflowParams,
) *WorkflowSpecFactory {
func NewWorkflowSpecFactory() *WorkflowSpecFactory {
return &WorkflowSpecFactory{
spec: &WorkflowSpec{
Owner: params.Owner,
Name: params.Name,
Triggers: make([]StepDefinition, 0),
Actions: make([]StepDefinition, 0),
Consensus: make([]StepDefinition, 0),
Expand Down
43 changes: 14 additions & 29 deletions pkg/workflows/sdk/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
// This implicitly tests the code generators functionally, as the generated code is used in the tests.

type Config struct {
Workflow sdk.NewWorkflowParams
Streams *streams.TriggerConfig
Ocr *ocr3.DataFeedsConsensusConfig
ChainWriter *chainwriter.TargetConfig
Expand All @@ -41,7 +40,7 @@ func NewWorkflowSpec(rawConfig []byte) (*sdk.WorkflowSpecFactory, error) {
return nil, err
}

workflow := sdk.NewWorkflowSpecFactory(conf.Workflow)
workflow := sdk.NewWorkflowSpecFactory()
streamsTrigger := conf.Streams.New(workflow)
consensus := conf.Ocr.New(workflow, "ccip_feeds", ocr3.DataFeedsConsensusInput{
Observations: sdk.ListOf[streams.Feed](streamsTrigger)},
Expand All @@ -55,7 +54,6 @@ func NewWorkflowSpec(rawConfig []byte) (*sdk.WorkflowSpecFactory, error) {
// ModifiedConfig, and the test it's used in, show how you can structure config to remove copy/paste issues when data
// needs to be repeated in multiple capability configurations.
type ModifiedConfig struct {
Workflow sdk.NewWorkflowParams
AllowedPartialStaleness string
MaxFrequencyMs uint64
DefaultHeartbeat uint64 `yaml:"default_heartbeat" json:"default_heartbeat"`
Expand Down Expand Up @@ -114,7 +112,7 @@ func NewWorkflowRemapped(rawConfig []byte) (*sdk.WorkflowSpecFactory, error) {
}
ocr3Config.AggregationConfig.Feeds = feeds

workflow := sdk.NewWorkflowSpecFactory(conf.Workflow)
workflow := sdk.NewWorkflowSpecFactory()
streamsTrigger := streamsConfig.New(workflow)

consensus := ocr3Config.New(workflow, "ccip_feeds", ocr3.DataFeedsConsensusInput{
Expand All @@ -134,7 +132,7 @@ func NewWorkflowSpecFromPrimitives(rawConfig []byte) (*sdk.WorkflowSpecFactory,
return nil, err
}

workflow := sdk.NewWorkflowSpecFactory(conf.Workflow)
workflow := sdk.NewWorkflowSpecFactory()
notStreamsTrigger := conf.NotStream.New(workflow)

md := streams.NewSignersMetadataFromFields(
Expand Down Expand Up @@ -213,8 +211,6 @@ func TestBuilder_ValidSpec(t *testing.T) {
require.NoError(t, err)

expected := sdk.WorkflowSpec{
Name: "notccipethsep",
Owner: "0x00000000000000000000000000000000000000aa",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Expand Down Expand Up @@ -291,14 +287,12 @@ func TestBuilder_ValidSpec(t *testing.T) {
})

t.Run("maps work correctly", func(t *testing.T) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "name", Owner: "owner"})
workflow := sdk.NewWorkflowSpecFactory()
trigger := basictrigger.TriggerConfig{Name: "1", Number: 1}.New(workflow)
mapaction.ActionConfig{}.New(workflow, "ref", mapaction.ActionInput{Payload: sdk.Map[string, mapaction.ActionInputsPayload](map[string]sdk.CapDefinition[string]{"Foo": trigger.CoolOutput()})})
spec, err := workflow.Spec()
require.NoError(t, err)
testutils.AssertWorkflowSpec(t, sdk.WorkflowSpec{
Name: "name",
Owner: "owner",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Expand Down Expand Up @@ -328,14 +322,12 @@ func TestBuilder_ValidSpec(t *testing.T) {
})

t.Run("any maps work correctly", func(t *testing.T) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "name", Owner: "owner"})
workflow := sdk.NewWorkflowSpecFactory()
trigger := basictrigger.TriggerConfig{Name: "1", Number: 1}.New(workflow)
anymapaction.MapActionConfig{}.New(workflow, "ref", anymapaction.MapActionInput{Payload: sdk.AnyMap[anymapaction.MapActionInputsPayload](sdk.CapMap{"Foo": trigger.CoolOutput()})})
spec, err := workflow.Spec()
require.NoError(t, err)
testutils.AssertWorkflowSpec(t, sdk.WorkflowSpec{
Name: "name",
Owner: "owner",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Expand Down Expand Up @@ -365,7 +357,7 @@ func TestBuilder_ValidSpec(t *testing.T) {
})

t.Run("ToListDefinition works correctly for list elements", func(t *testing.T) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "name", Owner: "owner"})
workflow := sdk.NewWorkflowSpecFactory()
trigger := listtrigger.TriggerConfig{Name: "1"}.New(workflow)
asList := sdk.ToListDefinition[string](trigger.CoolOutput())
sdk.Compute1(workflow, "compute", sdk.Compute1Inputs[[]string]{Arg0: asList}, func(_ sdk.Runtime, inputs []string) (string, error) {
Expand All @@ -379,8 +371,6 @@ func TestBuilder_ValidSpec(t *testing.T) {
require.NoError(t, err)

testutils.AssertWorkflowSpec(t, sdk.WorkflowSpec{
Name: "name",
Owner: "owner",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Expand Down Expand Up @@ -422,7 +412,7 @@ func TestBuilder_ValidSpec(t *testing.T) {
})

t.Run("ToListDefinition works correctly for built up lists", func(t *testing.T) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "name", Owner: "owner"})
workflow := sdk.NewWorkflowSpecFactory()
trigger := basictrigger.TriggerConfig{Name: "1"}.New(workflow)
asList := sdk.ToListDefinition(sdk.ListOf(trigger.CoolOutput()))
sdk.Compute1(workflow, "compute", sdk.Compute1Inputs[[]string]{Arg0: asList}, func(_ sdk.Runtime, inputs []string) (string, error) {
Expand All @@ -436,8 +426,6 @@ func TestBuilder_ValidSpec(t *testing.T) {
require.NoError(t, err)

testutils.AssertWorkflowSpec(t, sdk.WorkflowSpec{
Name: "name",
Owner: "owner",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Expand Down Expand Up @@ -479,7 +467,7 @@ func TestBuilder_ValidSpec(t *testing.T) {
})

t.Run("ToListDefinition works correctly for hard-coded lists", func(t *testing.T) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "name", Owner: "owner"})
workflow := sdk.NewWorkflowSpecFactory()
trigger := basictrigger.TriggerConfig{Name: "1"}.New(workflow)
list := sdk.ToListDefinition(sdk.ConstantDefinition([]string{"1", "2"}))
sdk.Compute2(workflow, "compute", sdk.Compute2Inputs[string, []string]{Arg0: trigger.CoolOutput(), Arg1: list}, func(_ sdk.Runtime, t string, l []string) (string, error) {
Expand All @@ -493,8 +481,6 @@ func TestBuilder_ValidSpec(t *testing.T) {
require.NoError(t, err)

testutils.AssertWorkflowSpec(t, sdk.WorkflowSpec{
Name: "name",
Owner: "owner",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Expand Down Expand Up @@ -542,14 +528,14 @@ func TestBuilder_ValidSpec(t *testing.T) {
})

t.Run("AnyListOf works like list of but returns a type any", func(t *testing.T) {
workflow1 := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "name", Owner: "owner"})
workflow1 := sdk.NewWorkflowSpecFactory()
trigger := basictrigger.TriggerConfig{Name: "foo", Number: 0}
list := sdk.ListOf(trigger.New(workflow1).CoolOutput())
sdk.Compute1(workflow1, "compute", sdk.Compute1Inputs[[]string]{Arg0: list}, func(_ sdk.Runtime, inputs []string) (string, error) {
return inputs[0], nil
})

workflow2 := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "name", Owner: "owner"})
workflow2 := sdk.NewWorkflowSpecFactory()
anyList := sdk.AnyListOf(trigger.New(workflow2).CoolOutput())
sdk.Compute1(workflow2, "compute", sdk.Compute1Inputs[[]any]{Arg0: anyList}, func(_ sdk.Runtime, inputs []any) (any, error) {
return inputs[0], nil
Expand All @@ -567,7 +553,7 @@ func TestBuilder_ValidSpec(t *testing.T) {
conf, err := UnmarshalYaml[Config](sepoliaConfig)
require.NoError(t, err)

workflow := sdk.NewWorkflowSpecFactory(conf.Workflow)
workflow := sdk.NewWorkflowSpecFactory()
streamsTrigger := conf.Streams.New(workflow)
consensus := conf.Ocr.New(workflow, "ccip_feeds", ocr3.DataFeedsConsensusInput{
Observations: sdk.ListOf[streams.Feed](streamsTrigger)},
Expand All @@ -589,7 +575,7 @@ func TestBuilder_ValidSpec(t *testing.T) {
conf, err := UnmarshalYaml[Config](sepoliaConfig)
require.NoError(t, err)

workflow := sdk.NewWorkflowSpecFactory(conf.Workflow)
workflow := sdk.NewWorkflowSpecFactory()
streamsTrigger := conf.Streams.New(workflow)
consensus := conf.Ocr.New(workflow, "", ocr3.DataFeedsConsensusInput{
Observations: sdk.ListOf[streams.Feed](streamsTrigger)},
Expand All @@ -605,7 +591,7 @@ func TestBuilder_ValidSpec(t *testing.T) {
conf, err := UnmarshalYaml[Config](sepoliaConfig)
require.NoError(t, err)

workflow := sdk.NewWorkflowSpecFactory(conf.Workflow)
workflow := sdk.NewWorkflowSpecFactory()
badStep := sdk.Step[streams.Feed]{
Definition: sdk.StepDefinition{
ID: "[email protected]",
Expand All @@ -632,7 +618,7 @@ func TestBuilder_ValidSpec(t *testing.T) {
conf, err := UnmarshalYaml[Config](sepoliaConfig)
require.NoError(t, err)

workflow := sdk.NewWorkflowSpecFactory(conf.Workflow)
workflow := sdk.NewWorkflowSpecFactory()
streamsTrigger := conf.Streams.New(workflow)
consensus := conf.Ocr.New(workflow, "ccip_feeds", ocr3.DataFeedsConsensusInput{
Observations: sdk.ListOf[streams.Feed](streamsTrigger)},
Expand Down Expand Up @@ -665,7 +651,6 @@ func runSepoliaStagingTest(t *testing.T, config []byte, gen func([]byte) (*sdk.W
}

type NotStreamsConfig struct {
Workflow sdk.NewWorkflowParams
NotStream *notstreams.TriggerConfig `yaml:"not_stream" json:"not_stream"`
Ocr *ModifiedConsensusConfig
ChainWriter *chainwriter.TargetConfig
Expand Down
12 changes: 2 additions & 10 deletions pkg/workflows/sdk/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ func TestCompute(t *testing.T) {
spec, err2 := workflow.Spec()
require.NoError(t, err2)
expectedSpec := sdk.WorkflowSpec{
Name: "name",
Owner: "owner",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Expand Down Expand Up @@ -179,10 +177,7 @@ type ComputeOutput struct {
}

func createComputeWithConfigWorkflow(config ComputeConfig, fn func(_ sdk.Runtime, config ComputeConfig, input basictrigger.TriggerOutputs) (ComputeOutput, error)) *sdk.WorkflowSpecFactory {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{
Owner: "owner",
Name: "name",
})
workflow := sdk.NewWorkflowSpecFactory()

triggerCfg := basictrigger.TriggerConfig{Name: "trigger", Number: 100}
trigger := triggerCfg.New(workflow)
Expand All @@ -202,10 +197,7 @@ func createComputeWithConfigWorkflow(config ComputeConfig, fn func(_ sdk.Runtime
}

func createWorkflow(fn func(_ sdk.Runtime, inputFeed notstreams.Feed) ([]streams.Feed, error)) *sdk.WorkflowSpecFactory {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{
Owner: "owner",
Name: "name",
})
workflow := sdk.NewWorkflowSpecFactory()

trigger := notstreams.TriggerConfig{MaxFrequencyMs: 5000}.New(workflow)
computed := sdk.Compute1(workflow, "Compute", sdk.Compute1Inputs[notstreams.Feed]{Arg0: trigger}, fn)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# At the time of writing, this was taken form the staging deployment
# trigger ref was added so it can match the way it's done by the builder. It's implied as trigger today, and not harmful to add.
# One of the heartbeat and deviation values were modified so that the defaults example can be shown to work.
name: "ccipethsep"
owner: "0x00000000000000000000000000000000000000aa"
triggers:
- id: "[email protected]"
ref: "trigger"
Expand Down Expand Up @@ -76,4 +74,4 @@ targets:
config:
address: "0xE0082363396985ae2FdcC3a9F816A586Eed88416"
deltaStage: "45s"
schedule: "oneAtATime"
schedule: "oneAtATime"
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
workflow:
name: notccipethsep
owner: '0x00000000000000000000000000000000000000aa'
not_stream:
maxFrequencyMs: 5000
ocr:
Expand Down
3 changes: 0 additions & 3 deletions pkg/workflows/sdk/testdata/fixtures/workflows/sepolia.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
workflow:
name: ccipethsep
owner: '0x00000000000000000000000000000000000000aa'
streams:
feedIds:
- '0x0003fbba4fce42f65d6032b18aee53efdf526cc734ad296cb57565979d883bdd'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
workflow:
name: ccipethsep
owner: '0x00000000000000000000000000000000000000aa'
maxFrequencyMs: 5000
default_heartbeat: 3600
default_deviation: '0.05'
Expand Down Expand Up @@ -29,4 +26,4 @@ chainWriter:
deltaStage: 45s
schedule: oneAtATime
targetChain: '[email protected]'
allowedPartialStaleness: '0.5'
allowedPartialStaleness: '0.5'
10 changes: 5 additions & 5 deletions pkg/workflows/sdk/testutils/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestRunner(t *testing.T) {
})

t.Run("Run allows hard-coded values", func(t *testing.T) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "tester", Owner: "ryan"})
workflow := sdk.NewWorkflowSpecFactory()
trigger := basictrigger.TriggerConfig{Name: "trigger", Number: 100}.New(workflow)
hardCodedInput := basicaction.NewActionOutputsFromFields(sdk.ConstantDefinition("hard-coded"))
tTransform := sdk.Compute2[basictrigger.TriggerOutputs, basicaction.ActionOutputs, bool](
Expand Down Expand Up @@ -261,7 +261,7 @@ type ComputeConfig struct {

func TestCompute(t *testing.T) {
t.Run("Inputs don't loose integer types when any is deserialized to", func(t *testing.T) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "name", Owner: "owner"})
workflow := sdk.NewWorkflowSpecFactory()
trigger := basictrigger.TriggerConfig{Name: "foo", Number: 100}.New(workflow)
toMap := sdk.Compute1(workflow, "tomap", sdk.Compute1Inputs[string]{Arg0: trigger.CoolOutput()}, func(runtime sdk.Runtime, i0 string) (map[string]any, error) {
v, err := strconv.Atoi(i0)
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestCompute(t *testing.T) {
})

t.Run("Config interpolates secrets", func(t *testing.T) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "name", Owner: "owner"})
workflow := sdk.NewWorkflowSpecFactory()
trigger := basictrigger.TriggerConfig{Name: "foo", Number: 100}.New(workflow)

conf := ComputeConfig{
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestCompute(t *testing.T) {
}

func registrationWorkflow() (*sdk.WorkflowSpecFactory, map[string]any, map[string]any) {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "tester", Owner: "ryan"})
workflow := sdk.NewWorkflowSpecFactory()
testTriggerConfig := map[string]any{"something": "from nothing"}
trigger := sdk.Step[int]{
Definition: sdk.StepDefinition{
Expand Down Expand Up @@ -369,7 +369,7 @@ func setupAllRunnerMocks(t *testing.T, runner *testutils.Runner) (*testutils.Tri
type actionTransform func(sdk sdk.Runtime, outputs basictrigger.TriggerOutputs) (bool, error)

func createBasicTestWorkflow(actionTransform actionTransform) *sdk.WorkflowSpecFactory {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Name: "tester", Owner: "ryan"})
workflow := sdk.NewWorkflowSpecFactory()
trigger := basictrigger.TriggerConfig{Name: "trigger", Number: 100}.New(workflow)
tTransform := sdk.Compute1[basictrigger.TriggerOutputs, bool](
workflow,
Expand Down
Loading

0 comments on commit 97ceadb

Please sign in to comment.