Skip to content

Commit

Permalink
i2i piping refactored with simpler approach
Browse files Browse the repository at this point in the history
  • Loading branch information
hovsep committed Sep 20, 2024
1 parent 83566a2 commit 07770cc
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 132 deletions.
25 changes: 13 additions & 12 deletions component/activation_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package component
import (
"errors"
"fmt"
"github.com/hovsep/fmesh/port"
)

// ActivationResult defines the result (possibly an error) of the activation of given component in given cycle
type ActivationResult struct {
componentName string
activated bool
inputKeys []string //@TODO: check if we can replace this by one int which will show the index of last signal used as input within signals collection (use signal position in the collection as it's unique id)
code ActivationResultCode
err error
componentName string
activated bool
inputsMetadata port.MetadataMap //Contains the info about length of input ports during the activation (required for correct i2i piping)
code ActivationResultCode
err error
}

// ActivationResultCode denotes a specific info about how a component been activated or why not activated at all
Expand Down Expand Up @@ -93,21 +94,21 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult {
return ar
}

func (ar *ActivationResult) WithInputKeys(keys []string) *ActivationResult {
ar.inputKeys = keys
func (ar *ActivationResult) WithInputsMetadata(meta port.MetadataMap) *ActivationResult {
ar.inputsMetadata = meta
return ar
}

func (ar *ActivationResult) InputKeys() []string {
return ar.inputKeys
func (ar *ActivationResult) InputsMetadata() port.MetadataMap {
return ar.inputsMetadata
}

// newActivationResultOK builds a specific activation result
func (c *Component) newActivationResultOK() *ActivationResult {
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodeOK).
WithInputKeys(c.Inputs().GetSignalKeys())
WithInputsMetadata(c.Inputs().GetPortsMetadata())

}

Expand Down Expand Up @@ -138,7 +139,7 @@ func (c *Component) newActivationCodeReturnedError(err error) *ActivationResult
SetActivated(true).
WithActivationCode(ActivationCodeReturnedError).
WithError(fmt.Errorf("component returned an error: %w", err)).
WithInputKeys(c.Inputs().GetSignalKeys())
WithInputsMetadata(c.Inputs().GetPortsMetadata())
}

// newActivationCodePanicked builds a specific activation result
Expand All @@ -147,7 +148,7 @@ func (c *Component) newActivationCodePanicked(err error) *ActivationResult {
SetActivated(true).
WithActivationCode(ActivationCodePanicked).
WithError(err).
WithInputKeys(c.Inputs().GetSignalKeys())
WithInputsMetadata(c.Inputs().GetPortsMetadata())
}

// isWaitingForInput tells whether component is waiting for specific inputs
Expand Down
4 changes: 2 additions & 2 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func (c *Component) FlushInputs() {
c.inputs.Flush(false)
}

// FlushOutputs ...
// FlushAndClearOutputs flushes output ports and clears flushed ones
// @TODO: hide this method from user
func (c *Component) FlushOutputs() {
func (c *Component) FlushAndClearOutputs() {
c.outputs.Flush(true)
}
8 changes: 4 additions & 4 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestComponent_Description(t *testing.T) {
}
}

func TestComponent_FlushOutputs(t *testing.T) {
func TestComponent_FlushAndClearOutputs(t *testing.T) {
sink := port.New("sink")

componentWithAllOutputsSet := NewComponent("c1").WithOutputs("o1", "o2")
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestComponent_FlushOutputs(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.component.FlushOutputs()
tt.component.FlushAndClearOutputs()
tt.assertions(t, tt.component, tt.destPort)
})
}
Expand Down Expand Up @@ -239,8 +239,8 @@ func TestComponent_WithActivationFunc(t *testing.T) {
assert.Equal(t, err1, err2)

//Compare signals without keys (because they are random)
assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals().AsGroup(), testOutputs2.ByName("out1").Signals().AsGroup())
assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals().AsGroup(), testOutputs2.ByName("out2").Signals().AsGroup())
assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals(), testOutputs2.ByName("out1").Signals())
assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals(), testOutputs2.ByName("out2").Signals())

})
}
Expand Down
11 changes: 8 additions & 3 deletions fmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,17 @@ func (fm *FMesh) drainComponentsAfterCycle(cycle *cycle.Cycle) {
continue
}

