diff --git a/README.md b/README.md
index 1fe3423..e426b00 100644
--- a/README.md
+++ b/README.md
@@ -8,25 +8,26 @@
What is it?
F-Mesh is a simplistic FBP-inspired framework in Go.
-It allows you to express your program as a mesh of interconnected components.
+It allows you to express your program as a mesh of interconnected components.
+You can think of it as a simple functions orchestrator.
+
Main concepts:
- F-Mesh consists of multiple Components - the main building blocks
-- Components have unlimited number of input and output
Ports
.
+- Components have unlimited number of input and output
Ports
- The main job of each component is to read inputs and provide outputs
-- Each port of the component can be connected to one or multiple ports of any other component. Such connections are called Pipes
-- It does not matter whether port is input or output, any port can be connected to any other port
+- Any output port can be connected to any input port via Pipes
- The component behaviour is defined by its Activation function
- The framework checks when components are ready to be activated and calls their activation functions concurrently
- One such iteration is called Activation cycle
-- On each activation cycle the framework does same things: activates all the components ready to be activated, flushes the data through pipes and disposes processed Signals (the data chunks flowing between components)
+- On each activation cycle the framework does same things: activates all the components ready for activation, flushes the data through pipes and disposes input Signals (the data chunks flowing between components)
- Ports and pipes are type agnostic, any data can be transferred or aggregated on any port
- The framework works in discrete time, not it wall time. The quant of time is 1 activation cycle, which gives you "logical parallelism" out of the box
- F-Mesh is suitable for logical wireframing, simulation, functional-style computations and implementing simple concurrency patterns without using the concurrency primitives like channels or any sort of locks
What it is not?
-F-mesh is not a classical FBP implementation, and it is not fully async. It does not support long-running components or wall-time events (like timers and tickers).
+F-mesh is not a classical FBP implementation, and it is not fully async. It does not support long-running components or wall-time events (like timers and tickers)
The framework is not suitable for implementing complex concurrent systems
Example:
@@ -54,7 +55,10 @@ It allows you to express your program as a mesh of interconnected components. 0) && (cycleNum >= fm.config.CyclesLimit) {
+ if (fm.config.CyclesLimit > 0) && (cycleNum > fm.config.CyclesLimit) {
return true, ErrReachedMaxAllowedCycles
}
diff --git a/fmesh_test.go b/fmesh_test.go
index e656a45..f773528 100644
--- a/fmesh_test.go
+++ b/fmesh_test.go
@@ -601,10 +601,10 @@ func TestFMesh_Run(t *testing.T) {
assert.Equal(t, tt.want[i].ActivationResults()[componentName].ComponentName(), gotActivationResult.ComponentName())
assert.Equal(t, tt.want[i].ActivationResults()[componentName].Code(), gotActivationResult.Code())
- if tt.want[i].ActivationResults()[componentName].HasError() {
+ if tt.want[i].ActivationResults()[componentName].IsError() {
assert.EqualError(t, tt.want[i].ActivationResults()[componentName].Error(), gotActivationResult.Error().Error())
} else {
- assert.False(t, gotActivationResult.HasError())
+ assert.False(t, gotActivationResult.IsError())
}
}
}
diff --git a/integration_tests/ports/waiting_for_inputs_test.go b/integration_tests/ports/waiting_for_inputs_test.go
new file mode 100644
index 0000000..9c790c2
--- /dev/null
+++ b/integration_tests/ports/waiting_for_inputs_test.go
@@ -0,0 +1,95 @@
+package ports
+
+import (
+ "github.com/hovsep/fmesh"
+ "github.com/hovsep/fmesh/component"
+ "github.com/hovsep/fmesh/cycle"
+ "github.com/hovsep/fmesh/port"
+ "github.com/hovsep/fmesh/signal"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_WaitingForInputs(t *testing.T) {
+ tests := []struct {
+ name string
+ setupFM func() *fmesh.FMesh
+ setInputs func(fm *fmesh.FMesh)
+ assertions func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error)
+ }{
+ {
+ name: "waiting for longer chain",
+ setupFM: func() *fmesh.FMesh {
+ getDoubler := func(name string) *component.Component {
+ return component.New(name).
+ WithDescription("This component just doubles the input").
+ WithInputs("i1").
+ WithOutputs("o1").
+ WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
+ inputNum := inputs.ByName("i1").Signals().FirstPayload().(int)
+ outputs.ByName("o1").PutSignals(signal.New(inputNum * 2))
+ return nil
+ })
+ }
+
+ d1 := getDoubler("d1")
+ d2 := getDoubler("d2")
+ d3 := getDoubler("d3")
+ d4 := getDoubler("d4")
+ d5 := getDoubler("d5")
+
+ s := component.New("sum").
+ WithDescription("This component just sums 2 inputs").
+ WithInputs("i1", "i2").
+ WithOutputs("o1").
+ WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
+ if !inputs.ByNames("i1", "i2").AllHaveSignals() {
+ return component.NewErrWaitForInputs(true)
+ }
+
+ inputNum1 := inputs.ByName("i1").Signals().FirstPayload().(int)
+ inputNum2 := inputs.ByName("i2").Signals().FirstPayload().(int)
+ outputs.ByName("o1").PutSignals(signal.New(inputNum1 + inputNum2))
+ return nil
+ })
+
+ //This chain consist of 3 components: d1->d2->d3
+ d1.Outputs().ByName("o1").PipeTo(d2.Inputs().ByName("i1"))
+ d2.Outputs().ByName("o1").PipeTo(d3.Inputs().ByName("i1"))
+
+ //This chain has only 2: d4->d5
+ d4.Outputs().ByName("o1").PipeTo(d5.Inputs().ByName("i1"))
+
+ //Both chains go into summator
+ d3.Outputs().ByName("o1").PipeTo(s.Inputs().ByName("i1"))
+ d5.Outputs().ByName("o1").PipeTo(s.Inputs().ByName("i2"))
+
+ return fmesh.New("fm").
+ WithComponents(d1, d2, d3, d4, d5, s).
+ WithConfig(fmesh.Config{
+ ErrorHandlingStrategy: fmesh.StopOnFirstErrorOrPanic,
+ CyclesLimit: 5,
+ })
+
+ },
+ setInputs: func(fm *fmesh.FMesh) {
+ //Put 1 signal to each chain so they start in the same cycle
+ fm.Components().ByName("d1").Inputs().ByName("i1").PutSignals(signal.New(1))
+ fm.Components().ByName("d4").Inputs().ByName("i1").PutSignals(signal.New(2))
+ },
+ assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) {
+ assert.NoError(t, err)
+ result := fm.Components().ByName("sum").Outputs().ByName("o1").Signals().FirstPayload().(int)
+ assert.Equal(t, 16, result)
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ fm := tt.setupFM()
+ tt.setInputs(fm)
+ cycles, err := fm.Run()
+ tt.assertions(t, fm, cycles, err)
+ })
+ }
+}