From 04f272728250d7cf461e331504710c70f72ea859 Mon Sep 17 00:00:00 2001
From: hovsep
Date: Fri, 4 Oct 2024 01:09:10 +0300
Subject: [PATCH 1/3] Update readme
---
README.md | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
diff --git a/README.md b/README.md
index 1fe3423..e426b00 100644
--- a/README.md
+++ b/README.md
@@ -8,25 +8,26 @@
What is it?
F-Mesh is a simplistic FBP-inspired framework in Go.
-It allows you to express your program as a mesh of interconnected components.
+It allows you to express your program as a mesh of interconnected components.
+You can think of it as a simple functions orchestrator.
+
Main concepts:
- F-Mesh consists of multiple Components - the main building blocks
-- Components have unlimited number of input and output
Ports
.
+- Components have unlimited number of input and output
Ports
- The main job of each component is to read inputs and provide outputs
-- Each port of the component can be connected to one or multiple ports of any other component. Such connections are called Pipes
-- It does not matter whether port is input or output, any port can be connected to any other port
+- Any output port can be connected to any input port via Pipes
- The component behaviour is defined by its Activation function
- The framework checks when components are ready to be activated and calls their activation functions concurrently
- One such iteration is called Activation cycle
-- On each activation cycle the framework does same things: activates all the components ready to be activated, flushes the data through pipes and disposes processed Signals (the data chunks flowing between components)
+- On each activation cycle the framework does same things: activates all the components ready for activation, flushes the data through pipes and disposes input Signals (the data chunks flowing between components)
- Ports and pipes are type agnostic, any data can be transferred or aggregated on any port
- The framework works in discrete time, not it wall time. The quant of time is 1 activation cycle, which gives you "logical parallelism" out of the box
- F-Mesh is suitable for logical wireframing, simulation, functional-style computations and implementing simple concurrency patterns without using the concurrency primitives like channels or any sort of locks
What it is not?
-F-mesh is not a classical FBP implementation, and it is not fully async. It does not support long-running components or wall-time events (like timers and tickers).
+F-mesh is not a classical FBP implementation, and it is not fully async. It does not support long-running components or wall-time events (like timers and tickers)
The framework is not suitable for implementing complex concurrent systems
Example:
@@ -54,7 +55,10 @@ It allows you to express your program as a mesh of interconnected components.
Date: Fri, 4 Oct 2024 01:21:44 +0300
Subject: [PATCH 2/3] Component collection refactored
---
component/collection.go | 8 ++++----
component/collection_test.go | 16 ++++++++--------
fmesh.go | 4 ++--
3 files changed, 14 insertions(+), 14 deletions(-)
diff --git a/component/collection.go b/component/collection.go
index cde1f62..af30267 100644
--- a/component/collection.go
+++ b/component/collection.go
@@ -3,8 +3,8 @@ package component
// Collection is a collection of components with useful methods
type Collection map[string]*Component
-// NewComponentCollection creates empty collection
-func NewComponentCollection() Collection {
+// NewCollection creates empty collection
+func NewCollection() Collection {
return make(Collection)
}
@@ -13,8 +13,8 @@ func (collection Collection) ByName(name string) *Component {
return collection[name]
}
-// Add adds components to existing collection
-func (collection Collection) Add(components ...*Component) Collection {
+// With adds components and returns the collection
+func (collection Collection) With(components ...*Component) Collection {
for _, component := range components {
collection[component.Name()] = component
}
diff --git a/component/collection_test.go b/component/collection_test.go
index ff5888e..306b53d 100644
--- a/component/collection_test.go
+++ b/component/collection_test.go
@@ -18,7 +18,7 @@ func TestCollection_ByName(t *testing.T) {
}{
{
name: "component found",
- components: NewComponentCollection().Add(New("c1"), New("c2")),
+ components: NewCollection().With(New("c1"), New("c2")),
args: args{
name: "c2",
},
@@ -32,7 +32,7 @@ func TestCollection_ByName(t *testing.T) {
},
{
name: "component not found",
- components: NewComponentCollection().Add(New("c1"), New("c2")),
+ components: NewCollection().With(New("c1"), New("c2")),
args: args{
name: "c3",
},
@@ -46,7 +46,7 @@ func TestCollection_ByName(t *testing.T) {
}
}
-func TestCollection_Add(t *testing.T) {
+func TestCollection_With(t *testing.T) {
type args struct {
components []*Component
}
@@ -58,7 +58,7 @@ func TestCollection_Add(t *testing.T) {
}{
{
name: "adding nothing to empty collection",
- collection: NewComponentCollection(),
+ collection: NewCollection(),
args: args{
components: nil,
},
@@ -68,7 +68,7 @@ func TestCollection_Add(t *testing.T) {
},
{
name: "adding to empty collection",
- collection: NewComponentCollection(),
+ collection: NewCollection(),
args: args{
components: []*Component{New("c1"), New("c2")},
},
@@ -81,7 +81,7 @@ func TestCollection_Add(t *testing.T) {
},
{
name: "adding to non-empty collection",
- collection: NewComponentCollection().Add(New("c1"), New("c2")),
+ collection: NewCollection().With(New("c1"), New("c2")),
args: args{
components: []*Component{New("c3"), New("c4")},
},
@@ -97,9 +97,9 @@ func TestCollection_Add(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- tt.collection.Add(tt.args.components...)
+ collectionAfter := tt.collection.With(tt.args.components...)
if tt.assertions != nil {
- tt.assertions(t, tt.collection)
+ tt.assertions(t, collectionAfter)
}
})
}
diff --git a/fmesh.go b/fmesh.go
index 398724e..7fecc08 100644
--- a/fmesh.go
+++ b/fmesh.go
@@ -32,7 +32,7 @@ type FMesh struct {
func New(name string) *FMesh {
return &FMesh{
name: name,
- components: component.NewComponentCollection(),
+ components: component.NewCollection(),
config: defaultConfig,
}
}
@@ -60,7 +60,7 @@ func (fm *FMesh) WithDescription(description string) *FMesh {
// WithComponents adds components to f-mesh
func (fm *FMesh) WithComponents(components ...*component.Component) *FMesh {
for _, c := range components {
- fm.components.Add(c)
+ fm.components = fm.components.With(c)
}
return fm
}
From 730104e4550dd6aa400841854ae59d847f0e3a19 Mon Sep 17 00:00:00 2001
From: hovsep
Date: Fri, 4 Oct 2024 02:43:52 +0300
Subject: [PATCH 3/3] Waiting for inputs reimplemented
---
component/activation_result.go | 35 ++++++-
component/activation_result_collection.go | 4 +-
component/component.go | 18 ++--
component/component_test.go | 4 +-
component/errors.go | 19 ++++
fmesh.go | 12 ++-
fmesh_test.go | 4 +-
.../ports/waiting_for_inputs_test.go | 95 +++++++++++++++++++
8 files changed, 173 insertions(+), 18 deletions(-)
create mode 100644 component/errors.go
create mode 100644 integration_tests/ports/waiting_for_inputs_test.go
diff --git a/component/activation_result.go b/component/activation_result.go
index 281fccc..9a64eda 100644
--- a/component/activation_result.go
+++ b/component/activation_result.go
@@ -1,6 +1,7 @@
package component
import (
+ "errors"
"fmt"
)
@@ -30,6 +31,12 @@ const (
// ActivationCodePanicked : component is activated, but panicked
ActivationCodePanicked
+
+ // ActivationCodeWaitingForInputs : component waits for specific inputs, but all input signals in current activation cycle may be cleared (default behaviour)
+ ActivationCodeWaitingForInputsClear
+
+ // ActivationCodeWaitingForInputsKeep : component waits for specific inputs, but wants to keep current input signals for the next cycle
+ ActivationCodeWaitingForInputsKeep
)
// NewActivationResult creates a new activation result for given component
@@ -60,13 +67,13 @@ func (ar *ActivationResult) Code() ActivationResultCode {
return ar.code
}
-// HasError returns true when activation result has an error
-func (ar *ActivationResult) HasError() bool {
+// IsError returns true when activation result has an error
+func (ar *ActivationResult) IsError() bool {
return ar.code == ActivationCodeReturnedError && ar.Error() != nil
}
-// HasPanic returns true when activation result is derived from panic
-func (ar *ActivationResult) HasPanic() bool {
+// IsPanic returns true when activation result is derived from panic
+func (ar *ActivationResult) IsPanic() bool {
return ar.code == ActivationCodePanicked && ar.Error() != nil
}
@@ -125,3 +132,23 @@ func (c *Component) newActivationResultPanicked(err error) *ActivationResult {
WithActivationCode(ActivationCodePanicked).
WithError(err)
}
+
+func (c *Component) newActivationResultWaitingForInputs(err error) *ActivationResult {
+ activationCode := ActivationCodeWaitingForInputsClear
+ if errors.Is(err, errWaitingForInputsKeep) {
+ activationCode = ActivationCodeWaitingForInputsKeep
+ }
+ return NewActivationResult(c.Name()).
+ SetActivated(true).
+ WithActivationCode(activationCode).
+ WithError(err)
+}
+
+func IsWaitingForInput(activationResult *ActivationResult) bool {
+ return activationResult.Code() == ActivationCodeWaitingForInputsClear ||
+ activationResult.Code() == ActivationCodeWaitingForInputsKeep
+}
+
+func WantsToKeepInputs(activationResult *ActivationResult) bool {
+ return activationResult.Code() == ActivationCodeWaitingForInputsKeep
+}
diff --git a/component/activation_result_collection.go b/component/activation_result_collection.go
index 4d7219d..257eff8 100644
--- a/component/activation_result_collection.go
+++ b/component/activation_result_collection.go
@@ -19,7 +19,7 @@ func (collection ActivationResultCollection) Add(activationResults ...*Activatio
// HasErrors tells whether the collection contains at least one activation result with error and respective code
func (collection ActivationResultCollection) HasErrors() bool {
for _, ar := range collection {
- if ar.HasError() {
+ if ar.IsError() {
return true
}
}
@@ -29,7 +29,7 @@ func (collection ActivationResultCollection) HasErrors() bool {
// HasPanics tells whether the collection contains at least one activation result with panic and respective code
func (collection ActivationResultCollection) HasPanics() bool {
for _, ar := range collection {
- if ar.HasPanic() {
+ if ar.IsPanic() {
return true
}
}
diff --git a/component/component.go b/component/component.go
index 4d7014c..2161d70 100644
--- a/component/component.go
+++ b/component/component.go
@@ -1,6 +1,7 @@
package component
import (
+ "errors"
"fmt"
"github.com/hovsep/fmesh/port"
)
@@ -89,10 +90,6 @@ func (c *Component) hasActivationFunction() bool {
// MaybeActivate tries to run the activation function if all required conditions are met
// @TODO: hide this method from user
func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
- defer func() {
- c.Inputs().Clear()
- }()
-
defer func() {
if r := recover(); r != nil {
activationResult = c.newActivationResultPanicked(fmt.Errorf("panicked with: %v", r))
@@ -102,7 +99,6 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
if !c.hasActivationFunction() {
//Activation function is not set (maybe useful while the mesh is under development)
activationResult = c.newActivationResultNoFunction()
-
return
}
@@ -115,14 +111,17 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
//Invoke the activation func
err := c.f(c.Inputs(), c.Outputs())
+ if errors.Is(err, errWaitingForInputs) {
+ activationResult = c.newActivationResultWaitingForInputs(err)
+ return
+ }
+
if err != nil {
activationResult = c.newActivationResultReturnedError(err)
-
return
}
activationResult = c.newActivationResultOK()
-
return
}
@@ -132,3 +131,8 @@ func (c *Component) FlushOutputs() {
out.Flush()
}
}
+
+// ClearInputs clears all input ports
+func (c *Component) ClearInputs() {
+ c.Inputs().Clear()
+}
diff --git a/component/component_test.go b/component/component_test.go
index 44b8417..0c8da6a 100644
--- a/component/component_test.go
+++ b/component/component_test.go
@@ -501,10 +501,10 @@ func TestComponent_MaybeActivate(t *testing.T) {
assert.Equal(t, tt.wantActivationResult.Activated(), gotActivationResult.Activated())
assert.Equal(t, tt.wantActivationResult.ComponentName(), gotActivationResult.ComponentName())
assert.Equal(t, tt.wantActivationResult.Code(), gotActivationResult.Code())
- if tt.wantActivationResult.HasError() {
+ if tt.wantActivationResult.IsError() {
assert.EqualError(t, gotActivationResult.Error(), tt.wantActivationResult.Error().Error())
} else {
- assert.False(t, gotActivationResult.HasError())
+ assert.False(t, gotActivationResult.IsError())
}
})
diff --git a/component/errors.go b/component/errors.go
new file mode 100644
index 0000000..671df5f
--- /dev/null
+++ b/component/errors.go
@@ -0,0 +1,19 @@
+package component
+
+import (
+ "errors"
+ "fmt"
+)
+
+var (
+ errWaitingForInputs = errors.New("component is waiting for some inputs")
+ errWaitingForInputsKeep = fmt.Errorf("%w: do not clear input ports", errWaitingForInputs)
+)
+
+// NewErrWaitForInputs returns respective error
+func NewErrWaitForInputs(keepInputs bool) error {
+ if keepInputs {
+ return errWaitingForInputsKeep
+ }
+ return errWaitingForInputs
+}
diff --git a/fmesh.go b/fmesh.go
index 7fecc08..7736097 100644
--- a/fmesh.go
+++ b/fmesh.go
@@ -106,7 +106,17 @@ func (fm *FMesh) drainComponents(cycle *cycle.Cycle) {
continue
}
+ if component.IsWaitingForInput(activationResult) {
+ if !component.WantsToKeepInputs(activationResult) {
+ c.ClearInputs()
+ }
+ // Components waiting for inputs are not flushed
+ continue
+ }
+
+ // Normally components are fully drained
c.FlushOutputs()
+ c.ClearInputs()
}
}
@@ -127,7 +137,7 @@ func (fm *FMesh) Run() (cycle.Collection, error) {
}
func (fm *FMesh) mustStop(cycleResult *cycle.Cycle, cycleNum int) (bool, error) {
- if (fm.config.CyclesLimit > 0) && (cycleNum >= fm.config.CyclesLimit) {
+ if (fm.config.CyclesLimit > 0) && (cycleNum > fm.config.CyclesLimit) {
return true, ErrReachedMaxAllowedCycles
}
diff --git a/fmesh_test.go b/fmesh_test.go
index e656a45..f773528 100644
--- a/fmesh_test.go
+++ b/fmesh_test.go
@@ -601,10 +601,10 @@ func TestFMesh_Run(t *testing.T) {
assert.Equal(t, tt.want[i].ActivationResults()[componentName].ComponentName(), gotActivationResult.ComponentName())
assert.Equal(t, tt.want[i].ActivationResults()[componentName].Code(), gotActivationResult.Code())
- if tt.want[i].ActivationResults()[componentName].HasError() {
+ if tt.want[i].ActivationResults()[componentName].IsError() {
assert.EqualError(t, tt.want[i].ActivationResults()[componentName].Error(), gotActivationResult.Error().Error())
} else {
- assert.False(t, gotActivationResult.HasError())
+ assert.False(t, gotActivationResult.IsError())
}
}
}
diff --git a/integration_tests/ports/waiting_for_inputs_test.go b/integration_tests/ports/waiting_for_inputs_test.go
new file mode 100644
index 0000000..9c790c2
--- /dev/null
+++ b/integration_tests/ports/waiting_for_inputs_test.go
@@ -0,0 +1,95 @@
+package ports
+
+import (
+ "github.com/hovsep/fmesh"
+ "github.com/hovsep/fmesh/component"
+ "github.com/hovsep/fmesh/cycle"
+ "github.com/hovsep/fmesh/port"
+ "github.com/hovsep/fmesh/signal"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_WaitingForInputs(t *testing.T) {
+ tests := []struct {
+ name string
+ setupFM func() *fmesh.FMesh
+ setInputs func(fm *fmesh.FMesh)
+ assertions func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error)
+ }{
+ {
+ name: "waiting for longer chain",
+ setupFM: func() *fmesh.FMesh {
+ getDoubler := func(name string) *component.Component {
+ return component.New(name).
+ WithDescription("This component just doubles the input").
+ WithInputs("i1").
+ WithOutputs("o1").
+ WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
+ inputNum := inputs.ByName("i1").Signals().FirstPayload().(int)
+ outputs.ByName("o1").PutSignals(signal.New(inputNum * 2))
+ return nil
+ })
+ }
+
+ d1 := getDoubler("d1")
+ d2 := getDoubler("d2")
+ d3 := getDoubler("d3")
+ d4 := getDoubler("d4")
+ d5 := getDoubler("d5")
+
+ s := component.New("sum").
+ WithDescription("This component just sums 2 inputs").
+ WithInputs("i1", "i2").
+ WithOutputs("o1").
+ WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
+ if !inputs.ByNames("i1", "i2").AllHaveSignals() {
+ return component.NewErrWaitForInputs(true)
+ }
+
+ inputNum1 := inputs.ByName("i1").Signals().FirstPayload().(int)
+ inputNum2 := inputs.ByName("i2").Signals().FirstPayload().(int)
+ outputs.ByName("o1").PutSignals(signal.New(inputNum1 + inputNum2))
+ return nil
+ })
+
+ //This chain consist of 3 components: d1->d2->d3
+ d1.Outputs().ByName("o1").PipeTo(d2.Inputs().ByName("i1"))
+ d2.Outputs().ByName("o1").PipeTo(d3.Inputs().ByName("i1"))
+
+ //This chain has only 2: d4->d5
+ d4.Outputs().ByName("o1").PipeTo(d5.Inputs().ByName("i1"))
+
+ //Both chains go into summator
+ d3.Outputs().ByName("o1").PipeTo(s.Inputs().ByName("i1"))
+ d5.Outputs().ByName("o1").PipeTo(s.Inputs().ByName("i2"))
+
+ return fmesh.New("fm").
+ WithComponents(d1, d2, d3, d4, d5, s).
+ WithConfig(fmesh.Config{
+ ErrorHandlingStrategy: fmesh.StopOnFirstErrorOrPanic,
+ CyclesLimit: 5,
+ })
+
+ },
+ setInputs: func(fm *fmesh.FMesh) {
+ //Put 1 signal to each chain so they start in the same cycle
+ fm.Components().ByName("d1").Inputs().ByName("i1").PutSignals(signal.New(1))
+ fm.Components().ByName("d4").Inputs().ByName("i1").PutSignals(signal.New(2))
+ },
+ assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) {
+ assert.NoError(t, err)
+ result := fm.Components().ByName("sum").Outputs().ByName("o1").Signals().FirstPayload().(int)
+ assert.Equal(t, 16, result)
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ fm := tt.setupFM()
+ tt.setInputs(fm)
+ cycles, err := fm.Run()
+ tt.assertions(t, fm, cycles, err)
+ })
+ }
+}