c.FlushInputs()
c.FlushOutputs()
c.FlushAndClearOutputs() // Just flush and clear
c.FlushInputs() // Inputs are a bit trickier

//Check if a component wait for inputs and wants to keep existing input
keepInputs := c.WantsToKeepInputs(activationResult)

if !keepInputs {
c.Inputs().RemoveSignalsByKeys(activationResult.InputKeys())
// Inputs can not be just cleared, instead we remove signals which
// have been used (been set on inputs) during the last activation cycle
// thus not affecting ones the component could have been received from i2i pipes
c.Inputs().DisposeProcessedSignals(activationResult.InputsMetadata())
}

}
Expand Down
18 changes: 15 additions & 3 deletions fmesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,15 +715,27 @@ func TestFMesh_runCycle(t *testing.T) {
component.NewActivationResult("c1").
SetActivated(true).
WithActivationCode(component.ActivationCodeOK).
WithInputKeys([]string{"1"}),
WithInputsMetadata(port.MetadataMap{
"i1": &port.Metadata{
SignalBufferLen: 1,
},
}),
component.NewActivationResult("c2").
SetActivated(true).
WithActivationCode(component.ActivationCodeOK).
WithInputKeys([]string{"1"}),
WithInputsMetadata(port.MetadataMap{
"i1": &port.Metadata{
SignalBufferLen: 1,
},
}),
component.NewActivationResult("c3").
SetActivated(true).
WithActivationCode(component.ActivationCodeOK).
WithInputKeys([]string{"1"}),
WithInputsMetadata(port.MetadataMap{
"i1": &port.Metadata{
SignalBufferLen: 1,
},
}),
),
},
}
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/piping/fan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Test_Fan(t *testing.T) {
assert.True(t, fm.Components().ByName("consumer").Outputs().ByName("o1").HasSignals())

//The signal is combined and consist of 3 payloads
resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals().AsGroup()
resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals()
assert.Len(t, resultSignals, 3)

//And they are all different
Expand Down
1 change: 0 additions & 1 deletion integration_tests/piping/piping_from_inputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func Test_PipingFromInput(t *testing.T) {
assert.Len(t, l.Outputs().ByName("log").Signals(), 3)
},
},

{
name: "observing component which waits for inputs",
setupFM: func() *fmesh.FMesh {
Expand Down
33 changes: 21 additions & 12 deletions port/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
// Collection is a port collection with useful methods
type Collection map[string]*Port

// Metadata contains metadata about the port
type Metadata struct {
SignalBufferLen int
}

type MetadataMap map[string]*Metadata

// NewCollection creates empty collection
func NewCollection() Collection {
return make(Collection)
Expand Down Expand Up @@ -79,13 +86,6 @@ func (collection Collection) Flush(clearFlushed bool) {
}
}

func (collection Collection) RemoveSignalsByKeys(signalKeys []string) Collection {
for _, p := range collection {
p.Signals().DeleteKeys(signalKeys)
}
return collection
}

// PipeTo creates pipes from each port in collection
func (collection Collection) PipeTo(toPorts ...*Port) {
for _, p := range collection {
Expand Down Expand Up @@ -113,15 +113,24 @@ func (collection Collection) AddIndexed(prefix string, startIndex int, endIndex
func (collection Collection) AllSignals() signal.Group {
group := signal.NewGroup()
for _, p := range collection {
group = append(group, p.Signals().AsGroup()...)
group = append(group, p.Signals()...)
}
return group
}

func (collection Collection) GetSignalKeys() []string {
keys := make([]string, 0)
// GetPortsMetadata returns info about current length of each port in collection
func (collection Collection) GetPortsMetadata() MetadataMap {
res := make(MetadataMap)
for _, p := range collection {
keys = append(keys, p.Signals().GetKeys()...)
res[p.Name()] = &Metadata{
SignalBufferLen: len(p.Signals()),
}
}
return res
}

func (collection Collection) DisposeProcessedSignals(portsMetadata MetadataMap) {
for pName, meta := range portsMetadata {
collection.ByName(pName).DisposeFirstNSignals(meta.SignalBufferLen)
}
return keys
}
30 changes: 23 additions & 7 deletions port/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestCollection_ByName(t *testing.T) {
args: args{
name: "p1",
},
want: &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}},
want: &Port{name: "p1", pipes: Group{}, signals: signal.Group{}},
},
{
name: "port with signals found",
Expand All @@ -95,7 +95,7 @@ func TestCollection_ByName(t *testing.T) {
},
want: &Port{
name: "p2",
signals: signal.NewCollection().AddPayload(12),
signals: signal.NewGroup().With(signal.New(12)),
pipes: Group{},
},
},
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestCollection_ByNames(t *testing.T) {
"p1": &Port{
name: "p1",
pipes: Group{},
signals: signal.NewCollection(),
signals: signal.Group{},
},
},
},
Expand All @@ -151,8 +151,16 @@ func TestCollection_ByNames(t *testing.T) {
names: []string{"p1", "p2"},
},
want: Collection{
"p1": &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}},
"p2": &Port{name: "p2", pipes: Group{}, signals: signal.Collection{}},
"p1": &Port{
name: "p1",
pipes: Group{},
signals: signal.Group{},
},
"p2": &Port{
name: "p2",
pipes: Group{},
signals: signal.Group{},
},
},
},
{
Expand All @@ -170,8 +178,16 @@ func TestCollection_ByNames(t *testing.T) {
names: []string{"p1", "p2", "p3"},
},
want: Collection{
"p1": &Port{name: "p1", pipes: Group{}, signals: signal.Collection{}},
"p2": &Port{name: "p2", pipes: Group{}, signals: signal.Collection{}},
"p1": &Port{
name: "p1",
pipes: Group{},
signals: signal.Group{},
},
"p2": &Port{
name: "p2",
pipes: Group{},
signals: signal.Group{},
},
},
},
}
Expand Down
4 changes: 2 additions & 2 deletions port/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func NewIndexedGroup(prefix string, startIndex int, endIndex int) Group {
return group
}

