Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stateful example #89

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions cycle/cycle.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cycle

import (
"errors"
"fmt"
"github.com/hovsep/fmesh/common"
"github.com/hovsep/fmesh/component"
"sync"
Expand All @@ -23,44 +25,56 @@ func New() *Cycle {
}

// ActivationResults getter
func (cycle *Cycle) ActivationResults() component.ActivationResultCollection {
return cycle.activationResults
func (c *Cycle) ActivationResults() component.ActivationResultCollection {
return c.activationResults
}

// HasErrors tells whether the cycle is ended wih activation errors (at lease one component returned an error)
func (cycle *Cycle) HasErrors() bool {
return cycle.ActivationResults().HasErrors()
func (c *Cycle) HasErrors() bool {
return c.ActivationResults().HasErrors()
}

// ConsolidatedError returns all errors and panics occurred during activation cycle together as single error
func (c *Cycle) ConsolidatedError() error {
var err error
for componentName, activationResult := range c.ActivationResults() {
if activationResult.IsError() || activationResult.IsPanic() {
err = errors.Join(err, fmt.Errorf("component: %s : %w", componentName, activationResult.ActivationError()))
}
}

return err
}

// HasPanics tells whether the cycle is ended wih panic(at lease one component panicked)
func (cycle *Cycle) HasPanics() bool {
return cycle.ActivationResults().HasPanics()
func (c *Cycle) HasPanics() bool {
return c.ActivationResults().HasPanics()
}

// HasActivatedComponents tells when at least one component in the cycle has activated
func (cycle *Cycle) HasActivatedComponents() bool {
return cycle.ActivationResults().HasActivatedComponents()
func (c *Cycle) HasActivatedComponents() bool {
return c.ActivationResults().HasActivatedComponents()
}

// WithActivationResults adds multiple activation results
func (cycle *Cycle) WithActivationResults(activationResults ...*component.ActivationResult) *Cycle {
cycle.activationResults = cycle.ActivationResults().Add(activationResults...)
return cycle
func (c *Cycle) WithActivationResults(activationResults ...*component.ActivationResult) *Cycle {
c.activationResults = c.ActivationResults().Add(activationResults...)
return c
}

// Number returns sequence number
func (cycle *Cycle) Number() int {
return cycle.number
func (c *Cycle) Number() int {
return c.number
}

// WithNumber sets the sequence number
func (cycle *Cycle) WithNumber(number int) *Cycle {
cycle.number = number
return cycle
func (c *Cycle) WithNumber(number int) *Cycle {
c.number = number
return c
}

// WithErr returns cycle with error
func (cycle *Cycle) WithErr(err error) *Cycle {
cycle.SetErr(err)
return cycle
// WithErr returns cycle with chained error
func (c *Cycle) WithErr(err error) *Cycle {
c.SetErr(err)
return c
}
88 changes: 88 additions & 0 deletions examples/stateful/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"fmt"
"github.com/hovsep/fmesh"
"github.com/hovsep/fmesh/component"
"github.com/hovsep/fmesh/port"
"github.com/hovsep/fmesh/signal"
"os"
)

// This example shows that components can have external state (internal state is in experimental phase and will be added in further versions of f-mesh)
func main() {
starter := component.New("starter").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1"))
})

layer1 := component.New("layer 1").
WithDescription("This dummy bypass layer is needed to continue executing, so we will demonstrate that counter is called multiple times").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1"))
})

layer2 := component.New("layer 2").
WithDescription("This dummy bypass layer is needed to continue executing, so we will demonstrate that counter is called multiple times").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1"))
})

layer3 := component.New("layer 3").
WithDescription("This dummy bypass layer is needed to continue executing, so we will demonstrate that counter is called multiple times").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1"))
})

//This variable is not part of f-mesh and just mutated from activation function
counterExternalState := 0

counter := component.New("counter").
WithDescription("Stateful counter").
WithInputs("i1").
WithOutputs("o1").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
for _, _ = range inputs.ByName("i1").AllSignalsOrNil() {
counterExternalState++
}

//Latest state is always kept on o1
outputs.ByName("o1").Clear().PutSignals(signal.New(counterExternalState))
return nil
})

