From 2ec8ac8e01468caedee9648ef12970089d0fd197 Mon Sep 17 00:00:00 2001 From: hovsep Date: Fri, 20 Sep 2024 00:47:43 +0300 Subject: [PATCH 1/3] Port: new handy method --- .../piping/piping_from_inputs_test.go | 1 - port/collection.go | 12 +- port/collection_test.go | 15 +-- port/port.go | 15 ++- port/port_test.go | 126 ++++++++++++------ 5 files changed, 108 insertions(+), 61 deletions(-) diff --git a/integration_tests/piping/piping_from_inputs_test.go b/integration_tests/piping/piping_from_inputs_test.go index 7f90e40..4d86dfd 100644 --- a/integration_tests/piping/piping_from_inputs_test.go +++ b/integration_tests/piping/piping_from_inputs_test.go @@ -47,7 +47,6 @@ func Test_PipingFromInput(t *testing.T) { WithOutputs("log"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { for _, sig := range inputs.ByName("in").Signals() { - fmt.Println(fmt.Sprintf("LOGGED SIGNAL: %v", sig.Payload())) outputs.ByName("log").PutSignals(signal.New(fmt.Sprintf("LOGGED SIGNAL: %v", sig.Payload()))) } return nil diff --git a/port/collection.go b/port/collection.go index 3379da7..78f39ba 100644 --- a/port/collection.go +++ b/port/collection.go @@ -52,13 +52,19 @@ func (collection Collection) AllHaveSignals() bool { return true } -// PutSignals puts a signals to all the port in collection +// PutSignals adds signals to every port in collection func (collection Collection) PutSignals(signals ...*signal.Signal) { for _, p := range collection { p.PutSignals(signals...) } } +// WithSignals adds signals to every port in collection and returns the collection +func (collection Collection) WithSignals(signals ...*signal.Signal) Collection { + collection.PutSignals(signals...) + return collection +} + // ClearSignals removes signals from all ports in collection func (collection Collection) ClearSignals() { for _, p := range collection { @@ -69,9 +75,7 @@ func (collection Collection) ClearSignals() { // Flush flushes all ports in collection func (collection Collection) Flush(clearFlushed bool) { for _, p := range collection { - if portFlushed := p.Flush(); clearFlushed && portFlushed { - p.ClearSignals() - } + p.Flush(clearFlushed) } } diff --git a/port/collection_test.go b/port/collection_test.go index fc98770..2ffa412 100644 --- a/port/collection_test.go +++ b/port/collection_test.go @@ -7,12 +7,10 @@ import ( ) func TestCollection_AllHaveSignal(t *testing.T) { - oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...) - oneEmptyPorts.PutSignals(signal.New(123)) + oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.New(123)) oneEmptyPorts.ByName("p2").ClearSignals() - allWithSignalPorts := NewCollection().Add(NewGroup("out1", "out2", "out3")...) - allWithSignalPorts.PutSignals(signal.New(77)) + allWithSignalPorts := NewCollection().Add(NewGroup("out1", "out2", "out3")...).WithSignals(signal.New(77)) tests := []struct { name string @@ -43,8 +41,7 @@ func TestCollection_AllHaveSignal(t *testing.T) { } func TestCollection_AnyHasSignal(t *testing.T) { - oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...) - oneEmptyPorts.PutSignals(signal.New(123)) + oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.New(123)) oneEmptyPorts.ByName("p2").ClearSignals() tests := []struct { @@ -71,8 +68,7 @@ func TestCollection_AnyHasSignal(t *testing.T) { } func TestCollection_ByName(t *testing.T) { - portsWithSignals := NewCollection().Add(NewGroup("p1", "p2")...) - portsWithSignals.PutSignals(signal.New(12)) + portsWithSignals := NewCollection().Add(NewGroup("p1", "p2")...).WithSignals(signal.New(12)) type args struct { name string @@ -188,8 +184,7 @@ func TestCollection_ByNames(t *testing.T) { func TestCollection_ClearSignal(t *testing.T) { t.Run("happy path", func(t *testing.T) { - ports := NewCollection().Add(NewGroup("p1", "p2", "p3")...) - ports.PutSignals(signal.NewGroup(1, 2, 3)...) + ports := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.NewGroup(1, 2, 3)...) assert.True(t, ports.AllHaveSignals()) ports.ClearSignals() assert.False(t, ports.AnyHasSignals()) diff --git a/port/port.go b/port/port.go index 5d7a979..af32d53 100644 --- a/port/port.go +++ b/port/port.go @@ -30,12 +30,18 @@ func (p *Port) Signals() signal.Collection { return p.signals } -// PutSignals adds a signals to current signals +// PutSignals adds signals // @TODO: rename func (p *Port) PutSignals(signals ...*signal.Signal) { p.Signals().Add(signals...) } +// WithSignals adds signals and returns the port +func (p *Port) WithSignals(signals ...*signal.Signal) *Port { + p.PutSignals(signals...) + return p +} + // ClearSignals removes all signals from the port func (p *Port) ClearSignals() { p.signals = signal.NewCollection() @@ -62,9 +68,9 @@ func (p *Port) PipeTo(toPorts ...*Port) { } } -// Flush pushes current signals to pipes and returns true when flushed +// Flush pushes signals to pipes, clears the port if needed and returns true when flushed // @TODO: hide this method from user -func (p *Port) Flush() bool { +func (p *Port) Flush(clearFlushed bool) bool { if !p.HasSignals() || !p.HasPipes() { return false } @@ -73,6 +79,9 @@ func (p *Port) Flush() bool { //Fan-Out ForwardSignals(p, outboundPort) } + if clearFlushed { + p.ClearSignals() + } return true } diff --git a/port/port_test.go b/port/port_test.go index 2fdfff0..5d65b7c 100644 --- a/port/port_test.go +++ b/port/port_test.go @@ -7,8 +7,7 @@ import ( ) func TestPort_HasSignals(t *testing.T) { - portWithSignal := New("portWithSignal") - portWithSignal.PutSignals(signal.New(123)) + portWithSignal := New("portWithSignal").WithSignals(signal.New(123)) tests := []struct { name string @@ -34,8 +33,7 @@ func TestPort_HasSignals(t *testing.T) { } func TestPort_Signals(t *testing.T) { - portWithSignal := New("portWithSignal") - portWithSignal.PutSignals(signal.New(123)) + portWithSignal := New("portWithSignal").WithSignals(signal.New(123)) tests := []struct { name string @@ -61,8 +59,7 @@ func TestPort_Signals(t *testing.T) { } func TestPort_ClearSignal(t *testing.T) { - portWithSignal := New("portWithSignal") - portWithSignal.PutSignals(signal.New(111)) + portWithSignal := New("portWithSignal").WithSignals(signal.New(111)) tests := []struct { name string @@ -134,14 +131,11 @@ func TestPort_PipeTo(t *testing.T) { } func TestPort_PutSignals(t *testing.T) { - portWithSingleSignal := New("portWithSingleSignal") - portWithSingleSignal.PutSignals(signal.New(11)) + portWithSingleSignal := New("portWithSingleSignal").WithSignals(signal.New(11)) - portWithMultipleSignals := New("portWithMultipleSignals") - portWithMultipleSignals.PutSignals(signal.NewGroup(11, 12)...) + portWithMultipleSignals := New("portWithMultipleSignals").WithSignals(signal.NewGroup(11, 12)...) - portWithMultipleSignals2 := New("portWithMultipleSignals2") - portWithMultipleSignals2.PutSignals(signal.NewGroup(55, 66)...) + portWithMultipleSignals2 := New("portWithMultipleSignals2").WithSignals(signal.NewGroup(55, 66)...) type args struct { signals []*signal.Signal @@ -280,28 +274,23 @@ func TestNewPort(t *testing.T) { } func TestPort_Flush(t *testing.T) { - portWithSignal1 := New("portWithSignal1") - portWithSignal1.PutSignals(signal.New(777)) - - portWithSignal2 := New("portWithSignal2") - portWithSignal2.PutSignals(signal.New(888)) - - portWithMultipleSignals := New("portWithMultipleSignals") - portWithMultipleSignals.PutSignals(signal.NewGroup(11, 12)...) - - emptyPort := New("emptyPort") - tests := []struct { - name string - source *Port - dest *Port - wantResult bool - assertions func(t *testing.T, source *Port, dest *Port) + name string + getSource func() *Port + getDest func() *Port + clearFlushed bool + wantResult bool + assertions func(t *testing.T, source *Port, dest *Port) }{ { - name: "port with no signals", - source: New("empty_src"), - dest: New("empty_dest"), + name: "port with no signals", + getSource: func() *Port { + return New("empty_src") + }, + getDest: func() *Port { + return New("empty_dest") + }, + clearFlushed: false, assertions: func(t *testing.T, source *Port, dest *Port) { assert.False(t, source.HasSignals()) assert.False(t, dest.HasSignals()) @@ -309,41 +298,92 @@ func TestPort_Flush(t *testing.T) { wantResult: false, }, { - name: "flush to empty port", - source: portWithSignal1, - dest: emptyPort, + name: "flush to empty port", + getSource: func() *Port { + return New("portWithSignal").WithSignals(signal.New(111)) + }, + getDest: func() *Port { + return New("empty_dest") + }, + clearFlushed: false, assertions: func(t *testing.T, source *Port, dest *Port) { //Source port is not cleared during flush assert.True(t, source.HasSignals()) //Signals transferred to destination port assert.True(t, dest.HasSignals()) - assert.Equal(t, dest.Signals().FirstPayload().(int), 777) + assert.Equal(t, dest.Signals().FirstPayload().(int), 111) + }, + wantResult: true, + }, + { + name: "flush to empty port and clear", + getSource: func() *Port { + return New("portWithSignal").WithSignals(signal.New(222)) + }, + getDest: func() *Port { + return New("empty_dest") + }, + clearFlushed: true, + assertions: func(t *testing.T, source *Port, dest *Port) { + //Source port is cleared + assert.False(t, source.HasSignals()) + + //Signals transferred to destination port + assert.True(t, dest.HasSignals()) + assert.Equal(t, dest.Signals().FirstPayload().(int), 222) }, wantResult: true, }, { - name: "flush to port with signals", - source: portWithSignal2, - dest: portWithMultipleSignals, + name: "flush to port with signals", + getSource: func() *Port { + return New("portWithSignal").WithSignals(signal.New(333)) + }, + getDest: func() *Port { + return New("portWithMultipleSignals").WithSignals(signal.NewGroup(444, 555, 666)...) + }, + clearFlushed: false, assertions: func(t *testing.T, source *Port, dest *Port) { //Source port is not cleared assert.True(t, source.HasSignals()) //Destination port now has 1 more signal assert.True(t, dest.HasSignals()) - assert.Len(t, dest.Signals(), 3) - assert.Contains(t, dest.Signals().AllPayloads(), 888) + assert.Len(t, dest.Signals(), 4) + assert.Contains(t, dest.Signals().AllPayloads(), 333) + }, + wantResult: true, + }, + { + name: "flush to port with signals and clear", + getSource: func() *Port { + return New("portWithSignal").WithSignals(signal.New(777)) + }, + getDest: func() *Port { + return New("portWithMultipleSignals").WithSignals(signal.NewGroup(888, 999, 101010)...) + }, + clearFlushed: true, + assertions: func(t *testing.T, source *Port, dest *Port) { + //Source port is cleared + assert.False(t, source.HasSignals()) + + //Destination port now has 1 more signal + assert.True(t, dest.HasSignals()) + assert.Len(t, dest.Signals(), 4) + assert.Contains(t, dest.Signals().AllPayloads(), 777) }, wantResult: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.source.PipeTo(tt.dest) - assert.Equal(t, tt.wantResult, tt.source.Flush()) + source := tt.getSource() + dest := tt.getDest() + source.PipeTo(dest) + assert.Equal(t, tt.wantResult, source.Flush(tt.clearFlushed)) if tt.assertions != nil { - tt.assertions(t, tt.source, tt.dest) + tt.assertions(t, source, dest) } }) } From 83566a220da81b53e541734990a9d3734e4faaa9 Mon Sep 17 00:00:00 2001 From: hovsep Date: Fri, 20 Sep 2024 02:10:44 +0300 Subject: [PATCH 2/3] Add integration test --- component/activation_result.go | 1 + component/component.go | 16 +++- .../piping/piping_from_inputs_test.go | 92 ++++++++++++++++++- port/collection.go | 13 +++ port/group.go | 21 +++++ 5 files changed, 140 insertions(+), 3 deletions(-) diff --git a/component/activation_result.go b/component/activation_result.go index a801c3d..72dcd84 100644 --- a/component/activation_result.go +++ b/component/activation_result.go @@ -38,6 +38,7 @@ const ( ) // NewActivationResult creates a new activation result for given component +// @TODO Hide this from user func NewActivationResult(componentName string) *ActivationResult { return &ActivationResult{ componentName: componentName, diff --git a/component/component.go b/component/component.go index f20bdd2..d66f4e1 100644 --- a/component/component.go +++ b/component/component.go @@ -34,13 +34,25 @@ func (c *Component) WithDescription(description string) *Component { // WithInputs ads input ports func (c *Component) WithInputs(portNames ...string) *Component { - c.inputs = c.inputs.Add(port.NewGroup(portNames...)...) + c.inputs = c.Inputs().Add(port.NewGroup(portNames...)...) return c } // WithOutputs adds output ports func (c *Component) WithOutputs(portNames ...string) *Component { - c.outputs = c.outputs.Add(port.NewGroup(portNames...)...) + c.outputs = c.Outputs().Add(port.NewGroup(portNames...)...) + return c +} + +// WithInputsIndexed creates multiple prefixed ports +func (c *Component) WithInputsIndexed(prefix string, startIndex int, endIndex int) *Component { + c.inputs = c.Inputs().AddIndexed(prefix, startIndex, endIndex) + return c +} + +// WithOutputsIndexed creates multiple prefixed ports +func (c *Component) WithOutputsIndexed(prefix string, startIndex int, endIndex int) *Component { + c.outputs = c.Outputs().AddIndexed(prefix, startIndex, endIndex) return c } diff --git a/integration_tests/piping/piping_from_inputs_test.go b/integration_tests/piping/piping_from_inputs_test.go index 4d86dfd..f88a348 100644 --- a/integration_tests/piping/piping_from_inputs_test.go +++ b/integration_tests/piping/piping_from_inputs_test.go @@ -23,7 +23,7 @@ func Test_PipingFromInput(t *testing.T) { setupFM: func() *fmesh.FMesh { adder := component.NewComponent("adder"). WithDescription("adds i1 and i2"). - WithInputs("i1", "i2"). + WithInputsIndexed("i", 1, 2). WithOutputs("out"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { i1, i2 := inputs.ByName("i1").Signals().FirstPayload().(int), inputs.ByName("i2").Signals().FirstPayload().(int) @@ -79,6 +79,96 @@ func Test_PipingFromInput(t *testing.T) { assert.Len(t, l.Outputs().ByName("log").Signals(), 3) }, }, + + { + name: "observing component which waits for inputs", + setupFM: func() *fmesh.FMesh { + starter := component.NewComponent("starter"). + WithDescription("This component just starts the whole f-mesh"). + WithInputs("start"). + WithOutputsIndexed("o", 1, 2). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + //Activate downstream components + outputs.PutSignals(inputs.AllSignals()[0]) + return nil + }) + + incr1 := component.NewComponent("incr1"). + WithDescription("Increments the input"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + outputs.PutSignals(signal.New(1 + inputs.AllSignals().FirstPayload().(int))) + return nil + }) + + incr2 := component.NewComponent("incr2"). + WithDescription("Increments the input"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + outputs.PutSignals(signal.New(1 + inputs.AllSignals().FirstPayload().(int))) + return nil + }) + + doubler := component.NewComponent("doubler"). + WithDescription("Doubles the input"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + outputs.PutSignals(signal.New(2 * inputs.AllSignals().FirstPayload().(int))) + return nil + }) + + agg := component.NewComponent("result_aggregator"). + WithDescription("Adds 2 inputs (only when both are available)"). + WithInputsIndexed("i", 1, 2). + WithOutputs("result"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + if !inputs.ByNames("i1", "i2").AllHaveSignals() { + return component.NewErrWaitForInputs(true) + } + i1 := inputs.ByName("i1").Signals().FirstPayload().(int) + i2 := inputs.ByName("i2").Signals().FirstPayload().(int) + outputs.PutSignals(signal.New(i1 + i2)) + return nil + }) + + observer := component.NewComponent("obsrv"). + WithDescription("Observes inputs of result aggregator"). + WithInputsIndexed("i", 1, 2). + WithOutputs("log"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + outputs.ByName("log").PutSignals(inputs.AllSignals()...) + return nil + }) + + fm := fmesh.New("observer").WithComponents(starter, incr1, incr2, doubler, agg, observer) + + starter.Outputs().ByName("o1").PipeTo(incr1.Inputs().ByName("i1")) + starter.Outputs().ByName("o2").PipeTo(incr2.Inputs().ByName("i1")) + incr1.Outputs().ByName("o1").PipeTo(doubler.Inputs().ByName("i1")) + doubler.Outputs().ByName("o1").PipeTo(agg.Inputs().ByName("i1")) + incr2.Outputs().ByName("o1").PipeTo(agg.Inputs().ByName("i2")) + agg.Inputs().ByName("i1").PipeTo(observer.Inputs().ByName("i1")) + agg.Inputs().ByName("i2").PipeTo(observer.Inputs().ByName("i2")) + return fm + }, + setInputs: func(fm *fmesh.FMesh) { + fm.Components().ByName("starter").Inputs().PutSignals(signal.New(10)) + }, + assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { + assert.NoError(t, err) + + //Multiplier result + assert.Equal(t, 33, fm.Components().ByName("result_aggregator").Outputs().ByName("result").Signals().FirstPayload()) + + //Observed signals + assert.Len(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals(), 2) + assert.Contains(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals().AllPayloads(), 11) + assert.Contains(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals().AllPayloads(), 22) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/port/collection.go b/port/collection.go index 78f39ba..d45e638 100644 --- a/port/collection.go +++ b/port/collection.go @@ -105,6 +105,19 @@ func (collection Collection) Add(ports ...*Port) Collection { return collection } +// AddIndexed creates ports with names like "o1","o2","o3" and so on +func (collection Collection) AddIndexed(prefix string, startIndex int, endIndex int) Collection { + return collection.Add(NewIndexedGroup(prefix, startIndex, endIndex)...) +} + +func (collection Collection) AllSignals() signal.Group { + group := signal.NewGroup() + for _, p := range collection { + group = append(group, p.Signals().AsGroup()...) + } + return group +} + func (collection Collection) GetSignalKeys() []string { keys := make([]string, 0) for _, p := range collection { diff --git a/port/group.go b/port/group.go index bb7e6d0..ad0f6fc 100644 --- a/port/group.go +++ b/port/group.go @@ -1,5 +1,7 @@ package port +import "fmt" + // Group is just a slice of ports (useful to pass multiple ports as variadic argument) type Group []*Port @@ -12,6 +14,25 @@ func NewGroup(names ...string) Group { return group } +// NewIndexedGroup is useful when you want to create group of ports with same prefix +func NewIndexedGroup(prefix string, startIndex int, endIndex int) Group { + if prefix == "" { + return nil + } + + if startIndex > endIndex { + return nil + } + + group := make(Group, endIndex-startIndex+1) + + for i := startIndex; i <= endIndex; i++ { + group[i-startIndex] = New(fmt.Sprintf("%s%d", prefix, i)) + } + + return group +} + // Add adds ports to group func (group Group) Add(ports ...*Port) Group { for _, port := range ports { From 07770cc469b0e3aef24c085a8fe62fa40c2c1d66 Mon Sep 17 00:00:00 2001 From: hovsep Date: Fri, 20 Sep 2024 03:57:49 +0300 Subject: [PATCH 3/3] i2i piping refactored with simpler approach --- component/activation_result.go | 25 +++--- component/component.go | 4 +- component/component_test.go | 8 +- fmesh.go | 11 ++- fmesh_test.go | 18 +++- integration_tests/piping/fan_test.go | 2 +- .../piping/piping_from_inputs_test.go | 1 - port/collection.go | 33 +++++--- port/collection_test.go | 30 +++++-- port/group.go | 4 +- port/port.go | 28 +++++-- port/port_test.go | 83 +++++++++++++++---- signal/collection.go | 61 -------------- signal/group.go | 19 +++++ 14 files changed, 195 insertions(+), 132 deletions(-) delete mode 100644 signal/collection.go diff --git a/component/activation_result.go b/component/activation_result.go index 72dcd84..7db9f95 100644 --- a/component/activation_result.go +++ b/component/activation_result.go @@ -3,15 +3,16 @@ package component import ( "errors" "fmt" + "github.com/hovsep/fmesh/port" ) // ActivationResult defines the result (possibly an error) of the activation of given component in given cycle type ActivationResult struct { - componentName string - activated bool - inputKeys []string //@TODO: check if we can replace this by one int which will show the index of last signal used as input within signals collection (use signal position in the collection as it's unique id) - code ActivationResultCode - err error + componentName string + activated bool + inputsMetadata port.MetadataMap //Contains the info about length of input ports during the activation (required for correct i2i piping) + code ActivationResultCode + err error } // ActivationResultCode denotes a specific info about how a component been activated or why not activated at all @@ -93,13 +94,13 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult { return ar } -func (ar *ActivationResult) WithInputKeys(keys []string) *ActivationResult { - ar.inputKeys = keys +func (ar *ActivationResult) WithInputsMetadata(meta port.MetadataMap) *ActivationResult { + ar.inputsMetadata = meta return ar } -func (ar *ActivationResult) InputKeys() []string { - return ar.inputKeys +func (ar *ActivationResult) InputsMetadata() port.MetadataMap { + return ar.inputsMetadata } // newActivationResultOK builds a specific activation result @@ -107,7 +108,7 @@ func (c *Component) newActivationResultOK() *ActivationResult { return NewActivationResult(c.Name()). SetActivated(true). WithActivationCode(ActivationCodeOK). - WithInputKeys(c.Inputs().GetSignalKeys()) + WithInputsMetadata(c.Inputs().GetPortsMetadata()) } @@ -138,7 +139,7 @@ func (c *Component) newActivationCodeReturnedError(err error) *ActivationResult SetActivated(true). WithActivationCode(ActivationCodeReturnedError). WithError(fmt.Errorf("component returned an error: %w", err)). - WithInputKeys(c.Inputs().GetSignalKeys()) + WithInputsMetadata(c.Inputs().GetPortsMetadata()) } // newActivationCodePanicked builds a specific activation result @@ -147,7 +148,7 @@ func (c *Component) newActivationCodePanicked(err error) *ActivationResult { SetActivated(true). WithActivationCode(ActivationCodePanicked). WithError(err). - WithInputKeys(c.Inputs().GetSignalKeys()) + WithInputsMetadata(c.Inputs().GetPortsMetadata()) } // isWaitingForInput tells whether component is waiting for specific inputs diff --git a/component/component.go b/component/component.go index d66f4e1..5711bf8 100644 --- a/component/component.go +++ b/component/component.go @@ -136,8 +136,8 @@ func (c *Component) FlushInputs() { c.inputs.Flush(false) } -// FlushOutputs ... +// FlushAndClearOutputs flushes output ports and clears flushed ones // @TODO: hide this method from user -func (c *Component) FlushOutputs() { +func (c *Component) FlushAndClearOutputs() { c.outputs.Flush(true) } diff --git a/component/component_test.go b/component/component_test.go index b371d92..605d42c 100644 --- a/component/component_test.go +++ b/component/component_test.go @@ -99,7 +99,7 @@ func TestComponent_Description(t *testing.T) { } } -func TestComponent_FlushOutputs(t *testing.T) { +func TestComponent_FlushAndClearOutputs(t *testing.T) { sink := port.New("sink") componentWithAllOutputsSet := NewComponent("c1").WithOutputs("o1", "o2") @@ -145,7 +145,7 @@ func TestComponent_FlushOutputs(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.component.FlushOutputs() + tt.component.FlushAndClearOutputs() tt.assertions(t, tt.component, tt.destPort) }) } @@ -239,8 +239,8 @@ func TestComponent_WithActivationFunc(t *testing.T) { assert.Equal(t, err1, err2) //Compare signals without keys (because they are random) - assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals().AsGroup(), testOutputs2.ByName("out1").Signals().AsGroup()) - assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals().AsGroup(), testOutputs2.ByName("out2").Signals().AsGroup()) + assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals(), testOutputs2.ByName("out1").Signals()) + assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals(), testOutputs2.ByName("out2").Signals()) }) } diff --git a/fmesh.go b/fmesh.go index 766abcf..1f4d8b6 100644 --- a/fmesh.go +++ b/fmesh.go @@ -88,12 +88,17 @@ func (fm *FMesh) drainComponentsAfterCycle(cycle *cycle.Cycle) { continue } - c.FlushInputs() - c.FlushOutputs() + c.FlushAndClearOutputs() // Just flush and clear + c.FlushInputs() // Inputs are a bit trickier + //Check if a component wait for inputs and wants to keep existing input keepInputs := c.WantsToKeepInputs(activationResult) + if !keepInputs { - c.Inputs().RemoveSignalsByKeys(activationResult.InputKeys()) + // Inputs can not be just cleared, instead we remove signals which + // have been used (been set on inputs) during the last activation cycle + // thus not affecting ones the component could have been received from i2i pipes + c.Inputs().DisposeProcessedSignals(activationResult.InputsMetadata()) } } diff --git a/fmesh_test.go b/fmesh_test.go index 9fcee29..957a641 100644 --- a/fmesh_test.go +++ b/fmesh_test.go @@ -715,15 +715,27 @@ func TestFMesh_runCycle(t *testing.T) { component.NewActivationResult("c1"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithInputKeys([]string{"1"}), + WithInputsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + }), component.NewActivationResult("c2"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithInputKeys([]string{"1"}), + WithInputsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + }), component.NewActivationResult("c3"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithInputKeys([]string{"1"}), + WithInputsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + }), ), }, } diff --git a/integration_tests/piping/fan_test.go b/integration_tests/piping/fan_test.go index dd9cd0f..73caf33 100644 --- a/integration_tests/piping/fan_test.go +++ b/integration_tests/piping/fan_test.go @@ -135,7 +135,7 @@ func Test_Fan(t *testing.T) { assert.True(t, fm.Components().ByName("consumer").Outputs().ByName("o1").HasSignals()) //The signal is combined and consist of 3 payloads - resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals().AsGroup() + resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals() assert.Len(t, resultSignals, 3) //And they are all different diff --git a/integration_tests/piping/piping_from_inputs_test.go b/integration_tests/piping/piping_from_inputs_test.go index f88a348..e98bf41 100644 --- a/integration_tests/piping/piping_from_inputs_test.go +++ b/integration_tests/piping/piping_from_inputs_test.go @@ -79,7 +79,6 @@ func Test_PipingFromInput(t *testing.T) { assert.Len(t, l.Outputs().ByName("log").Signals(), 3) }, }, - { name: "observing component which waits for inputs", setupFM: func() *fmesh.FMesh { diff --git a/port/collection.go b/port/collection.go index d45e638..b44ab3a 100644 --- a/port/collection.go +++ b/port/collection.go @@ -7,6 +7,13 @@ import ( // Collection is a port collection with useful methods type Collection map[string]*Port +// Metadata contains metadata about the port +type Metadata struct { + SignalBufferLen int +} + +type MetadataMap map[string]*Metadata + // NewCollection creates empty collection func NewCollection() Collection { return make(Collection) @@ -79,13 +86,6 @@ func (collection Collection) Flush(clearFlushed bool) { } } -func (collection Collection) RemoveSignalsByKeys(signalKeys []string) Collection { - for _, p := range collection { - p.Signals().DeleteKeys(signalKeys) - } - return collection -} - // PipeTo creates pipes from each port in collection func (collection Collection) PipeTo(toPorts ...*Port) { for _, p := range collection { @@ -113,15 +113,24 @@ func (collection Collection) AddIndexed(prefix string, startIndex int, endIndex func (collection Collection) AllSignals() signal.Group { group := signal.NewGroup() for _, p := range collection { - group = append(group, p.Signals().AsGroup()...) + group = append(group, p.Signals()...) } return group } -func (collection Collection) GetSignalKeys() []string { - keys := make([]string, 0) +// GetPortsMetadata returns info about current length of each port in collection +func (collection Collection) GetPortsMetadata() MetadataMap { + res := make(MetadataMap) for _, p := range collection { - keys = append(keys, p.Signals().GetKeys()...) + res[p.Name()] = &Metadata{ + SignalBufferLen: len(p.Signals()), + } + } + return res +} + +func (collection Collection) DisposeProcessedSignals(portsMetadata MetadataMap) { + for pName, meta := range portsMetadata { + collection.ByName(pName).DisposeFirstNSignals(meta.SignalBufferLen) } - return keys } diff --git a/port/collection_test.go b/port/collection_test.go index 2ffa412..de1fc35 100644 --- a/port/collection_test.go +++ b/port/collection_test.go @@ -85,7 +85,7 @@ func TestCollection_ByName(t *testing.T) { args: args{ name: "p1", }, - want: &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}}, + want: &Port{name: "p1", pipes: Group{}, signals: signal.Group{}}, }, { name: "port with signals found", @@ -95,7 +95,7 @@ func TestCollection_ByName(t *testing.T) { }, want: &Port{ name: "p2", - signals: signal.NewCollection().AddPayload(12), + signals: signal.NewGroup().With(signal.New(12)), pipes: Group{}, }, }, @@ -140,7 +140,7 @@ func TestCollection_ByNames(t *testing.T) { "p1": &Port{ name: "p1", pipes: Group{}, - signals: signal.NewCollection(), + signals: signal.Group{}, }, }, }, @@ -151,8 +151,16 @@ func TestCollection_ByNames(t *testing.T) { names: []string{"p1", "p2"}, }, want: Collection{ - "p1": &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}}, - "p2": &Port{name: "p2", pipes: Group{}, signals: signal.Collection{}}, + "p1": &Port{ + name: "p1", + pipes: Group{}, + signals: signal.Group{}, + }, + "p2": &Port{ + name: "p2", + pipes: Group{}, + signals: signal.Group{}, + }, }, }, { @@ -170,8 +178,16 @@ func TestCollection_ByNames(t *testing.T) { names: []string{"p1", "p2", "p3"}, }, want: Collection{ - "p1": &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}}, - "p2": &Port{name: "p2", pipes: Group{}, signals: signal.Collection{}}, + "p1": &Port{ + name: "p1", + pipes: Group{}, + signals: signal.Group{}, + }, + "p2": &Port{ + name: "p2", + pipes: Group{}, + signals: signal.Group{}, + }, }, }, } diff --git a/port/group.go b/port/group.go index ad0f6fc..fd598db 100644 --- a/port/group.go +++ b/port/group.go @@ -33,8 +33,8 @@ func NewIndexedGroup(prefix string, startIndex int, endIndex int) Group { return group } -// Add adds ports to group -func (group Group) Add(ports ...*Port) Group { +// With adds ports to group +func (group Group) With(ports ...*Port) Group { for _, port := range ports { if port == nil { continue diff --git a/port/port.go b/port/port.go index af32d53..78838a5 100644 --- a/port/port.go +++ b/port/port.go @@ -7,8 +7,8 @@ import ( // Port defines a connectivity point of a component type Port struct { name string - signals signal.Collection //Current signals set on the port - pipes Group //Refs to all outbound pipes connected to this port + signals signal.Group //Signal buffer + pipes Group //Outbound pipes } // New creates a new port @@ -16,7 +16,7 @@ func New(name string) *Port { return &Port{ name: name, pipes: NewGroup(), - signals: signal.NewCollection(), + signals: signal.NewGroup(), } } @@ -26,14 +26,18 @@ func (p *Port) Name() string { } // Signals getter -func (p *Port) Signals() signal.Collection { +func (p *Port) Signals() signal.Group { return p.signals } +func (p *Port) setSignals(signals signal.Group) { + p.signals = signals +} + // PutSignals adds signals // @TODO: rename func (p *Port) PutSignals(signals ...*signal.Signal) { - p.Signals().Add(signals...) + p.setSignals(p.Signals().With(signals...)) } // WithSignals adds signals and returns the port @@ -44,7 +48,15 @@ func (p *Port) WithSignals(signals ...*signal.Signal) *Port { // ClearSignals removes all signals from the port func (p *Port) ClearSignals() { - p.signals = signal.NewCollection() + p.setSignals(signal.NewGroup()) +} + +func (p *Port) DisposeFirstNSignals(n int) { + if n > len(p.Signals()) { + p.ClearSignals() + return + } + p.setSignals(p.Signals()[n:]) } // HasSignals says whether port signals is set or not @@ -64,7 +76,7 @@ func (p *Port) PipeTo(toPorts ...*Port) { if toPort == nil { continue } - p.pipes = p.pipes.Add(toPort) + p.pipes = p.pipes.With(toPort) } } @@ -87,5 +99,5 @@ func (p *Port) Flush(clearFlushed bool) bool { // ForwardSignals puts signals from source port to destination port, without clearing the source port func ForwardSignals(source *Port, dest *Port) { - dest.PutSignals(source.Signals().AsGroup()...) + dest.PutSignals(source.Signals()...) } diff --git a/port/port_test.go b/port/port_test.go index 5d65b7c..df53b55 100644 --- a/port/port_test.go +++ b/port/port_test.go @@ -38,22 +38,22 @@ func TestPort_Signals(t *testing.T) { tests := []struct { name string port *Port - want signal.Collection + want signal.Group }{ { name: "no signals", port: New("noSignal"), - want: signal.Collection{}, + want: signal.Group{}, }, { name: "with signal", port: portWithSignal, - want: signal.NewCollection().AddPayload(123), + want: signal.NewGroup(123), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.want.AsGroup(), tt.port.Signals().AsGroup()) + assert.Equal(t, tt.want, tt.port.Signals()) }) } } @@ -69,12 +69,12 @@ func TestPort_ClearSignal(t *testing.T) { { name: "happy path", before: portWithSignal, - after: &Port{name: "portWithSignal", pipes: Group{}, signals: signal.Collection{}}, + after: &Port{name: "portWithSignal", pipes: Group{}, signals: signal.Group{}}, }, { name: "cleaning empty port", before: New("emptyPort"), - after: &Port{name: "emptyPort", pipes: Group{}, signals: signal.Collection{}}, + after: &Port{name: "emptyPort", pipes: Group{}, signals: signal.Group{}}, }, } for _, tt := range tests { @@ -103,7 +103,7 @@ func TestPort_PipeTo(t *testing.T) { after: &Port{ name: "p1", pipes: Group{p2, p3}, - signals: signal.Collection{}, + signals: signal.Group{}, }, args: args{ toPorts: []*Port{p2, p3}, @@ -115,7 +115,7 @@ func TestPort_PipeTo(t *testing.T) { after: &Port{ name: "p4", pipes: Group{p2}, - signals: signal.Collection{}, + signals: signal.Group{}, }, args: args{ toPorts: []*Port{p2, nil}, @@ -151,7 +151,7 @@ func TestPort_PutSignals(t *testing.T) { before: New("emptyPort"), after: &Port{ name: "emptyPort", - signals: signal.NewCollection().AddPayload(11), + signals: signal.NewGroup(11), pipes: Group{}, }, args: args{ @@ -163,7 +163,7 @@ func TestPort_PutSignals(t *testing.T) { before: New("p"), after: &Port{ name: "p", - signals: signal.NewCollection().AddPayload(11, 12), + signals: signal.NewGroup(11, 12), pipes: Group{}, }, args: args{ @@ -175,7 +175,7 @@ func TestPort_PutSignals(t *testing.T) { before: portWithSingleSignal, after: &Port{ name: "portWithSingleSignal", - signals: signal.NewCollection().AddPayload(11, 12), + signals: signal.NewGroup(11, 12), pipes: Group{}, }, args: args{ @@ -187,7 +187,7 @@ func TestPort_PutSignals(t *testing.T) { before: portWithMultipleSignals, after: &Port{ name: "portWithMultipleSignals", - signals: signal.NewCollection().AddPayload(11, 12, 13), + signals: signal.NewGroup(11, 12, 13), pipes: Group{}, }, args: args{ @@ -199,7 +199,7 @@ func TestPort_PutSignals(t *testing.T) { before: portWithMultipleSignals2, after: &Port{ name: "portWithMultipleSignals2", - signals: signal.NewCollection().AddPayload(55, 66, 13, 14), //Notice LIFO order + signals: signal.NewGroup(55, 66, 13, 14), //Notice LIFO order pipes: Group{}, }, args: args{ @@ -210,7 +210,7 @@ func TestPort_PutSignals(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.before.PutSignals(tt.args.signals...) - assert.ElementsMatch(t, tt.after.Signals().AsGroup(), tt.before.Signals().AsGroup()) + assert.ElementsMatch(t, tt.after.Signals(), tt.before.Signals()) }) } } @@ -251,7 +251,7 @@ func TestNewPort(t *testing.T) { want: &Port{ name: "", pipes: Group{}, - signals: signal.Collection{}, + signals: signal.Group{}, }, }, { @@ -262,7 +262,7 @@ func TestNewPort(t *testing.T) { want: &Port{ name: "p1", pipes: Group{}, - signals: signal.Collection{}, + signals: signal.Group{}, }, }, } @@ -388,3 +388,54 @@ func TestPort_Flush(t *testing.T) { }) } } + +func TestPort_DisposeFirstNSignals(t *testing.T) { + type args struct { + n int + } + tests := []struct { + name string + port *Port + wantSignals signal.Group + args args + }{ + { + name: "empty port", + port: New("empty"), + wantSignals: signal.Group{}, + args: args{ + n: 0, + }, + }, + { + name: "with signals", + port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), + wantSignals: signal.NewGroup(44, 55, 66), + args: args{ + n: 3, + }, + }, + { + name: "n > len(signals) cleans the signal buffer", + port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), + wantSignals: signal.NewGroup(), + args: args{ + n: 10, + }, + }, + { + name: "n = 0 has no effect", + port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), + wantSignals: signal.NewGroup(11, 22, 33, 44, 55, 66), + args: args{ + n: 0, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.port.DisposeFirstNSignals(tt.args.n) + assert.Equal(t, tt.wantSignals, tt.port.Signals()) + }) + } +} diff --git a/signal/collection.go b/signal/collection.go deleted file mode 100644 index 22995d0..0000000 --- a/signal/collection.go +++ /dev/null @@ -1,61 +0,0 @@ -package signal - -import ( - "fmt" -) - -type Collection map[string]*Signal - -func NewCollection() Collection { - return make(Collection) -} - -func (collection Collection) Add(signals ...*Signal) Collection { - for _, sig := range signals { - signalKey := collection.newKey(sig) - collection[signalKey] = sig - } - return collection -} - -func (collection Collection) AddPayload(payloads ...any) Collection { - for _, p := range payloads { - collection.Add(New(p)) - } - return collection -} - -func (collection Collection) newKey(signal *Signal) string { - return fmt.Sprintf("%d", len(collection)+1) -} - -func (collection Collection) AsGroup() Group { - group := NewGroup() - for _, sig := range collection { - group = append(group, sig) - } - return group -} - -func (collection Collection) FirstPayload() any { - return collection.AsGroup().FirstPayload() -} - -func (collection Collection) AllPayloads() []any { - return collection.AsGroup().AllPayloads() -} - -func (collection Collection) GetKeys() []string { - keys := make([]string, 0) - for k, _ := range collection { - keys = append(keys, k) - } - return keys -} - -func (collection Collection) DeleteKeys(keys []string) Collection { - for _, key := range keys { - delete(collection, key) - } - return collection -} diff --git a/signal/group.go b/signal/group.go index 230767c..90c9783 100644 --- a/signal/group.go +++ b/signal/group.go @@ -28,3 +28,22 @@ func (group Group) AllPayloads() []any { } return all } + +// With adds signals to group +func (group Group) With(signals ...*Signal) Group { + for _, sig := range signals { + if sig == nil { + continue + } + group = append(group, sig) + } + + return group +} + +func (group Group) WithPayloads(payloads ...any) Group { + for _, p := range payloads { + group = append(group, New(p)) + } + return group +}