From 9324f5971ac92f7aa9b995cb5dafadadf56e949e Mon Sep 17 00:00:00 2001 From: hovsep Date: Thu, 19 Sep 2024 02:24:38 +0300 Subject: [PATCH] Allow to pipe from inputs --- component/activation_result.go | 55 +++++++++-- component/activation_result_collection.go | 9 ++ component/component.go | 27 +++--- component/component_test.go | 7 +- errors.go | 9 +- fmesh.go | 23 ++++- fmesh_test.go | 83 +++++++++++++---- integration_tests/computation/math_test.go | 2 +- .../{multiplexing_test.go => fan_test.go} | 7 +- .../piping/piping_from_inputs_test.go | 92 +++++++++++++++++++ .../piping/piping_to_outputs_test.go | 37 ++++++++ port/collection.go | 32 +++++++ port/collection_test.go | 41 +++++---- port/port.go | 38 ++++---- port/port_test.go | 46 +++++----- signal/collection.go | 61 ++++++++++++ 16 files changed, 460 insertions(+), 109 deletions(-) rename integration_tests/piping/{multiplexing_test.go => fan_test.go} (97%) create mode 100644 integration_tests/piping/piping_from_inputs_test.go create mode 100644 integration_tests/piping/piping_to_outputs_test.go create mode 100644 signal/collection.go diff --git a/component/activation_result.go b/component/activation_result.go index ea5ae85..a801c3d 100644 --- a/component/activation_result.go +++ b/component/activation_result.go @@ -1,11 +1,15 @@ package component -import "fmt" +import ( + "errors" + "fmt" +) // ActivationResult defines the result (possibly an error) of the activation of given component in given cycle type ActivationResult struct { componentName string activated bool + inputKeys []string //@TODO: check if we can replace this by one int which will show the index of last signal used as input within signals collection (use signal position in the collection as it's unique id) code ActivationResultCode err error } @@ -88,32 +92,69 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult { return ar } +func (ar *ActivationResult) WithInputKeys(keys []string) *ActivationResult { + ar.inputKeys = keys + return ar +} + +func (ar *ActivationResult) InputKeys() []string { + return ar.inputKeys +} + // newActivationResultOK builds a specific activation result func (c *Component) newActivationResultOK() *ActivationResult { - return NewActivationResult(c.Name()).SetActivated(true).WithActivationCode(ActivationCodeOK) + return NewActivationResult(c.Name()). + SetActivated(true). + WithActivationCode(ActivationCodeOK). + WithInputKeys(c.Inputs().GetSignalKeys()) + } // newActivationCodeNoInput builds a specific activation result func (c *Component) newActivationCodeNoInput() *ActivationResult { - return NewActivationResult(c.Name()).SetActivated(false).WithActivationCode(ActivationCodeNoInput) + return NewActivationResult(c.Name()). + SetActivated(false). + WithActivationCode(ActivationCodeNoInput) } // newActivationCodeNoFunction builds a specific activation result func (c *Component) newActivationCodeNoFunction() *ActivationResult { - return NewActivationResult(c.Name()).SetActivated(false).WithActivationCode(ActivationCodeNoFunction) + return NewActivationResult(c.Name()). + SetActivated(false). + WithActivationCode(ActivationCodeNoFunction) } // newActivationCodeWaitingForInput builds a specific activation result func (c *Component) newActivationCodeWaitingForInput() *ActivationResult { - return NewActivationResult(c.Name()).SetActivated(false).WithActivationCode(ActivationCodeWaitingForInput) + return NewActivationResult(c.Name()). + SetActivated(false). + WithActivationCode(ActivationCodeWaitingForInput) } // newActivationCodeReturnedError builds a specific activation result func (c *Component) newActivationCodeReturnedError(err error) *ActivationResult { - return NewActivationResult(c.Name()).SetActivated(true).WithActivationCode(ActivationCodeReturnedError).WithError(fmt.Errorf("component returned an error: %w", err)) + return NewActivationResult(c.Name()). + SetActivated(true). + WithActivationCode(ActivationCodeReturnedError). + WithError(fmt.Errorf("component returned an error: %w", err)). + WithInputKeys(c.Inputs().GetSignalKeys()) } // newActivationCodePanicked builds a specific activation result func (c *Component) newActivationCodePanicked(err error) *ActivationResult { - return NewActivationResult(c.Name()).SetActivated(true).WithActivationCode(ActivationCodePanicked).WithError(err) + return NewActivationResult(c.Name()). + SetActivated(true). + WithActivationCode(ActivationCodePanicked). + WithError(err). + WithInputKeys(c.Inputs().GetSignalKeys()) +} + +// isWaitingForInput tells whether component is waiting for specific inputs +func (c *Component) isWaitingForInput(activationResult *ActivationResult) bool { + return activationResult.HasError() && errors.Is(activationResult.Error(), errWaitingForInputs) +} + +// WantsToKeepInputs tells whether component wants to keep signals on input ports for the next cycle +func (c *Component) WantsToKeepInputs(activationResult *ActivationResult) bool { + return c.isWaitingForInput(activationResult) && errors.Is(activationResult.Error(), errWaitingForInputsKeep) } diff --git a/component/activation_result_collection.go b/component/activation_result_collection.go index 956128d..4d7219d 100644 --- a/component/activation_result_collection.go +++ b/component/activation_result_collection.go @@ -45,3 +45,12 @@ func (collection ActivationResultCollection) HasActivatedComponents() bool { } return false } + +// ByComponentName returns the activation result of given component +func (collection ActivationResultCollection) ByComponentName(componentName string) *ActivationResult { + if result, ok := collection[componentName]; ok { + return result + } + + return nil +} diff --git a/component/component.go b/component/component.go index f1e2f7f..f20bdd2 100644 --- a/component/component.go +++ b/component/component.go @@ -76,11 +76,10 @@ func (c *Component) hasActivationFunction() bool { } // MaybeActivate tries to run the activation function if all required conditions are met +// @TODO: hide this method from user func (c *Component) MaybeActivate() (activationResult *ActivationResult) { defer func() { if r := recover(); r != nil { - //Clear inputs and exit - c.inputs.ClearSignals() activationResult = c.newActivationCodePanicked(fmt.Errorf("panicked with: %v", r)) } }() @@ -99,22 +98,15 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) { return } - //Run the computation - err := c.f(c.inputs, c.outputs) + //Invoke the activation func + err := c.f(c.Inputs(), c.Outputs()) if errors.Is(err, errWaitingForInputs) { activationResult = c.newActivationCodeWaitingForInput() - if !errors.Is(err, errWaitingForInputsKeep) { - c.inputs.ClearSignals() - } - return } - //Clear inputs - c.inputs.ClearSignals() - if err != nil { activationResult = c.newActivationCodeReturnedError(err) @@ -126,9 +118,14 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) { return } -// FlushOutputs pushed signals out of the component outputs to pipes and clears outputs +// FlushInputs ... +// @TODO: hide this method from user +func (c *Component) FlushInputs() { + c.inputs.Flush(false) +} + +// FlushOutputs ... +// @TODO: hide this method from user func (c *Component) FlushOutputs() { - for _, out := range c.outputs { - out.Flush() - } + c.outputs.Flush(true) } diff --git a/component/component_test.go b/component/component_test.go index 120104b..b371d92 100644 --- a/component/component_test.go +++ b/component/component_test.go @@ -213,7 +213,6 @@ func TestComponent_WithActivationFunc(t *testing.T) { name string component *Component args args - want *Component }{ { name: "happy path", @@ -238,7 +237,11 @@ func TestComponent_WithActivationFunc(t *testing.T) { err1 := componentAfter.f(testInputs1, testOutputs1) err2 := tt.args.f(testInputs2, testOutputs2) assert.Equal(t, err1, err2) - assert.Equal(t, testOutputs1, testOutputs2) + + //Compare signals without keys (because they are random) + assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals().AsGroup(), testOutputs2.ByName("out1").Signals().AsGroup()) + assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals().AsGroup(), testOutputs2.ByName("out2").Signals().AsGroup()) + }) } } diff --git a/errors.go b/errors.go index d134b35..b06689d 100644 --- a/errors.go +++ b/errors.go @@ -7,13 +7,18 @@ import ( type ErrorHandlingStrategy int const ( - StopOnFirstError ErrorHandlingStrategy = iota + // StopOnFirstErrorOrPanic stops the f-mesh on first error or panic + StopOnFirstErrorOrPanic ErrorHandlingStrategy = iota + + // StopOnFirstPanic ignores errors, but stops the f-mesh on first panic StopOnFirstPanic + + // IgnoreAll allows to continue running the f-mesh regardless of how components finish their activation functions IgnoreAll ) var ( - ErrHitAnError = errors.New("f-mesh hit an error and will be stopped") + ErrHitAnErrorOrPanic = errors.New("f-mesh hit an error or panic and will be stopped") ErrHitAPanic = errors.New("f-mesh hit a panic and will be stopped") ErrUnsupportedErrorHandlingStrategy = errors.New("unsupported error handling strategy") ) diff --git a/fmesh.go b/fmesh.go index 7356a9c..766abcf 100644 --- a/fmesh.go +++ b/fmesh.go @@ -79,10 +79,23 @@ func (fm *FMesh) runCycle() *cycle.Cycle { return newCycle } -// DrainComponents drains the data from all components outputs -func (fm *FMesh) drainComponents() { +// DrainComponents drains the data from activated components +func (fm *FMesh) drainComponentsAfterCycle(cycle *cycle.Cycle) { for _, c := range fm.components { + activationResult := cycle.ActivationResults().ByComponentName(c.Name()) + + if !activationResult.Activated() { + continue + } + + c.FlushInputs() c.FlushOutputs() + + keepInputs := c.WantsToKeepInputs(activationResult) + if !keepInputs { + c.Inputs().RemoveSignalsByKeys(activationResult.InputKeys()) + } + } } @@ -98,7 +111,7 @@ func (fm *FMesh) Run() (cycle.Collection, error) { return allCycles, err } - fm.drainComponents() + fm.drainComponentsAfterCycle(cycleResult) } } @@ -110,8 +123,8 @@ func (fm *FMesh) mustStop(cycleResult *cycle.Cycle) (bool, error) { //Check if mesh must stop because of configured error handling strategy switch fm.errorHandlingStrategy { - case StopOnFirstError: - return cycleResult.HasErrors(), ErrHitAnError + case StopOnFirstErrorOrPanic: + return cycleResult.HasErrors() || cycleResult.HasPanics(), ErrHitAnErrorOrPanic case StopOnFirstPanic: return cycleResult.HasPanics(), ErrHitAPanic case IgnoreAll: diff --git a/fmesh_test.go b/fmesh_test.go index 6bfcae2..9fcee29 100644 --- a/fmesh_test.go +++ b/fmesh_test.go @@ -109,7 +109,7 @@ func TestFMesh_WithErrorHandlingStrategy(t *testing.T) { want: &FMesh{ name: "fm1", components: component.Collection{}, - errorHandlingStrategy: StopOnFirstError, + errorHandlingStrategy: StopOnFirstErrorOrPanic, }, }, { @@ -305,7 +305,7 @@ func TestFMesh_Run(t *testing.T) { { name: "stop on first error on first cycle", fm: New("fm"). - WithErrorHandlingStrategy(StopOnFirstError). + WithErrorHandlingStrategy(StopOnFirstErrorOrPanic). WithComponents( component.NewComponent("c1"). WithDescription("This component just returns an unexpected error"). @@ -660,29 +660,51 @@ func TestFMesh_runCycle(t *testing.T) { } return nil }), + component.NewComponent("c4"). + WithDescription("I'm waiting for specific input"). + WithInputs("i1", "i2"). + WithOutputs("o1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + if !inputs.ByNames("i1", "i2").AllHaveSignals() { + return component.NewErrWaitForInputs(false) + } + return nil + }), ), initFM: func(fm *FMesh) { //Only i1 is set, while component is waiting for both i1 and i2 to be set fm.Components().ByName("c3").Inputs().ByName("i1").PutSignals(signal.New(123)) + //Same for c4 + fm.Components().ByName("c4").Inputs().ByName("i1").PutSignals(signal.New(123)) }, want: cycle.New(). WithActivationResults( component.NewActivationResult("c1").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), component.NewActivationResult("c2").SetActivated(false).WithActivationCode(component.ActivationCodeNoFunction), - component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeWaitingForInput)), + component.NewActivationResult("c3").SetActivated(false).WithActivationCode(component.ActivationCodeWaitingForInput), + component.NewActivationResult("c4").SetActivated(false).WithActivationCode(component.ActivationCodeWaitingForInput)), }, { name: "all components activated in one cycle (concurrently)", fm: New("test").WithComponents( - component.NewComponent("c1").WithDescription("").WithInputs("i1").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - return nil - }), - component.NewComponent("c2").WithDescription("").WithInputs("i1").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - return nil - }), - component.NewComponent("c3").WithDescription("").WithInputs("i1").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { - return nil - }), + component.NewComponent("c1"). + WithDescription(""). + WithInputs("i1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + return nil + }), + component.NewComponent("c2"). + WithDescription(""). + WithInputs("i1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + return nil + }), + component.NewComponent("c3"). + WithDescription(""). + WithInputs("i1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + return nil + }), ), initFM: func(fm *FMesh) { fm.Components().ByName("c1").Inputs().ByName("i1").PutSignals(signal.New(1)) @@ -690,9 +712,18 @@ func TestFMesh_runCycle(t *testing.T) { fm.Components().ByName("c3").Inputs().ByName("i1").PutSignals(signal.New(3)) }, want: cycle.New().WithActivationResults( - component.NewActivationResult("c1").SetActivated(true).WithActivationCode(component.ActivationCodeOK), - component.NewActivationResult("c2").SetActivated(true).WithActivationCode(component.ActivationCodeOK), - component.NewActivationResult("c3").SetActivated(true).WithActivationCode(component.ActivationCodeOK), + component.NewActivationResult("c1"). + SetActivated(true). + WithActivationCode(component.ActivationCodeOK). + WithInputKeys([]string{"1"}), + component.NewActivationResult("c2"). + SetActivated(true). + WithActivationCode(component.ActivationCodeOK). + WithInputKeys([]string{"1"}), + component.NewActivationResult("c3"). + SetActivated(true). + WithActivationCode(component.ActivationCodeOK). + WithInputKeys([]string{"1"}), ), }, } @@ -706,22 +737,28 @@ func TestFMesh_runCycle(t *testing.T) { } } -func TestFMesh_drainComponents(t *testing.T) { +func TestFMesh_drainComponentsAfterCycle(t *testing.T) { tests := []struct { name string + cycle *cycle.Cycle fm *FMesh initFM func(fm *FMesh) assertionsAfterDrain func(t *testing.T, fm *FMesh) }{ { - name: "no components", - fm: New("empty_fm"), + name: "no components", + cycle: cycle.New(), + fm: New("empty_fm"), assertionsAfterDrain: func(t *testing.T, fm *FMesh) { assert.Empty(t, fm.Components()) }, }, { name: "no signals to be drained", + cycle: cycle.New().WithActivationResults( + component.NewActivationResult("c1").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), + component.NewActivationResult("c2").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), + ), fm: New("fm").WithComponents( component.NewComponent("c1").WithInputs("i1").WithOutputs("o1"), component.NewComponent("c2").WithInputs("i1").WithOutputs("o1"), @@ -741,6 +778,9 @@ func TestFMesh_drainComponents(t *testing.T) { }, { name: "there are signals on output, but no pipes", + cycle: cycle.New().WithActivationResults( + component.NewActivationResult("c1").SetActivated(true).WithActivationCode(component.ActivationCodeOK), + component.NewActivationResult("c2").SetActivated(true).WithActivationCode(component.ActivationCodeOK)), fm: New("fm").WithComponents( component.NewComponent("c1").WithInputs("i1").WithOutputs("o1"), component.NewComponent("c2").WithInputs("i1").WithOutputs("o1"), @@ -762,6 +802,9 @@ func TestFMesh_drainComponents(t *testing.T) { }, { name: "happy path", + cycle: cycle.New().WithActivationResults( + component.NewActivationResult("c1").SetActivated(true).WithActivationCode(component.ActivationCodeOK), + component.NewActivationResult("c2").SetActivated(true).WithActivationCode(component.ActivationCodeOK)), fm: New("fm").WithComponents( component.NewComponent("c1").WithInputs("i1").WithOutputs("o1"), component.NewComponent("c2").WithInputs("i1").WithOutputs("o1"), @@ -782,13 +825,15 @@ func TestFMesh_drainComponents(t *testing.T) { assert.Equal(t, c2.Inputs().ByName("i1").Signals().FirstPayload().(int), 123) //The signal is correct }, }, + + //TODO:add test cases: "only activated components are drained", "signals from previous cycle are removed" } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.initFM != nil { tt.initFM(tt.fm) } - tt.fm.drainComponents() + tt.fm.drainComponentsAfterCycle(tt.cycle) tt.assertionsAfterDrain(t, tt.fm) }) } diff --git a/integration_tests/computation/math_test.go b/integration_tests/computation/math_test.go index 6bfa36a..1758b44 100644 --- a/integration_tests/computation/math_test.go +++ b/integration_tests/computation/math_test.go @@ -42,7 +42,7 @@ func Test_Math(t *testing.T) { c1.Outputs().ByName("res").PipeTo(c2.Inputs().ByName("num")) - return fmesh.New("fm").WithComponents(c1, c2).WithErrorHandlingStrategy(fmesh.StopOnFirstError) + return fmesh.New("fm").WithComponents(c1, c2).WithErrorHandlingStrategy(fmesh.StopOnFirstErrorOrPanic) }, setInputs: func(fm *fmesh.FMesh) { fm.Components().ByName("c1").Inputs().ByName("num").PutSignals(signal.New(32)) diff --git a/integration_tests/piping/multiplexing_test.go b/integration_tests/piping/fan_test.go similarity index 97% rename from integration_tests/piping/multiplexing_test.go rename to integration_tests/piping/fan_test.go index 8449f67..dd9cd0f 100644 --- a/integration_tests/piping/multiplexing_test.go +++ b/integration_tests/piping/fan_test.go @@ -12,7 +12,7 @@ import ( "time" ) -func Test_Multiplexing(t *testing.T) { +func Test_Fan(t *testing.T) { tests := []struct { name string setupFM func() *fmesh.FMesh @@ -84,7 +84,7 @@ func Test_Multiplexing(t *testing.T) { }, }, { - name: "multiplexing", + name: "fan-in (3 pipes coming into 1 destination port)", setupFM: func() *fmesh.FMesh { producer1 := component.NewComponent("producer1"). WithInputs("start"). @@ -130,11 +130,12 @@ func Test_Multiplexing(t *testing.T) { fm.Components().ByName("producer3").Inputs().ByName("start").PutSignals(signal.New(struct{}{})) }, assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { + assert.NoError(t, err) //Consumer received a signal assert.True(t, fm.Components().ByName("consumer").Outputs().ByName("o1").HasSignals()) //The signal is combined and consist of 3 payloads - resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals() + resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals().AsGroup() assert.Len(t, resultSignals, 3) //And they are all different diff --git a/integration_tests/piping/piping_from_inputs_test.go b/integration_tests/piping/piping_from_inputs_test.go new file mode 100644 index 0000000..7f90e40 --- /dev/null +++ b/integration_tests/piping/piping_from_inputs_test.go @@ -0,0 +1,92 @@ +package integration_tests + +import ( + "fmt" + "github.com/hovsep/fmesh" + "github.com/hovsep/fmesh/component" + "github.com/hovsep/fmesh/cycle" + "github.com/hovsep/fmesh/port" + "github.com/hovsep/fmesh/signal" + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_PipingFromInput(t *testing.T) { + tests := []struct { + name string + setupFM func() *fmesh.FMesh + setInputs func(fm *fmesh.FMesh) + assertions func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) + }{ + { + name: "observer pattern", + setupFM: func() *fmesh.FMesh { + adder := component.NewComponent("adder"). + WithDescription("adds i1 and i2"). + WithInputs("i1", "i2"). + WithOutputs("out"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + i1, i2 := inputs.ByName("i1").Signals().FirstPayload().(int), inputs.ByName("i2").Signals().FirstPayload().(int) + outputs.ByName("out").PutSignals(signal.New(i1 + i2)) + return nil + }) + + multiplier := component.NewComponent("multiplier"). + WithDescription("multiplies i1 by 10"). + WithInputs("i1"). + WithOutputs("out"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + i1 := inputs.ByName("i1").Signals().FirstPayload().(int) + outputs.ByName("out").PutSignals(signal.New(i1 * 10)) + return nil + }) + + logger := component.NewComponent("logger"). + WithDescription("logs all input signals"). + WithInputs("in"). + WithOutputs("log"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + for _, sig := range inputs.ByName("in").Signals() { + fmt.Println(fmt.Sprintf("LOGGED SIGNAL: %v", sig.Payload())) + outputs.ByName("log").PutSignals(signal.New(fmt.Sprintf("LOGGED SIGNAL: %v", sig.Payload()))) + } + return nil + }) + + fm := fmesh.New("fm with observer"). + WithDescription("In this f-mesh adder receives 2 numbers, adds them and passes to multiplier. "+ + "The logger component is connected to adder's inputs, so it can observe them"+ + "The cool thing is logger does not need multiple input ports to observe multiple ports of other component"). + WithComponents(adder, multiplier, logger) + + adder.Outputs().ByName("out").PipeTo(multiplier.Inputs().ByName("i1")) + adder.Inputs().ByNames("i1", "i2").PipeTo(logger.Inputs().ByName("in")) + multiplier.Inputs().ByName("i1").PipeTo(logger.Inputs().ByName("in")) + + return fm + }, + setInputs: func(fm *fmesh.FMesh) { + fm.Components().ByName("adder").Inputs().ByName("i1").PutSignals(signal.New(4)) + fm.Components().ByName("adder").Inputs().ByName("i2").PutSignals(signal.New(5)) + }, + assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { + assert.NoError(t, err) + m := fm.Components().ByName("multiplier") + l := fm.Components().ByName("logger") + assert.True(t, m.Outputs().ByName("out").HasSignals()) + assert.Equal(t, 90, m.Outputs().ByName("out").Signals().FirstPayload().(int)) + + assert.True(t, fm.Components().ByName("logger").Outputs().ByName("log").HasSignals()) + assert.Len(t, l.Outputs().ByName("log").Signals(), 3) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fm := tt.setupFM() + tt.setInputs(fm) + cycles, err := fm.Run() + tt.assertions(t, fm, cycles, err) + }) + } +} diff --git a/integration_tests/piping/piping_to_outputs_test.go b/integration_tests/piping/piping_to_outputs_test.go new file mode 100644 index 0000000..9b7bae1 --- /dev/null +++ b/integration_tests/piping/piping_to_outputs_test.go @@ -0,0 +1,37 @@ +package integration_tests + +import ( + "github.com/hovsep/fmesh" + "github.com/hovsep/fmesh/cycle" + "testing" +) + +func Test_PipingToOutputs(t *testing.T) { + tests := []struct { + name string + setupFM func() *fmesh.FMesh + setInputs func(fm *fmesh.FMesh) + assertions func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) + }{ + { + name: "injector", + setupFM: func() *fmesh.FMesh { + return fmesh.New("injector") + }, + setInputs: func(fm *fmesh.FMesh) { + + }, + assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { + + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fm := tt.setupFM() + tt.setInputs(fm) + cycles, err := fm.Run() + tt.assertions(t, fm, cycles, err) + }) + } +} diff --git a/port/collection.go b/port/collection.go index ebc6e6d..3379da7 100644 --- a/port/collection.go +++ b/port/collection.go @@ -66,6 +66,30 @@ func (collection Collection) ClearSignals() { } } +// Flush flushes all ports in collection +func (collection Collection) Flush(clearFlushed bool) { + for _, p := range collection { + if portFlushed := p.Flush(); clearFlushed && portFlushed { + p.ClearSignals() + } + } +} + +func (collection Collection) RemoveSignalsByKeys(signalKeys []string) Collection { + for _, p := range collection { + p.Signals().DeleteKeys(signalKeys) + } + return collection +} + +// PipeTo creates pipes from each port in collection +func (collection Collection) PipeTo(toPorts ...*Port) { + for _, p := range collection { + p.PipeTo(toPorts...) + } +} + +// Add adds ports to collection func (collection Collection) Add(ports ...*Port) Collection { for _, port := range ports { if port == nil { @@ -76,3 +100,11 @@ func (collection Collection) Add(ports ...*Port) Collection { return collection } + +func (collection Collection) GetSignalKeys() []string { + keys := make([]string, 0) + for _, p := range collection { + keys = append(keys, p.Signals().GetKeys()...) + } + return keys +} diff --git a/port/collection_test.go b/port/collection_test.go index aaba717..fc98770 100644 --- a/port/collection_test.go +++ b/port/collection_test.go @@ -78,34 +78,34 @@ func TestCollection_ByName(t *testing.T) { name string } tests := []struct { - name string - ports Collection - args args - want *Port + name string + collection Collection + args args + want *Port }{ { - name: "empty port found", - ports: NewCollection().Add(NewGroup("p1", "p2")...), + name: "empty port found", + collection: NewCollection().Add(NewGroup("p1", "p2")...), args: args{ name: "p1", }, - want: &Port{name: "p1", pipes: Group{}, signals: signal.Group{}}, + want: &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}}, }, { - name: "port with signals found", - ports: portsWithSignals, + name: "port with signals found", + collection: portsWithSignals, args: args{ name: "p2", }, want: &Port{ name: "p2", - signals: signal.NewGroup(12), + signals: signal.NewCollection().AddPayload(12), pipes: Group{}, }, }, { - name: "port not found", - ports: NewCollection().Add(NewGroup("p1", "p2")...), + name: "port not found", + collection: NewCollection().Add(NewGroup("p1", "p2")...), args: args{ name: "p3", }, @@ -114,7 +114,12 @@ func TestCollection_ByName(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.want, tt.ports.ByName(tt.args.name)) + gotPort := tt.collection.ByName(tt.args.name) + if tt.want == nil { + assert.Nil(t, gotPort) + } else { + //Compare everything, but nror + } }) } } @@ -139,7 +144,7 @@ func TestCollection_ByNames(t *testing.T) { "p1": &Port{ name: "p1", pipes: Group{}, - signals: signal.Group{}, + signals: signal.NewCollection(), }, }, }, @@ -150,8 +155,8 @@ func TestCollection_ByNames(t *testing.T) { names: []string{"p1", "p2"}, }, want: Collection{ - "p1": &Port{name: "p1", pipes: Group{}, signals: signal.Group{}}, - "p2": &Port{name: "p2", pipes: Group{}, signals: signal.Group{}}, + "p1": &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}}, + "p2": &Port{name: "p2", pipes: Group{}, signals: signal.Collection{}}, }, }, { @@ -169,8 +174,8 @@ func TestCollection_ByNames(t *testing.T) { names: []string{"p1", "p2", "p3"}, }, want: Collection{ - "p1": &Port{name: "p1", pipes: Group{}, signals: signal.Group{}}, - "p2": &Port{name: "p2", pipes: Group{}, signals: signal.Group{}}, + "p1": &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}}, + "p2": &Port{name: "p2", pipes: Group{}, signals: signal.Collection{}}, }, }, } diff --git a/port/port.go b/port/port.go index f0aa207..5d7a979 100644 --- a/port/port.go +++ b/port/port.go @@ -7,8 +7,8 @@ import ( // Port defines a connectivity point of a component type Port struct { name string - signals signal.Group //Current signals set on the port - pipes Group //Refs to all outbound pipes connected to this port + signals signal.Collection //Current signals set on the port + pipes Group //Refs to all outbound pipes connected to this port } // New creates a new port @@ -16,7 +16,7 @@ func New(name string) *Port { return &Port{ name: name, pipes: NewGroup(), - signals: signal.NewGroup(), + signals: signal.NewCollection(), } } @@ -26,28 +26,33 @@ func (p *Port) Name() string { } // Signals getter -func (p *Port) Signals() signal.Group { +func (p *Port) Signals() signal.Collection { return p.signals } // PutSignals adds a signals to current signals +// @TODO: rename func (p *Port) PutSignals(signals ...*signal.Signal) { - for _, s := range signals { - p.signals = append(p.signals, s) - } + p.Signals().Add(signals...) } -// ClearSignals removes current signals from the port +// ClearSignals removes all signals from the port func (p *Port) ClearSignals() { - p.signals = signal.NewGroup() + p.signals = signal.NewCollection() } // HasSignals says whether port signals is set or not func (p *Port) HasSignals() bool { - return len(p.signals) > 0 + return len(p.Signals()) > 0 +} + +// HasPipes says whether port has outbound pipes +func (p *Port) HasPipes() bool { + return len(p.pipes) > 0 } // PipeTo creates one or multiple pipes to other port(s) +// @TODO: hide this method from AF func (p *Port) PipeTo(toPorts ...*Port) { for _, toPort := range toPorts { if toPort == nil { @@ -57,20 +62,21 @@ func (p *Port) PipeTo(toPorts ...*Port) { } } -// Flush pushed current signals to pipes and clears the port -func (p *Port) Flush() { - if !p.HasSignals() || len(p.pipes) == 0 { - return +// Flush pushes current signals to pipes and returns true when flushed +// @TODO: hide this method from user +func (p *Port) Flush() bool { + if !p.HasSignals() || !p.HasPipes() { + return false } for _, outboundPort := range p.pipes { //Fan-Out ForwardSignals(p, outboundPort) } - p.ClearSignals() + return true } // ForwardSignals puts signals from source port to destination port, without clearing the source port func ForwardSignals(source *Port, dest *Port) { - dest.PutSignals(source.Signals()...) + dest.PutSignals(source.Signals().AsGroup()...) } diff --git a/port/port_test.go b/port/port_test.go index 675d8a6..2fdfff0 100644 --- a/port/port_test.go +++ b/port/port_test.go @@ -40,22 +40,22 @@ func TestPort_Signals(t *testing.T) { tests := []struct { name string port *Port - want signal.Group + want signal.Collection }{ { name: "no signals", port: New("noSignal"), - want: signal.Group{}, + want: signal.Collection{}, }, { name: "with signal", port: portWithSignal, - want: signal.NewGroup(123), + want: signal.NewCollection().AddPayload(123), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.want, tt.port.Signals()) + assert.Equal(t, tt.want.AsGroup(), tt.port.Signals().AsGroup()) }) } } @@ -72,12 +72,12 @@ func TestPort_ClearSignal(t *testing.T) { { name: "happy path", before: portWithSignal, - after: &Port{name: "portWithSignal", pipes: Group{}, signals: signal.Group{}}, + after: &Port{name: "portWithSignal", pipes: Group{}, signals: signal.Collection{}}, }, { name: "cleaning empty port", before: New("emptyPort"), - after: &Port{name: "emptyPort", pipes: Group{}, signals: signal.Group{}}, + after: &Port{name: "emptyPort", pipes: Group{}, signals: signal.Collection{}}, }, } for _, tt := range tests { @@ -106,7 +106,7 @@ func TestPort_PipeTo(t *testing.T) { after: &Port{ name: "p1", pipes: Group{p2, p3}, - signals: signal.Group{}, + signals: signal.Collection{}, }, args: args{ toPorts: []*Port{p2, p3}, @@ -118,7 +118,7 @@ func TestPort_PipeTo(t *testing.T) { after: &Port{ name: "p4", pipes: Group{p2}, - signals: signal.Group{}, + signals: signal.Collection{}, }, args: args{ toPorts: []*Port{p2, nil}, @@ -157,7 +157,7 @@ func TestPort_PutSignals(t *testing.T) { before: New("emptyPort"), after: &Port{ name: "emptyPort", - signals: signal.NewGroup(11), + signals: signal.NewCollection().AddPayload(11), pipes: Group{}, }, args: args{ @@ -169,7 +169,7 @@ func TestPort_PutSignals(t *testing.T) { before: New("p"), after: &Port{ name: "p", - signals: signal.NewGroup(11, 12), + signals: signal.NewCollection().AddPayload(11, 12), pipes: Group{}, }, args: args{ @@ -181,7 +181,7 @@ func TestPort_PutSignals(t *testing.T) { before: portWithSingleSignal, after: &Port{ name: "portWithSingleSignal", - signals: signal.NewGroup(11, 12), + signals: signal.NewCollection().AddPayload(11, 12), pipes: Group{}, }, args: args{ @@ -193,7 +193,7 @@ func TestPort_PutSignals(t *testing.T) { before: portWithMultipleSignals, after: &Port{ name: "portWithMultipleSignals", - signals: signal.NewGroup(11, 12, 13), //Notice LIFO order + signals: signal.NewCollection().AddPayload(11, 12, 13), pipes: Group{}, }, args: args{ @@ -205,7 +205,7 @@ func TestPort_PutSignals(t *testing.T) { before: portWithMultipleSignals2, after: &Port{ name: "portWithMultipleSignals2", - signals: signal.NewGroup(55, 66, 13, 14), //Notice LIFO order + signals: signal.NewCollection().AddPayload(55, 66, 13, 14), //Notice LIFO order pipes: Group{}, }, args: args{ @@ -216,7 +216,7 @@ func TestPort_PutSignals(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.before.PutSignals(tt.args.signals...) - assert.Equal(t, tt.after, tt.before) + assert.ElementsMatch(t, tt.after.Signals().AsGroup(), tt.before.Signals().AsGroup()) }) } } @@ -257,7 +257,7 @@ func TestNewPort(t *testing.T) { want: &Port{ name: "", pipes: Group{}, - signals: signal.Group{}, + signals: signal.Collection{}, }, }, { @@ -268,7 +268,7 @@ func TestNewPort(t *testing.T) { want: &Port{ name: "p1", pipes: Group{}, - signals: signal.Group{}, + signals: signal.Collection{}, }, }, } @@ -295,6 +295,7 @@ func TestPort_Flush(t *testing.T) { name string source *Port dest *Port + wantResult bool assertions func(t *testing.T, source *Port, dest *Port) }{ { @@ -305,39 +306,42 @@ func TestPort_Flush(t *testing.T) { assert.False(t, source.HasSignals()) assert.False(t, dest.HasSignals()) }, + wantResult: false, }, { name: "flush to empty port", source: portWithSignal1, dest: emptyPort, assertions: func(t *testing.T, source *Port, dest *Port) { - //Source port is clear - assert.False(t, source.HasSignals()) + //Source port is not cleared during flush + assert.True(t, source.HasSignals()) //Signals transferred to destination port assert.True(t, dest.HasSignals()) assert.Equal(t, dest.Signals().FirstPayload().(int), 777) }, + wantResult: true, }, { name: "flush to port with signals", source: portWithSignal2, dest: portWithMultipleSignals, assertions: func(t *testing.T, source *Port, dest *Port) { - //Source port is clear - assert.False(t, source.HasSignals()) + //Source port is not cleared + assert.True(t, source.HasSignals()) //Destination port now has 1 more signal assert.True(t, dest.HasSignals()) assert.Len(t, dest.Signals(), 3) assert.Contains(t, dest.Signals().AllPayloads(), 888) }, + wantResult: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.source.PipeTo(tt.dest) - tt.source.Flush() + assert.Equal(t, tt.wantResult, tt.source.Flush()) if tt.assertions != nil { tt.assertions(t, tt.source, tt.dest) } diff --git a/signal/collection.go b/signal/collection.go new file mode 100644 index 0000000..22995d0 --- /dev/null +++ b/signal/collection.go @@ -0,0 +1,61 @@ +package signal + +import ( + "fmt" +) + +type Collection map[string]*Signal + +func NewCollection() Collection { + return make(Collection) +} + +func (collection Collection) Add(signals ...*Signal) Collection { + for _, sig := range signals { + signalKey := collection.newKey(sig) + collection[signalKey] = sig + } + return collection +} + +func (collection Collection) AddPayload(payloads ...any) Collection { + for _, p := range payloads { + collection.Add(New(p)) + } + return collection +} + +func (collection Collection) newKey(signal *Signal) string { + return fmt.Sprintf("%d", len(collection)+1) +} + +func (collection Collection) AsGroup() Group { + group := NewGroup() + for _, sig := range collection { + group = append(group, sig) + } + return group +} + +func (collection Collection) FirstPayload() any { + return collection.AsGroup().FirstPayload() +} + +func (collection Collection) AllPayloads() []any { + return collection.AsGroup().AllPayloads() +} + +func (collection Collection) GetKeys() []string { + keys := make([]string, 0) + for k, _ := range collection { + keys = append(keys, k) + } + return keys +} + +func (collection Collection) DeleteKeys(keys []string) Collection { + for _, key := range keys { + delete(collection, key) + } + return collection +}