// Add adds ports to group
func (group Group) Add(ports ...*Port) Group {
// With adds ports to group
func (group Group) With(ports ...*Port) Group {
for _, port := range ports {
if port == nil {
continue
Expand Down
28 changes: 20 additions & 8 deletions port/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import (
// Port defines a connectivity point of a component
type Port struct {
name string
signals signal.Collection //Current signals set on the port
pipes Group //Refs to all outbound pipes connected to this port
signals signal.Group //Signal buffer
pipes Group //Outbound pipes
}

// New creates a new port
func New(name string) *Port {
return &Port{
name: name,
pipes: NewGroup(),
signals: signal.NewCollection(),
signals: signal.NewGroup(),
}
}

Expand All @@ -26,14 +26,18 @@ func (p *Port) Name() string {
}

// Signals getter
func (p *Port) Signals() signal.Collection {
func (p *Port) Signals() signal.Group {
return p.signals
}

func (p *Port) setSignals(signals signal.Group) {
p.signals = signals
}

// PutSignals adds signals
// @TODO: rename
func (p *Port) PutSignals(signals ...*signal.Signal) {
p.Signals().Add(signals...)
p.setSignals(p.Signals().With(signals...))
}

// WithSignals adds signals and returns the port
Expand All @@ -44,7 +48,15 @@ func (p *Port) WithSignals(signals ...*signal.Signal) *Port {

// ClearSignals removes all signals from the port
func (p *Port) ClearSignals() {
p.signals = signal.NewCollection()
p.setSignals(signal.NewGroup())
}

func (p *Port) DisposeFirstNSignals(n int) {
if n > len(p.Signals()) {
p.ClearSignals()
return
}
p.setSignals(p.Signals()[n:])
}

// HasSignals says whether port signals is set or not
Expand All @@ -64,7 +76,7 @@ func (p *Port) PipeTo(toPorts ...*Port) {
if toPort == nil {
continue
}
p.pipes = p.pipes.Add(toPort)
p.pipes = p.pipes.With(toPort)
}
}

Expand All @@ -87,5 +99,5 @@ func (p *Port) Flush(clearFlushed bool) bool {

// ForwardSignals puts signals from source port to destination port, without clearing the source port
func ForwardSignals(source *Port, dest *Port) {
dest.PutSignals(source.Signals().AsGroup()...)
dest.PutSignals(source.Signals()...)
}
Loading

0 comments on commit 07770cc

Please sign in to comment.