diff --git a/component/activation_result.go b/component/activation_result.go index a801c3d..7db9f95 100644 --- a/component/activation_result.go +++ b/component/activation_result.go @@ -3,15 +3,16 @@ package component import ( "errors" "fmt" + "github.com/hovsep/fmesh/port" ) // ActivationResult defines the result (possibly an error) of the activation of given component in given cycle type ActivationResult struct { - componentName string - activated bool - inputKeys []string //@TODO: check if we can replace this by one int which will show the index of last signal used as input within signals collection (use signal position in the collection as it's unique id) - code ActivationResultCode - err error + componentName string + activated bool + inputsMetadata port.MetadataMap //Contains the info about length of input ports during the activation (required for correct i2i piping) + code ActivationResultCode + err error } // ActivationResultCode denotes a specific info about how a component been activated or why not activated at all @@ -38,6 +39,7 @@ const ( ) // NewActivationResult creates a new activation result for given component +// @TODO Hide this from user func NewActivationResult(componentName string) *ActivationResult { return &ActivationResult{ componentName: componentName, @@ -92,13 +94,13 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult { return ar } -func (ar *ActivationResult) WithInputKeys(keys []string) *ActivationResult { - ar.inputKeys = keys +func (ar *ActivationResult) WithInputsMetadata(meta port.MetadataMap) *ActivationResult { + ar.inputsMetadata = meta return ar } -func (ar *ActivationResult) InputKeys() []string { - return ar.inputKeys +func (ar *ActivationResult) InputsMetadata() port.MetadataMap { + return ar.inputsMetadata } // newActivationResultOK builds a specific activation result @@ -106,7 +108,7 @@ func (c *Component) newActivationResultOK() *ActivationResult { return NewActivationResult(c.Name()). SetActivated(true). WithActivationCode(ActivationCodeOK). - WithInputKeys(c.Inputs().GetSignalKeys()) + WithInputsMetadata(c.Inputs().GetPortsMetadata()) } @@ -137,7 +139,7 @@ func (c *Component) newActivationCodeReturnedError(err error) *ActivationResult SetActivated(true). WithActivationCode(ActivationCodeReturnedError). WithError(fmt.Errorf("component returned an error: %w", err)). - WithInputKeys(c.Inputs().GetSignalKeys()) + WithInputsMetadata(c.Inputs().GetPortsMetadata()) } // newActivationCodePanicked builds a specific activation result @@ -146,7 +148,7 @@ func (c *Component) newActivationCodePanicked(err error) *ActivationResult { SetActivated(true). WithActivationCode(ActivationCodePanicked). WithError(err). - WithInputKeys(c.Inputs().GetSignalKeys()) + WithInputsMetadata(c.Inputs().GetPortsMetadata()) } // isWaitingForInput tells whether component is waiting for specific inputs diff --git a/component/component.go b/component/component.go index f20bdd2..5711bf8 100644 --- a/component/component.go +++ b/component/component.go @@ -34,13 +34,25 @@ func (c *Component) WithDescription(description string) *Component { // WithInputs ads input ports func (c *Component) WithInputs(portNames ...string) *Component { - c.inputs = c.inputs.Add(port.NewGroup(portNames...)...) + c.inputs = c.Inputs().Add(port.NewGroup(portNames...)...) return c } // WithOutputs adds output ports func (c *Component) WithOutputs(portNames ...string) *Component { - c.outputs = c.outputs.Add(port.NewGroup(portNames...)...) + c.outputs = c.Outputs().Add(port.NewGroup(portNames...)...) + return c +} + +// WithInputsIndexed creates multiple prefixed ports +func (c *Component) WithInputsIndexed(prefix string, startIndex int, endIndex int) *Component { + c.inputs = c.Inputs().AddIndexed(prefix, startIndex, endIndex) + return c +} + +// WithOutputsIndexed creates multiple prefixed ports +func (c *Component) WithOutputsIndexed(prefix string, startIndex int, endIndex int) *Component { + c.outputs = c.Outputs().AddIndexed(prefix, startIndex, endIndex) return c } @@ -124,8 +136,8 @@ func (c *Component) FlushInputs() { c.inputs.Flush(false) } -// FlushOutputs ... +// FlushAndClearOutputs flushes output ports and clears flushed ones // @TODO: hide this method from user -func (c *Component) FlushOutputs() { +func (c *Component) FlushAndClearOutputs() { c.outputs.Flush(true) } diff --git a/component/component_test.go b/component/component_test.go index b371d92..605d42c 100644 --- a/component/component_test.go +++ b/component/component_test.go @@ -99,7 +99,7 @@ func TestComponent_Description(t *testing.T) { } } -func TestComponent_FlushOutputs(t *testing.T) { +func TestComponent_FlushAndClearOutputs(t *testing.T) { sink := port.New("sink") componentWithAllOutputsSet := NewComponent("c1").WithOutputs("o1", "o2") @@ -145,7 +145,7 @@ func TestComponent_FlushOutputs(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.component.FlushOutputs() + tt.component.FlushAndClearOutputs() tt.assertions(t, tt.component, tt.destPort) }) } @@ -239,8 +239,8 @@ func TestComponent_WithActivationFunc(t *testing.T) { assert.Equal(t, err1, err2) //Compare signals without keys (because they are random) - assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals().AsGroup(), testOutputs2.ByName("out1").Signals().AsGroup()) - assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals().AsGroup(), testOutputs2.ByName("out2").Signals().AsGroup()) + assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals(), testOutputs2.ByName("out1").Signals()) + assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals(), testOutputs2.ByName("out2").Signals()) }) } diff --git a/fmesh.go b/fmesh.go index 766abcf..1f4d8b6 100644 --- a/fmesh.go +++ b/fmesh.go @@ -88,12 +88,17 @@ func (fm *FMesh) drainComponentsAfterCycle(cycle *cycle.Cycle) { continue } - c.FlushInputs() - c.FlushOutputs() + c.FlushAndClearOutputs() // Just flush and clear + c.FlushInputs() // Inputs are a bit trickier + //Check if a component wait for inputs and wants to keep existing input keepInputs := c.WantsToKeepInputs(activationResult) + if !keepInputs { - c.Inputs().RemoveSignalsByKeys(activationResult.InputKeys()) + // Inputs can not be just cleared, instead we remove signals which + // have been used (been set on inputs) during the last activation cycle + // thus not affecting ones the component could have been received from i2i pipes + c.Inputs().DisposeProcessedSignals(activationResult.InputsMetadata()) } } diff --git a/fmesh_test.go b/fmesh_test.go index 9fcee29..957a641 100644 --- a/fmesh_test.go +++ b/fmesh_test.go @@ -715,15 +715,27 @@ func TestFMesh_runCycle(t *testing.T) { component.NewActivationResult("c1"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithInputKeys([]string{"1"}), + WithInputsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + }), component.NewActivationResult("c2"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithInputKeys([]string{"1"}), + WithInputsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + }), component.NewActivationResult("c3"). SetActivated(true). WithActivationCode(component.ActivationCodeOK). - WithInputKeys([]string{"1"}), + WithInputsMetadata(port.MetadataMap{ + "i1": &port.Metadata{ + SignalBufferLen: 1, + }, + }), ), }, } diff --git a/integration_tests/piping/fan_test.go b/integration_tests/piping/fan_test.go index dd9cd0f..73caf33 100644 --- a/integration_tests/piping/fan_test.go +++ b/integration_tests/piping/fan_test.go @@ -135,7 +135,7 @@ func Test_Fan(t *testing.T) { assert.True(t, fm.Components().ByName("consumer").Outputs().ByName("o1").HasSignals()) //The signal is combined and consist of 3 payloads - resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals().AsGroup() + resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals() assert.Len(t, resultSignals, 3) //And they are all different diff --git a/integration_tests/piping/piping_from_inputs_test.go b/integration_tests/piping/piping_from_inputs_test.go index 7f90e40..e98bf41 100644 --- a/integration_tests/piping/piping_from_inputs_test.go +++ b/integration_tests/piping/piping_from_inputs_test.go @@ -23,7 +23,7 @@ func Test_PipingFromInput(t *testing.T) { setupFM: func() *fmesh.FMesh { adder := component.NewComponent("adder"). WithDescription("adds i1 and i2"). - WithInputs("i1", "i2"). + WithInputsIndexed("i", 1, 2). WithOutputs("out"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { i1, i2 := inputs.ByName("i1").Signals().FirstPayload().(int), inputs.ByName("i2").Signals().FirstPayload().(int) @@ -47,7 +47,6 @@ func Test_PipingFromInput(t *testing.T) { WithOutputs("log"). WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { for _, sig := range inputs.ByName("in").Signals() { - fmt.Println(fmt.Sprintf("LOGGED SIGNAL: %v", sig.Payload())) outputs.ByName("log").PutSignals(signal.New(fmt.Sprintf("LOGGED SIGNAL: %v", sig.Payload()))) } return nil @@ -80,6 +79,95 @@ func Test_PipingFromInput(t *testing.T) { assert.Len(t, l.Outputs().ByName("log").Signals(), 3) }, }, + { + name: "observing component which waits for inputs", + setupFM: func() *fmesh.FMesh { + starter := component.NewComponent("starter"). + WithDescription("This component just starts the whole f-mesh"). + WithInputs("start"). + WithOutputsIndexed("o", 1, 2). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + //Activate downstream components + outputs.PutSignals(inputs.AllSignals()[0]) + return nil + }) + + incr1 := component.NewComponent("incr1"). + WithDescription("Increments the input"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + outputs.PutSignals(signal.New(1 + inputs.AllSignals().FirstPayload().(int))) + return nil + }) + + incr2 := component.NewComponent("incr2"). + WithDescription("Increments the input"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + outputs.PutSignals(signal.New(1 + inputs.AllSignals().FirstPayload().(int))) + return nil + }) + + doubler := component.NewComponent("doubler"). + WithDescription("Doubles the input"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + outputs.PutSignals(signal.New(2 * inputs.AllSignals().FirstPayload().(int))) + return nil + }) + + agg := component.NewComponent("result_aggregator"). + WithDescription("Adds 2 inputs (only when both are available)"). + WithInputsIndexed("i", 1, 2). + WithOutputs("result"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + if !inputs.ByNames("i1", "i2").AllHaveSignals() { + return component.NewErrWaitForInputs(true) + } + i1 := inputs.ByName("i1").Signals().FirstPayload().(int) + i2 := inputs.ByName("i2").Signals().FirstPayload().(int) + outputs.PutSignals(signal.New(i1 + i2)) + return nil + }) + + observer := component.NewComponent("obsrv"). + WithDescription("Observes inputs of result aggregator"). + WithInputsIndexed("i", 1, 2). + WithOutputs("log"). + WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error { + outputs.ByName("log").PutSignals(inputs.AllSignals()...) + return nil + }) + + fm := fmesh.New("observer").WithComponents(starter, incr1, incr2, doubler, agg, observer) + + starter.Outputs().ByName("o1").PipeTo(incr1.Inputs().ByName("i1")) + starter.Outputs().ByName("o2").PipeTo(incr2.Inputs().ByName("i1")) + incr1.Outputs().ByName("o1").PipeTo(doubler.Inputs().ByName("i1")) + doubler.Outputs().ByName("o1").PipeTo(agg.Inputs().ByName("i1")) + incr2.Outputs().ByName("o1").PipeTo(agg.Inputs().ByName("i2")) + agg.Inputs().ByName("i1").PipeTo(observer.Inputs().ByName("i1")) + agg.Inputs().ByName("i2").PipeTo(observer.Inputs().ByName("i2")) + return fm + }, + setInputs: func(fm *fmesh.FMesh) { + fm.Components().ByName("starter").Inputs().PutSignals(signal.New(10)) + }, + assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) { + assert.NoError(t, err) + + //Multiplier result + assert.Equal(t, 33, fm.Components().ByName("result_aggregator").Outputs().ByName("result").Signals().FirstPayload()) + + //Observed signals + assert.Len(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals(), 2) + assert.Contains(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals().AllPayloads(), 11) + assert.Contains(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals().AllPayloads(), 22) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/port/collection.go b/port/collection.go index 3379da7..b44ab3a 100644 --- a/port/collection.go +++ b/port/collection.go @@ -7,6 +7,13 @@ import ( // Collection is a port collection with useful methods type Collection map[string]*Port +// Metadata contains metadata about the port +type Metadata struct { + SignalBufferLen int +} + +type MetadataMap map[string]*Metadata + // NewCollection creates empty collection func NewCollection() Collection { return make(Collection) @@ -52,13 +59,19 @@ func (collection Collection) AllHaveSignals() bool { return true } -// PutSignals puts a signals to all the port in collection +// PutSignals adds signals to every port in collection func (collection Collection) PutSignals(signals ...*signal.Signal) { for _, p := range collection { p.PutSignals(signals...) } } +// WithSignals adds signals to every port in collection and returns the collection +func (collection Collection) WithSignals(signals ...*signal.Signal) Collection { + collection.PutSignals(signals...) + return collection +} + // ClearSignals removes signals from all ports in collection func (collection Collection) ClearSignals() { for _, p := range collection { @@ -69,17 +82,8 @@ func (collection Collection) ClearSignals() { // Flush flushes all ports in collection func (collection Collection) Flush(clearFlushed bool) { for _, p := range collection { - if portFlushed := p.Flush(); clearFlushed && portFlushed { - p.ClearSignals() - } - } -} - -func (collection Collection) RemoveSignalsByKeys(signalKeys []string) Collection { - for _, p := range collection { - p.Signals().DeleteKeys(signalKeys) + p.Flush(clearFlushed) } - return collection } // PipeTo creates pipes from each port in collection @@ -101,10 +105,32 @@ func (collection Collection) Add(ports ...*Port) Collection { return collection } -func (collection Collection) GetSignalKeys() []string { - keys := make([]string, 0) +// AddIndexed creates ports with names like "o1","o2","o3" and so on +func (collection Collection) AddIndexed(prefix string, startIndex int, endIndex int) Collection { + return collection.Add(NewIndexedGroup(prefix, startIndex, endIndex)...) +} + +func (collection Collection) AllSignals() signal.Group { + group := signal.NewGroup() for _, p := range collection { - keys = append(keys, p.Signals().GetKeys()...) + group = append(group, p.Signals()...) + } + return group +} + +// GetPortsMetadata returns info about current length of each port in collection +func (collection Collection) GetPortsMetadata() MetadataMap { + res := make(MetadataMap) + for _, p := range collection { + res[p.Name()] = &Metadata{ + SignalBufferLen: len(p.Signals()), + } + } + return res +} + +func (collection Collection) DisposeProcessedSignals(portsMetadata MetadataMap) { + for pName, meta := range portsMetadata { + collection.ByName(pName).DisposeFirstNSignals(meta.SignalBufferLen) } - return keys } diff --git a/port/collection_test.go b/port/collection_test.go index fc98770..de1fc35 100644 --- a/port/collection_test.go +++ b/port/collection_test.go @@ -7,12 +7,10 @@ import ( ) func TestCollection_AllHaveSignal(t *testing.T) { - oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...) - oneEmptyPorts.PutSignals(signal.New(123)) + oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.New(123)) oneEmptyPorts.ByName("p2").ClearSignals() - allWithSignalPorts := NewCollection().Add(NewGroup("out1", "out2", "out3")...) - allWithSignalPorts.PutSignals(signal.New(77)) + allWithSignalPorts := NewCollection().Add(NewGroup("out1", "out2", "out3")...).WithSignals(signal.New(77)) tests := []struct { name string @@ -43,8 +41,7 @@ func TestCollection_AllHaveSignal(t *testing.T) { } func TestCollection_AnyHasSignal(t *testing.T) { - oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...) - oneEmptyPorts.PutSignals(signal.New(123)) + oneEmptyPorts := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.New(123)) oneEmptyPorts.ByName("p2").ClearSignals() tests := []struct { @@ -71,8 +68,7 @@ func TestCollection_AnyHasSignal(t *testing.T) { } func TestCollection_ByName(t *testing.T) { - portsWithSignals := NewCollection().Add(NewGroup("p1", "p2")...) - portsWithSignals.PutSignals(signal.New(12)) + portsWithSignals := NewCollection().Add(NewGroup("p1", "p2")...).WithSignals(signal.New(12)) type args struct { name string @@ -89,7 +85,7 @@ func TestCollection_ByName(t *testing.T) { args: args{ name: "p1", }, - want: &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}}, + want: &Port{name: "p1", pipes: Group{}, signals: signal.Group{}}, }, { name: "port with signals found", @@ -99,7 +95,7 @@ func TestCollection_ByName(t *testing.T) { }, want: &Port{ name: "p2", - signals: signal.NewCollection().AddPayload(12), + signals: signal.NewGroup().With(signal.New(12)), pipes: Group{}, }, }, @@ -144,7 +140,7 @@ func TestCollection_ByNames(t *testing.T) { "p1": &Port{ name: "p1", pipes: Group{}, - signals: signal.NewCollection(), + signals: signal.Group{}, }, }, }, @@ -155,8 +151,16 @@ func TestCollection_ByNames(t *testing.T) { names: []string{"p1", "p2"}, }, want: Collection{ - "p1": &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}}, - "p2": &Port{name: "p2", pipes: Group{}, signals: signal.Collection{}}, + "p1": &Port{ + name: "p1", + pipes: Group{}, + signals: signal.Group{}, + }, + "p2": &Port{ + name: "p2", + pipes: Group{}, + signals: signal.Group{}, + }, }, }, { @@ -174,8 +178,16 @@ func TestCollection_ByNames(t *testing.T) { names: []string{"p1", "p2", "p3"}, }, want: Collection{ - "p1": &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}}, - "p2": &Port{name: "p2", pipes: Group{}, signals: signal.Collection{}}, + "p1": &Port{ + name: "p1", + pipes: Group{}, + signals: signal.Group{}, + }, + "p2": &Port{ + name: "p2", + pipes: Group{}, + signals: signal.Group{}, + }, }, }, } @@ -188,8 +200,7 @@ func TestCollection_ByNames(t *testing.T) { func TestCollection_ClearSignal(t *testing.T) { t.Run("happy path", func(t *testing.T) { - ports := NewCollection().Add(NewGroup("p1", "p2", "p3")...) - ports.PutSignals(signal.NewGroup(1, 2, 3)...) + ports := NewCollection().Add(NewGroup("p1", "p2", "p3")...).WithSignals(signal.NewGroup(1, 2, 3)...) assert.True(t, ports.AllHaveSignals()) ports.ClearSignals() assert.False(t, ports.AnyHasSignals()) diff --git a/port/group.go b/port/group.go index bb7e6d0..fd598db 100644 --- a/port/group.go +++ b/port/group.go @@ -1,5 +1,7 @@ package port +import "fmt" + // Group is just a slice of ports (useful to pass multiple ports as variadic argument) type Group []*Port @@ -12,8 +14,27 @@ func NewGroup(names ...string) Group { return group } -// Add adds ports to group -func (group Group) Add(ports ...*Port) Group { +// NewIndexedGroup is useful when you want to create group of ports with same prefix +func NewIndexedGroup(prefix string, startIndex int, endIndex int) Group { + if prefix == "" { + return nil + } + + if startIndex > endIndex { + return nil + } + + group := make(Group, endIndex-startIndex+1) + + for i := startIndex; i <= endIndex; i++ { + group[i-startIndex] = New(fmt.Sprintf("%s%d", prefix, i)) + } + + return group +} + +// With adds ports to group +func (group Group) With(ports ...*Port) Group { for _, port := range ports { if port == nil { continue diff --git a/port/port.go b/port/port.go index 5d7a979..78838a5 100644 --- a/port/port.go +++ b/port/port.go @@ -7,8 +7,8 @@ import ( // Port defines a connectivity point of a component type Port struct { name string - signals signal.Collection //Current signals set on the port - pipes Group //Refs to all outbound pipes connected to this port + signals signal.Group //Signal buffer + pipes Group //Outbound pipes } // New creates a new port @@ -16,7 +16,7 @@ func New(name string) *Port { return &Port{ name: name, pipes: NewGroup(), - signals: signal.NewCollection(), + signals: signal.NewGroup(), } } @@ -26,19 +26,37 @@ func (p *Port) Name() string { } // Signals getter -func (p *Port) Signals() signal.Collection { +func (p *Port) Signals() signal.Group { return p.signals } -// PutSignals adds a signals to current signals +func (p *Port) setSignals(signals signal.Group) { + p.signals = signals +} + +// PutSignals adds signals // @TODO: rename func (p *Port) PutSignals(signals ...*signal.Signal) { - p.Signals().Add(signals...) + p.setSignals(p.Signals().With(signals...)) +} + +// WithSignals adds signals and returns the port +func (p *Port) WithSignals(signals ...*signal.Signal) *Port { + p.PutSignals(signals...) + return p } // ClearSignals removes all signals from the port func (p *Port) ClearSignals() { - p.signals = signal.NewCollection() + p.setSignals(signal.NewGroup()) +} + +func (p *Port) DisposeFirstNSignals(n int) { + if n > len(p.Signals()) { + p.ClearSignals() + return + } + p.setSignals(p.Signals()[n:]) } // HasSignals says whether port signals is set or not @@ -58,13 +76,13 @@ func (p *Port) PipeTo(toPorts ...*Port) { if toPort == nil { continue } - p.pipes = p.pipes.Add(toPort) + p.pipes = p.pipes.With(toPort) } } -// Flush pushes current signals to pipes and returns true when flushed +// Flush pushes signals to pipes, clears the port if needed and returns true when flushed // @TODO: hide this method from user -func (p *Port) Flush() bool { +func (p *Port) Flush(clearFlushed bool) bool { if !p.HasSignals() || !p.HasPipes() { return false } @@ -73,10 +91,13 @@ func (p *Port) Flush() bool { //Fan-Out ForwardSignals(p, outboundPort) } + if clearFlushed { + p.ClearSignals() + } return true } // ForwardSignals puts signals from source port to destination port, without clearing the source port func ForwardSignals(source *Port, dest *Port) { - dest.PutSignals(source.Signals().AsGroup()...) + dest.PutSignals(source.Signals()...) } diff --git a/port/port_test.go b/port/port_test.go index 2fdfff0..df53b55 100644 --- a/port/port_test.go +++ b/port/port_test.go @@ -7,8 +7,7 @@ import ( ) func TestPort_HasSignals(t *testing.T) { - portWithSignal := New("portWithSignal") - portWithSignal.PutSignals(signal.New(123)) + portWithSignal := New("portWithSignal").WithSignals(signal.New(123)) tests := []struct { name string @@ -34,35 +33,33 @@ func TestPort_HasSignals(t *testing.T) { } func TestPort_Signals(t *testing.T) { - portWithSignal := New("portWithSignal") - portWithSignal.PutSignals(signal.New(123)) + portWithSignal := New("portWithSignal").WithSignals(signal.New(123)) tests := []struct { name string port *Port - want signal.Collection + want signal.Group }{ { name: "no signals", port: New("noSignal"), - want: signal.Collection{}, + want: signal.Group{}, }, { name: "with signal", port: portWithSignal, - want: signal.NewCollection().AddPayload(123), + want: signal.NewGroup(123), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.Equal(t, tt.want.AsGroup(), tt.port.Signals().AsGroup()) + assert.Equal(t, tt.want, tt.port.Signals()) }) } } func TestPort_ClearSignal(t *testing.T) { - portWithSignal := New("portWithSignal") - portWithSignal.PutSignals(signal.New(111)) + portWithSignal := New("portWithSignal").WithSignals(signal.New(111)) tests := []struct { name string @@ -72,12 +69,12 @@ func TestPort_ClearSignal(t *testing.T) { { name: "happy path", before: portWithSignal, - after: &Port{name: "portWithSignal", pipes: Group{}, signals: signal.Collection{}}, + after: &Port{name: "portWithSignal", pipes: Group{}, signals: signal.Group{}}, }, { name: "cleaning empty port", before: New("emptyPort"), - after: &Port{name: "emptyPort", pipes: Group{}, signals: signal.Collection{}}, + after: &Port{name: "emptyPort", pipes: Group{}, signals: signal.Group{}}, }, } for _, tt := range tests { @@ -106,7 +103,7 @@ func TestPort_PipeTo(t *testing.T) { after: &Port{ name: "p1", pipes: Group{p2, p3}, - signals: signal.Collection{}, + signals: signal.Group{}, }, args: args{ toPorts: []*Port{p2, p3}, @@ -118,7 +115,7 @@ func TestPort_PipeTo(t *testing.T) { after: &Port{ name: "p4", pipes: Group{p2}, - signals: signal.Collection{}, + signals: signal.Group{}, }, args: args{ toPorts: []*Port{p2, nil}, @@ -134,14 +131,11 @@ func TestPort_PipeTo(t *testing.T) { } func TestPort_PutSignals(t *testing.T) { - portWithSingleSignal := New("portWithSingleSignal") - portWithSingleSignal.PutSignals(signal.New(11)) + portWithSingleSignal := New("portWithSingleSignal").WithSignals(signal.New(11)) - portWithMultipleSignals := New("portWithMultipleSignals") - portWithMultipleSignals.PutSignals(signal.NewGroup(11, 12)...) + portWithMultipleSignals := New("portWithMultipleSignals").WithSignals(signal.NewGroup(11, 12)...) - portWithMultipleSignals2 := New("portWithMultipleSignals2") - portWithMultipleSignals2.PutSignals(signal.NewGroup(55, 66)...) + portWithMultipleSignals2 := New("portWithMultipleSignals2").WithSignals(signal.NewGroup(55, 66)...) type args struct { signals []*signal.Signal @@ -157,7 +151,7 @@ func TestPort_PutSignals(t *testing.T) { before: New("emptyPort"), after: &Port{ name: "emptyPort", - signals: signal.NewCollection().AddPayload(11), + signals: signal.NewGroup(11), pipes: Group{}, }, args: args{ @@ -169,7 +163,7 @@ func TestPort_PutSignals(t *testing.T) { before: New("p"), after: &Port{ name: "p", - signals: signal.NewCollection().AddPayload(11, 12), + signals: signal.NewGroup(11, 12), pipes: Group{}, }, args: args{ @@ -181,7 +175,7 @@ func TestPort_PutSignals(t *testing.T) { before: portWithSingleSignal, after: &Port{ name: "portWithSingleSignal", - signals: signal.NewCollection().AddPayload(11, 12), + signals: signal.NewGroup(11, 12), pipes: Group{}, }, args: args{ @@ -193,7 +187,7 @@ func TestPort_PutSignals(t *testing.T) { before: portWithMultipleSignals, after: &Port{ name: "portWithMultipleSignals", - signals: signal.NewCollection().AddPayload(11, 12, 13), + signals: signal.NewGroup(11, 12, 13), pipes: Group{}, }, args: args{ @@ -205,7 +199,7 @@ func TestPort_PutSignals(t *testing.T) { before: portWithMultipleSignals2, after: &Port{ name: "portWithMultipleSignals2", - signals: signal.NewCollection().AddPayload(55, 66, 13, 14), //Notice LIFO order + signals: signal.NewGroup(55, 66, 13, 14), //Notice LIFO order pipes: Group{}, }, args: args{ @@ -216,7 +210,7 @@ func TestPort_PutSignals(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.before.PutSignals(tt.args.signals...) - assert.ElementsMatch(t, tt.after.Signals().AsGroup(), tt.before.Signals().AsGroup()) + assert.ElementsMatch(t, tt.after.Signals(), tt.before.Signals()) }) } } @@ -257,7 +251,7 @@ func TestNewPort(t *testing.T) { want: &Port{ name: "", pipes: Group{}, - signals: signal.Collection{}, + signals: signal.Group{}, }, }, { @@ -268,7 +262,7 @@ func TestNewPort(t *testing.T) { want: &Port{ name: "p1", pipes: Group{}, - signals: signal.Collection{}, + signals: signal.Group{}, }, }, } @@ -280,28 +274,23 @@ func TestNewPort(t *testing.T) { } func TestPort_Flush(t *testing.T) { - portWithSignal1 := New("portWithSignal1") - portWithSignal1.PutSignals(signal.New(777)) - - portWithSignal2 := New("portWithSignal2") - portWithSignal2.PutSignals(signal.New(888)) - - portWithMultipleSignals := New("portWithMultipleSignals") - portWithMultipleSignals.PutSignals(signal.NewGroup(11, 12)...) - - emptyPort := New("emptyPort") - tests := []struct { - name string - source *Port - dest *Port - wantResult bool - assertions func(t *testing.T, source *Port, dest *Port) + name string + getSource func() *Port + getDest func() *Port + clearFlushed bool + wantResult bool + assertions func(t *testing.T, source *Port, dest *Port) }{ { - name: "port with no signals", - source: New("empty_src"), - dest: New("empty_dest"), + name: "port with no signals", + getSource: func() *Port { + return New("empty_src") + }, + getDest: func() *Port { + return New("empty_dest") + }, + clearFlushed: false, assertions: func(t *testing.T, source *Port, dest *Port) { assert.False(t, source.HasSignals()) assert.False(t, dest.HasSignals()) @@ -309,42 +298,144 @@ func TestPort_Flush(t *testing.T) { wantResult: false, }, { - name: "flush to empty port", - source: portWithSignal1, - dest: emptyPort, + name: "flush to empty port", + getSource: func() *Port { + return New("portWithSignal").WithSignals(signal.New(111)) + }, + getDest: func() *Port { + return New("empty_dest") + }, + clearFlushed: false, assertions: func(t *testing.T, source *Port, dest *Port) { //Source port is not cleared during flush assert.True(t, source.HasSignals()) //Signals transferred to destination port assert.True(t, dest.HasSignals()) - assert.Equal(t, dest.Signals().FirstPayload().(int), 777) + assert.Equal(t, dest.Signals().FirstPayload().(int), 111) }, wantResult: true, }, { - name: "flush to port with signals", - source: portWithSignal2, - dest: portWithMultipleSignals, + name: "flush to empty port and clear", + getSource: func() *Port { + return New("portWithSignal").WithSignals(signal.New(222)) + }, + getDest: func() *Port { + return New("empty_dest") + }, + clearFlushed: true, + assertions: func(t *testing.T, source *Port, dest *Port) { + //Source port is cleared + assert.False(t, source.HasSignals()) + + //Signals transferred to destination port + assert.True(t, dest.HasSignals()) + assert.Equal(t, dest.Signals().FirstPayload().(int), 222) + }, + wantResult: true, + }, + { + name: "flush to port with signals", + getSource: func() *Port { + return New("portWithSignal").WithSignals(signal.New(333)) + }, + getDest: func() *Port { + return New("portWithMultipleSignals").WithSignals(signal.NewGroup(444, 555, 666)...) + }, + clearFlushed: false, assertions: func(t *testing.T, source *Port, dest *Port) { //Source port is not cleared assert.True(t, source.HasSignals()) //Destination port now has 1 more signal assert.True(t, dest.HasSignals()) - assert.Len(t, dest.Signals(), 3) - assert.Contains(t, dest.Signals().AllPayloads(), 888) + assert.Len(t, dest.Signals(), 4) + assert.Contains(t, dest.Signals().AllPayloads(), 333) + }, + wantResult: true, + }, + { + name: "flush to port with signals and clear", + getSource: func() *Port { + return New("portWithSignal").WithSignals(signal.New(777)) + }, + getDest: func() *Port { + return New("portWithMultipleSignals").WithSignals(signal.NewGroup(888, 999, 101010)...) + }, + clearFlushed: true, + assertions: func(t *testing.T, source *Port, dest *Port) { + //Source port is cleared + assert.False(t, source.HasSignals()) + + //Destination port now has 1 more signal + assert.True(t, dest.HasSignals()) + assert.Len(t, dest.Signals(), 4) + assert.Contains(t, dest.Signals().AllPayloads(), 777) }, wantResult: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.source.PipeTo(tt.dest) - assert.Equal(t, tt.wantResult, tt.source.Flush()) + source := tt.getSource() + dest := tt.getDest() + source.PipeTo(dest) + assert.Equal(t, tt.wantResult, source.Flush(tt.clearFlushed)) if tt.assertions != nil { - tt.assertions(t, tt.source, tt.dest) + tt.assertions(t, source, dest) } }) } } + +func TestPort_DisposeFirstNSignals(t *testing.T) { + type args struct { + n int + } + tests := []struct { + name string + port *Port + wantSignals signal.Group + args args + }{ + { + name: "empty port", + port: New("empty"), + wantSignals: signal.Group{}, + args: args{ + n: 0, + }, + }, + { + name: "with signals", + port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), + wantSignals: signal.NewGroup(44, 55, 66), + args: args{ + n: 3, + }, + }, + { + name: "n > len(signals) cleans the signal buffer", + port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), + wantSignals: signal.NewGroup(), + args: args{ + n: 10, + }, + }, + { + name: "n = 0 has no effect", + port: New("p1").WithSignals(signal.NewGroup(11, 22, 33, 44, 55, 66)...), + wantSignals: signal.NewGroup(11, 22, 33, 44, 55, 66), + args: args{ + n: 0, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.port.DisposeFirstNSignals(tt.args.n) + assert.Equal(t, tt.wantSignals, tt.port.Signals()) + }) + } +} diff --git a/signal/collection.go b/signal/collection.go deleted file mode 100644 index 22995d0..0000000 --- a/signal/collection.go +++ /dev/null @@ -1,61 +0,0 @@ -package signal - -import ( - "fmt" -) - -type Collection map[string]*Signal - -func NewCollection() Collection { - return make(Collection) -} - -func (collection Collection) Add(signals ...*Signal) Collection { - for _, sig := range signals { - signalKey := collection.newKey(sig) - collection[signalKey] = sig - } - return collection -} - -func (collection Collection) AddPayload(payloads ...any) Collection { - for _, p := range payloads { - collection.Add(New(p)) - } - return collection -} - -func (collection Collection) newKey(signal *Signal) string { - return fmt.Sprintf("%d", len(collection)+1) -} - -func (collection Collection) AsGroup() Group { - group := NewGroup() - for _, sig := range collection { - group = append(group, sig) - } - return group -} - -func (collection Collection) FirstPayload() any { - return collection.AsGroup().FirstPayload() -} - -func (collection Collection) AllPayloads() []any { - return collection.AsGroup().AllPayloads() -} - -func (collection Collection) GetKeys() []string { - keys := make([]string, 0) - for k, _ := range collection { - keys = append(keys, k) - } - return keys -} - -func (collection Collection) DeleteKeys(keys []string) Collection { - for _, key := range keys { - delete(collection, key) - } - return collection -} diff --git a/signal/group.go b/signal/group.go index 230767c..90c9783 100644 --- a/signal/group.go +++ b/signal/group.go @@ -28,3 +28,22 @@ func (group Group) AllPayloads() []any { } return all } + +// With adds signals to group +func (group Group) With(signals ...*Signal) Group { + for _, sig := range signals { + if sig == nil { + continue + } + group = append(group, sig) + } + + return group +} + +func (group Group) WithPayloads(payloads ...any) Group { + for _, p := range payloads { + group = append(group, New(p)) + } + return group +}