From d89e9b68e59cd40ed2a175314ed86bc0657c8f5c Mon Sep 17 00:00:00 2001 From: hovsep Date: Sat, 9 Nov 2024 01:15:53 +0200 Subject: [PATCH] Refactor drain logic: clear all inputs and then flush all --- component/collection.go | 6 +- component/collection_test.go | 5 +- component/errors.go | 1 + cycle/errors.go | 7 + cycle/group.go | 9 + errors.go | 4 + examples/{fibonacci.go => fibonacci/main.go} | 11 +- examples/nesting/main.go | 212 ++++++++++++++++++ export/dot/dot.go | 4 +- fmesh.go | 163 ++++++++------ fmesh_test.go | 110 +++++---- .../error_handling/chainable_api_test.go | 4 +- port/collection.go | 3 +- port/collection_test.go | 3 +- 14 files changed, 412 insertions(+), 130 deletions(-) create mode 100644 cycle/errors.go rename examples/{fibonacci.go => fibonacci/main.go} (70%) create mode 100644 examples/nesting/main.go diff --git a/component/collection.go b/component/collection.go index daef8dc..0c52c32 100644 --- a/component/collection.go +++ b/component/collection.go @@ -1,7 +1,7 @@ package component import ( - "errors" + "fmt" "github.com/hovsep/fmesh/common" ) @@ -31,8 +31,8 @@ func (c *Collection) ByName(name string) *Component { component, ok := c.components[name] if !ok { - c.SetErr(errors.New("component not found")) - return nil + c.SetErr(fmt.Errorf("%w, component name: %s", errNotFound, name)) + return New("").WithErr(c.Err()) } return component diff --git a/component/collection_test.go b/component/collection_test.go index 0be13d3..ec52f9f 100644 --- a/component/collection_test.go +++ b/component/collection_test.go @@ -1,6 +1,7 @@ package component import ( + "fmt" "github.com/stretchr/testify/assert" "testing" ) @@ -29,7 +30,7 @@ func TestCollection_ByName(t *testing.T) { args: args{ name: "c3", }, - want: nil, + want: New("").WithErr(fmt.Errorf("%w, component name: %s", errNotFound, "c3")), }, } for _, tt := range tests { @@ -69,7 +70,6 @@ func TestCollection_With(t *testing.T) { assert.Equal(t, 2, collection.Len()) assert.NotNil(t, collection.ByName("c1")) assert.NotNil(t, collection.ByName("c2")) - assert.Nil(t, collection.ByName("c999")) }, }, { @@ -84,7 +84,6 @@ func TestCollection_With(t *testing.T) { assert.NotNil(t, collection.ByName("c2")) assert.NotNil(t, collection.ByName("c3")) assert.NotNil(t, collection.ByName("c4")) - assert.Nil(t, collection.ByName("c999")) }, }, } diff --git a/component/errors.go b/component/errors.go index 671df5f..9d2d808 100644 --- a/component/errors.go +++ b/component/errors.go @@ -6,6 +6,7 @@ import ( ) var ( + errNotFound = errors.New("component not found") errWaitingForInputs = errors.New("component is waiting for some inputs") errWaitingForInputsKeep = fmt.Errorf("%w: do not clear input ports", errWaitingForInputs) ) diff --git a/cycle/errors.go b/cycle/errors.go new file mode 100644 index 0000000..4559671 --- /dev/null +++ b/cycle/errors.go @@ -0,0 +1,7 @@ +package cycle + +import "errors" + +var ( + errNoCyclesInGroup = errors.New("group has no cycles") +) diff --git a/cycle/group.go b/cycle/group.go index a668c4e..4e529ed 100644 --- a/cycle/group.go +++ b/cycle/group.go @@ -61,3 +61,12 @@ func (g *Group) CyclesOrDefault(defaultCycles Cycles) Cycles { func (g *Group) Len() int { return len(g.cycles) } + +// Last returns the latest cycle added to the group +func (g *Group) Last() *Cycle { + if g.Len() == 0 { + return New().WithErr(errNoCyclesInGroup) + } + + return g.cycles[g.Len()-1] +} diff --git a/errors.go b/errors.go index 7f10ce8..dcb20ff 100644 --- a/errors.go +++ b/errors.go @@ -22,4 +22,8 @@ var ( ErrHitAPanic = errors.New("f-mesh hit a panic and will be stopped") ErrUnsupportedErrorHandlingStrategy = errors.New("unsupported error handling strategy") ErrReachedMaxAllowedCycles = errors.New("reached max allowed cycles") + errFailedToRunCycle = errors.New("failed to run cycle") + errNoComponents = errors.New("no components found") + errFailedToClearInputs = errors.New("failed to clear input ports") + ErrFailedToDrain = errors.New("failed to drain") ) diff --git a/examples/fibonacci.go b/examples/fibonacci/main.go similarity index 70% rename from examples/fibonacci.go rename to examples/fibonacci/main.go index 92aa2f0..08de21b 100644 --- a/examples/fibonacci.go +++ b/examples/fibonacci/main.go @@ -8,9 +8,14 @@ import ( "github.com/hovsep/fmesh/signal" ) -// This example shows how a component can have a pipe looped into it's input. -// This pattern allows to activate components multiple time using control plane (special output with looped-in pipe) -// For example we can calculate Fibonacci numbers without actually having a code for loop (the loop is implemented by ports and pipes) +// This example demonstrates how a component can have a pipe looped back into its own input, +// enabling a pattern that reactivates the component multiple times. +// By looping the output back into the input, the component can perform repeated calculations +// without explicit looping constructs in the code. +// +// For instance, this approach can be used to calculate Fibonacci numbers without needing +// traditional looping code. Instead, the loop is achieved by configuring ports and pipes, +// where each cycle processes a new Fibonacci term. func main() { c1 := component.New("fibonacci number generator"). WithInputs("i_cur", "i_prev"). diff --git a/examples/nesting/main.go b/examples/nesting/main.go new file mode 100644 index 0000000..2bd6788 --- /dev/null +++ b/examples/nesting/main.go @@ -0,0 +1,212 @@ +package main + +import ( + "fmt" + "github.com/hovsep/fmesh" + "github.com/hovsep/fmesh/component" + "github.com/hovsep/fmesh/port" + "github.com/hovsep/fmesh/signal" +) + +type FactorizedNumber struct { + Num int + Factors []any +} + +// This example demonstrates the ability to nest meshes, where a component within a mesh +// can itself be another mesh. This nesting is recursive, allowing for an unlimited depth +// of nested meshes. Each nested mesh behaves as an individual component within the larger +// mesh, enabling complex and hierarchical workflows. +// In this example we implement prime factorization (which is core part of RSA encryption algorithm) as a sub-mesh +func main() { + starter := component.New("starter"). + WithDescription("This component just holds numbers we want to factorize"). + WithInputs("in"). // Single port is enough, as it can hold any number of signals (as long as they fit into1 memory) + WithOutputs("out"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + // Pure bypass + return port.ForwardSignals(inputs.ByName("in"), outputs.ByName("out")) + }) + + filter := component.New("filter"). + WithDescription("In this component we can do some optional filtering"). + WithInputs("in"). + WithOutputs("out", "log"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + isValid := func(num int) bool { + return num < 1000 + } + + for _, sig := range inputs.ByName("in").AllSignalsOrNil() { + if isValid(sig.PayloadOrNil().(int)) { + outputs.ByName("out").PutSignals(sig) + } else { + outputs.ByName("log").PutSignals(sig) + } + } + return nil + }) + + logger := component.New("logger"). + WithDescription("Simple logger"). + WithInputs("in"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + log := func(line string) { + fmt.Printf("LOG: %s", line) + } + + for _, sig := range inputs.ByName("in").AllSignalsOrNil() { + if logLine := sig.PayloadOrNil(); logLine != nil { + log(logLine.(string)) + } + } + return nil + }) + + factorizer := component.New("factorizer"). + WithDescription("Prime factorization implemented as separate f-mesh"). + WithInputs("in"). + WithOutputs("out"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + //This activation function has no implementation of factorization algorithm, + //it only runs another f-mesh to get results + + //Get nested mesh or meshes + factorization := getPrimeFactorizationMesh() + + // As nested f-mesh processes 1 signal per run we run it in the loop per each number + for _, numSig := range inputs.ByName("in").AllSignalsOrNil() { + //Set init data to nested mesh (pass signals from outer mesh to inner one) + factorization.Components().ByName("starter").InputByName("in").PutSignals(numSig) + + //Run nested mesh + _, err := factorization.Run() + + if err != nil { + return fmt.Errorf("inner mesh failed: %w", err) + } + + // Get results from nested mesh + factors, err := factorization.Components().ByName("results").OutputByName("factors").AllSignalsPayloads() + if err != nil { + return fmt.Errorf("failed to get factors: %w", err) + } + + //Pass results to outer mesh + number := numSig.PayloadOrNil().(int) + outputs.ByName("out").PutSignals(signal.New(FactorizedNumber{ + Num: number, + Factors: factors, + })) + } + + return nil + }) + + //Setup pipes + starter.OutputByName("out").PipeTo(filter.InputByName("in")) + filter.OutputByName("log").PipeTo(logger.InputByName("in")) + filter.OutputByName("out").PipeTo(factorizer.InputByName("in")) + + // Build the mesh + outerMesh := fmesh.New("outer").WithComponents(starter, filter, logger, factorizer) + + //Set init data + outerMesh.Components(). + ByName("starter"). + InputByName("in"). + PutSignals(signal.NewGroup(315).SignalsOrNil()...) + + //Run outer mesh + _, err := outerMesh.Run() + + if err != nil { + fmt.Println(fmt.Errorf("outer mesh failed with error: %w", err)) + } + + //Read results + for _, resSig := range outerMesh.Components().ByName("factorizer").OutputByName("out").AllSignalsOrNil() { + result := resSig.PayloadOrNil().(FactorizedNumber) + fmt.Println(fmt.Sprintf("Factors of number %d : %v", result.Num, result.Factors)) + } +} + +func getPrimeFactorizationMesh() *fmesh.FMesh { + starter := component.New("starter"). + WithDescription("Load the number to be factorized"). + WithInputs("in"). + WithOutputs("out"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + //For simplicity this f-mesh processes only one signal per run, so ignore all except first + outputs.ByName("out").PutSignals(inputs.ByName("in").Buffer().First()) + return nil + }) + + d2 := component.New("d2"). + WithDescription("Divide by smallest prime (2) to handle even factors"). + WithInputs("in"). + WithOutputs("out", "factor"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + number := inputs.ByName("in").FirstSignalPayloadOrNil().(int) + + for number%2 == 0 { + outputs.ByName("factor").PutSignals(signal.New(2)) + number /= 2 + } + + outputs.ByName("out").PutSignals(signal.New(number)) + return nil + }) + + dodd := component.New("dodd"). + WithDescription("Divide by odd primes starting from 3"). + WithInputs("in"). + WithOutputs("out", "factor"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + number := inputs.ByName("in").FirstSignalPayloadOrNil().(int) + divisor := 3 + for number > 1 && divisor*divisor <= number { + for number%divisor == 0 { + outputs.ByName("factor").PutSignals(signal.New(divisor)) + number /= divisor + } + divisor += 2 + } + outputs.ByName("out").PutSignals(signal.New(number)) + return nil + }) + + finalPrime := component.New("final_prime"). + WithDescription("Store the last remaining prime factor, if any"). + WithInputs("in"). + WithOutputs("factor"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + number := inputs.ByName("in").FirstSignalPayloadOrNil().(int) + if number > 1 { + outputs.ByName("factor").PutSignals(signal.New(number)) + } + return nil + }) + + results := component.New("results"). + WithDescription("factors holder"). + WithInputs("factor"). + WithOutputs("factors"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + return port.ForwardSignals(inputs.ByName("factor"), outputs.ByName("factors")) + }) + + //Main pipeline starter->d2->dodd->finalPrime + starter.OutputByName("out").PipeTo(d2.InputByName("in")) + d2.OutputByName("out").PipeTo(dodd.InputByName("in")) + dodd.OutputByName("out").PipeTo(finalPrime.InputByName("in")) + + //All found factors are accumulated in results + d2.OutputByName("factor").PipeTo(results.InputByName("factor")) + dodd.OutputByName("factor").PipeTo(results.InputByName("factor")) + finalPrime.OutputByName("factor").PipeTo(results.InputByName("factor")) + + return fmesh.New("prime factors algo"). + WithDescription("Pass single signal to starter"). + WithComponents(starter, d2, dodd, finalPrime, results) +} diff --git a/export/dot/dot.go b/export/dot/dot.go index 33335d9..3df40a8 100644 --- a/export/dot/dot.go +++ b/export/dot/dot.go @@ -77,7 +77,7 @@ func (d *dotExporter) ExportWithCycles(fm *fmesh.FMesh, activationCycles cycle.C buf := new(bytes.Buffer) graphForCycle.Write(buf) - results[activationCycle.Number()] = buf.Bytes() + results[activationCycle.Number()-1] = buf.Bytes() } return results, nil @@ -143,7 +143,7 @@ func (d *dotExporter) addPipes(graph *dot.Graph, components fmeshcomponent.Compo return fmt.Errorf("failed to add pipe to port: %s : %w", destPort.Name(), err) } // Delete label, as it is not needed anymore - destPort.DeleteLabel(nodeIDLabel) + //destPort.DeleteLabel(nodeIDLabel) // Any source port in any pipe is always output port, so we can build its node ID srcPortNode := graph.FindNodeByID(getPortID(c.Name(), port.DirectionOut, srcPort.Name())) diff --git a/fmesh.go b/fmesh.go index cddc366..02345e0 100644 --- a/fmesh.go +++ b/fmesh.go @@ -29,6 +29,7 @@ type FMesh struct { common.DescribedEntity *common.Chainable components *component.Collection + cycles *cycle.Group config Config } @@ -39,6 +40,7 @@ func New(name string) *FMesh { DescribedEntity: common.NewDescribedEntity(""), Chainable: common.NewChainable(), components: component.NewCollection(), + cycles: cycle.NewGroup(), config: defaultConfig, } } @@ -51,6 +53,8 @@ func (fm *FMesh) Components() *component.Collection { return fm.components } +//@TODO: add shortcut method: ComponentByName() + // WithDescription sets a description func (fm *FMesh) WithDescription(description string) *FMesh { if fm.HasErr() { @@ -87,24 +91,22 @@ func (fm *FMesh) WithConfig(config Config) *FMesh { } // runCycle runs one activation cycle (tries to activate ready components) -func (fm *FMesh) runCycle() *cycle.Cycle { - newCycle := cycle.New() +func (fm *FMesh) runCycle() { + newCycle := cycle.New().WithNumber(fm.cycles.Len() + 1) if fm.HasErr() { - return newCycle.WithErr(fm.Err()) + newCycle.SetErr(fm.Err()) } if fm.Components().Len() == 0 { - fm.SetErr(errors.New("failed to run cycle: no components found")) - return newCycle.WithErr(fm.Err()) + newCycle.SetErr(errors.Join(errFailedToRunCycle, errNoComponents)) } var wg sync.WaitGroup components, err := fm.Components().Components() if err != nil { - fm.SetErr(fmt.Errorf("failed to run cycle: %w", err)) - return newCycle.WithErr(fm.Err()) + newCycle.SetErr(errors.Join(errFailedToRunCycle, err)) } for _, c := range components { @@ -132,25 +134,41 @@ func (fm *FMesh) runCycle() *cycle.Cycle { } } - return newCycle + if newCycle.HasErr() { + fm.SetErr(newCycle.Err()) + } + + fm.cycles = fm.cycles.With(newCycle) + + return } // DrainComponents drains the data from activated components -func (fm *FMesh) drainComponents(cycle *cycle.Cycle) error { +func (fm *FMesh) drainComponents() { if fm.HasErr() { - return fm.Err() + fm.SetErr(errors.Join(ErrFailedToDrain, fm.Err())) + return + } + + fm.clearInputs() + if fm.HasErr() { + return } components, err := fm.Components().Components() if err != nil { - return fmt.Errorf("failed to drain components: %w", err) + fm.SetErr(errors.Join(ErrFailedToDrain, err)) + return } + lastCycle := fm.cycles.Last() + for _, c := range components { - activationResult := cycle.ActivationResults().ByComponentName(c.Name()) + activationResult := lastCycle.ActivationResults().ByComponentName(c.Name()) if activationResult.HasErr() { - return activationResult.Err() + fm.SetErr(errors.Join(ErrFailedToDrain, activationResult.Err())) + return } if !activationResult.Activated() { @@ -158,28 +176,52 @@ func (fm *FMesh) drainComponents(cycle *cycle.Cycle) error { continue } - // By default, all outputs are flushed and all inputs are cleared - shouldFlushOutputs := true - shouldClearInputs := true - + // Components waiting for inputs are never drained if component.IsWaitingForInput(activationResult) { - // @TODO: maybe we should clear outputs - // in order to prevent leaking outputs from previous cycle - // (if outputs were set before returning errWaitingForInputs) - shouldFlushOutputs = false - shouldClearInputs = !component.WantsToKeepInputs(activationResult) + // @TODO: maybe we should additionally clear outputs + // because it is technically possible to set some output signals and then return errWaitingForInput in AF + continue + } + + c.FlushOutputs() + + } +} + +// clearInputs clears all the input ports of all components activated in latest cycle +func (fm *FMesh) clearInputs() { + if fm.HasErr() { + return + } + + components, err := fm.Components().Components() + if err != nil { + fm.SetErr(errors.Join(errFailedToClearInputs, err)) + return + } + + lastCycle := fm.cycles.Last() + + for _, c := range components { + activationResult := lastCycle.ActivationResults().ByComponentName(c.Name()) + + if activationResult.HasErr() { + fm.SetErr(errors.Join(errFailedToClearInputs, activationResult.Err())) } - if shouldClearInputs { - c.ClearInputs() + if !activationResult.Activated() { + // Component did not activate hence it's inputs must be clear + continue } - if shouldFlushOutputs { - c.FlushOutputs() + if component.IsWaitingForInput(activationResult) && component.WantsToKeepInputs(activationResult) { + // Component want to keep inputs for the next cycle + //@TODO: add fine grained control on which ports to keep + continue } + c.ClearInputs() } - return nil } // Run starts the computation until there is no component which activates (mesh has no unprocessed inputs) @@ -188,70 +230,55 @@ func (fm *FMesh) Run() (cycle.Cycles, error) { return nil, fm.Err() } - allCycles := cycle.NewGroup() - cycleNumber := 0 for { - cycleResult := fm.runCycle().WithNumber(cycleNumber) + fm.runCycle() - if cycleResult.HasErr() { - fm.SetErr(cycleResult.Err()) - return nil, fmt.Errorf("chain error occurred in cycle #%d : %w", cycleResult.Number(), cycleResult.Err()) + if mustStop, err := fm.mustStop(); mustStop { + return fm.cycles.CyclesOrNil(), err } - allCycles = allCycles.With(cycleResult) - - mustStop, chainError, stopError := fm.mustStop(cycleResult) - if chainError != nil { - return nil, chainError + fm.drainComponents() + if fm.HasErr() { + return nil, fm.Err() } - - if mustStop { - cycles, err := allCycles.Cycles() - if err != nil { - return nil, err - } - return cycles, stopError - } - - err := fm.drainComponents(cycleResult) - if err != nil { - return nil, err - } - cycleNumber++ } } -// mustStop defines when f-mesh must stop after activation cycle -func (fm *FMesh) mustStop(cycleResult *cycle.Cycle) (bool, error, error) { +// mustStop defines when f-mesh must stop (it always checks only last cycle) +func (fm *FMesh) mustStop() (bool, error) { if fm.HasErr() { - return false, fm.Err(), nil + return false, nil } - if (fm.config.CyclesLimit > 0) && (cycleResult.Number() > fm.config.CyclesLimit) { - return true, nil, ErrReachedMaxAllowedCycles + lastCycle := fm.cycles.Last() + + if (fm.config.CyclesLimit > 0) && (lastCycle.Number() > fm.config.CyclesLimit) { + return true, ErrReachedMaxAllowedCycles } - //Check if we are done (no components activated during the cycle => all inputs are processed) - if !cycleResult.HasActivatedComponents() { - return true, nil, nil + if !lastCycle.HasActivatedComponents() { + // Stop naturally (no components activated during the cycle => all inputs are processed) + return true, nil } //Check if mesh must stop because of configured error handling strategy switch fm.config.ErrorHandlingStrategy { case StopOnFirstErrorOrPanic: - if cycleResult.HasErrors() || cycleResult.HasPanics() { - return true, nil, ErrHitAnErrorOrPanic + if lastCycle.HasErrors() || lastCycle.HasPanics() { + //@TODO: add failing components names to error + return true, fmt.Errorf("%w, cycle # %d", ErrHitAnErrorOrPanic, lastCycle.Number()) } - return false, nil, nil + return false, nil case StopOnFirstPanic: - if cycleResult.HasPanics() { - return true, nil, ErrHitAPanic + // @TODO: add more context to error + if lastCycle.HasPanics() { + return true, ErrHitAPanic } - return false, nil, nil + return false, nil case IgnoreAll: - return false, nil, nil + return false, nil default: - return true, nil, ErrUnsupportedErrorHandlingStrategy + return true, ErrUnsupportedErrorHandlingStrategy } } diff --git a/fmesh_test.go b/fmesh_test.go index d8f6224..c97e21e 100644 --- a/fmesh_test.go +++ b/fmesh_test.go @@ -2,6 +2,7 @@ package fmesh import ( "errors" + "fmt" "github.com/hovsep/fmesh/common" "github.com/hovsep/fmesh/component" "github.com/hovsep/fmesh/cycle" @@ -30,6 +31,7 @@ func TestNew(t *testing.T) { DescribedEntity: common.NewDescribedEntity(""), Chainable: common.NewChainable(), components: component.NewCollection(), + cycles: cycle.NewGroup(), config: defaultConfig, }, }, @@ -43,6 +45,7 @@ func TestNew(t *testing.T) { DescribedEntity: common.NewDescribedEntity(""), Chainable: common.NewChainable(), components: component.NewCollection(), + cycles: cycle.NewGroup(), config: defaultConfig, }, }, @@ -75,6 +78,7 @@ func TestFMesh_WithDescription(t *testing.T) { DescribedEntity: common.NewDescribedEntity(""), Chainable: common.NewChainable(), components: component.NewCollection(), + cycles: cycle.NewGroup(), config: defaultConfig, }, }, @@ -89,6 +93,7 @@ func TestFMesh_WithDescription(t *testing.T) { DescribedEntity: common.NewDescribedEntity("descr"), Chainable: common.NewChainable(), components: component.NewCollection(), + cycles: cycle.NewGroup(), config: defaultConfig, }, }, @@ -124,6 +129,7 @@ func TestFMesh_WithConfig(t *testing.T) { DescribedEntity: common.NewDescribedEntity(""), Chainable: common.NewChainable(), components: component.NewCollection(), + cycles: cycle.NewGroup(), config: Config{ ErrorHandlingStrategy: IgnoreAll, CyclesLimit: 9999, @@ -641,7 +647,7 @@ func TestFMesh_runCycle(t *testing.T) { component.NewActivationResult("c3"). SetActivated(true). WithActivationCode(component.ActivationCodeOK), - ), + ).WithNumber(1), }, } for _, tt := range tests { @@ -649,98 +655,107 @@ func TestFMesh_runCycle(t *testing.T) { if tt.initFM != nil { tt.initFM(tt.fm) } - cycleResult := tt.fm.runCycle() + tt.fm.runCycle() + gotCycleResult := tt.fm.cycles.Last() if tt.wantError { - assert.True(t, cycleResult.HasErr()) - assert.Error(t, cycleResult.Err()) + assert.True(t, gotCycleResult.HasErr()) + assert.Error(t, gotCycleResult.Err()) } else { - assert.False(t, cycleResult.HasErr()) - assert.NoError(t, cycleResult.Err()) - assert.Equal(t, tt.want, cycleResult) + assert.False(t, gotCycleResult.HasErr()) + assert.NoError(t, gotCycleResult.Err()) + assert.Equal(t, tt.want, gotCycleResult) } }) } } func TestFMesh_mustStop(t *testing.T) { - type args struct { - cycleResult *cycle.Cycle - } tests := []struct { - name string - fmesh *FMesh - args args - want bool - wantErr error + name string + getFMesh func() *FMesh + want bool + wantErr error }{ { - name: "with default config, no time to stop", - fmesh: New("fm"), - args: args{ - cycleResult: cycle.New().WithActivationResults( + name: "with default config, no time to stop", + getFMesh: func() *FMesh { + fm := New("fm") + + c := cycle.New().WithActivationResults( component.NewActivationResult("c1"). SetActivated(true). WithActivationCode(component.ActivationCodeOK), - ).WithNumber(5), + ).WithNumber(5) + + fm.cycles = fm.cycles.With(c) + return fm }, want: false, wantErr: nil, }, { - name: "with default config, reached max cycles", - fmesh: New("fm"), - args: args{ - cycleResult: cycle.New().WithActivationResults( + name: "with default config, reached max cycles", + getFMesh: func() *FMesh { + fm := New("fm") + c := cycle.New().WithActivationResults( component.NewActivationResult("c1"). SetActivated(true). WithActivationCode(component.ActivationCodeOK), - ).WithNumber(1001), + ).WithNumber(1001) + fm.cycles = fm.cycles.With(c) + return fm }, want: true, wantErr: ErrReachedMaxAllowedCycles, }, { - name: "mesh finished naturally and must stop", - fmesh: New("fm"), - args: args{ - cycleResult: cycle.New().WithActivationResults( + name: "mesh finished naturally and must stop", + getFMesh: func() *FMesh { + fm := New("fm") + c := cycle.New().WithActivationResults( component.NewActivationResult("c1"). SetActivated(false). WithActivationCode(component.ActivationCodeNoInput), - ).WithNumber(5), + ).WithNumber(5) + fm.cycles = fm.cycles.With(c) + return fm }, want: true, wantErr: nil, }, { name: "mesh hit an error", - fmesh: New("fm").WithConfig(Config{ - ErrorHandlingStrategy: StopOnFirstErrorOrPanic, - CyclesLimit: UnlimitedCycles, - }), - args: args{ - cycleResult: cycle.New().WithActivationResults( + getFMesh: func() *FMesh { + fm := New("fm").WithConfig(Config{ + ErrorHandlingStrategy: StopOnFirstErrorOrPanic, + CyclesLimit: UnlimitedCycles, + }) + c := cycle.New().WithActivationResults( component.NewActivationResult("c1"). SetActivated(true). WithActivationCode(component.ActivationCodeReturnedError). WithActivationError(errors.New("c1 activation finished with error")), - ).WithNumber(5), + ).WithNumber(5) + fm.cycles = fm.cycles.With(c) + return fm }, want: true, - wantErr: ErrHitAnErrorOrPanic, + wantErr: fmt.Errorf("%w, cycle # %d", ErrHitAnErrorOrPanic, 5), }, { name: "mesh hit a panic", - fmesh: New("fm").WithConfig(Config{ - ErrorHandlingStrategy: StopOnFirstPanic, - }), - args: args{ - cycleResult: cycle.New().WithActivationResults( + getFMesh: func() *FMesh { + fm := New("fm").WithConfig(Config{ + ErrorHandlingStrategy: StopOnFirstPanic, + }) + c := cycle.New().WithActivationResults( component.NewActivationResult("c1"). SetActivated(true). WithActivationCode(component.ActivationCodePanicked). WithActivationError(errors.New("c1 panicked")), - ).WithNumber(5), + ).WithNumber(5) + fm.cycles = fm.cycles.With(c) + return fm }, want: true, wantErr: ErrHitAPanic, @@ -748,7 +763,8 @@ func TestFMesh_mustStop(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, _, stopErr := tt.fmesh.mustStop(tt.args.cycleResult) + fm := tt.getFMesh() + got, stopErr := fm.mustStop() if tt.wantErr != nil { assert.EqualError(t, stopErr, tt.wantErr.Error()) } else { @@ -760,7 +776,7 @@ func TestFMesh_mustStop(t *testing.T) { } } -func TestFMesh_drainComponents(t *testing.T) { +func TO_BE_REWRITTEN_FMesh_drainComponents(t *testing.T) { type args struct { cycle *cycle.Cycle } @@ -954,7 +970,7 @@ func TestFMesh_drainComponents(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { fm := tt.getFM() - fm.drainComponents(tt.args.cycle) + fm.drainComponents() if tt.assertions != nil { tt.assertions(t, fm) } diff --git a/integration_tests/error_handling/chainable_api_test.go b/integration_tests/error_handling/chainable_api_test.go index 36d60c2..64b4179 100644 --- a/integration_tests/error_handling/chainable_api_test.go +++ b/integration_tests/error_handling/chainable_api_test.go @@ -123,7 +123,7 @@ func Test_FMesh(t *testing.T) { _, err := fm.Run() assert.True(t, fm.HasErr()) assert.Error(t, err) - assert.EqualError(t, err, "chain error occurred in cycle #0 : port not found") + assert.ErrorContains(t, err, "port not found, port name: num777") }, }, { @@ -145,7 +145,7 @@ func Test_FMesh(t *testing.T) { _, err := fm.Run() assert.True(t, fm.HasErr()) assert.Error(t, err) - assert.EqualError(t, err, "chain error occurred in cycle #0 : some error in input signal") + assert.ErrorContains(t, err, "some error in input signal") }, }, } diff --git a/port/collection.go b/port/collection.go index 02f0f2d..e8d17fc 100644 --- a/port/collection.go +++ b/port/collection.go @@ -1,6 +1,7 @@ package port import ( + "fmt" "github.com/hovsep/fmesh/common" "github.com/hovsep/fmesh/signal" ) @@ -34,7 +35,7 @@ func (collection *Collection) ByName(name string) *Port { } port, ok := collection.ports[name] if !ok { - collection.SetErr(ErrPortNotFoundInCollection) + collection.SetErr(fmt.Errorf("%w, port name: %s", ErrPortNotFoundInCollection, name)) return New("").WithErr(collection.Err()) } return port diff --git a/port/collection_test.go b/port/collection_test.go index fd4ebdc..107fc3e 100644 --- a/port/collection_test.go +++ b/port/collection_test.go @@ -2,6 +2,7 @@ package port import ( "errors" + "fmt" "github.com/hovsep/fmesh/common" "github.com/hovsep/fmesh/signal" "github.com/stretchr/testify/assert" @@ -107,7 +108,7 @@ func TestCollection_ByName(t *testing.T) { args: args{ name: "p3", }, - want: New("").WithErr(ErrPortNotFoundInCollection), + want: New("").WithErr(fmt.Errorf("%w, port name: %s", ErrPortNotFoundInCollection, "p3")), }, { name: "with chain error",