// Chain: starter->layer1->layer2->layer3
starter.OutputByName("o1").PipeTo(layer1.InputByName("i1"))
layer1.OutputByName("o1").PipeTo(layer2.InputByName("i1"))
layer2.OutputByName("o1").PipeTo(layer3.InputByName("i1"))

// Layers 1-3 are also reporting to the counter
layer1.OutputByName("o1").PipeTo(counter.InputByName("i1"))
layer2.OutputByName("o1").PipeTo(counter.InputByName("i1"))
layer3.OutputByName("o1").PipeTo(counter.InputByName("i1"))

fm := fmesh.New("stateful").WithComponents(starter, layer1, layer2, layer3, counter)

//Init data (4 heterogeneous signals, value does not matter)
starter.InputByName("i1").PutSignals(signal.NewGroup(1, "a", 0, nil).SignalsOrNil()...)

//Run the mesh
_, err := fm.Run()
if err != nil {
fmt.Println(err)
os.Exit(1)
}

count := fm.Components().ByName("counter").OutputByName("o1").FirstSignalPayloadOrDefault(0)

//Expected: 12 (4 signals repeated 3 times (on each layer))
fmt.Printf("Count: %d", count)

}
6 changes: 2 additions & 4 deletions fmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,12 @@ func (fm *FMesh) mustStop() (bool, error) {
switch fm.config.ErrorHandlingStrategy {
case StopOnFirstErrorOrPanic:
if lastCycle.HasErrors() || lastCycle.HasPanics() {
//@TODO: add failing components names to error
return true, fmt.Errorf("%w, cycle # %d", ErrHitAnErrorOrPanic, lastCycle.Number())
return true, fmt.Errorf("%w, cycle # %d", errors.Join(ErrHitAnErrorOrPanic, lastCycle.ConsolidatedError()), lastCycle.Number())
}
return false, nil
case StopOnFirstPanic:
// @TODO: add more context to error
if lastCycle.HasPanics() {
return true, ErrHitAPanic
return true, errors.Join(ErrHitAPanic, lastCycle.ConsolidatedError())
}
return false, nil
case IgnoreAll:
Expand Down
17 changes: 8 additions & 9 deletions fmesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fmesh

import (
"errors"
"fmt"
"github.com/hovsep/fmesh/common"
"github.com/hovsep/fmesh/component"
"github.com/hovsep/fmesh/cycle"
Expand Down Expand Up @@ -674,7 +673,7 @@ func TestFMesh_mustStop(t *testing.T) {
name string
getFMesh func() *FMesh
want bool
wantErr error
wantErr string
}{
{
name: "with default config, no time to stop",
Expand All @@ -691,7 +690,7 @@ func TestFMesh_mustStop(t *testing.T) {
return fm
},
want: false,
wantErr: nil,
wantErr: "",
},
{
name: "with default config, reached max cycles",
Expand All @@ -706,7 +705,7 @@ func TestFMesh_mustStop(t *testing.T) {
return fm
},
want: true,
wantErr: ErrReachedMaxAllowedCycles,
wantErr: "reached max allowed cycles",
},
{
name: "mesh finished naturally and must stop",
Expand All @@ -721,7 +720,7 @@ func TestFMesh_mustStop(t *testing.T) {
return fm
},
want: true,
wantErr: nil,
wantErr: "",
},
{
name: "mesh hit an error",
Expand All @@ -740,7 +739,7 @@ func TestFMesh_mustStop(t *testing.T) {
return fm
},
want: true,
wantErr: fmt.Errorf("%w, cycle # %d", ErrHitAnErrorOrPanic, 5),
wantErr: "c1 activation finished with error",
},
{
name: "mesh hit a panic",
Expand All @@ -758,15 +757,15 @@ func TestFMesh_mustStop(t *testing.T) {
return fm
},
want: true,
wantErr: ErrHitAPanic,
wantErr: "hit a panic and will be stopped",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fm := tt.getFMesh()
got, stopErr := fm.mustStop()
if tt.wantErr != nil {
assert.EqualError(t, stopErr, tt.wantErr.Error())
if tt.wantErr != "" {
assert.ErrorContains(t, stopErr, tt.wantErr)
} else {
assert.NoError(t, stopErr)
}
Expand Down
Loading