From f7b2a01e0697ec556b3a8861825b06ef3ad4a3df Mon Sep 17 00:00:00 2001 From: hovsep Date: Tue, 24 Sep 2024 02:33:18 +0300 Subject: [PATCH] Various refactorings --- .../activation_result_collection_test.go | 12 +- component/collection_test.go | 10 +- component/component.go | 33 +- component/component_test.go | 93 ++- component/state_snapshot.go | 46 ++ fmesh.go | 14 +- fmesh_test.go | 575 +++++++++--------- integration_tests/computation/math_test.go | 4 +- integration_tests/piping/fan_test.go | 16 +- .../piping/piping_from_inputs_test.go | 18 +- .../piping/piping_to_outputs_test.go | 8 +- port/collection.go | 40 +- port/collection_test.go | 6 +- port/metadata.go | 20 + port/port.go | 31 +- port/port_test.go | 77 +-- signal/group.go | 4 +- 17 files changed, 488 insertions(+), 519 deletions(-) create mode 100644 component/state_snapshot.go create mode 100644 port/metadata.go diff --git a/component/activation_result_collection_test.go b/component/activation_result_collection_test.go index 46c00c7..283f6b7 100644 --- a/component/activation_result_collection_test.go +++ b/component/activation_result_collection_test.go @@ -34,8 +34,8 @@ func TestActivationResultCollection_Add(t *testing.T) { collection: NewActivationResultCollection(), args: args{ activationResults: []*ActivationResult{ - NewComponent("c1").newActivationResultOK(), - NewComponent("c2").newActivationResultReturnedError(errors.New("oops")), + New("c1").newActivationResultOK(), + New("c2").newActivationResultReturnedError(errors.New("oops")), }, }, assertions: func(t *testing.T, collection ActivationResultCollection) { @@ -48,13 +48,13 @@ func TestActivationResultCollection_Add(t *testing.T) { { name: "adding to existing collection", collection: NewActivationResultCollection().Add( - NewComponent("c1").newActivationResultOK(), - NewComponent("c2").newActivationResultOK(), + New("c1").newActivationResultOK(), + New("c2").newActivationResultOK(), ), args: args{ activationResults: []*ActivationResult{ - NewComponent("c4").newActivationResultNoInput(), - NewComponent("c5").newActivationResultPanicked(errors.New("panic")), + New("c4").newActivationResultNoInput(), + New("c5").newActivationResultPanicked(errors.New("panic")), }, }, assertions: func(t *testing.T, collection ActivationResultCollection) { diff --git a/component/collection_test.go b/component/collection_test.go index 253ef72..112afab 100644 --- a/component/collection_test.go +++ b/component/collection_test.go @@ -18,7 +18,7 @@ func TestCollection_ByName(t *testing.T) { }{ { name: "component found", - components: NewComponentCollection().Add(NewComponent("c1"), NewComponent("c2")), + components: NewComponentCollection().Add(New("c1"), New("c2")), args: args{ name: "c2", }, @@ -32,7 +32,7 @@ func TestCollection_ByName(t *testing.T) { }, { name: "component not found", - components: NewComponentCollection().Add(NewComponent("c1"), NewComponent("c2")), + components: NewComponentCollection().Add(New("c1"), New("c2")), args: args{ name: "c3", }, @@ -70,7 +70,7 @@ func TestCollection_Add(t *testing.T) { name: "adding to empty collection", collection: NewComponentCollection(), args: args{ - components: []*Component{NewComponent("c1"), NewComponent("c2")}, + components: []*Component{New("c1"), New("c2")}, }, assertions: func(t *testing.T, collection Collection) { assert.Len(t, collection, 2) @@ -81,9 +81,9 @@ func TestCollection_Add(t *testing.T) { }, { name: "adding to existing collection", - collection: NewComponentCollection().Add(NewComponent("c1"), NewComponent("c2")), + collection: NewComponentCollection().Add(New("c1"), New("c2")), args: args{ - components: []*Component{NewComponent("c3"), NewComponent("c4")}, + components: []*Component{New("c3"), New("c4")}, }, assertions: func(t *testing.T, collection Collection) { assert.Len(t, collection, 4) diff --git a/component/component.go b/component/component.go index 0646e3d..3064bc6 100644 --- a/component/component.go +++ b/component/component.go @@ -6,12 +6,6 @@ import ( "github.com/hovsep/fmesh/port" ) -// @TODO add getter\setter and constructor -type StateSnapshot struct { - InputPorts port.MetadataMap - OutputPorts port.MetadataMap -} - type ActivationFunc func(inputs port.Collection, outputs port.Collection) error // Component defines a main building block of FMesh @@ -23,8 +17,8 @@ type Component struct { f ActivationFunc } -// NewComponent creates a new empty component -func NewComponent(name string) *Component { +// New creates initialized component +func New(name string) *Component { return &Component{ name: name, inputs: port.NewCollection(), @@ -93,13 +87,6 @@ func (c *Component) hasActivationFunction() bool { return c.f != nil } -func (c *Component) getStateSnapshot() *StateSnapshot { - return &StateSnapshot{ - InputPorts: c.Inputs().GetPortsMetadata(), - OutputPorts: c.Outputs().GetPortsMetadata(), - } -} - // MaybeActivate tries to run the activation function if all required conditions are met // @TODO: hide this method from user func (c *Component) MaybeActivate() (activationResult *ActivationResult) { @@ -158,12 +145,22 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) { // FlushInputs ... // @TODO: hide this method from user -func (c *Component) FlushInputs() { - c.Inputs().Flush(false) +func (c *Component) FlushInputs(activationResult *ActivationResult, keepInputSignals bool) { + c.Inputs().Flush() + if !keepInputSignals { + // Inputs can not be just cleared, instead we remove signals which + // have been used (been set on inputs) during the last activation cycle + // thus not affecting ones the component could have been received from i2i pipes + for portName, p := range c.Inputs() { + p.DisposeSignals(activationResult.StateBefore().InputPortsMetadata()[portName].SignalBufferLen) + } + } } // FlushOutputs ... // @TODO: hide this method from user func (c *Component) FlushOutputs(activationResult *ActivationResult) { - c.Outputs().FlushProcessedSignals(activationResult.StateAfter().OutputPorts) + for portName, p := range c.Outputs() { + p.FlushAndDispose(activationResult.StateAfter().OutputPortsMetadata()[portName].SignalBufferLen) + } } diff --git a/component/component_test.go b/component/component_test.go index d558631..565dcb0 100644 --- a/component/component_test.go +++ b/component/component_test.go @@ -46,7 +46,7 @@ func TestNewComponent(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, tt.want, NewComponent(tt.args.name), "NewComponent(%v)", tt.args.name) + assert.Equalf(t, tt.want, New(tt.args.name), "New(%v)", tt.args.name) }) } } @@ -59,12 +59,12 @@ func TestComponent_Name(t *testing.T) { }{ { name: "empty name", - component: NewComponent(""), + component: New(""), want: "", }, { name: "with name", - component: NewComponent("c1"), + component: New("c1"), want: "c1", }, } @@ -83,12 +83,12 @@ func TestComponent_Description(t *testing.T) { }{ { name: "no description", - component: NewComponent("c1"), + component: New("c1"), want: "", }, { name: "with description", - component: NewComponent("c1").WithDescription("descr"), + component: New("c1").WithDescription("descr"), want: "descr", }, } @@ -102,10 +102,10 @@ func TestComponent_Description(t *testing.T) { func TestComponent_FlushOutputs(t *testing.T) { sink := port.New("sink") - componentWithNoOutputs := NewComponent("c1") - componentWithCleanOutputs := NewComponent("c1").WithOutputs("o1", "o2") + componentWithNoOutputs := New("c1") + componentWithCleanOutputs := New("c1").WithOutputs("o1", "o2") - componentWithAllOutputsSet := NewComponent("c1").WithOutputs("o1", "o2") + componentWithAllOutputsSet := New("c1").WithOutputs("o1", "o2") componentWithAllOutputsSet.Outputs().ByNames("o1").PutSignals(signal.New(777)) componentWithAllOutputsSet.Outputs().ByNames("o2").PutSignals(signal.New(888)) componentWithAllOutputsSet.Outputs().ByNames("o1", "o2").PipeTo(sink) @@ -121,14 +121,8 @@ func TestComponent_FlushOutputs(t *testing.T) { name: "no outputs", component: componentWithNoOutputs, activationResult: componentWithNoOutputs.newActivationResultOK(). - WithStateBefore(&StateSnapshot{ - InputPorts: port.MetadataMap{}, - OutputPorts: port.MetadataMap{}, - }). - WithStateAfter(&StateSnapshot{ - InputPorts: port.MetadataMap{}, - OutputPorts: port.MetadataMap{}, - }), + WithStateBefore(NewStateSnapshot()). + WithStateAfter(NewStateSnapshot()), destPort: nil, assertions: func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) { assert.NotNil(t, componentAfterFlush.Outputs()) @@ -139,14 +133,14 @@ func TestComponent_FlushOutputs(t *testing.T) { name: "output has no signal set", component: componentWithCleanOutputs, activationResult: componentWithCleanOutputs.newActivationResultOK(). - WithStateBefore(&StateSnapshot{ - InputPorts: port.MetadataMap{}, - OutputPorts: port.MetadataMap{}, - }). - WithStateAfter(&StateSnapshot{ - InputPorts: port.MetadataMap{}, - OutputPorts: port.MetadataMap{}, - }), + WithStateBefore(NewStateSnapshot().WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{SignalBufferLen: 0}, + "o2": &port.Metadata{SignalBufferLen: 0}, + })). + WithStateAfter(NewStateSnapshot().WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{SignalBufferLen: 0}, + "o2": &port.Metadata{SignalBufferLen: 0}, + })), destPort: nil, assertions: func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) { assert.False(t, componentAfterFlush.Outputs().AnyHasSignals()) @@ -156,21 +150,16 @@ func TestComponent_FlushOutputs(t *testing.T) { name: "happy path", component: componentWithAllOutputsSet, activationResult: componentWithAllOutputsSet.newActivationResultOK(). - WithStateBefore(&StateSnapshot{ - InputPorts: port.MetadataMap{}, - OutputPorts: port.MetadataMap{}, - }). - WithStateAfter(&StateSnapshot{ - InputPorts: port.MetadataMap{}, - OutputPorts: port.MetadataMap{ + WithStateBefore(NewStateSnapshot()). + WithStateAfter(NewStateSnapshot(). + WithOutputPortsMetadata(port.MetadataMap{ "o1": &port.Metadata{ SignalBufferLen: 1, }, "o2": &port.Metadata{ SignalBufferLen: 1, }, - }, - }), + })), destPort: sink, assertions: func(t *testing.T, componentAfterFlush *Component, destPort *port.Port) { assert.Contains(t, destPort.Signals().AllPayloads(), 777) @@ -197,12 +186,12 @@ func TestComponent_Inputs(t *testing.T) { }{ { name: "no inputs", - component: NewComponent("c1"), + component: New("c1"), want: port.Collection{}, }, { name: "with inputs", - component: NewComponent("c1").WithInputs("i1", "i2"), + component: New("c1").WithInputs("i1", "i2"), want: port.Collection{ "i1": port.New("i1"), "i2": port.New("i2"), @@ -224,12 +213,12 @@ func TestComponent_Outputs(t *testing.T) { }{ { name: "no outputs", - component: NewComponent("c1"), + component: New("c1"), want: port.Collection{}, }, { name: "with outputs", - component: NewComponent("c1").WithOutputs("o1", "o2"), + component: New("c1").WithOutputs("o1", "o2"), want: port.Collection{ "o1": port.New("o1"), "o2": port.New("o2"), @@ -254,7 +243,7 @@ func TestComponent_WithActivationFunc(t *testing.T) { }{ { name: "happy path", - component: NewComponent("c1"), + component: New("c1"), args: args{ f: func(inputs port.Collection, outputs port.Collection) error { outputs.ByName("out1").PutSignals(signal.New(23)) @@ -296,7 +285,7 @@ func TestComponent_WithDescription(t *testing.T) { }{ { name: "happy path", - component: NewComponent("c1"), + component: New("c1"), args: args{ description: "descr", }, @@ -328,7 +317,7 @@ func TestComponent_WithInputs(t *testing.T) { }{ { name: "happy path", - component: NewComponent("c1"), + component: New("c1"), args: args{ portNames: []string{"p1", "p2"}, }, @@ -345,7 +334,7 @@ func TestComponent_WithInputs(t *testing.T) { }, { name: "no arg", - component: NewComponent("c1"), + component: New("c1"), args: args{ portNames: nil, }, @@ -377,7 +366,7 @@ func TestComponent_WithOutputs(t *testing.T) { }{ { name: "happy path", - component: NewComponent("c1"), + component: New("c1"), args: args{ portNames: []string{"p1", "p2"}, }, @@ -394,7 +383,7 @@ func TestComponent_WithOutputs(t *testing.T) { }, { name: "no arg", - component: NewComponent("c1"), + component: New("c1"), args: args{ portNames: nil, }, @@ -423,14 +412,14 @@ func TestComponent_MaybeActivate(t *testing.T) { { name: "empty component is not activated", getComponent: func() *Component { - return NewComponent("c1") + return New("c1") }, wantActivationResult: NewActivationResult("c1").SetActivated(false).WithActivationCode(ActivationCodeNoFunction), }, { name: "component with inputs set, but no activation func", getComponent: func() *Component { - c := NewComponent("c1").WithInputs("i1") + c := New("c1").WithInputs("i1") c.Inputs().ByName("i1").PutSignals(signal.New(123)) return c }, @@ -441,7 +430,7 @@ func TestComponent_MaybeActivate(t *testing.T) { { name: "no input", getComponent: func() *Component { - c := NewComponent("c1"). + c := New("c1"). WithInputs("i1", "i2"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -460,7 +449,7 @@ func TestComponent_MaybeActivate(t *testing.T) { { name: "component is waiting for input, reset inputs", getComponent: func() *Component { - c := NewComponent("c1"). + c := New("c1"). WithInputs("i1", "i2"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -481,7 +470,7 @@ func TestComponent_MaybeActivate(t *testing.T) { { name: "component is waiting for input, keep inputs", getComponent: func() *Component { - c := NewComponent("c1"). + c := New("c1"). WithInputs("i1", "i2"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -502,7 +491,7 @@ func TestComponent_MaybeActivate(t *testing.T) { { name: "activated with error", getComponent: func() *Component { - c := NewComponent("c1"). + c := New("c1"). WithInputs("i1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { return errors.New("test error") @@ -519,7 +508,7 @@ func TestComponent_MaybeActivate(t *testing.T) { { name: "activated without error", getComponent: func() *Component { - c := NewComponent("c1"). + c := New("c1"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -537,7 +526,7 @@ func TestComponent_MaybeActivate(t *testing.T) { { name: "component panicked with error", getComponent: func() *Component { - c := NewComponent("c1"). + c := New("c1"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -557,7 +546,7 @@ func TestComponent_MaybeActivate(t *testing.T) { { name: "component panicked with string", getComponent: func() *Component { - c := NewComponent("c1"). + c := New("c1"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { diff --git a/component/state_snapshot.go b/component/state_snapshot.go new file mode 100644 index 0000000..3a2731f --- /dev/null +++ b/component/state_snapshot.go @@ -0,0 +1,46 @@ +package component + +import "github.com/hovsep/fmesh/port" + +// StateSnapshot represents the state of a component (used by f-mesh to perform piping correctly) +type StateSnapshot struct { + inputPortsMetadata port.MetadataMap + outputPortsMetadata port.MetadataMap +} + +// NewStateSnapshot creates new component state snapshot +func NewStateSnapshot() *StateSnapshot { + return &StateSnapshot{ + inputPortsMetadata: make(port.MetadataMap), + outputPortsMetadata: make(port.MetadataMap), + } +} + +// InputPortsMetadata ... getter +func (s *StateSnapshot) InputPortsMetadata() port.MetadataMap { + return s.inputPortsMetadata +} + +// OutputPortsMetadata ... getter +func (s *StateSnapshot) OutputPortsMetadata() port.MetadataMap { + return s.outputPortsMetadata +} + +// WithInputPortsMetadata sets important information about input ports +func (s *StateSnapshot) WithInputPortsMetadata(inputPortsMetadata port.MetadataMap) *StateSnapshot { + s.inputPortsMetadata = inputPortsMetadata + return s +} + +// WithOutputPortsMetadata sets important information about output ports +func (s *StateSnapshot) WithOutputPortsMetadata(outputPortsMetadata port.MetadataMap) *StateSnapshot { + s.outputPortsMetadata = outputPortsMetadata + return s +} + +// getStateSnapshot returns a snapshot of component state +func (c *Component) getStateSnapshot() *StateSnapshot { + return NewStateSnapshot(). + WithInputPortsMetadata(c.Inputs().GetPortsMetadata()). + WithOutputPortsMetadata(c.Outputs().GetPortsMetadata()) +} diff --git a/fmesh.go b/fmesh.go index 5460f88..4572c9d 100644 --- a/fmesh.go +++ b/fmesh.go @@ -93,19 +93,7 @@ func (fm *FMesh) drainComponentsAfterCycle(cycle *cycle.Cycle) { } c.FlushOutputs(activationResult) - - c.FlushInputs() // Inputs are a bit trickier - - //Check if a component wait for inputs and wants to keep existing input - keepInputs := c.WantsToKeepInputs(activationResult) - - if !keepInputs { - // Inputs can not be just cleared, instead we remove signals which - // have been used (been set on inputs) during the last activation cycle - // thus not affecting ones the component could have been received from i2i pipes - c.Inputs().DisposeProcessedSignals(activationResult.StateBefore().InputPorts) - } - + c.FlushInputs(activationResult, c.WantsToKeepInputs(activationResult)) // Inputs are a bit trickier } } diff --git a/fmesh_test.go b/fmesh_test.go index 92b02b1..2f50674 100644 --- a/fmesh_test.go +++ b/fmesh_test.go @@ -160,13 +160,13 @@ func TestFMesh_WithComponents(t *testing.T) { fm: New("fm1"), args: args{ components: []*component.Component{ - component.NewComponent("c1"), + component.New("c1"), }, }, want: &FMesh{ name: "fm1", components: component.Collection{ - "c1": component.NewComponent("c1"), + "c1": component.New("c1"), }, }, }, @@ -175,15 +175,15 @@ func TestFMesh_WithComponents(t *testing.T) { fm: New("fm1"), args: args{ components: []*component.Component{ - component.NewComponent("c1"), - component.NewComponent("c2"), + component.New("c1"), + component.New("c2"), }, }, want: &FMesh{ name: "fm1", components: component.Collection{ - "c1": component.NewComponent("c1"), - "c2": component.NewComponent("c2"), + "c1": component.New("c1"), + "c2": component.New("c2"), }, }, }, @@ -192,18 +192,18 @@ func TestFMesh_WithComponents(t *testing.T) { fm: New("fm1"), args: args{ components: []*component.Component{ - component.NewComponent("c1").WithDescription("descr1"), - component.NewComponent("c2").WithDescription("descr2"), - component.NewComponent("c2").WithDescription("descr3"), //This will overwrite the previous one - component.NewComponent("c4").WithDescription("descr4"), + component.New("c1").WithDescription("descr1"), + component.New("c2").WithDescription("descr2"), + component.New("c2").WithDescription("descr3"), //This will overwrite the previous one + component.New("c4").WithDescription("descr4"), }, }, want: &FMesh{ name: "fm1", components: component.Collection{ - "c1": component.NewComponent("c1").WithDescription("descr1"), - "c2": component.NewComponent("c2").WithDescription("descr3"), - "c4": component.NewComponent("c4").WithDescription("descr4"), + "c1": component.New("c1").WithDescription("descr1"), + "c2": component.New("c2").WithDescription("descr3"), + "c4": component.New("c4").WithDescription("descr4"), }, }, }, @@ -281,7 +281,7 @@ func TestFMesh_Run(t *testing.T) { name: "unsupported error handling strategy", fm: New("fm").WithErrorHandlingStrategy(100). WithComponents( - component.NewComponent("c1"). + component.New("c1"). WithDescription("This component simply puts a constant on o1"). WithInputs("i1"). WithOutputs("o1"). @@ -307,7 +307,7 @@ func TestFMesh_Run(t *testing.T) { fm: New("fm"). WithErrorHandlingStrategy(StopOnFirstErrorOrPanic). WithComponents( - component.NewComponent("c1"). + component.New("c1"). WithDescription("This component just returns an unexpected error"). WithInputs("i1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -332,7 +332,7 @@ func TestFMesh_Run(t *testing.T) { fm: New("fm"). WithErrorHandlingStrategy(StopOnFirstPanic). WithComponents( - component.NewComponent("c1"). + component.New("c1"). WithDescription("This component just sends a number to c2"). WithInputs("i1"). WithOutputs("o1"). @@ -340,7 +340,7 @@ func TestFMesh_Run(t *testing.T) { outputs.ByName("o1").PutSignals(signal.New(10)) return nil }), - component.NewComponent("c2"). + component.New("c2"). WithDescription("This component receives a number from c1 and passes it to c4"). WithInputs("i1"). WithOutputs("o1"). @@ -348,14 +348,14 @@ func TestFMesh_Run(t *testing.T) { port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) return nil }), - component.NewComponent("c3"). + component.New("c3"). WithDescription("This component returns an error, but the mesh is configured to ignore errors"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { return errors.New("boom") }), - component.NewComponent("c4"). + component.New("c4"). WithDescription("This component receives a number from c2 and panics"). WithInputs("i1"). WithOutputs("o1"). @@ -430,7 +430,7 @@ func TestFMesh_Run(t *testing.T) { fm: New("fm"). WithErrorHandlingStrategy(IgnoreAll). WithComponents( - component.NewComponent("c1"). + component.New("c1"). WithDescription("This component just sends a number to c2"). WithInputs("i1"). WithOutputs("o1"). @@ -438,7 +438,7 @@ func TestFMesh_Run(t *testing.T) { outputs.ByName("o1").PutSignals(signal.New(10)) return nil }), - component.NewComponent("c2"). + component.New("c2"). WithDescription("This component receives a number from c1 and passes it to c4"). WithInputs("i1"). WithOutputs("o1"). @@ -446,14 +446,14 @@ func TestFMesh_Run(t *testing.T) { port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) return nil }), - component.NewComponent("c3"). + component.New("c3"). WithDescription("This component returns an error, but the mesh is configured to ignore errors"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { return errors.New("boom") }), - component.NewComponent("c4"). + component.New("c4"). WithDescription("This component receives a number from c2 and panics, but the mesh is configured to ignore even panics"). WithInputs("i1"). WithOutputs("o1"). @@ -465,7 +465,7 @@ func TestFMesh_Run(t *testing.T) { panic("no way") return nil }), - component.NewComponent("c5"). + component.New("c5"). WithDescription("This component receives a number from c4"). WithInputs("i1"). WithOutputs("o1"). @@ -636,7 +636,7 @@ func TestFMesh_runCycle(t *testing.T) { { name: "mesh has components, but no one is activated", fm: New("test").WithComponents( - component.NewComponent("c1"). + component.New("c1"). WithDescription("I do not have any input signal set, hence I will never be activated"). WithInputs("i1"). WithOutputs("o1"). @@ -645,12 +645,12 @@ func TestFMesh_runCycle(t *testing.T) { return nil }), - component.NewComponent("c2"). + component.New("c2"). WithDescription("I do not have activation func set"). WithInputs("i1"). WithOutputs("o1"), - component.NewComponent("c3"). + component.New("c3"). WithDescription("I'm waiting for specific input"). WithInputs("i1", "i2"). WithOutputs("o1"). @@ -660,7 +660,7 @@ func TestFMesh_runCycle(t *testing.T) { } return nil }), - component.NewComponent("c4"). + component.New("c4"). WithDescription("I'm waiting for specific input"). WithInputs("i1", "i2"). WithOutputs("o1"). @@ -682,135 +682,133 @@ func TestFMesh_runCycle(t *testing.T) { component.NewActivationResult("c1"). SetActivated(false). WithActivationCode(component.ActivationCodeNoInput). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ + WithStateBefore(component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ "i1": &port.Metadata{ SignalBufferLen: 0, }, - }, - OutputPorts: port.MetadataMap{ + }). + WithOutputPortsMetadata(port.MetadataMap{ "o1": &port.Metadata{ SignalBufferLen: 0, }, - }, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ + })). + WithStateAfter(component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ "i1": &port.Metadata{ SignalBufferLen: 0, }, - }, - OutputPorts: port.MetadataMap{ + }). + WithOutputPortsMetadata(port.MetadataMap{ "o1": &port.Metadata{ SignalBufferLen: 0, }, - }, - }), + })), component.NewActivationResult("c2"). SetActivated(false). WithActivationCode(component.ActivationCodeNoFunction). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })), component.NewActivationResult("c3"). SetActivated(false). WithActivationCode(component.ActivationCodeWaitingForInput). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - "i2": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - "i2": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + "i2": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + "i2": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })), component.NewActivationResult("c4"). SetActivated(false). WithActivationCode(component.ActivationCodeWaitingForInput). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - "i2": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - "i2": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - })), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + "i2": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + "i2": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + }))), }, { name: "all components activated in one cycle (concurrently)", fm: New("test").WithComponents( - component.NewComponent("c1"). + component.New("c1"). WithDescription(""). WithInputs("i1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { // No output return nil }), - component.NewComponent("c2"). + component.New("c2"). WithDescription(""). WithInputs("i1"). WithOutputs("o1", "o2"). @@ -821,7 +819,7 @@ func TestFMesh_runCycle(t *testing.T) { outputs.ByName("o2").PutSignals(signal.NewGroup(2, 3, 4, 5)...) return nil }), - component.NewComponent("c3"). + component.New("c3"). WithDescription(""). WithInputs("i1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -838,73 +836,70 @@ func TestFMesh_runCycle(t *testing.T) { component.NewActivationResult("c1"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - }, - OutputPorts: port.MetadataMap{}, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - }, - OutputPorts: port.MetadataMap{}, - }), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + })), component.NewActivationResult("c2"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - "o2": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 1, - }, - "o2": &port.Metadata{ - SignalBufferLen: 4, - }, - }, - }), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + "o2": &port.Metadata{ + SignalBufferLen: 0, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 1, + }, + "o2": &port.Metadata{ + SignalBufferLen: 4, + }, + })), component.NewActivationResult("c3"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - }, - OutputPorts: port.MetadataMap{}, - }).WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 1, - }, - }, - OutputPorts: port.MetadataMap{}, - }), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + })), ), }, } @@ -941,8 +936,8 @@ func TestFMesh_drainComponentsAfterCycle(t *testing.T) { component.NewActivationResult("c2").SetActivated(false).WithActivationCode(component.ActivationCodeNoInput), ), fm: New("fm").WithComponents( - component.NewComponent("c1").WithInputs("i1").WithOutputs("o1"), - component.NewComponent("c2").WithInputs("i1").WithOutputs("o1"), + component.New("c1").WithInputs("i1").WithOutputs("o1"), + component.New("c2").WithInputs("i1").WithOutputs("o1"), ), initFM: func(fm *FMesh) { //Create a pipe @@ -963,65 +958,65 @@ func TestFMesh_drainComponentsAfterCycle(t *testing.T) { component.NewActivationResult("c1"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 1, - }, - }, - }), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 1, + }, + })), component.NewActivationResult("c2"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 1, - }, - }, - })), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 1, + }, + }))), fm: New("fm").WithComponents( - component.NewComponent("c1"). + component.New("c1"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { return nil }), - component.NewComponent("c2"). + component.New("c2"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -1049,61 +1044,61 @@ func TestFMesh_drainComponentsAfterCycle(t *testing.T) { component.NewActivationResult("c1"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 1, - }, - }, - }), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 1, + }, + })), component.NewActivationResult("c2"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithStateBefore(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }). - WithStateAfter(&component.StateSnapshot{ - InputPorts: port.MetadataMap{ - "i1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - OutputPorts: port.MetadataMap{ - "o1": &port.Metadata{ - SignalBufferLen: 0, - }, - }, - }), + WithStateBefore( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })). + WithStateAfter( + component.NewStateSnapshot(). + WithInputPortsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 0, + }, + }). + WithOutputPortsMetadata(port.MetadataMap{ + "o1": &port.Metadata{ + SignalBufferLen: 0, + }, + })), ), fm: New("fm").WithComponents( - component.NewComponent("c1").WithInputs("i1").WithOutputs("o1"), - component.NewComponent("c2").WithInputs("i1").WithOutputs("o1"), + component.New("c1").WithInputs("i1").WithOutputs("o1"), + component.New("c2").WithInputs("i1").WithOutputs("o1"), ), initFM: func(fm *FMesh) { //Create a pipe diff --git a/integration_tests/computation/math_test.go b/integration_tests/computation/math_test.go index 1758b44..f9169e5 100644 --- a/integration_tests/computation/math_test.go +++ b/integration_tests/computation/math_test.go @@ -20,7 +20,7 @@ func Test_Math(t *testing.T) { { name: "add and multiply", setupFM: func() *fmesh.FMesh { - c1 := component.NewComponent("c1"). + c1 := component.New("c1"). WithDescription("adds 2 to the input"). WithInputs("num"). WithOutputs("res"). @@ -30,7 +30,7 @@ func Test_Math(t *testing.T) { return nil }) - c2 := component.NewComponent("c2"). + c2 := component.New("c2"). WithDescription("multiplies by 3"). WithInputs("num"). WithOutputs("res"). diff --git a/integration_tests/piping/fan_test.go b/integration_tests/piping/fan_test.go index 73caf33..b45fe19 100644 --- a/integration_tests/piping/fan_test.go +++ b/integration_tests/piping/fan_test.go @@ -23,7 +23,7 @@ func Test_Fan(t *testing.T) { name: "fan-out (3 pipes from 1 source port)", setupFM: func() *fmesh.FMesh { fm := fmesh.New("fan-out").WithComponents( - component.NewComponent("producer"). + component.New("producer"). WithInputs("start"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -31,7 +31,7 @@ func Test_Fan(t *testing.T) { return nil }), - component.NewComponent("consumer1"). + component.New("consumer1"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -40,7 +40,7 @@ func Test_Fan(t *testing.T) { return nil }), - component.NewComponent("consumer2"). + component.New("consumer2"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -49,7 +49,7 @@ func Test_Fan(t *testing.T) { return nil }), - component.NewComponent("consumer3"). + component.New("consumer3"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -86,7 +86,7 @@ func Test_Fan(t *testing.T) { { name: "fan-in (3 pipes coming into 1 destination port)", setupFM: func() *fmesh.FMesh { - producer1 := component.NewComponent("producer1"). + producer1 := component.New("producer1"). WithInputs("start"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -94,7 +94,7 @@ func Test_Fan(t *testing.T) { return nil }) - producer2 := component.NewComponent("producer2"). + producer2 := component.New("producer2"). WithInputs("start"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -102,14 +102,14 @@ func Test_Fan(t *testing.T) { return nil }) - producer3 := component.NewComponent("producer3"). + producer3 := component.New("producer3"). WithInputs("start"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { outputs.ByName("o1").PutSignals(signal.New(rand.Int())) return nil }) - consumer := component.NewComponent("consumer"). + consumer := component.New("consumer"). WithInputs("i1"). WithOutputs("o1"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { diff --git a/integration_tests/piping/piping_from_inputs_test.go b/integration_tests/piping/piping_from_inputs_test.go index e98bf41..1932581 100644 --- a/integration_tests/piping/piping_from_inputs_test.go +++ b/integration_tests/piping/piping_from_inputs_test.go @@ -21,7 +21,7 @@ func Test_PipingFromInput(t *testing.T) { { name: "observer pattern", setupFM: func() *fmesh.FMesh { - adder := component.NewComponent("adder"). + adder := component.New("adder"). WithDescription("adds i1 and i2"). WithInputsIndexed("i", 1, 2). WithOutputs("out"). @@ -31,7 +31,7 @@ func Test_PipingFromInput(t *testing.T) { return nil }) - multiplier := component.NewComponent("multiplier"). + multiplier := component.New("multiplier"). WithDescription("multiplies i1 by 10"). WithInputs("i1"). WithOutputs("out"). @@ -41,7 +41,7 @@ func Test_PipingFromInput(t *testing.T) { return nil }) - logger := component.NewComponent("logger"). + logger := component.New("logger"). WithDescription("logs all input signals"). WithInputs("in"). WithOutputs("log"). @@ -82,7 +82,7 @@ func Test_PipingFromInput(t *testing.T) { { name: "observing component which waits for inputs", setupFM: func() *fmesh.FMesh { - starter := component.NewComponent("starter"). + starter := component.New("starter"). WithDescription("This component just starts the whole f-mesh"). WithInputs("start"). WithOutputsIndexed("o", 1, 2). @@ -92,7 +92,7 @@ func Test_PipingFromInput(t *testing.T) { return nil }) - incr1 := component.NewComponent("incr1"). + incr1 := component.New("incr1"). WithDescription("Increments the input"). WithInputs("i1"). WithOutputs("o1"). @@ -101,7 +101,7 @@ func Test_PipingFromInput(t *testing.T) { return nil }) - incr2 := component.NewComponent("incr2"). + incr2 := component.New("incr2"). WithDescription("Increments the input"). WithInputs("i1"). WithOutputs("o1"). @@ -110,7 +110,7 @@ func Test_PipingFromInput(t *testing.T) { return nil }) - doubler := component.NewComponent("doubler"). + doubler := component.New("doubler"). WithDescription("Doubles the input"). WithInputs("i1"). WithOutputs("o1"). @@ -119,7 +119,7 @@ func Test_PipingFromInput(t *testing.T) { return nil }) - agg := component.NewComponent("result_aggregator"). + agg := component.New("result_aggregator"). WithDescription("Adds 2 inputs (only when both are available)"). WithInputsIndexed("i", 1, 2). WithOutputs("result"). @@ -133,7 +133,7 @@ func Test_PipingFromInput(t *testing.T) { return nil }) - observer := component.NewComponent("obsrv"). + observer := component.New("obsrv"). WithDescription("Observes inputs of result aggregator"). WithInputsIndexed("i", 1, 2). WithOutputs("log"). diff --git a/integration_tests/piping/piping_to_outputs_test.go b/integration_tests/piping/piping_to_outputs_test.go index 42d4eab..abc17b2 100644 --- a/integration_tests/piping/piping_to_outputs_test.go +++ b/integration_tests/piping/piping_to_outputs_test.go @@ -21,7 +21,7 @@ func Test_PipingToOutputs(t *testing.T) { // Simple case: both generator and injector are activated only once and in the same cycle name: "single signal injection", setupFM: func() *fmesh.FMesh { - gen := component.NewComponent("generator"). + gen := component.New("generator"). WithDescription("Just generates a signal"). WithInputs("start"). WithOutputs("res").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -29,7 +29,7 @@ func Test_PipingToOutputs(t *testing.T) { return nil }) - inj := component.NewComponent("injector"). + inj := component.New("injector"). WithDescription("Adds signals to gen.res output port"). WithInputs("start"). WithOutputs("res").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -58,7 +58,7 @@ func Test_PipingToOutputs(t *testing.T) { // 2 components have symmetrically connected output (both are generators and injectors at the same time) name: "outputs exchange", setupFM: func() *fmesh.FMesh { - c1 := component.NewComponent("c1"). + c1 := component.New("c1"). WithDescription("Generates a signal"). WithInputs("start"). WithOutputs("res").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { @@ -66,7 +66,7 @@ func Test_PipingToOutputs(t *testing.T) { return nil }) - c2 := component.NewComponent("c2"). + c2 := component.New("c2"). WithDescription("Generates a signal"). WithInputs("start"). WithOutputs("res").WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { diff --git a/port/collection.go b/port/collection.go index e22ca3d..128cc72 100644 --- a/port/collection.go +++ b/port/collection.go @@ -7,13 +7,6 @@ import ( // Collection is a port collection with useful methods type Collection map[string]*Port -// Metadata contains metadata about the port -type Metadata struct { - SignalBufferLen int -} - -type MetadataMap map[string]*Metadata - // NewCollection creates empty collection func NewCollection() Collection { return make(Collection) @@ -72,17 +65,17 @@ func (collection Collection) WithSignals(signals ...*signal.Signal) Collection { return collection } -// ClearSignals removes signals from all ports in collection -func (collection Collection) ClearSignals() { +// Clear clears all ports in collection +func (collection Collection) Clear() { for _, p := range collection { - p.ClearSignals() + p.Clear() } } // Flush flushes all ports in collection -func (collection Collection) Flush(clearFlushed bool) { +func (collection Collection) Flush() { for _, p := range collection { - p.Flush(clearFlushed) + p.Flush() } } @@ -117,26 +110,3 @@ func (collection Collection) AllSignals() signal.Group { } return group } - -// GetPortsMetadata returns info about current length of each port in collection -func (collection Collection) GetPortsMetadata() MetadataMap { - res := make(MetadataMap) - for _, p := range collection { - res[p.Name()] = &Metadata{ - SignalBufferLen: len(p.Signals()), - } - } - return res -} - -func (collection Collection) DisposeProcessedSignals(portsMetadata MetadataMap) { - for pName, meta := range portsMetadata { - collection.ByName(pName).DisposeFirstNSignals(meta.SignalBufferLen) - } -} - -func (collection Collection) FlushProcessedSignals(portsMetadata MetadataMap) { - for pName, meta := range portsMetadata { - collection.ByName(pName).FlushAndDisposeNSignals(meta.SignalBufferLen) - } -} diff --git a/port/collection_test.go b/port/collection_test.go index de1fc35..a68492b 100644 --- a/port/collection_test.go +++ b/port/collection_test.go @@ -8,7 +8,7 @@ import ( func TestCollection_AllHaveSignal(t *testing.T) { oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.New(123)) - oneEmptyPorts.ByName("p2").ClearSignals() + oneEmptyPorts.ByName("p2").Clear() allWithSignalPorts := NewCollection().Add(NewGroup("out1", "out2", "out3")...).WithSignals(signal.New(77)) @@ -42,7 +42,7 @@ func TestCollection_AllHaveSignal(t *testing.T) { func TestCollection_AnyHasSignal(t *testing.T) { oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.New(123)) - oneEmptyPorts.ByName("p2").ClearSignals() + oneEmptyPorts.ByName("p2").Clear() tests := []struct { name string @@ -202,7 +202,7 @@ func TestCollection_ClearSignal(t *testing.T) { t.Run("happy path", func(t *testing.T) { ports := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.NewGroup(1, 2, 3)...) assert.True(t, ports.AllHaveSignals()) - ports.ClearSignals() + ports.Clear() assert.False(t, ports.AnyHasSignals()) }) } diff --git a/port/metadata.go b/port/metadata.go new file mode 100644 index 0000000..319b452 --- /dev/null +++ b/port/metadata.go @@ -0,0 +1,20 @@ +package port + +// Metadata contains metadata about the port +type Metadata struct { + SignalBufferLen int +} + +// MetadataMap contains port metadata indexed by port name +type MetadataMap map[string]*Metadata + +// GetPortsMetadata returns info about current length of each port in collection +func (collection Collection) GetPortsMetadata() MetadataMap { + res := make(MetadataMap) + for _, p := range collection { + res[p.Name()] = &Metadata{ + SignalBufferLen: len(p.Signals()), + } + } + return res +} diff --git a/port/port.go b/port/port.go index 9cf0546..8f15441 100644 --- a/port/port.go +++ b/port/port.go @@ -30,6 +30,7 @@ func (p *Port) Signals() signal.Group { return p.signals } +// setSignals sets signals field func (p *Port) setSignals(signals signal.Group) { p.signals = signals } @@ -46,24 +47,22 @@ func (p *Port) WithSignals(signals ...*signal.Signal) *Port { return p } -// ClearSignals removes all signals from the port -func (p *Port) ClearSignals() { +// Clear removes all signals from the port +func (p *Port) Clear() { p.setSignals(signal.NewGroup()) } -func (p *Port) DisposeFirstNSignals(n int) { - if n > len(p.Signals()) { - p.ClearSignals() - return - } +// DisposeSignals removes n signals from the beginning of signal buffer +func (p *Port) DisposeSignals(n int) { p.setSignals(p.Signals()[n:]) } -func (p *Port) FlushAndDisposeNSignals(n int) { +// FlushAndDispose flushes n signals and then disposes them +func (p *Port) FlushAndDispose(n int) { if n > len(p.Signals()) { - //Flush all - p.Flush(false) - p.ClearSignals() + //Flush all signals and clear + p.Flush() + p.Clear() } if !p.HasSignals() || !p.HasPipes() { @@ -75,7 +74,7 @@ func (p *Port) FlushAndDisposeNSignals(n int) { ForwardNSignals(p, outboundPort, n) } - p.DisposeFirstNSignals(n) + p.DisposeSignals(n) } // HasSignals says whether port signals is set or not @@ -101,7 +100,7 @@ func (p *Port) PipeTo(toPorts ...*Port) { // Flush pushes signals to pipes, clears the port if needed and returns true when flushed // @TODO: hide this method from user -func (p *Port) Flush(clearFlushed bool) bool { +func (p *Port) Flush() bool { if !p.HasSignals() || !p.HasPipes() { return false } @@ -110,17 +109,15 @@ func (p *Port) Flush(clearFlushed bool) bool { //Fan-Out ForwardSignals(p, outboundPort) } - if clearFlushed { - p.ClearSignals() - } return true } -// ForwardSignals puts signals from source port to destination port, without clearing the source port +// ForwardSignals copies all signals from source port to destination port, without clearing the source port func ForwardSignals(source *Port, dest *Port) { dest.PutSignals(source.Signals()...) } +// ForwardNSignals forwards n signals func ForwardNSignals(source *Port, dest *Port, n int) { dest.PutSignals(source.Signals()[:n]...) } diff --git a/port/port_test.go b/port/port_test.go index df53b55..521e7df 100644 --- a/port/port_test.go +++ b/port/port_test.go @@ -58,7 +58,7 @@ func TestPort_Signals(t *testing.T) { } } -func TestPort_ClearSignal(t *testing.T) { +func TestPort_Clear(t *testing.T) { portWithSignal := New("portWithSignal").WithSignals(signal.New(111)) tests := []struct { @@ -79,7 +79,7 @@ func TestPort_ClearSignal(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.before.ClearSignals() + tt.before.Clear() assert.Equal(t, tt.after, tt.before) }) } @@ -275,12 +275,11 @@ func TestNewPort(t *testing.T) { func TestPort_Flush(t *testing.T) { tests := []struct { - name string - getSource func() *Port - getDest func() *Port - clearFlushed bool - wantResult bool - assertions func(t *testing.T, source *Port, dest *Port) + name string + getSource func() *Port + getDest func() *Port + wantResult bool + assertions func(t *testing.T, source *Port, dest *Port) }{ { name: "port with no signals", @@ -290,7 +289,6 @@ func TestPort_Flush(t *testing.T) { getDest: func() *Port { return New("empty_dest") }, - clearFlushed: false, assertions: func(t *testing.T, source *Port, dest *Port) { assert.False(t, source.HasSignals()) assert.False(t, dest.HasSignals()) @@ -305,7 +303,6 @@ func TestPort_Flush(t *testing.T) { getDest: func() *Port { return New("empty_dest") }, - clearFlushed: false, assertions: func(t *testing.T, source *Port, dest *Port) { //Source port is not cleared during flush assert.True(t, source.HasSignals()) @@ -316,25 +313,6 @@ func TestPort_Flush(t *testing.T) { }, wantResult: true, }, - { - name: "flush to empty port and clear", - getSource: func() *Port { - return New("portWithSignal").WithSignals(signal.New(222)) - }, - getDest: func() *Port { - return New("empty_dest") - }, - clearFlushed: true, - assertions: func(t *testing.T, source *Port, dest *Port) { - //Source port is cleared - assert.False(t, source.HasSignals()) - - //Signals transferred to destination port - assert.True(t, dest.HasSignals()) - assert.Equal(t, dest.Signals().FirstPayload().(int), 222) - }, - wantResult: true, - }, { name: "flush to port with signals", getSource: func() *Port { @@ -343,7 +321,6 @@ func TestPort_Flush(t *testing.T) { getDest: func() *Port { return New("portWithMultipleSignals").WithSignals(signal.NewGroup(444, 555, 666)...) }, - clearFlushed: false, assertions: func(t *testing.T, source *Port, dest *Port) { //Source port is not cleared assert.True(t, source.HasSignals()) @@ -355,33 +332,13 @@ func TestPort_Flush(t *testing.T) { }, wantResult: true, }, - { - name: "flush to port with signals and clear", - getSource: func() *Port { - return New("portWithSignal").WithSignals(signal.New(777)) - }, - getDest: func() *Port { - return New("portWithMultipleSignals").WithSignals(signal.NewGroup(888, 999, 101010)...) - }, - clearFlushed: true, - assertions: func(t *testing.T, source *Port, dest *Port) { - //Source port is cleared - assert.False(t, source.HasSignals()) - - //Destination port now has 1 more signal - assert.True(t, dest.HasSignals()) - assert.Len(t, dest.Signals(), 4) - assert.Contains(t, dest.Signals().AllPayloads(), 777) - }, - wantResult: true, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { source := tt.getSource() dest := tt.getDest() source.PipeTo(dest) - assert.Equal(t, tt.wantResult, source.Flush(tt.clearFlushed)) + assert.Equal(t, tt.wantResult, source.Flush()) if tt.assertions != nil { tt.assertions(t, source, dest) } @@ -389,7 +346,7 @@ func TestPort_Flush(t *testing.T) { } } -func TestPort_DisposeFirstNSignals(t *testing.T) { +func TestPort_DisposeSignals(t *testing.T) { type args struct { n int } @@ -397,6 +354,7 @@ func TestPort_DisposeFirstNSignals(t *testing.T) { name string port *Port wantSignals signal.Group + wantPanic bool args args }{ { @@ -416,9 +374,10 @@ func TestPort_DisposeFirstNSignals(t *testing.T) { }, }, { - name: "n > len(signals) cleans the signal buffer", + name: "n > len(signals), panic", port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), wantSignals: signal.NewGroup(), + wantPanic: true, args: args{ n: 10, }, @@ -434,8 +393,16 @@ func TestPort_DisposeFirstNSignals(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.port.DisposeFirstNSignals(tt.args.n) - assert.Equal(t, tt.wantSignals, tt.port.Signals()) + + if tt.wantPanic { + assert.Panics(t, func() { + tt.port.DisposeSignals(tt.args.n) + }) + } else { + tt.port.DisposeSignals(tt.args.n) + assert.Equal(t, tt.wantSignals, tt.port.Signals()) + } + }) } } diff --git a/signal/group.go b/signal/group.go index 90c9783..cb09937 100644 --- a/signal/group.go +++ b/signal/group.go @@ -16,13 +16,13 @@ func (group Group) FirstPayload() any { //Intentionally not checking the group len //as the method does not have returning error (api is simpler) //and we can not just return nil, as nil may be a valid payload. - //So just let runtime panic + //Just letting the runtime panic return group[0].Payload() } // AllPayloads returns a slice with all payloads of the all signals in the group func (group Group) AllPayloads() []any { - all := make([]any, 0) + all := make([]any, 0, len(group)) for _, s := range group { all = append(all, s.Payload()) }