From b8d30fbe711f7ae35d85b3bf50a7d53e92b9589a Mon Sep 17 00:00:00 2001 From: hovsep Date: Wed, 13 Nov 2024 23:51:16 +0200 Subject: [PATCH 1/2] Better errors when f-mesh hit an error or panic --- cycle/cycle.go | 54 +++++++++++++++++++++++++++++++------------------- fmesh.go | 6 ++---- fmesh_test.go | 17 ++++++++-------- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/cycle/cycle.go b/cycle/cycle.go index 7051b76..3ad6ab8 100644 --- a/cycle/cycle.go +++ b/cycle/cycle.go @@ -1,6 +1,8 @@ package cycle import ( + "errors" + "fmt" "github.com/hovsep/fmesh/common" "github.com/hovsep/fmesh/component" "sync" @@ -23,44 +25,56 @@ func New() *Cycle { } // ActivationResults getter -func (cycle *Cycle) ActivationResults() component.ActivationResultCollection { - return cycle.activationResults +func (c *Cycle) ActivationResults() component.ActivationResultCollection { + return c.activationResults } // HasErrors tells whether the cycle is ended wih activation errors (at lease one component returned an error) -func (cycle *Cycle) HasErrors() bool { - return cycle.ActivationResults().HasErrors() +func (c *Cycle) HasErrors() bool { + return c.ActivationResults().HasErrors() +} + +// ConsolidatedError returns all errors and panics occurred during activation cycle together as single error +func (c *Cycle) ConsolidatedError() error { + var err error + for componentName, activationResult := range c.ActivationResults() { + if activationResult.IsError() || activationResult.IsPanic() { + err = errors.Join(err, fmt.Errorf("component: %s : %w", componentName, activationResult.ActivationError())) + } + } + + return err } // HasPanics tells whether the cycle is ended wih panic(at lease one component panicked) -func (cycle *Cycle) HasPanics() bool { - return cycle.ActivationResults().HasPanics() +func (c *Cycle) HasPanics() bool { + return c.ActivationResults().HasPanics() } // HasActivatedComponents tells when at least one component in the cycle has activated -func (cycle *Cycle) HasActivatedComponents() bool { - return cycle.ActivationResults().HasActivatedComponents() +func (c *Cycle) HasActivatedComponents() bool { + return c.ActivationResults().HasActivatedComponents() } // WithActivationResults adds multiple activation results -func (cycle *Cycle) WithActivationResults(activationResults ...*component.ActivationResult) *Cycle { - cycle.activationResults = cycle.ActivationResults().Add(activationResults...) - return cycle +func (c *Cycle) WithActivationResults(activationResults ...*component.ActivationResult) *Cycle { + c.activationResults = c.ActivationResults().Add(activationResults...) + return c } // Number returns sequence number -func (cycle *Cycle) Number() int { - return cycle.number +func (c *Cycle) Number() int { + return c.number } // WithNumber sets the sequence number -func (cycle *Cycle) WithNumber(number int) *Cycle { - cycle.number = number - return cycle +func (c *Cycle) WithNumber(number int) *Cycle { + c.number = number + return c } -// WithErr returns cycle with error -func (cycle *Cycle) WithErr(err error) *Cycle { - cycle.SetErr(err) - return cycle +// WithErr returns cycle with chained error +func (c *Cycle) WithErr(err error) *Cycle { + c.SetErr(err) + return c } diff --git a/fmesh.go b/fmesh.go index 02345e0..7b98b60 100644 --- a/fmesh.go +++ b/fmesh.go @@ -265,14 +265,12 @@ func (fm *FMesh) mustStop() (bool, error) { switch fm.config.ErrorHandlingStrategy { case StopOnFirstErrorOrPanic: if lastCycle.HasErrors() || lastCycle.HasPanics() { - //@TODO: add failing components names to error - return true, fmt.Errorf("%w, cycle # %d", ErrHitAnErrorOrPanic, lastCycle.Number()) + return true, fmt.Errorf("%w, cycle # %d", errors.Join(ErrHitAnErrorOrPanic, lastCycle.ConsolidatedError()), lastCycle.Number()) } return false, nil case StopOnFirstPanic: - // @TODO: add more context to error if lastCycle.HasPanics() { - return true, ErrHitAPanic + return true, errors.Join(ErrHitAPanic, lastCycle.ConsolidatedError()) } return false, nil case IgnoreAll: diff --git a/fmesh_test.go b/fmesh_test.go index c97e21e..3c21ac8 100644 --- a/fmesh_test.go +++ b/fmesh_test.go @@ -2,7 +2,6 @@ package fmesh import ( "errors" - "fmt" "github.com/hovsep/fmesh/common" "github.com/hovsep/fmesh/component" "github.com/hovsep/fmesh/cycle" @@ -674,7 +673,7 @@ func TestFMesh_mustStop(t *testing.T) { name string getFMesh func() *FMesh want bool - wantErr error + wantErr string }{ { name: "with default config, no time to stop", @@ -691,7 +690,7 @@ func TestFMesh_mustStop(t *testing.T) { return fm }, want: false, - wantErr: nil, + wantErr: "", }, { name: "with default config, reached max cycles", @@ -706,7 +705,7 @@ func TestFMesh_mustStop(t *testing.T) { return fm }, want: true, - wantErr: ErrReachedMaxAllowedCycles, + wantErr: "reached max allowed cycles", }, { name: "mesh finished naturally and must stop", @@ -721,7 +720,7 @@ func TestFMesh_mustStop(t *testing.T) { return fm }, want: true, - wantErr: nil, + wantErr: "", }, { name: "mesh hit an error", @@ -740,7 +739,7 @@ func TestFMesh_mustStop(t *testing.T) { return fm }, want: true, - wantErr: fmt.Errorf("%w, cycle # %d", ErrHitAnErrorOrPanic, 5), + wantErr: "c1 activation finished with error", }, { name: "mesh hit a panic", @@ -758,15 +757,15 @@ func TestFMesh_mustStop(t *testing.T) { return fm }, want: true, - wantErr: ErrHitAPanic, + wantErr: "hit a panic and will be stopped", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { fm := tt.getFMesh() got, stopErr := fm.mustStop() - if tt.wantErr != nil { - assert.EqualError(t, stopErr, tt.wantErr.Error()) + if tt.wantErr != "" { + assert.ErrorContains(t, stopErr, tt.wantErr) } else { assert.NoError(t, stopErr) } From cfc3d2653352da65d697fa95767cd21dc053a3b7 Mon Sep 17 00:00:00 2001 From: hovsep Date: Wed, 13 Nov 2024 23:51:24 +0200 Subject: [PATCH 2/2] Add example --- examples/stateful/main.go | 88 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 examples/stateful/main.go diff --git a/examples/stateful/main.go b/examples/stateful/main.go new file mode 100644 index 0000000..5bf14fd --- /dev/null +++ b/examples/stateful/main.go @@ -0,0 +1,88 @@ +package main + +import ( + "fmt" + "github.com/hovsep/fmesh" + "github.com/hovsep/fmesh/component" + "github.com/hovsep/fmesh/port" + "github.com/hovsep/fmesh/signal" + "os" +) + +// This example shows that components can have external state (internal state is in experimental phase and will be added in further versions of f-mesh) +func main() { + starter := component.New("starter"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) + }) + + layer1 := component.New("layer 1"). + WithDescription("This dummy bypass layer is needed to continue executing, so we will demonstrate that counter is called multiple times"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) + }) + + layer2 := component.New("layer 2"). + WithDescription("This dummy bypass layer is needed to continue executing, so we will demonstrate that counter is called multiple times"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) + }) + + layer3 := component.New("layer 3"). + WithDescription("This dummy bypass layer is needed to continue executing, so we will demonstrate that counter is called multiple times"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) + }) + + //This variable is not part of f-mesh and just mutated from activation function + counterExternalState := 0 + + counter := component.New("counter"). + WithDescription("Stateful counter"). + WithInputs("i1"). + WithOutputs("o1").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + for _, _ = range inputs.ByName("i1").AllSignalsOrNil() { + counterExternalState++ + } + + //Latest state is always kept on o1 + outputs.ByName("o1").Clear().PutSignals(signal.New(counterExternalState)) + return nil + }) + + // Chain: starter->layer1->layer2->layer3 + starter.OutputByName("o1").PipeTo(layer1.InputByName("i1")) + layer1.OutputByName("o1").PipeTo(layer2.InputByName("i1")) + layer2.OutputByName("o1").PipeTo(layer3.InputByName("i1")) + + // Layers 1-3 are also reporting to the counter + layer1.OutputByName("o1").PipeTo(counter.InputByName("i1")) + layer2.OutputByName("o1").PipeTo(counter.InputByName("i1")) + layer3.OutputByName("o1").PipeTo(counter.InputByName("i1")) + + fm := fmesh.New("stateful").WithComponents(starter, layer1, layer2, layer3, counter) + + //Init data (4 heterogeneous signals, value does not matter) + starter.InputByName("i1").PutSignals(signal.NewGroup(1, "a", 0, nil).SignalsOrNil()...) + + //Run the mesh + _, err := fm.Run() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + count := fm.Components().ByName("counter").OutputByName("o1").FirstSignalPayloadOrDefault(0) + + //Expected: 12 (4 signals repeated 3 times (on each layer)) + fmt.Printf("Count: %d", count) + +}