From b5ef1ea707be747874c51217f2f35ca9c1e9a45a Mon Sep 17 00:00:00 2001 From: hovsep Date: Wed, 25 Sep 2024 01:13:41 +0300 Subject: [PATCH 1/6] Add (temporarily failing) integration test --- .../ports/waiting_for_inputs_test.go | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 integration_tests/ports/waiting_for_inputs_test.go diff --git a/integration_tests/ports/waiting_for_inputs_test.go b/integration_tests/ports/waiting_for_inputs_test.go new file mode 100644 index 0000000..1fb684e --- /dev/null +++ b/integration_tests/ports/waiting_for_inputs_test.go @@ -0,0 +1,80 @@ +package integration_tests + +import ( + "github.com/hovsep/fmesh" + "github.com/hovsep/fmesh/component" + "github.com/hovsep/fmesh/cycle" + "github.com/hovsep/fmesh/port" + "github.com/hovsep/fmesh/signal" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_WaitingForInputs(t *testing.T) { + tests := []struct { + name string + setupFM func() *fmesh.FMesh + setInputs func(fm *fmesh.FMesh) + assertions func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) + }{ + { + name: "waits for single input and keep signals", + setupFM: func() *fmesh.FMesh { + return fmesh.New("fm").WithComponents( + component.New("waiter"). + WithInputs("i1", "i2"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + if !inputs.ByNames("i1", "i2").AllHaveSignals() { + return component.NewErrWaitForInputs(true) + } + return nil + }), + ) + }, + setInputs: func(fm *fmesh.FMesh) { + //Only one input set + fm.Components().ByName("waiter").Inputs().ByName("i1").PutSignals(signal.New(1)) + }, + assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { + assert.NoError(t, err) + + // Signal is kept on input port + assert.True(t, fm.Components().ByName("waiter").Inputs().ByName("i1").HasSignals()) + }, + }, + { + //@TODO:make this test pass + name: "waits for multiple input", + setupFM: func() *fmesh.FMesh { + return fmesh.New("fm").WithComponents( + component.New("waiter"). + WithInputs("i1", "i2", "i3"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + if !inputs.ByNames("i2", "i3").AllHaveSignals() { + return component.NewErrWaitForInputs(false) + } + return nil + }), + ) + }, + setInputs: func(fm *fmesh.FMesh) { + //Only one input set + fm.Components().ByName("waiter").Inputs().ByName("i1").PutSignals(signal.New(1)) + }, + assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { + assert.NoError(t, err) + + // Signal is not kept on input port + assert.False(t, fm.Components().ByName("waiter").Inputs().ByName("i1").HasSignals()) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fm := tt.setupFM() + tt.setInputs(fm) + cycles, err := fm.Run() + tt.assertions(t, fm, cycles, err) + }) + } +} From 9c5e35161c76606821ce982e91f38f47dafae893 Mon Sep 17 00:00:00 2001 From: hovsep Date: Wed, 2 Oct 2024 01:41:59 +0300 Subject: [PATCH 2/6] [WIP] Rollback: o2o and i2i piping, remove ability to wait for input (need to re-implement) --- component/activation_result.go | 41 -- component/component.go | 56 +-- component/component_test.go | 101 +--- component/errors.go | 19 - component/state_snapshot.go | 46 -- cycle/cycle_test.go | 12 +- fmesh.go | 9 +- fmesh_test.go | 444 +----------------- .../piping/piping_from_inputs_test.go | 180 ------- .../piping/piping_to_outputs_test.go | 106 ----- .../ports/waiting_for_inputs_test.go | 80 ---- port/port.go | 38 +- port/port_test.go | 134 ------ 13 files changed, 37 insertions(+), 1229 deletions(-) delete mode 100644 component/errors.go delete mode 100644 component/state_snapshot.go delete mode 100644 integration_tests/piping/piping_from_inputs_test.go delete mode 100644 integration_tests/piping/piping_to_outputs_test.go delete mode 100644 integration_tests/ports/waiting_for_inputs_test.go diff --git a/component/activation_result.go b/component/activation_result.go index e74aa58..281fccc 100644 --- a/component/activation_result.go +++ b/component/activation_result.go @@ -1,7 +1,6 @@ package component import ( - "errors" "fmt" ) @@ -9,8 +8,6 @@ import ( type ActivationResult struct { componentName string activated bool - stateBefore *StateSnapshot //Contains the info about length of input ports during the activation (required for correct i2i piping) - stateAfter *StateSnapshot code ActivationResultCode err error } @@ -28,9 +25,6 @@ const ( // ActivationCodeNoFunction : component activation function is not set, so we can not activate it ActivationCodeNoFunction - // ActivationCodeWaitingForInput : component is waiting for more inputs on some ports - ActivationCodeWaitingForInput - // ActivationCodeReturnedError : component is activated, but returned an error ActivationCodeReturnedError @@ -94,24 +88,6 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult { return ar } -func (ar *ActivationResult) WithStateBefore(snapshot *StateSnapshot) *ActivationResult { - ar.stateBefore = snapshot - return ar -} - -func (ar *ActivationResult) StateBefore() *StateSnapshot { - return ar.stateBefore -} - -func (ar *ActivationResult) WithStateAfter(snapshot *StateSnapshot) *ActivationResult { - ar.stateAfter = snapshot - return ar -} - -func (ar *ActivationResult) StateAfter() *StateSnapshot { - return ar.stateAfter -} - // newActivationResultOK builds a specific activation result func (c *Component) newActivationResultOK() *ActivationResult { return NewActivationResult(c.Name()). @@ -134,13 +110,6 @@ func (c *Component) newActivationResultNoFunction() *ActivationResult { WithActivationCode(ActivationCodeNoFunction) } -// newActivationResultWaitingForInput builds a specific activation result -func (c *Component) newActivationResultWaitingForInput() *ActivationResult { - return NewActivationResult(c.Name()). - SetActivated(false). - WithActivationCode(ActivationCodeWaitingForInput) -} - // newActivationResultReturnedError builds a specific activation result func (c *Component) newActivationResultReturnedError(err error) *ActivationResult { return NewActivationResult(c.Name()). @@ -156,13 +125,3 @@ func (c *Component) newActivationResultPanicked(err error) *ActivationResult { WithActivationCode(ActivationCodePanicked). WithError(err) } - -// isWaitingForInput tells whether component is waiting for specific inputs -func (c *Component) isWaitingForInput(activationResult *ActivationResult) bool { - return activationResult.HasError() && errors.Is(activationResult.Error(), errWaitingForInputs) -} - -// WantsToKeepInputs tells whether component wants to keep signals on input ports for the next cycle -func (c *Component) WantsToKeepInputs(activationResult *ActivationResult) bool { - return c.isWaitingForInput(activationResult) && errors.Is(activationResult.Error(), errWaitingForInputsKeep) -} diff --git a/component/component.go b/component/component.go index 19f6f35..42446c3 100644 --- a/component/component.go +++ b/component/component.go @@ -1,7 +1,6 @@ package component import ( - "errors" "fmt" "github.com/hovsep/fmesh/port" ) @@ -90,77 +89,46 @@ func (c *Component) hasActivationFunction() bool { // MaybeActivate tries to run the activation function if all required conditions are met // @TODO: hide this method from user func (c *Component) MaybeActivate() (activationResult *ActivationResult) { - stateBeforeActivation := c.getStateSnapshot() + defer func() { + c.Inputs().Clear() + }() defer func() { if r := recover(); r != nil { - activationResult = c.newActivationResultPanicked(fmt.Errorf("panicked with: %v", r)). - WithStateBefore(stateBeforeActivation). - WithStateAfter(c.getStateSnapshot()) + activationResult = c.newActivationResultPanicked(fmt.Errorf("panicked with: %v", r)) } }() if !c.hasActivationFunction() { //Activation function is not set (maybe useful while the mesh is under development) - activationResult = c.newActivationResultNoFunction(). - WithStateBefore(stateBeforeActivation). - WithStateAfter(c.getStateSnapshot()) + activationResult = c.newActivationResultNoFunction() return } if !c.inputs.AnyHasSignals() { //No inputs set, stop here - activationResult = c.newActivationResultNoInput(). - WithStateBefore(stateBeforeActivation). - WithStateAfter(c.getStateSnapshot()) + activationResult = c.newActivationResultNoInput() return } //Invoke the activation func err := c.f(c.Inputs(), c.Outputs()) - if errors.Is(err, errWaitingForInputs) { - activationResult = c.newActivationResultWaitingForInput(). - WithStateBefore(stateBeforeActivation). - WithStateAfter(c.getStateSnapshot()) - - return - } - if err != nil { - activationResult = c.newActivationResultReturnedError(err). - WithStateBefore(stateBeforeActivation). - WithStateAfter(c.getStateSnapshot()) + activationResult = c.newActivationResultReturnedError(err) return } - activationResult = c.newActivationResultOK(). - WithStateBefore(stateBeforeActivation). - WithStateAfter(c.getStateSnapshot()) + activationResult = c.newActivationResultOK() return } -// FlushInputs flushes and clears (when needed) input ports -// @TODO: hide this method from user -func (c *Component) FlushInputs(activationResult *ActivationResult, keepInputSignals bool) { - c.Inputs().Flush() - if !keepInputSignals { - // Inputs can not be just cleared, instead we remove signals which - // have been used (been set on inputs) during the last activation cycle - // thus not affecting ones the component could have been received from i2i pipes - for portName, p := range c.Inputs() { - p.DisposeSignals(activationResult.StateBefore().InputPortsMetadata()[portName].SignalBufferLen) - } - } -} - -// FlushOutputs flushes output ports and disposes processed signals -// @TODO: hide this method from user -func (c *Component) FlushOutputs(activationResult *ActivationResult) { - for portName, p := range c.Outputs() { - p.FlushAndDispose(activationResult.StateAfter().OutputPortsMetadata()[portName].SignalBufferLen) +// FlushOutputs pushed signals out of the component outputs to pipes and clears outputs +func (c *Component) FlushOutputs() { + for _, out := range c.outputs { + out.Flush() } } diff --git a/component/component_test.go b/component/component_test.go index 565dcb0..bc12241 100644 --- a/component/component_test.go +++ b/component/component_test.go @@ -111,19 +111,15 @@ func TestComponent_FlushOutputs(t *testing.T) { componentWithAllOutputsSet.Outputs().ByNames("o1", "o2").PipeTo(sink) tests := []struct { - name string - component *Component - activationResult *ActivationResult - destPort *port.Port //Where the component flushes ALL it's inputs - assertions func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) + name string + component *Component + destPort *port.Port //Where the component flushes ALL it's inputs + assertions func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) }{ { name: "no outputs", component: componentWithNoOutputs, - activationResult: componentWithNoOutputs.newActivationResultOK(). - WithStateBefore(NewStateSnapshot()). - WithStateAfter(NewStateSnapshot()), - destPort: nil, + destPort: nil, assertions: func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) { assert.NotNil(t, componentAfterFlush.Outputs()) assert.Empty(t, componentAfterFlush.Outputs()) @@ -132,16 +128,7 @@ func TestComponent_FlushOutputs(t *testing.T) { { name: "output has no signal set", component: componentWithCleanOutputs, - activationResult: componentWithCleanOutputs.newActivationResultOK(). - WithStateBefore(NewStateSnapshot().WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{SignalBufferLen: 0}, - "o2": &port.Metadata{SignalBufferLen: 0}, - })). - WithStateAfter(NewStateSnapshot().WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{SignalBufferLen: 0}, - "o2": &port.Metadata{SignalBufferLen: 0}, - })), - destPort: nil, + destPort: nil, assertions: func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) { assert.False(t, componentAfterFlush.Outputs().AnyHasSignals()) }, @@ -149,18 +136,7 @@ func TestComponent_FlushOutputs(t *testing.T) { { name: "happy path", component: componentWithAllOutputsSet, - activationResult: componentWithAllOutputsSet.newActivationResultOK(). - WithStateBefore(NewStateSnapshot()). - WithStateAfter(NewStateSnapshot(). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 1, - }, - "o2": &port.Metadata{ - SignalBufferLen: 1, - }, - })), - destPort: sink, + destPort: sink, assertions: func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) { assert.Contains(t, destPort.Signals().AllPayloads(), 777) assert.Contains(t, destPort.Signals().AllPayloads(), 888) @@ -172,7 +148,7 @@ func TestComponent_FlushOutputs(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.component.FlushOutputs(tt.activationResult) + tt.component.FlushOutputs() tt.assertions(t, tt.component, tt.destPort) }) } @@ -427,67 +403,6 @@ func TestComponent_MaybeActivate(t *testing.T) { SetActivated(false). WithActivationCode(ActivationCodeNoFunction), }, - { - name: "no input", - getComponent: func() *Component { - c := New("c1"). - WithInputs("i1", "i2"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - - if !inputs.ByNames("i1", "i2").AllHaveSignals() { - return NewErrWaitForInputs(false) - } - - return nil - }) - return c - }, - wantActivationResult: NewActivationResult("c1"). - SetActivated(false). - WithActivationCode(ActivationCodeNoInput), - }, - { - name: "component is waiting for input, reset inputs", - getComponent: func() *Component { - c := New("c1"). - WithInputs("i1", "i2"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - - if !inputs.ByNames("i1", "i2").AllHaveSignals() { - return NewErrWaitForInputs(false) - } - - return nil - }) - //Only one input set - c.Inputs().ByName("i1").PutSignals(signal.New(123)) - return c - }, - wantActivationResult: NewActivationResult("c1"). - SetActivated(false). - WithActivationCode(ActivationCodeWaitingForInput), - }, - { - name: "component is waiting for input, keep inputs", - getComponent: func() *Component { - c := New("c1"). - WithInputs("i1", "i2"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - - if !inputs.ByNames("i1", "i2").AllHaveSignals() { - return NewErrWaitForInputs(true) - } - - return nil - }) - //Only one input set - c.Inputs().ByName("i1").PutSignals(signal.New(123)) - return c - }, - wantActivationResult: NewActivationResult("c1"). - SetActivated(false). - WithActivationCode(ActivationCodeWaitingForInput), - }, { name: "activated with error", getComponent: func() *Component { diff --git a/component/errors.go b/component/errors.go deleted file mode 100644 index 671df5f..0000000 --- a/component/errors.go +++ /dev/null @@ -1,19 +0,0 @@ -package component - -import ( - "errors" - "fmt" -) - -var ( - errWaitingForInputs = errors.New("component is waiting for some inputs") - errWaitingForInputsKeep = fmt.Errorf("%w: do not clear input ports", errWaitingForInputs) -) - -// NewErrWaitForInputs returns respective error -func NewErrWaitForInputs(keepInputs bool) error { - if keepInputs { - return errWaitingForInputsKeep - } - return errWaitingForInputs -} diff --git a/component/state_snapshot.go b/component/state_snapshot.go deleted file mode 100644 index 46816a2..0000000 --- a/component/state_snapshot.go +++ /dev/null @@ -1,46 +0,0 @@ -package component - -import "github.com/hovsep/fmesh/port" - -// StateSnapshot represents the state of a component (used by f-mesh to perform piping correctly) -type StateSnapshot struct { - inputPortsMetadata port.MetadataMap - outputPortsMetadata port.MetadataMap -} - -// NewStateSnapshot creates new component state snapshot -func NewStateSnapshot() *StateSnapshot { - return &StateSnapshot{ - inputPortsMetadata: make(port.MetadataMap), - outputPortsMetadata: make(port.MetadataMap), - } -} - -// InputPortsMetadata getter -func (s *StateSnapshot) InputPortsMetadata() port.MetadataMap { - return s.inputPortsMetadata -} - -// OutputPortsMetadata getter -func (s *StateSnapshot) OutputPortsMetadata() port.MetadataMap { - return s.outputPortsMetadata -} - -// WithInputPortsMetadata sets important information about input ports -func (s *StateSnapshot) WithInputPortsMetadata(inputPortsMetadata port.MetadataMap) *StateSnapshot { - s.inputPortsMetadata = inputPortsMetadata - return s -} - -// WithOutputPortsMetadata sets important information about output ports -func (s *StateSnapshot) WithOutputPortsMetadata(outputPortsMetadata port.MetadataMap) *StateSnapshot { - s.outputPortsMetadata = outputPortsMetadata - return s -} - -// getStateSnapshot returns a snapshot of component state -func (c *Component) getStateSnapshot() *StateSnapshot { - return NewStateSnapshot(). - WithInputPortsMetadata(c.Inputs().GetPortsMetadata()). - WithOutputPortsMetadata(c.Outputs().GetPortsMetadata()) -} diff --git a/cycle/cycle_test.go b/cycle/cycle_test.go index d23facb..21c3a29 100644 --- a/cycle/cycle_test.go +++ b/cycle/cycle_test.go @@ -68,7 +68,7 @@ func TestCycle_HasActivatedComponents(t *testing.T) { cycleResult: New().WithActivationResults( component.NewActivationResult("c1").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), component.NewActivationResult("c2").SetActivated(false).WithActivationCode(component.ActivationCodeNoFunction), - component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeWaitingForInput), + component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), ), want: false, }, @@ -77,7 +77,7 @@ func TestCycle_HasActivatedComponents(t *testing.T) { cycleResult: New().WithActivationResults( component.NewActivationResult("c1").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), component.NewActivationResult("c2").SetActivated(true).WithActivationCode(component.ActivationCodeOK), - component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeWaitingForInput), + component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), ), want: true, }, @@ -105,7 +105,7 @@ func TestCycle_HasErrors(t *testing.T) { cycleResult: New().WithActivationResults( component.NewActivationResult("c1").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), component.NewActivationResult("c2").SetActivated(false).WithActivationCode(component.ActivationCodeNoFunction), - component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeWaitingForInput), + component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), ), want: false, }, @@ -114,7 +114,7 @@ func TestCycle_HasErrors(t *testing.T) { cycleResult: New().WithActivationResults( component.NewActivationResult("c1").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), component.NewActivationResult("c2").SetActivated(true).WithActivationCode(component.ActivationCodeReturnedError).WithError(errors.New("some error")), - component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeWaitingForInput), + component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), ), want: true, }, @@ -142,7 +142,7 @@ func TestCycle_HasPanics(t *testing.T) { cycleResult: New().WithActivationResults( component.NewActivationResult("c1").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), component.NewActivationResult("c2").SetActivated(false).WithActivationCode(component.ActivationCodeNoFunction), - component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeWaitingForInput), + component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), component.NewActivationResult("c4").SetActivated(true).WithActivationCode(component.ActivationCodeReturnedError).WithError(errors.New("some error")), ), want: false, @@ -152,7 +152,7 @@ func TestCycle_HasPanics(t *testing.T) { cycleResult: New().WithActivationResults( component.NewActivationResult("c1").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), component.NewActivationResult("c2").SetActivated(true).WithActivationCode(component.ActivationCodeReturnedError).WithError(errors.New("some error")), - component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeWaitingForInput), + component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), component.NewActivationResult("c4").SetActivated(true).WithActivationCode(component.ActivationCodePanicked).WithError(errors.New("some panic")), ), want: true, diff --git a/fmesh.go b/fmesh.go index af43acd..c00a575 100644 --- a/fmesh.go +++ b/fmesh.go @@ -84,16 +84,15 @@ func (fm *FMesh) runCycle() *cycle.Cycle { } // DrainComponents drains the data from activated components -func (fm *FMesh) drainComponentsAfterCycle(cycle *cycle.Cycle) { - for _, c := range fm.components { +func (fm *FMesh) drainComponents(cycle *cycle.Cycle) { + for _, c := range fm.Components() { activationResult := cycle.ActivationResults().ByComponentName(c.Name()) if !activationResult.Activated() { continue } - c.FlushOutputs(activationResult) - c.FlushInputs(activationResult, c.WantsToKeepInputs(activationResult)) // Inputs are a bit trickier + c.FlushOutputs() } } @@ -109,7 +108,7 @@ func (fm *FMesh) Run() (cycle.Collection, error) { return allCycles, err } - fm.drainComponentsAfterCycle(cycleResult) + fm.drainComponents(cycleResult) } } diff --git a/fmesh_test.go b/fmesh_test.go index e09efb7..aeff95a 100644 --- a/fmesh_test.go +++ b/fmesh_test.go @@ -633,171 +633,6 @@ func TestFMesh_runCycle(t *testing.T) { fm: New("empty mesh"), want: cycle.New(), }, - { - name: "mesh has components, but no one is activated", - fm: New("test").WithComponents( - component.New("c1"). - WithDescription("I do not have any input signal set, hence I will never be activated"). - WithInputs("i1"). - WithOutputs("o1"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - outputs.ByName("o1").PutSignals(signal.New("this signal will never be sent")) - return nil - }), - - component.New("c2"). - WithDescription("I do not have activation func set"). - WithInputs("i1"). - WithOutputs("o1"), - - component.New("c3"). - WithDescription("I'm waiting for specific input"). - WithInputs("i1", "i2"). - WithOutputs("o1"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - if !inputs.ByNames("i1", "i2").AllHaveSignals() { - return component.NewErrWaitForInputs(true) - } - return nil - }), - component.New("c4"). - WithDescription("I'm waiting for specific input"). - WithInputs("i1", "i2"). - WithOutputs("o1"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - if !inputs.ByNames("i1", "i2").AllHaveSignals() { - return component.NewErrWaitForInputs(false) - } - return nil - }), - ), - initFM: func(fm *FMesh) { - //Only i1 is set, while component is waiting for both i1 and i2 to be set - fm.Components().ByName("c3").Inputs().ByName("i1").PutSignals(signal.New(123)) - //Same for c4 - fm.Components().ByName("c4").Inputs().ByName("i1").PutSignals(signal.New(456)) - }, - want: cycle.New(). - WithActivationResults( - component.NewActivationResult("c1"). - SetActivated(false). - WithActivationCode(component.ActivationCodeNoInput). - WithStateBefore(component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })). - WithStateAfter(component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })), - component.NewActivationResult("c2"). - SetActivated(false). - WithActivationCode(component.ActivationCodeNoFunction). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })), - component.NewActivationResult("c3"). - SetActivated(false). - WithActivationCode(component.ActivationCodeWaitingForInput). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - "i2": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - "i2": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })), - component.NewActivationResult("c4"). - SetActivated(false). - WithActivationCode(component.ActivationCodeWaitingForInput). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - "i2": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - "i2": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }))), - }, { name: "all components activated in one cycle (concurrently)", fm: New("test").WithComponents( @@ -835,71 +670,13 @@ func TestFMesh_runCycle(t *testing.T) { want: cycle.New().WithActivationResults( component.NewActivationResult("c1"). SetActivated(true). - WithActivationCode(component.ActivationCodeOK). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - })), + WithActivationCode(component.ActivationCodeOK), component.NewActivationResult("c2"). SetActivated(true). - WithActivationCode(component.ActivationCodeOK). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - "o2": &port.Metadata{ - SignalBufferLen: 0, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 1, - }, - "o2": &port.Metadata{ - SignalBufferLen: 4, - }, - })), + WithActivationCode(component.ActivationCodeOK), component.NewActivationResult("c3"). SetActivated(true). - WithActivationCode(component.ActivationCodeOK). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - })), + WithActivationCode(component.ActivationCodeOK), ), }, } @@ -912,218 +689,3 @@ func TestFMesh_runCycle(t *testing.T) { }) } } - -func TestFMesh_drainComponentsAfterCycle(t *testing.T) { - tests := []struct { - name string - cycle *cycle.Cycle - fm *FMesh - initFM func(fm *FMesh) - assertionsAfterDrain func(t *testing.T, fm *FMesh) - }{ - { - name: "no components", - cycle: cycle.New(), - fm: New("empty_fm"), - assertionsAfterDrain: func(t *testing.T, fm *FMesh) { - assert.Empty(t, fm.Components()) - }, - }, - { - name: "no signals to be drained", - cycle: cycle.New().WithActivationResults( - component.NewActivationResult("c1").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), - component.NewActivationResult("c2").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), - ), - fm: New("fm").WithComponents( - component.New("c1").WithInputs("i1").WithOutputs("o1"), - component.New("c2").WithInputs("i1").WithOutputs("o1"), - ), - initFM: func(fm *FMesh) { - //Create a pipe - c1, c2 := fm.Components().ByName("c1"), fm.Components().ByName("c2") - c1.Outputs().ByName("o1").PipeTo(c2.Inputs().ByName("i1")) - }, - assertionsAfterDrain: func(t *testing.T, fm *FMesh) { - //All ports in all components are empty - assert.False(t, fm.Components().ByName("c1").Inputs().AnyHasSignals()) - assert.False(t, fm.Components().ByName("c1").Outputs().AnyHasSignals()) - assert.False(t, fm.Components().ByName("c2").Inputs().AnyHasSignals()) - assert.False(t, fm.Components().ByName("c2").Outputs().AnyHasSignals()) - }, - }, - { - name: "there are signals on output, but no pipes", - cycle: cycle.New().WithActivationResults( - component.NewActivationResult("c1"). - SetActivated(true). - WithActivationCode(component.ActivationCodeOK). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 1, - }, - })), - component.NewActivationResult("c2"). - SetActivated(true). - WithActivationCode(component.ActivationCodeOK). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 1, - }, - }))), - fm: New("fm").WithComponents( - component.New("c1"). - WithInputs("i1"). - WithOutputs("o1"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - return nil - }), - component.New("c2"). - WithInputs("i1"). - WithOutputs("o1"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - return nil - }), - ), - initFM: func(fm *FMesh) { - //Both components have signals on their outputs - fm.Components().ByName("c1").Outputs().ByName("o1").PutSignals(signal.New(1)) - fm.Components().ByName("c2").Outputs().ByName("o1").PutSignals(signal.New(1)) - }, - assertionsAfterDrain: func(t *testing.T, fm *FMesh) { - //Output signals are still there - assert.True(t, fm.Components().ByName("c1").Outputs().ByName("o1").HasSignals()) - assert.True(t, fm.Components().ByName("c2").Outputs().ByName("o1").HasSignals()) - - //Inputs are clear - assert.False(t, fm.Components().ByName("c1").Inputs().ByName("i1").HasSignals()) - assert.False(t, fm.Components().ByName("c2").Inputs().ByName("i1").HasSignals()) - }, - }, - { - name: "happy path", - cycle: cycle.New().WithActivationResults( - component.NewActivationResult("c1"). - SetActivated(true). - WithActivationCode(component.ActivationCodeOK). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 1, - }, - })), - component.NewActivationResult("c2"). - SetActivated(true). - WithActivationCode(component.ActivationCodeOK). - WithStateBefore( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })). - WithStateAfter( - component.NewStateSnapshot(). - WithInputPortsMetadata(port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }). - WithOutputPortsMetadata(port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - })), - ), - fm: New("fm").WithComponents( - component.New("c1").WithInputs("i1").WithOutputs("o1"), - component.New("c2").WithInputs("i1").WithOutputs("o1"), - ), - initFM: func(fm *FMesh) { - //Create a pipe - c1, c2 := fm.Components().ByName("c1"), fm.Components().ByName("c2") - c1.Outputs().ByName("o1").PipeTo(c2.Inputs().ByName("i1")) - - //c1 has a signal which must go to c2.i1 after drain - c1.Outputs().ByName("o1").PutSignals(signal.New(123)) - }, - assertionsAfterDrain: func(t *testing.T, fm *FMesh) { - c1, c2 := fm.Components().ByName("c1"), fm.Components().ByName("c2") - - assert.True(t, c2.Inputs().ByName("i1").HasSignals()) //Signal is transferred to destination port - assert.False(t, c1.Outputs().ByName("o1").HasSignals()) //Source port is cleaned up - assert.Equal(t, c2.Inputs().ByName("i1").Signals().FirstPayload().(int), 123) //The signal is correct - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.initFM != nil { - tt.initFM(tt.fm) - } - tt.fm.drainComponentsAfterCycle(tt.cycle) - tt.assertionsAfterDrain(t, tt.fm) - }) - } -} diff --git a/integration_tests/piping/piping_from_inputs_test.go b/integration_tests/piping/piping_from_inputs_test.go deleted file mode 100644 index 65d1277..0000000 --- a/integration_tests/piping/piping_from_inputs_test.go +++ /dev/null @@ -1,180 +0,0 @@ -package integration_tests - -import ( - "fmt" - "github.com/hovsep/fmesh" - "github.com/hovsep/fmesh/component" - "github.com/hovsep/fmesh/cycle" - "github.com/hovsep/fmesh/port" - "github.com/hovsep/fmesh/signal" - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_PipingFromInput(t *testing.T) { - tests := []struct { - name string - setupFM func() *fmesh.FMesh - setInputs func(fm *fmesh.FMesh) - assertions func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) - }{ - { - name: "observer pattern", - setupFM: func() *fmesh.FMesh { - adder := component.New("adder"). - WithDescription("adds i1 and i2"). - WithInputsIndexed("i", 1, 2). - WithOutputs("out"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - i1, i2 := inputs.ByName("i1").Signals().FirstPayload().(int), inputs.ByName("i2").Signals().FirstPayload().(int) - outputs.ByName("out").PutSignals(signal.New(i1 + i2)) - return nil - }) - - multiplier := component.New("multiplier"). - WithDescription("multiplies i1 by 10"). - WithInputs("i1"). - WithOutputs("out"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - i1 := inputs.ByName("i1").Signals().FirstPayload().(int) - outputs.ByName("out").PutSignals(signal.New(i1 * 10)) - return nil - }) - - logger := component.New("logger"). - WithDescription("logs all input signals"). - WithInputs("in"). - WithOutputs("log"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - for _, sig := range inputs.ByName("in").Signals() { - outputs.ByName("log").PutSignals(signal.New(fmt.Sprintf("LOGGED SIGNAL: %v", sig.Payload()))) - } - return nil - }) - - fm := fmesh.New("fm with observer"). - WithDescription("In this f-mesh adder receives 2 numbers, adds them and passes to multiplier. "+ - "The logger component is connected to adder's inputs, so it can observe them"+ - "The cool thing is logger does not need multiple input ports to observe multiple ports of other component"). - WithComponents(adder, multiplier, logger) - - adder.Outputs().ByName("out").PipeTo(multiplier.Inputs().ByName("i1")) - adder.Inputs().ByNames("i1", "i2").PipeTo(logger.Inputs().ByName("in")) - multiplier.Inputs().ByName("i1").PipeTo(logger.Inputs().ByName("in")) - - return fm - }, - setInputs: func(fm *fmesh.FMesh) { - fm.Components().ByName("adder").Inputs().ByName("i1").PutSignals(signal.New(4)) - fm.Components().ByName("adder").Inputs().ByName("i2").PutSignals(signal.New(5)) - }, - assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { - assert.NoError(t, err) - m := fm.Components().ByName("multiplier") - l := fm.Components().ByName("logger") - assert.True(t, m.Outputs().ByName("out").HasSignals()) - assert.Equal(t, 90, m.Outputs().ByName("out").Signals().FirstPayload().(int)) - - assert.True(t, fm.Components().ByName("logger").Outputs().ByName("log").HasSignals()) - assert.Len(t, l.Outputs().ByName("log").Signals(), 3) - }, - }, - { - name: "observing component which waits for inputs", - setupFM: func() *fmesh.FMesh { - starter := component.New("starter"). - WithDescription("This component just starts the whole f-mesh"). - WithInputs("start"). - WithOutputsIndexed("o", 1, 2). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - //Activate downstream components - outputs.PutSignals(inputs.ByName("start").Signals().First()) - return nil - }) - - incr1 := component.New("incr1"). - WithDescription("Increments the input"). - WithInputs("i1"). - WithOutputs("o1"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - outputs.PutSignals(signal.New(1 + inputs.ByName("i1").Signals().FirstPayload().(int))) - return nil - }) - - incr2 := component.New("incr2"). - WithDescription("Increments the input"). - WithInputs("i1"). - WithOutputs("o1"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - outputs.PutSignals(signal.New(1 + inputs.ByName("i1").Signals().FirstPayload().(int))) - return nil - }) - - doubler := component.New("doubler"). - WithDescription("Doubles the input"). - WithInputs("i1"). - WithOutputs("o1"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - outputs.PutSignals(signal.New(2 * inputs.ByName("i1").Signals().FirstPayload().(int))) - return nil - }) - - agg := component.New("result_aggregator"). - WithDescription("Adds 2 inputs (only when both are available)"). - WithInputsIndexed("i", 1, 2). - WithOutputs("result"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - if !inputs.ByNames("i1", "i2").AllHaveSignals() { - return component.NewErrWaitForInputs(true) - } - i1 := inputs.ByName("i1").Signals().FirstPayload().(int) - i2 := inputs.ByName("i2").Signals().FirstPayload().(int) - outputs.PutSignals(signal.New(i1 + i2)) - return nil - }) - - observer := component.New("obsrv"). - WithDescription("Observes inputs of result aggregator"). - WithInputsIndexed("i", 1, 2). - WithOutputs("log"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - outputs.ByName("log").PutSignals(inputs.ByNames("i1", "i2").Signals()...) - return nil - }) - - fm := fmesh.New("observer").WithComponents(starter, incr1, incr2, doubler, agg, observer) - - starter.Outputs().ByName("o1").PipeTo(incr1.Inputs().ByName("i1")) - starter.Outputs().ByName("o2").PipeTo(incr2.Inputs().ByName("i1")) - incr1.Outputs().ByName("o1").PipeTo(doubler.Inputs().ByName("i1")) - doubler.Outputs().ByName("o1").PipeTo(agg.Inputs().ByName("i1")) - incr2.Outputs().ByName("o1").PipeTo(agg.Inputs().ByName("i2")) - agg.Inputs().ByName("i1").PipeTo(observer.Inputs().ByName("i1")) - agg.Inputs().ByName("i2").PipeTo(observer.Inputs().ByName("i2")) - return fm - }, - setInputs: func(fm *fmesh.FMesh) { - fm.Components().ByName("starter").Inputs().PutSignals(signal.New(10)) - }, - assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { - assert.NoError(t, err) - - //Multiplier result - assert.Equal(t, 33, fm.Components().ByName("result_aggregator").Outputs().ByName("result").Signals().FirstPayload()) - - //Observed signals - assert.Len(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals(), 2) - assert.Contains(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals().AllPayloads(), 11) - assert.Contains(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals().AllPayloads(), 22) - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - fm := tt.setupFM() - tt.setInputs(fm) - cycles, err := fm.Run() - tt.assertions(t, fm, cycles, err) - }) - } -} diff --git a/integration_tests/piping/piping_to_outputs_test.go b/integration_tests/piping/piping_to_outputs_test.go deleted file mode 100644 index abc17b2..0000000 --- a/integration_tests/piping/piping_to_outputs_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package integration_tests - -import ( - "github.com/hovsep/fmesh" - "github.com/hovsep/fmesh/component" - "github.com/hovsep/fmesh/cycle" - "github.com/hovsep/fmesh/port" - "github.com/hovsep/fmesh/signal" - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_PipingToOutputs(t *testing.T) { - tests := []struct { - name string - setupFM func() *fmesh.FMesh - setInputs func(fm *fmesh.FMesh) - assertions func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) - }{ - { - // Simple case: both generator and injector are activated only once and in the same cycle - name: "single signal injection", - setupFM: func() *fmesh.FMesh { - gen := component.New("generator"). - WithDescription("Just generates a signal"). - WithInputs("start"). - WithOutputs("res").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - outputs.PutSignals(signal.New(111)) - return nil - }) - - inj := component.New("injector"). - WithDescription("Adds signals to gen.res output port"). - WithInputs("start"). - WithOutputs("res").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - outputs.PutSignals(signal.New(222)) - return nil - }) - - // o2o pipe: - inj.Outputs().ByName("res").PipeTo(gen.Outputs().ByName("res")) - - fm := fmesh.New("injector").WithComponents(gen, inj) - - return fm - }, - setInputs: func(fm *fmesh.FMesh) { - fm.Components().ByName("generator").Inputs().ByName("start").PutSignals(signal.New("start gen")) - fm.Components().ByName("injector").Inputs().ByName("start").PutSignals(signal.New("start inj")) - }, - assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { - assert.NoError(t, err) - assert.Len(t, fm.Components().ByName("generator").Outputs().ByName("res").Signals(), 2) - assert.False(t, fm.Components().ByName("injector").Outputs().ByName("res").HasSignals()) - }, - }, - { - // 2 components have symmetrically connected output (both are generators and injectors at the same time) - name: "outputs exchange", - setupFM: func() *fmesh.FMesh { - c1 := component.New("c1"). - WithDescription("Generates a signal"). - WithInputs("start"). - WithOutputs("res").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - outputs.PutSignals(signal.New("signal from c1")) - return nil - }) - - c2 := component.New("c2"). - WithDescription("Generates a signal"). - WithInputs("start"). - WithOutputs("res").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - outputs.PutSignals(signal.New("signal from c2")) - return nil - }) - - // o2o pipe 1: - c1.Outputs().ByName("res").PipeTo(c2.Outputs().ByName("res")) - c2.Outputs().ByName("res").PipeTo(c1.Outputs().ByName("res")) - - fm := fmesh.New("exchange").WithComponents(c1, c2) - - return fm - }, - setInputs: func(fm *fmesh.FMesh) { - fm.Components().ByName("c1").Inputs().ByName("start").PutSignals(signal.New("start c1")) - fm.Components().ByName("c2").Inputs().ByName("start").PutSignals(signal.New("start c2")) - }, - assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { - assert.NoError(t, err) - assert.Len(t, fm.Components().ByName("c1").Outputs().ByName("res").Signals(), 1) - assert.Len(t, fm.Components().ByName("c2").Outputs().ByName("res").Signals(), 1) - assert.Contains(t, fm.Components().ByName("c1").Outputs().ByName("res").Signals().AllPayloads(), "signal from c2") - assert.Contains(t, fm.Components().ByName("c2").Outputs().ByName("res").Signals().AllPayloads(), "signal from c1") - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - fm := tt.setupFM() - tt.setInputs(fm) - cycles, err := fm.Run() - tt.assertions(t, fm, cycles, err) - }) - } -} diff --git a/integration_tests/ports/waiting_for_inputs_test.go b/integration_tests/ports/waiting_for_inputs_test.go deleted file mode 100644 index 1fb684e..0000000 --- a/integration_tests/ports/waiting_for_inputs_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package integration_tests - -import ( - "github.com/hovsep/fmesh" - "github.com/hovsep/fmesh/component" - "github.com/hovsep/fmesh/cycle" - "github.com/hovsep/fmesh/port" - "github.com/hovsep/fmesh/signal" - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_WaitingForInputs(t *testing.T) { - tests := []struct { - name string - setupFM func() *fmesh.FMesh - setInputs func(fm *fmesh.FMesh) - assertions func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) - }{ - { - name: "waits for single input and keep signals", - setupFM: func() *fmesh.FMesh { - return fmesh.New("fm").WithComponents( - component.New("waiter"). - WithInputs("i1", "i2"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - if !inputs.ByNames("i1", "i2").AllHaveSignals() { - return component.NewErrWaitForInputs(true) - } - return nil - }), - ) - }, - setInputs: func(fm *fmesh.FMesh) { - //Only one input set - fm.Components().ByName("waiter").Inputs().ByName("i1").PutSignals(signal.New(1)) - }, - assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { - assert.NoError(t, err) - - // Signal is kept on input port - assert.True(t, fm.Components().ByName("waiter").Inputs().ByName("i1").HasSignals()) - }, - }, - { - //@TODO:make this test pass - name: "waits for multiple input", - setupFM: func() *fmesh.FMesh { - return fmesh.New("fm").WithComponents( - component.New("waiter"). - WithInputs("i1", "i2", "i3"). - WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - if !inputs.ByNames("i2", "i3").AllHaveSignals() { - return component.NewErrWaitForInputs(false) - } - return nil - }), - ) - }, - setInputs: func(fm *fmesh.FMesh) { - //Only one input set - fm.Components().ByName("waiter").Inputs().ByName("i1").PutSignals(signal.New(1)) - }, - assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { - assert.NoError(t, err) - - // Signal is not kept on input port - assert.False(t, fm.Components().ByName("waiter").Inputs().ByName("i1").HasSignals()) - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - fm := tt.setupFM() - tt.setInputs(fm) - cycles, err := fm.Run() - tt.assertions(t, fm, cycles, err) - }) - } -} diff --git a/port/port.go b/port/port.go index 3a5279f..22a459a 100644 --- a/port/port.go +++ b/port/port.go @@ -52,43 +52,18 @@ func (p *Port) Clear() { p.setSignals(signal.NewGroup()) } -// DisposeSignals removes n signals from the beginning of signal buffer -func (p *Port) DisposeSignals(n int) { - p.setSignals(p.Signals()[n:]) -} - -// FlushAndDispose flushes n signals and then disposes them -func (p *Port) FlushAndDispose(n int) { - if n > len(p.Signals()) { - //Flush all signals and clear - p.Flush() - p.Clear() - } - - if !p.HasSignals() || !p.HasPipes() { - return - } - - for _, outboundPort := range p.pipes { - //Fan-Out - ForwardNSignals(p, outboundPort, n) - } - - p.DisposeSignals(n) -} - -// Flush pushes signals to pipes and returns true when flushed +// Flush pushes signals to pipes and clears the port // @TODO: hide this method from user -func (p *Port) Flush() bool { +func (p *Port) Flush() { if !p.HasSignals() || !p.HasPipes() { - return false + return } for _, outboundPort := range p.pipes { //Fan-Out ForwardSignals(p, outboundPort) } - return true + p.Clear() } // HasSignals says whether port signals is set or not @@ -116,8 +91,3 @@ func (p *Port) PipeTo(toPorts ...*Port) { func ForwardSignals(source *Port, dest *Port) { dest.PutSignals(source.Signals()...) } - -// ForwardNSignals forwards n signals -func ForwardNSignals(source *Port, dest *Port, n int) { - dest.PutSignals(source.Signals()[:n]...) -} diff --git a/port/port_test.go b/port/port_test.go index 521e7df..5fa5cd9 100644 --- a/port/port_test.go +++ b/port/port_test.go @@ -272,137 +272,3 @@ func TestNewPort(t *testing.T) { }) } } - -func TestPort_Flush(t *testing.T) { - tests := []struct { - name string - getSource func() *Port - getDest func() *Port - wantResult bool - assertions func(t *testing.T, source *Port, dest *Port) - }{ - { - name: "port with no signals", - getSource: func() *Port { - return New("empty_src") - }, - getDest: func() *Port { - return New("empty_dest") - }, - assertions: func(t *testing.T, source *Port, dest *Port) { - assert.False(t, source.HasSignals()) - assert.False(t, dest.HasSignals()) - }, - wantResult: false, - }, - { - name: "flush to empty port", - getSource: func() *Port { - return New("portWithSignal").WithSignals(signal.New(111)) - }, - getDest: func() *Port { - return New("empty_dest") - }, - assertions: func(t *testing.T, source *Port, dest *Port) { - //Source port is not cleared during flush - assert.True(t, source.HasSignals()) - - //Signals transferred to destination port - assert.True(t, dest.HasSignals()) - assert.Equal(t, dest.Signals().FirstPayload().(int), 111) - }, - wantResult: true, - }, - { - name: "flush to port with signals", - getSource: func() *Port { - return New("portWithSignal").WithSignals(signal.New(333)) - }, - getDest: func() *Port { - return New("portWithMultipleSignals").WithSignals(signal.NewGroup(444, 555, 666)...) - }, - assertions: func(t *testing.T, source *Port, dest *Port) { - //Source port is not cleared - assert.True(t, source.HasSignals()) - - //Destination port now has 1 more signal - assert.True(t, dest.HasSignals()) - assert.Len(t, dest.Signals(), 4) - assert.Contains(t, dest.Signals().AllPayloads(), 333) - }, - wantResult: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - source := tt.getSource() - dest := tt.getDest() - source.PipeTo(dest) - assert.Equal(t, tt.wantResult, source.Flush()) - if tt.assertions != nil { - tt.assertions(t, source, dest) - } - }) - } -} - -func TestPort_DisposeSignals(t *testing.T) { - type args struct { - n int - } - tests := []struct { - name string - port *Port - wantSignals signal.Group - wantPanic bool - args args - }{ - { - name: "empty port", - port: New("empty"), - wantSignals: signal.Group{}, - args: args{ - n: 0, - }, - }, - { - name: "with signals", - port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), - wantSignals: signal.NewGroup(44, 55, 66), - args: args{ - n: 3, - }, - }, - { - name: "n > len(signals), panic", - port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), - wantSignals: signal.NewGroup(), - wantPanic: true, - args: args{ - n: 10, - }, - }, - { - name: "n = 0 has no effect", - port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), - wantSignals: signal.NewGroup(11, 22, 33, 44, 55, 66), - args: args{ - n: 0, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - - if tt.wantPanic { - assert.Panics(t, func() { - tt.port.DisposeSignals(tt.args.n) - }) - } else { - tt.port.DisposeSignals(tt.args.n) - assert.Equal(t, tt.wantSignals, tt.port.Signals()) - } - - }) - } -} From e4b665b99a815105e2a5123d50f037c185bfbc83 Mon Sep 17 00:00:00 2001 From: hovsep Date: Thu, 3 Oct 2024 11:39:20 +0300 Subject: [PATCH 3/6] Signal: add tests --- signal/group.go | 3 -- signal/group_test.go | 100 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/signal/group.go b/signal/group.go index af85347..eb5d5ea 100644 --- a/signal/group.go +++ b/signal/group.go @@ -36,9 +36,6 @@ func (group Group) With(signals ...*Signal) Group { newGroup := make(Group, len(group)+len(signals)) copy(newGroup, group) for i, sig := range signals { - if sig == nil { - continue - } newGroup[len(group)+i] = sig } diff --git a/signal/group_test.go b/signal/group_test.go index fc35e01..b57984e 100644 --- a/signal/group_test.go +++ b/signal/group_test.go @@ -108,3 +108,103 @@ func TestGroup_AllPayloads(t *testing.T) { }) } } + +func TestGroup_With(t *testing.T) { + type args struct { + signals []*Signal + } + tests := []struct { + name string + group Group + args args + want Group + }{ + { + name: "no addition to empty group", + group: NewGroup(), + args: args{ + signals: nil, + }, + want: NewGroup(), + }, + { + name: "no addition to group", + group: NewGroup(1, 2, 3), + args: args{ + signals: nil, + }, + want: NewGroup(1, 2, 3), + }, + { + name: "addition to empty group", + group: NewGroup(), + args: args{ + signals: NewGroup(3, 4, 5), + }, + want: NewGroup(3, 4, 5), + }, + { + name: "addition to group", + group: NewGroup(1, 2, 3), + args: args{ + signals: NewGroup(4, 5, 6), + }, + want: NewGroup(1, 2, 3, 4, 5, 6), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, tt.group.With(tt.args.signals...)) + }) + } +} + +func TestGroup_WithPayloads(t *testing.T) { + type args struct { + payloads []any + } + tests := []struct { + name string + group Group + args args + want Group + }{ + { + name: "no addition to empty group", + group: NewGroup(), + args: args{ + payloads: nil, + }, + want: NewGroup(), + }, + { + name: "addition to empty group", + group: NewGroup(), + args: args{ + payloads: []any{1, 2, 3}, + }, + want: NewGroup(1, 2, 3), + }, + { + name: "no addition to group", + group: NewGroup(1, 2, 3), + args: args{ + payloads: nil, + }, + want: NewGroup(1, 2, 3), + }, + { + name: "addition to group", + group: NewGroup(1, 2, 3), + args: args{ + payloads: []any{4, 5, 6}, + }, + want: NewGroup(1, 2, 3, 4, 5, 6), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, tt.group.WithPayloads(tt.args.payloads...)) + }) + } +} From 93565405f1ee411f6bfb480ccbe2bba393d50e4c Mon Sep 17 00:00:00 2001 From: hovsep Date: Thu, 3 Oct 2024 15:18:11 +0300 Subject: [PATCH 4/6] Port: add tests --- .../activation_result_collection_test.go | 2 +- component/collection_test.go | 2 +- component/component.go | 8 +- component/component_test.go | 10 +- cycle/cycle_test.go | 2 +- port/collection.go | 23 +- port/collection_test.go | 199 ++++++++++++++++-- port/group.go | 7 +- port/group_test.go | 137 ++++++++++++ port/metadata.go | 20 -- port/port.go | 16 +- port/port_test.go | 176 +++++++++++----- 12 files changed, 470 insertions(+), 132 deletions(-) create mode 100644 port/group_test.go delete mode 100644 port/metadata.go diff --git a/component/activation_result_collection_test.go b/component/activation_result_collection_test.go index 283f6b7..4882b3c 100644 --- a/component/activation_result_collection_test.go +++ b/component/activation_result_collection_test.go @@ -46,7 +46,7 @@ func TestActivationResultCollection_Add(t *testing.T) { }, }, { - name: "adding to existing collection", + name: "adding to non-empty collection", collection: NewActivationResultCollection().Add( New("c1").newActivationResultOK(), New("c2").newActivationResultOK(), diff --git a/component/collection_test.go b/component/collection_test.go index 112afab..ff5888e 100644 --- a/component/collection_test.go +++ b/component/collection_test.go @@ -80,7 +80,7 @@ func TestCollection_Add(t *testing.T) { }, }, { - name: "adding to existing collection", + name: "adding to non-empty collection", collection: NewComponentCollection().Add(New("c1"), New("c2")), args: args{ components: []*Component{New("c3"), New("c4")}, diff --git a/component/component.go b/component/component.go index 42446c3..4d7014c 100644 --- a/component/component.go +++ b/component/component.go @@ -33,25 +33,25 @@ func (c *Component) WithDescription(description string) *Component { // WithInputs ads input ports func (c *Component) WithInputs(portNames ...string) *Component { - c.inputs = c.Inputs().Add(port.NewGroup(portNames...)...) + c.inputs = c.Inputs().With(port.NewGroup(portNames...)...) return c } // WithOutputs adds output ports func (c *Component) WithOutputs(portNames ...string) *Component { - c.outputs = c.Outputs().Add(port.NewGroup(portNames...)...) + c.outputs = c.Outputs().With(port.NewGroup(portNames...)...) return c } // WithInputsIndexed creates multiple prefixed ports func (c *Component) WithInputsIndexed(prefix string, startIndex int, endIndex int) *Component { - c.inputs = c.Inputs().AddIndexed(prefix, startIndex, endIndex) + c.inputs = c.Inputs().WithIndexed(prefix, startIndex, endIndex) return c } // WithOutputsIndexed creates multiple prefixed ports func (c *Component) WithOutputsIndexed(prefix string, startIndex int, endIndex int) *Component { - c.outputs = c.Outputs().AddIndexed(prefix, startIndex, endIndex) + c.outputs = c.Outputs().WithIndexed(prefix, startIndex, endIndex) return c } diff --git a/component/component_test.go b/component/component_test.go index bc12241..6600340 100644 --- a/component/component_test.go +++ b/component/component_test.go @@ -46,7 +46,7 @@ func TestNewComponent(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, tt.want, New(tt.args.name), "New(%v)", tt.args.name) + assert.Equal(t, tt.want, New(tt.args.name)) }) } } @@ -233,10 +233,10 @@ func TestComponent_WithActivationFunc(t *testing.T) { componentAfter := tt.component.WithActivationFunc(tt.args.f) //Compare activation functions by they result and error - testInputs1 := port.NewCollection().Add(port.NewGroup("in1", "in2")...) - testInputs2 := port.NewCollection().Add(port.NewGroup("in1", "in2")...) - testOutputs1 := port.NewCollection().Add(port.NewGroup("out1", "out2")...) - testOutputs2 := port.NewCollection().Add(port.NewGroup("out1", "out2")...) + testInputs1 := port.NewCollection().With(port.NewGroup("in1", "in2")...) + testInputs2 := port.NewCollection().With(port.NewGroup("in1", "in2")...) + testOutputs1 := port.NewCollection().With(port.NewGroup("out1", "out2")...) + testOutputs2 := port.NewCollection().With(port.NewGroup("out1", "out2")...) err1 := componentAfter.f(testInputs1, testOutputs1) err2 := tt.args.f(testInputs2, testOutputs2) assert.Equal(t, err1, err2) diff --git a/cycle/cycle_test.go b/cycle/cycle_test.go index 21c3a29..88e3f91 100644 --- a/cycle/cycle_test.go +++ b/cycle/cycle_test.go @@ -200,7 +200,7 @@ func TestCycle_WithActivationResults(t *testing.T) { }, }, { - name: "adding to existing collection", + name: "adding to non-empty collection", cycleResult: New().WithActivationResults( component.NewActivationResult("c1"). SetActivated(false). diff --git a/port/collection.go b/port/collection.go index e404bfe..b361027 100644 --- a/port/collection.go +++ b/port/collection.go @@ -59,8 +59,8 @@ func (collection Collection) PutSignals(signals ...*signal.Signal) { } } -// WithSignals adds signals to every port in collection and returns the collection -func (collection Collection) WithSignals(signals ...*signal.Signal) Collection { +// withSignals adds signals to every port in collection and returns the collection +func (collection Collection) withSignals(signals ...*signal.Signal) Collection { collection.PutSignals(signals...) return collection } @@ -79,28 +79,25 @@ func (collection Collection) Flush() { } } -// PipeTo creates pipes from each port in collection -func (collection Collection) PipeTo(toPorts ...*Port) { +// PipeTo creates pipes from each port in collection to given destination ports +func (collection Collection) PipeTo(destPorts ...*Port) { for _, p := range collection { - p.PipeTo(toPorts...) + p.PipeTo(destPorts...) } } -// Add adds ports to collection -func (collection Collection) Add(ports ...*Port) Collection { +// With adds ports to collection and returns it +func (collection Collection) With(ports ...*Port) Collection { for _, port := range ports { - if port == nil { - continue - } collection[port.Name()] = port } return collection } -// AddIndexed creates ports with names like "o1","o2","o3" and so on -func (collection Collection) AddIndexed(prefix string, startIndex int, endIndex int) Collection { - return collection.Add(NewIndexedGroup(prefix, startIndex, endIndex)...) +// WithIndexed creates ports with names like "o1","o2","o3" and so on +func (collection Collection) WithIndexed(prefix string, startIndex int, endIndex int) Collection { + return collection.With(NewIndexedGroup(prefix, startIndex, endIndex)...) } // Signals returns all signals of all ports in the group diff --git a/port/collection_test.go b/port/collection_test.go index a68492b..e33ebb3 100644 --- a/port/collection_test.go +++ b/port/collection_test.go @@ -7,11 +7,9 @@ import ( ) func TestCollection_AllHaveSignal(t *testing.T) { - oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.New(123)) + oneEmptyPorts := NewCollection().With(NewGroup("p1", "p2", "p3")...).withSignals(signal.New(123)) oneEmptyPorts.ByName("p2").Clear() - allWithSignalPorts := NewCollection().Add(NewGroup("out1", "out2", "out3")...).WithSignals(signal.New(77)) - tests := []struct { name string ports Collection @@ -19,7 +17,7 @@ func TestCollection_AllHaveSignal(t *testing.T) { }{ { name: "all empty", - ports: NewCollection().Add(NewGroup("p1", "p2")...), + ports: NewCollection().With(NewGroup("p1", "p2")...), want: false, }, { @@ -29,7 +27,7 @@ func TestCollection_AllHaveSignal(t *testing.T) { }, { name: "all set", - ports: allWithSignalPorts, + ports: NewCollection().With(NewGroup("out1", "out2", "out3")...).withSignals(signal.New(77)), want: true, }, } @@ -41,7 +39,7 @@ func TestCollection_AllHaveSignal(t *testing.T) { } func TestCollection_AnyHasSignal(t *testing.T) { - oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.New(123)) + oneEmptyPorts := NewCollection().With(NewGroup("p1", "p2", "p3")...).withSignals(signal.New(123)) oneEmptyPorts.ByName("p2").Clear() tests := []struct { @@ -56,7 +54,7 @@ func TestCollection_AnyHasSignal(t *testing.T) { }, { name: "all empty", - ports: NewCollection().Add(NewGroup("p1", "p2", "p3")...), + ports: NewCollection().With(NewGroup("p1", "p2", "p3")...), want: false, }, } @@ -68,8 +66,6 @@ func TestCollection_AnyHasSignal(t *testing.T) { } func TestCollection_ByName(t *testing.T) { - portsWithSignals := NewCollection().Add(NewGroup("p1", "p2")...).WithSignals(signal.New(12)) - type args struct { name string } @@ -81,7 +77,7 @@ func TestCollection_ByName(t *testing.T) { }{ { name: "empty port found", - collection: NewCollection().Add(NewGroup("p1", "p2")...), + collection: NewCollection().With(NewGroup("p1", "p2")...), args: args{ name: "p1", }, @@ -89,7 +85,7 @@ func TestCollection_ByName(t *testing.T) { }, { name: "port with signals found", - collection: portsWithSignals, + collection: NewCollection().With(NewGroup("p1", "p2")...).withSignals(signal.New(12)), args: args{ name: "p2", }, @@ -101,7 +97,7 @@ func TestCollection_ByName(t *testing.T) { }, { name: "port not found", - collection: NewCollection().Add(NewGroup("p1", "p2")...), + collection: NewCollection().With(NewGroup("p1", "p2")...), args: args{ name: "p3", }, @@ -132,7 +128,7 @@ func TestCollection_ByNames(t *testing.T) { }{ { name: "single port found", - ports: NewCollection().Add(NewGroup("p1", "p2")...), + ports: NewCollection().With(NewGroup("p1", "p2")...), args: args{ names: []string{"p1"}, }, @@ -146,7 +142,7 @@ func TestCollection_ByNames(t *testing.T) { }, { name: "multiple ports found", - ports: NewCollection().Add(NewGroup("p1", "p2")...), + ports: NewCollection().With(NewGroup("p1", "p2")...), args: args{ names: []string{"p1", "p2"}, }, @@ -165,7 +161,7 @@ func TestCollection_ByNames(t *testing.T) { }, { name: "single port not found", - ports: NewCollection().Add(NewGroup("p1", "p2")...), + ports: NewCollection().With(NewGroup("p1", "p2")...), args: args{ names: []string{"p7"}, }, @@ -173,7 +169,7 @@ func TestCollection_ByNames(t *testing.T) { }, { name: "some ports not found", - ports: NewCollection().Add(NewGroup("p1", "p2")...), + ports: NewCollection().With(NewGroup("p1", "p2")...), args: args{ names: []string{"p1", "p2", "p3"}, }, @@ -200,14 +196,14 @@ func TestCollection_ByNames(t *testing.T) { func TestCollection_ClearSignal(t *testing.T) { t.Run("happy path", func(t *testing.T) { - ports := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.NewGroup(1, 2, 3)...) + ports := NewCollection().With(NewGroup("p1", "p2", "p3")...).withSignals(signal.NewGroup(1, 2, 3)...) assert.True(t, ports.AllHaveSignals()) ports.Clear() assert.False(t, ports.AnyHasSignals()) }) } -func TestCollection_Add(t *testing.T) { +func TestCollection_With(t *testing.T) { type args struct { ports []*Port } @@ -239,8 +235,8 @@ func TestCollection_Add(t *testing.T) { }, }, { - name: "adding to existing collection", - collection: NewCollection().Add(NewGroup("p1", "p2")...), + name: "adding to non-empty collection", + collection: NewCollection().With(NewGroup("p1", "p2")...), args: args{ ports: NewGroup("p3", "p4"), }, @@ -252,10 +248,171 @@ func TestCollection_Add(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.collection = tt.collection.Add(tt.args.ports...) + tt.collection = tt.collection.With(tt.args.ports...) + if tt.assertions != nil { + tt.assertions(t, tt.collection) + } + }) + } +} + +func TestCollection_Flush(t *testing.T) { + tests := []struct { + name string + collection Collection + assertions func(t *testing.T, collection Collection) + }{ + { + name: "empty collection", + collection: NewCollection(), + assertions: func(t *testing.T, collection Collection) { + assert.Len(t, collection, 0) + }, + }, + { + name: "all ports in collection are flushed", + collection: NewCollection().With( + New("src"). + WithSignals(signal.NewGroup(1, 2, 3)...). + withPipes(New("dst1"), New("dst2")), + ), + assertions: func(t *testing.T, collection Collection) { + assert.Len(t, collection, 1) + assert.False(t, collection.ByName("src").HasSignals()) + for _, destPort := range collection.ByName("src").pipes { + assert.Len(t, destPort.Signals(), 3) + assert.Contains(t, destPort.Signals().AllPayloads(), 1) + assert.Contains(t, destPort.Signals().AllPayloads(), 2) + assert.Contains(t, destPort.Signals().AllPayloads(), 3) + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.collection.Flush() + if tt.assertions != nil { + tt.assertions(t, tt.collection) + } + }) + } +} + +func TestCollection_PipeTo(t *testing.T) { + type args struct { + destPorts []*Port + } + tests := []struct { + name string + collection Collection + args args + assertions func(t *testing.T, collection Collection) + }{ + { + name: "empty collection", + collection: NewCollection(), + args: args{ + destPorts: NewIndexedGroup("dest_", 1, 3), + }, + assertions: func(t *testing.T, collection Collection) { + assert.Len(t, collection, 0) + }, + }, + { + name: "add pipes to each port in collection", + collection: NewCollection().With(NewIndexedGroup("p", 1, 3)...), + args: args{ + destPorts: NewIndexedGroup("dest", 1, 5), + }, + assertions: func(t *testing.T, collection Collection) { + assert.Len(t, collection, 3) + for _, p := range collection { + assert.True(t, p.HasPipes()) + assert.Len(t, p.pipes, 5) + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.collection.PipeTo(tt.args.destPorts...) if tt.assertions != nil { tt.assertions(t, tt.collection) } }) } } + +func TestCollection_WithIndexed(t *testing.T) { + type args struct { + prefix string + startIndex int + endIndex int + } + tests := []struct { + name string + collection Collection + args args + assertions func(t *testing.T, collection Collection) + }{ + { + name: "adding to empty collection", + collection: NewCollection(), + args: args{ + prefix: "p", + startIndex: 1, + endIndex: 3, + }, + assertions: func(t *testing.T, collection Collection) { + assert.Len(t, collection, 3) + }, + }, + { + name: "adding to non-empty collection", + collection: NewCollection().With(NewGroup("p1", "p2", "p3")...), + args: args{ + prefix: "p", + startIndex: 4, + endIndex: 5, + }, + assertions: func(t *testing.T, collection Collection) { + assert.Len(t, collection, 5) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + collectionAfter := tt.collection.WithIndexed(tt.args.prefix, tt.args.startIndex, tt.args.endIndex) + if tt.assertions != nil { + tt.assertions(t, collectionAfter) + } + }) + } +} + +func TestCollection_Signals(t *testing.T) { + tests := []struct { + name string + collection Collection + want signal.Group + }{ + { + name: "empty collection", + collection: NewCollection(), + want: signal.NewGroup(), + }, + { + name: "non-empty collection", + collection: NewCollection(). + WithIndexed("p", 1, 3). + withSignals(signal.NewGroup(1, 2, 3)...). + withSignals(signal.New("test")), + want: signal.NewGroup(1, 2, 3, "test", 1, 2, 3, "test", 1, 2, 3, "test"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, tt.collection.Signals()) + }) + } +} diff --git a/port/group.go b/port/group.go index 7caf63b..988e01f 100644 --- a/port/group.go +++ b/port/group.go @@ -14,12 +14,9 @@ func NewGroup(names ...string) Group { return group } -// NewIndexedGroup is useful when you want to create group of ports with same prefix +// NewIndexedGroup is useful to create group of ports with same prefix +// NOTE: endIndex is inclusive, e.g. NewIndexedGroup("p", 0, 0) will create one port with name "p0" func NewIndexedGroup(prefix string, startIndex int, endIndex int) Group { - if prefix == "" { - return nil - } - if startIndex > endIndex { return nil } diff --git a/port/group_test.go b/port/group_test.go new file mode 100644 index 0000000..3fb8a4f --- /dev/null +++ b/port/group_test.go @@ -0,0 +1,137 @@ +package port + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestNewGroup(t *testing.T) { + type args struct { + names []string + } + tests := []struct { + name string + args args + want Group + }{ + { + name: "empty group", + args: args{ + names: nil, + }, + want: Group{}, + }, + { + name: "non-empty group", + args: args{ + names: []string{"p1", "p2"}, + }, + want: Group{ + New("p1"), + New("p2"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, NewGroup(tt.args.names...)) + }) + } +} + +func TestNewIndexedGroup(t *testing.T) { + type args struct { + prefix string + startIndex int + endIndex int + } + tests := []struct { + name string + args args + want Group + }{ + { + name: "empty prefix is valid", + args: args{ + prefix: "", + startIndex: 0, + endIndex: 3, + }, + want: NewGroup("0", "1", "2", "3"), + }, + { + name: "with prefix", + args: args{ + prefix: "in_", + startIndex: 4, + endIndex: 5, + }, + want: NewGroup("in_4", "in_5"), + }, + { + name: "with invalid start index", + args: args{ + prefix: "", + startIndex: 999, + endIndex: 5, + }, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, NewIndexedGroup(tt.args.prefix, tt.args.startIndex, tt.args.endIndex)) + }) + } +} + +func TestGroup_With(t *testing.T) { + type args struct { + ports []*Port + } + tests := []struct { + name string + group Group + args args + assertions func(t *testing.T, group Group) + }{ + { + name: "adding nothing to empty group", + group: NewGroup(), + args: args{ + ports: nil, + }, + assertions: func(t *testing.T, group Group) { + assert.Len(t, group, 0) + }, + }, + { + name: "adding to empty group", + group: NewGroup(), + args: args{ + ports: NewGroup("p1", "p2", "p3"), + }, + assertions: func(t *testing.T, group Group) { + assert.Len(t, group, 3) + }, + }, + { + name: "adding to non-empty group", + group: NewIndexedGroup("p", 1, 3), + args: args{ + ports: NewGroup("p4", "p5", "p6"), + }, + assertions: func(t *testing.T, group Group) { + assert.Len(t, group, 6) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + groupAfter := tt.group.With(tt.args.ports...) + if tt.assertions != nil { + tt.assertions(t, groupAfter) + } + }) + } +} diff --git a/port/metadata.go b/port/metadata.go deleted file mode 100644 index 319b452..0000000 --- a/port/metadata.go +++ /dev/null @@ -1,20 +0,0 @@ -package port - -// Metadata contains metadata about the port -type Metadata struct { - SignalBufferLen int -} - -// MetadataMap contains port metadata indexed by port name -type MetadataMap map[string]*Metadata - -// GetPortsMetadata returns info about current length of each port in collection -func (collection Collection) GetPortsMetadata() MetadataMap { - res := make(MetadataMap) - for _, p := range collection { - res[p.Name()] = &Metadata{ - SignalBufferLen: len(p.Signals()), - } - } - return res -} diff --git a/port/port.go b/port/port.go index 22a459a..5b26c4e 100644 --- a/port/port.go +++ b/port/port.go @@ -78,15 +78,23 @@ func (p *Port) HasPipes() bool { // PipeTo creates one or multiple pipes to other port(s) // @TODO: hide this method from AF -func (p *Port) PipeTo(toPorts ...*Port) { - for _, toPort := range toPorts { - if toPort == nil { +func (p *Port) PipeTo(destPorts ...*Port) { + for _, destPort := range destPorts { + if destPort == nil { continue } - p.pipes = p.pipes.With(toPort) + p.pipes = p.pipes.With(destPort) } } +// withPipes adds pipes and returns the port +func (p *Port) withPipes(destPorts ...*Port) *Port { + for _, destPort := range destPorts { + p.PipeTo(destPort) + } + return p +} + // ForwardSignals copies all signals from source port to destination port, without clearing the source port func ForwardSignals(source *Port, dest *Port) { dest.PutSignals(source.Signals()...) diff --git a/port/port_test.go b/port/port_test.go index 5fa5cd9..98b12cb 100644 --- a/port/port_test.go +++ b/port/port_test.go @@ -7,8 +7,6 @@ import ( ) func TestPort_HasSignals(t *testing.T) { - portWithSignal := New("portWithSignal").WithSignals(signal.New(123)) - tests := []struct { name string port *Port @@ -21,7 +19,7 @@ func TestPort_HasSignals(t *testing.T) { }, { name: "port has normal signals", - port: portWithSignal, + port: New("p").WithSignals(signal.New(123)), want: true, }, } @@ -33,8 +31,6 @@ func TestPort_HasSignals(t *testing.T) { } func TestPort_Signals(t *testing.T) { - portWithSignal := New("portWithSignal").WithSignals(signal.New(123)) - tests := []struct { name string port *Port @@ -47,7 +43,7 @@ func TestPort_Signals(t *testing.T) { }, { name: "with signal", - port: portWithSignal, + port: New("p").WithSignals(signal.New(123)), want: signal.NewGroup(123), }, } @@ -59,8 +55,6 @@ func TestPort_Signals(t *testing.T) { } func TestPort_Clear(t *testing.T) { - portWithSignal := New("portWithSignal").WithSignals(signal.New(111)) - tests := []struct { name string before *Port @@ -68,8 +62,8 @@ func TestPort_Clear(t *testing.T) { }{ { name: "happy path", - before: portWithSignal, - after: &Port{name: "portWithSignal", pipes: Group{}, signals: signal.Group{}}, + before: New("p").WithSignals(signal.New(111)), + after: &Port{name: "p", pipes: Group{}, signals: signal.Group{}}, }, { name: "cleaning empty port", @@ -131,77 +125,51 @@ func TestPort_PipeTo(t *testing.T) { } func TestPort_PutSignals(t *testing.T) { - portWithSingleSignal := New("portWithSingleSignal").WithSignals(signal.New(11)) - - portWithMultipleSignals := New("portWithMultipleSignals").WithSignals(signal.NewGroup(11, 12)...) - - portWithMultipleSignals2 := New("portWithMultipleSignals2").WithSignals(signal.NewGroup(55, 66)...) - type args struct { signals []*signal.Signal } tests := []struct { - name string - before *Port - after *Port - args args + name string + port *Port + signalsAfter signal.Group + args args }{ { - name: "single signal to empty port", - before: New("emptyPort"), - after: &Port{ - name: "emptyPort", - signals: signal.NewGroup(11), - pipes: Group{}, - }, + name: "single signal to empty port", + port: New("emptyPort"), + signalsAfter: signal.NewGroup(11), args: args{ signals: signal.NewGroup(11), }, }, { - name: "multiple signals to empty port", - before: New("p"), - after: &Port{ - name: "p", - signals: signal.NewGroup(11, 12), - pipes: Group{}, - }, + name: "multiple signals to empty port", + port: New("p"), + signalsAfter: signal.NewGroup(11, 12), args: args{ signals: signal.NewGroup(11, 12), }, }, { - name: "single signal to port with single signal", - before: portWithSingleSignal, - after: &Port{ - name: "portWithSingleSignal", - signals: signal.NewGroup(11, 12), - pipes: Group{}, - }, + name: "single signal to port with single signal", + port: New("p").WithSignals(signal.New(11)), + signalsAfter: signal.NewGroup(11, 12), args: args{ signals: signal.NewGroup(12), }, }, { - name: "single signals to port with multiple signals", - before: portWithMultipleSignals, - after: &Port{ - name: "portWithMultipleSignals", - signals: signal.NewGroup(11, 12, 13), - pipes: Group{}, - }, + name: "single signals to port with multiple signals", + port: New("p").WithSignals(signal.NewGroup(11, 12)...), + signalsAfter: signal.NewGroup(11, 12, 13), args: args{ signals: signal.NewGroup(13), }, }, { - name: "multiple signals to port with multiple signals", - before: portWithMultipleSignals2, - after: &Port{ - name: "portWithMultipleSignals2", - signals: signal.NewGroup(55, 66, 13, 14), //Notice LIFO order - pipes: Group{}, - }, + name: "multiple signals to port with multiple signals", + port: New("p").WithSignals(signal.NewGroup(55, 66)...), + signalsAfter: signal.NewGroup(55, 66, 13, 14), //Notice LIFO order args: args{ signals: signal.NewGroup(13, 14), }, @@ -209,8 +177,8 @@ func TestPort_PutSignals(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.before.PutSignals(tt.args.signals...) - assert.ElementsMatch(t, tt.after.Signals(), tt.before.Signals()) + tt.port.PutSignals(tt.args.signals...) + assert.ElementsMatch(t, tt.signalsAfter, tt.port.Signals()) }) } } @@ -272,3 +240,97 @@ func TestNewPort(t *testing.T) { }) } } + +func TestPort_HasPipes(t *testing.T) { + tests := []struct { + name string + port *Port + want bool + }{ + { + name: "no pipes", + port: New("p"), + want: false, + }, + { + name: "with pipes", + port: New("p1").withPipes(New("p2")), + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, tt.port.HasPipes()) + }) + } +} + +func TestPort_Flush(t *testing.T) { + tests := []struct { + name string + srcPort *Port + assertions func(t *testing.T, srcPort *Port) + }{ + { + name: "port with signals and no pipes is not flushed", + srcPort: New("p").WithSignals(signal.NewGroup(1, 2, 3)...), + assertions: func(t *testing.T, srcPort *Port) { + assert.True(t, srcPort.HasSignals()) + assert.Len(t, srcPort.Signals(), 3) + assert.False(t, srcPort.HasPipes()) + }, + }, + { + name: "empty port with pipes is not flushed", + srcPort: New("p").withPipes(New("p1"), New("p2")), + assertions: func(t *testing.T, srcPort *Port) { + assert.False(t, srcPort.HasSignals()) + assert.True(t, srcPort.HasPipes()) + }, + }, + { + name: "flush to empty ports", + srcPort: New("p").WithSignals(signal.NewGroup(1, 2, 3)...). + withPipes( + New("p1"), + New("p2")), + assertions: func(t *testing.T, srcPort *Port) { + assert.False(t, srcPort.HasSignals()) + assert.True(t, srcPort.HasPipes()) + for _, destPort := range srcPort.pipes { + assert.True(t, destPort.HasSignals()) + assert.Len(t, destPort.Signals(), 3) + assert.Contains(t, destPort.Signals().AllPayloads(), 1) + assert.Contains(t, destPort.Signals().AllPayloads(), 2) + assert.Contains(t, destPort.Signals().AllPayloads(), 3) + } + }, + }, + { + name: "flush to non empty ports", + srcPort: New("p").WithSignals(signal.NewGroup(1, 2, 3)...). + withPipes( + New("p1").WithSignals(signal.NewGroup(4, 5, 6)...), + New("p2").WithSignals(signal.NewGroup(7, 8, 9)...)), + assertions: func(t *testing.T, srcPort *Port) { + assert.False(t, srcPort.HasSignals()) + assert.True(t, srcPort.HasPipes()) + for _, destPort := range srcPort.pipes { + assert.True(t, destPort.HasSignals()) + assert.Len(t, destPort.Signals(), 6) + assert.Contains(t, destPort.Signals().AllPayloads(), 1) + assert.Contains(t, destPort.Signals().AllPayloads(), 2) + assert.Contains(t, destPort.Signals().AllPayloads(), 3) + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.srcPort.Flush() + if tt.assertions != nil { + tt.assertions(t, tt.srcPort) + } + }) + } +} From f46aa85a43a11da93cbaaccac19a40b3f504b62a Mon Sep 17 00:00:00 2001 From: hovsep Date: Thu, 3 Oct 2024 23:06:40 +0300 Subject: [PATCH 5/6] Component: add tests --- component/component_test.go | 122 ++++++++++++++++++++++++++++++++++-- 1 file changed, 118 insertions(+), 4 deletions(-) diff --git a/component/component_test.go b/component/component_test.go index 6600340..44b8417 100644 --- a/component/component_test.go +++ b/component/component_test.go @@ -386,7 +386,7 @@ func TestComponent_MaybeActivate(t *testing.T) { wantActivationResult *ActivationResult }{ { - name: "empty component is not activated", + name: "component with no activation function and no inputs", getComponent: func() *Component { return New("c1") }, @@ -403,6 +403,22 @@ func TestComponent_MaybeActivate(t *testing.T) { SetActivated(false). WithActivationCode(ActivationCodeNoFunction), }, + { + name: "component with activation func, but no inputs", + getComponent: func() *Component { + c := New("c1"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) + return nil + }) + return c + }, + wantActivationResult: NewActivationResult("c1"). + SetActivated(false). + WithActivationCode(ActivationCodeNoInput), + }, { name: "activated with error", getComponent: func() *Component { @@ -482,9 +498,9 @@ func TestComponent_MaybeActivate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { gotActivationResult := tt.getComponent().MaybeActivate() - assert.Equal(t, gotActivationResult.Activated(), tt.wantActivationResult.Activated()) - assert.Equal(t, gotActivationResult.ComponentName(), tt.wantActivationResult.ComponentName()) - assert.Equal(t, gotActivationResult.Code(), tt.wantActivationResult.Code()) + assert.Equal(t, tt.wantActivationResult.Activated(), gotActivationResult.Activated()) + assert.Equal(t, tt.wantActivationResult.ComponentName(), gotActivationResult.ComponentName()) + assert.Equal(t, tt.wantActivationResult.Code(), gotActivationResult.Code()) if tt.wantActivationResult.HasError() { assert.EqualError(t, gotActivationResult.Error(), tt.wantActivationResult.Error().Error()) } else { @@ -494,3 +510,101 @@ func TestComponent_MaybeActivate(t *testing.T) { }) } } + +func TestComponent_WithInputsIndexed(t *testing.T) { + type args struct { + prefix string + startIndex int + endIndex int + } + tests := []struct { + name string + component *Component + args args + assertions func(t *testing.T, component *Component) + }{ + { + name: "component has no ports before", + component: New("c").WithOutputs("o1", "o2"), + args: args{ + prefix: "p", + startIndex: 1, + endIndex: 3, + }, + assertions: func(t *testing.T, component *Component) { + assert.Len(t, component.Outputs(), 2) + assert.Len(t, component.Inputs(), 3) + }, + }, + { + name: "component has ports before", + component: New("c").WithInputs("i1", "i2").WithOutputs("o1", "o2"), + args: args{ + prefix: "p", + startIndex: 1, + endIndex: 3, + }, + assertions: func(t *testing.T, component *Component) { + assert.Len(t, component.Outputs(), 2) + assert.Len(t, component.Inputs(), 5) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + componentAfter := tt.component.WithInputsIndexed(tt.args.prefix, tt.args.startIndex, tt.args.endIndex) + if tt.assertions != nil { + tt.assertions(t, componentAfter) + } + }) + } +} + +func TestComponent_WithOutputsIndexed(t *testing.T) { + type args struct { + prefix string + startIndex int + endIndex int + } + tests := []struct { + name string + component *Component + args args + assertions func(t *testing.T, component *Component) + }{ + { + name: "component has no ports before", + component: New("c").WithInputs("i1", "i2"), + args: args{ + prefix: "p", + startIndex: 1, + endIndex: 3, + }, + assertions: func(t *testing.T, component *Component) { + assert.Len(t, component.Inputs(), 2) + assert.Len(t, component.Outputs(), 3) + }, + }, + { + name: "component has ports before", + component: New("c").WithInputs("i1", "i2").WithOutputs("o1", "o2"), + args: args{ + prefix: "p", + startIndex: 1, + endIndex: 3, + }, + assertions: func(t *testing.T, component *Component) { + assert.Len(t, component.Inputs(), 2) + assert.Len(t, component.Outputs(), 5) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + componentAfter := tt.component.WithOutputsIndexed(tt.args.prefix, tt.args.startIndex, tt.args.endIndex) + if tt.assertions != nil { + tt.assertions(t, componentAfter) + } + }) + } +} From 8bd7eecacd3d4b762efa42d57a9d02ae58eb1a58 Mon Sep 17 00:00:00 2001 From: hovsep Date: Fri, 4 Oct 2024 00:04:02 +0300 Subject: [PATCH 6/6] Add f-mesh configuration --- errors.go | 1 + fmesh.go | 52 +++-- fmesh_test.go | 217 +++++++++++++++------ integration_tests/computation/math_test.go | 5 +- 4 files changed, 197 insertions(+), 78 deletions(-) diff --git a/errors.go b/errors.go index b06689d..7f10ce8 100644 --- a/errors.go +++ b/errors.go @@ -21,4 +21,5 @@ var ( ErrHitAnErrorOrPanic = errors.New("f-mesh hit an error or panic and will be stopped") ErrHitAPanic = errors.New("f-mesh hit a panic and will be stopped") ErrUnsupportedErrorHandlingStrategy = errors.New("unsupported error handling strategy") + ErrReachedMaxAllowedCycles = errors.New("reached max allowed cycles") ) diff --git a/fmesh.go b/fmesh.go index c00a575..398724e 100644 --- a/fmesh.go +++ b/fmesh.go @@ -1,26 +1,40 @@ package fmesh import ( - "errors" "github.com/hovsep/fmesh/component" "github.com/hovsep/fmesh/cycle" "sync" ) -// @TODO: move this to fm.Config -const maxCyclesAllowed = 100 //Dev mode +const UnlimitedCycles = 0 + +type Config struct { + // ErrorHandlingStrategy defines how f-mesh will handle errors and panics + ErrorHandlingStrategy ErrorHandlingStrategy + // CyclesLimit defines max number of activation cycles, 0 means no limit + CyclesLimit int +} + +var defaultConfig = Config{ + ErrorHandlingStrategy: StopOnFirstErrorOrPanic, + CyclesLimit: 1000, +} // FMesh is the functional mesh type FMesh struct { - name string - description string - components component.Collection - errorHandlingStrategy ErrorHandlingStrategy + name string + description string + components component.Collection + config Config } // New creates a new f-mesh func New(name string) *FMesh { - return &FMesh{name: name, components: component.NewComponentCollection()} + return &FMesh{ + name: name, + components: component.NewComponentCollection(), + config: defaultConfig, + } } // Name getter @@ -51,9 +65,9 @@ func (fm *FMesh) WithComponents(components ...*component.Component) *FMesh { return fm } -// WithErrorHandlingStrategy defines how the mesh will handle errors -func (fm *FMesh) WithErrorHandlingStrategy(strategy ErrorHandlingStrategy) *FMesh { - fm.errorHandlingStrategy = strategy +// WithConfig sets the configuration and returns the f-mesh +func (fm *FMesh) WithConfig(config Config) *FMesh { + fm.config = config return fm } @@ -113,8 +127,8 @@ func (fm *FMesh) Run() (cycle.Collection, error) { } func (fm *FMesh) mustStop(cycleResult *cycle.Cycle, cycleNum int) (bool, error) { - if cycleNum >= maxCyclesAllowed { - return true, errors.New("reached max allowed cycles") + if (fm.config.CyclesLimit > 0) && (cycleNum >= fm.config.CyclesLimit) { + return true, ErrReachedMaxAllowedCycles } //Check if we are done (no components activated during the cycle => all inputs are processed) @@ -123,11 +137,17 @@ func (fm *FMesh) mustStop(cycleResult *cycle.Cycle, cycleNum int) (bool, error) } //Check if mesh must stop because of configured error handling strategy - switch fm.errorHandlingStrategy { + switch fm.config.ErrorHandlingStrategy { case StopOnFirstErrorOrPanic: - return cycleResult.HasErrors() || cycleResult.HasPanics(), ErrHitAnErrorOrPanic + if cycleResult.HasErrors() || cycleResult.HasPanics() { + return true, ErrHitAnErrorOrPanic + } + return false, nil case StopOnFirstPanic: - return cycleResult.HasPanics(), ErrHitAPanic + if cycleResult.HasPanics() { + return true, ErrHitAPanic + } + return false, nil case IgnoreAll: return false, nil default: diff --git a/fmesh_test.go b/fmesh_test.go index aeff95a..e656a45 100644 --- a/fmesh_test.go +++ b/fmesh_test.go @@ -26,6 +26,7 @@ func TestNew(t *testing.T) { }, want: &FMesh{ components: component.Collection{}, + config: defaultConfig, }, }, { @@ -36,6 +37,7 @@ func TestNew(t *testing.T) { want: &FMesh{ name: "fm1", components: component.Collection{}, + config: defaultConfig, }, }, } @@ -63,10 +65,10 @@ func TestFMesh_WithDescription(t *testing.T) { description: "", }, want: &FMesh{ - name: "fm1", - description: "", - components: component.Collection{}, - errorHandlingStrategy: 0, + name: "fm1", + description: "", + components: component.Collection{}, + config: defaultConfig, }, }, { @@ -76,10 +78,10 @@ func TestFMesh_WithDescription(t *testing.T) { description: "descr", }, want: &FMesh{ - name: "fm1", - description: "descr", - components: component.Collection{}, - errorHandlingStrategy: 0, + name: "fm1", + description: "descr", + components: component.Collection{}, + config: defaultConfig, }, }, } @@ -90,9 +92,9 @@ func TestFMesh_WithDescription(t *testing.T) { } } -func TestFMesh_WithErrorHandlingStrategy(t *testing.T) { +func TestFMesh_WithConfig(t *testing.T) { type args struct { - strategy ErrorHandlingStrategy + config Config } tests := []struct { name string @@ -101,33 +103,27 @@ func TestFMesh_WithErrorHandlingStrategy(t *testing.T) { want *FMesh }{ { - name: "default strategy", - fm: New("fm1"), - args: args{ - strategy: 0, - }, - want: &FMesh{ - name: "fm1", - components: component.Collection{}, - errorHandlingStrategy: StopOnFirstErrorOrPanic, - }, - }, - { - name: "custom strategy", + name: "custom config", fm: New("fm1"), args: args{ - strategy: IgnoreAll, + config: Config{ + ErrorHandlingStrategy: IgnoreAll, + CyclesLimit: 9999, + }, }, want: &FMesh{ - name: "fm1", - components: component.Collection{}, - errorHandlingStrategy: IgnoreAll, + name: "fm1", + components: component.Collection{}, + config: Config{ + ErrorHandlingStrategy: IgnoreAll, + CyclesLimit: 9999, + }, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.want, tt.fm.WithErrorHandlingStrategy(tt.args.strategy)) + assert.Equal(t, tt.want, tt.fm.WithConfig(tt.args.config)) }) } } @@ -137,10 +133,10 @@ func TestFMesh_WithComponents(t *testing.T) { components []*component.Component } tests := []struct { - name string - fm *FMesh - args args - want *FMesh + name string + fm *FMesh + args args + wantComponents component.Collection }{ { name: "no components", @@ -148,12 +144,7 @@ func TestFMesh_WithComponents(t *testing.T) { args: args{ components: nil, }, - want: &FMesh{ - name: "fm1", - description: "", - components: component.Collection{}, - errorHandlingStrategy: 0, - }, + wantComponents: component.Collection{}, }, { name: "with single component", @@ -163,11 +154,8 @@ func TestFMesh_WithComponents(t *testing.T) { component.New("c1"), }, }, - want: &FMesh{ - name: "fm1", - components: component.Collection{ - "c1": component.New("c1"), - }, + wantComponents: component.Collection{ + "c1": component.New("c1"), }, }, { @@ -179,12 +167,9 @@ func TestFMesh_WithComponents(t *testing.T) { component.New("c2"), }, }, - want: &FMesh{ - name: "fm1", - components: component.Collection{ - "c1": component.New("c1"), - "c2": component.New("c2"), - }, + wantComponents: component.Collection{ + "c1": component.New("c1"), + "c2": component.New("c2"), }, }, { @@ -198,19 +183,16 @@ func TestFMesh_WithComponents(t *testing.T) { component.New("c4").WithDescription("descr4"), }, }, - want: &FMesh{ - name: "fm1", - components: component.Collection{ - "c1": component.New("c1").WithDescription("descr1"), - "c2": component.New("c2").WithDescription("descr3"), - "c4": component.New("c4").WithDescription("descr4"), - }, + wantComponents: component.Collection{ + "c1": component.New("c1").WithDescription("descr1"), + "c2": component.New("c2").WithDescription("descr3"), + "c4": component.New("c4").WithDescription("descr4"), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.want, tt.fm.WithComponents(tt.args.components...)) + assert.Equal(t, tt.wantComponents, tt.fm.WithComponents(tt.args.components...).Components()) }) } } @@ -279,7 +261,10 @@ func TestFMesh_Run(t *testing.T) { }, { name: "unsupported error handling strategy", - fm: New("fm").WithErrorHandlingStrategy(100). + fm: New("fm").WithConfig(Config{ + ErrorHandlingStrategy: 100, + CyclesLimit: 0, + }). WithComponents( component.New("c1"). WithDescription("This component simply puts a constant on o1"). @@ -305,7 +290,9 @@ func TestFMesh_Run(t *testing.T) { { name: "stop on first error on first cycle", fm: New("fm"). - WithErrorHandlingStrategy(StopOnFirstErrorOrPanic). + WithConfig(Config{ + ErrorHandlingStrategy: StopOnFirstErrorOrPanic, + }). WithComponents( component.New("c1"). WithDescription("This component just returns an unexpected error"). @@ -330,7 +317,9 @@ func TestFMesh_Run(t *testing.T) { { name: "stop on first panic on cycle 3", fm: New("fm"). - WithErrorHandlingStrategy(StopOnFirstPanic). + WithConfig(Config{ + ErrorHandlingStrategy: StopOnFirstPanic, + }). WithComponents( component.New("c1"). WithDescription("This component just sends a number to c2"). @@ -428,7 +417,9 @@ func TestFMesh_Run(t *testing.T) { { name: "all errors and panics are ignored", fm: New("fm"). - WithErrorHandlingStrategy(IgnoreAll). + WithConfig(Config{ + ErrorHandlingStrategy: IgnoreAll, + }). WithComponents( component.New("c1"). WithDescription("This component just sends a number to c2"). @@ -689,3 +680,107 @@ func TestFMesh_runCycle(t *testing.T) { }) } } + +func TestFMesh_mustStop(t *testing.T) { + type args struct { + cycleResult *cycle.Cycle + cycleNum int + } + tests := []struct { + name string + fmesh *FMesh + args args + want bool + wantErr error + }{ + { + name: "with default config, no time to stop", + fmesh: New("fm"), + args: args{ + cycleResult: cycle.New().WithActivationResults( + component.NewActivationResult("c1"). + SetActivated(true). + WithActivationCode(component.ActivationCodeOK), + ), + cycleNum: 5, + }, + want: false, + wantErr: nil, + }, + { + name: "with default config, reached max cycles", + fmesh: New("fm"), + args: args{ + cycleResult: cycle.New().WithActivationResults( + component.NewActivationResult("c1"). + SetActivated(true). + WithActivationCode(component.ActivationCodeOK), + ), + cycleNum: 1001, + }, + want: true, + wantErr: ErrReachedMaxAllowedCycles, + }, + { + name: "mesh finished naturally and must stop", + fmesh: New("fm"), + args: args{ + cycleResult: cycle.New().WithActivationResults( + component.NewActivationResult("c1"). + SetActivated(false). + WithActivationCode(component.ActivationCodeNoInput), + ), + cycleNum: 5, + }, + want: true, + wantErr: nil, + }, + { + name: "mesh hit an error", + fmesh: New("fm").WithConfig(Config{ + ErrorHandlingStrategy: StopOnFirstErrorOrPanic, + CyclesLimit: UnlimitedCycles, + }), + args: args{ + cycleResult: cycle.New().WithActivationResults( + component.NewActivationResult("c1"). + SetActivated(true). + WithActivationCode(component.ActivationCodeReturnedError). + WithError(errors.New("c1 activation finished with error")), + ), + cycleNum: 5, + }, + want: true, + wantErr: ErrHitAnErrorOrPanic, + }, + { + name: "mesh hit a panic", + fmesh: New("fm").WithConfig(Config{ + ErrorHandlingStrategy: StopOnFirstPanic, + }), + args: args{ + cycleResult: cycle.New().WithActivationResults( + component.NewActivationResult("c1"). + SetActivated(true). + WithActivationCode(component.ActivationCodePanicked). + WithError(errors.New("c1 panicked")), + ), + cycleNum: 5, + }, + want: true, + wantErr: ErrHitAPanic, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.fmesh.mustStop(tt.args.cycleResult, tt.args.cycleNum) + if tt.wantErr != nil { + assert.EqualError(t, err, tt.wantErr.Error()) + } else { + assert.NoError(t, err) + } + + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/integration_tests/computation/math_test.go b/integration_tests/computation/math_test.go index f9169e5..38c5fce 100644 --- a/integration_tests/computation/math_test.go +++ b/integration_tests/computation/math_test.go @@ -42,7 +42,10 @@ func Test_Math(t *testing.T) { c1.Outputs().ByName("res").PipeTo(c2.Inputs().ByName("num")) - return fmesh.New("fm").WithComponents(c1, c2).WithErrorHandlingStrategy(fmesh.StopOnFirstErrorOrPanic) + return fmesh.New("fm").WithComponents(c1, c2).WithConfig(fmesh.Config{ + ErrorHandlingStrategy: fmesh.StopOnFirstErrorOrPanic, + CyclesLimit: 10, + }) }, setInputs: func(fm *fmesh.FMesh) { fm.Components().ByName("c1").Inputs().ByName("num").PutSignals(signal.New(32))