Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V.0.1.0 #47

Merged
merged 3 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions component/activation_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package component
import (
"errors"
"fmt"
"github.com/hovsep/fmesh/port"
)

// ActivationResult defines the result (possibly an error) of the activation of given component in given cycle
type ActivationResult struct {
componentName string
activated bool
inputKeys []string //@TODO: check if we can replace this by one int which will show the index of last signal used as input within signals collection (use signal position in the collection as it's unique id)
code ActivationResultCode
err error
componentName string
activated bool
inputsMetadata port.MetadataMap //Contains the info about length of input ports during the activation (required for correct i2i piping)
code ActivationResultCode
err error
}

// ActivationResultCode denotes a specific info about how a component been activated or why not activated at all
Expand All @@ -38,6 +39,7 @@ const (
)

// NewActivationResult creates a new activation result for given component
// @TODO Hide this from user
func NewActivationResult(componentName string) *ActivationResult {
return &ActivationResult{
componentName: componentName,
Expand Down Expand Up @@ -92,21 +94,21 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult {
return ar
}

func (ar *ActivationResult) WithInputKeys(keys []string) *ActivationResult {
ar.inputKeys = keys
func (ar *ActivationResult) WithInputsMetadata(meta port.MetadataMap) *ActivationResult {
ar.inputsMetadata = meta
return ar
}

func (ar *ActivationResult) InputKeys() []string {
return ar.inputKeys
func (ar *ActivationResult) InputsMetadata() port.MetadataMap {
return ar.inputsMetadata
}

// newActivationResultOK builds a specific activation result
func (c *Component) newActivationResultOK() *ActivationResult {
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodeOK).
WithInputKeys(c.Inputs().GetSignalKeys())
WithInputsMetadata(c.Inputs().GetPortsMetadata())

}

Expand Down Expand Up @@ -137,7 +139,7 @@ func (c *Component) newActivationCodeReturnedError(err error) *ActivationResult
SetActivated(true).
WithActivationCode(ActivationCodeReturnedError).
WithError(fmt.Errorf("component returned an error: %w", err)).
WithInputKeys(c.Inputs().GetSignalKeys())
WithInputsMetadata(c.Inputs().GetPortsMetadata())
}

// newActivationCodePanicked builds a specific activation result
Expand All @@ -146,7 +148,7 @@ func (c *Component) newActivationCodePanicked(err error) *ActivationResult {
SetActivated(true).
WithActivationCode(ActivationCodePanicked).
WithError(err).
WithInputKeys(c.Inputs().GetSignalKeys())
WithInputsMetadata(c.Inputs().GetPortsMetadata())
}

// isWaitingForInput tells whether component is waiting for specific inputs
Expand Down
20 changes: 16 additions & 4 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,25 @@ func (c *Component) WithDescription(description string) *Component {

// WithInputs ads input ports
func (c *Component) WithInputs(portNames ...string) *Component {
c.inputs = c.inputs.Add(port.NewGroup(portNames...)...)
c.inputs = c.Inputs().Add(port.NewGroup(portNames...)...)
return c
}

// WithOutputs adds output ports
func (c *Component) WithOutputs(portNames ...string) *Component {
c.outputs = c.outputs.Add(port.NewGroup(portNames...)...)
c.outputs = c.Outputs().Add(port.NewGroup(portNames...)...)
return c
}

// WithInputsIndexed creates multiple prefixed ports
func (c *Component) WithInputsIndexed(prefix string, startIndex int, endIndex int) *Component {
c.inputs = c.Inputs().AddIndexed(prefix, startIndex, endIndex)
return c
}

// WithOutputsIndexed creates multiple prefixed ports
func (c *Component) WithOutputsIndexed(prefix string, startIndex int, endIndex int) *Component {
c.outputs = c.Outputs().AddIndexed(prefix, startIndex, endIndex)
return c
}

Expand Down Expand Up @@ -124,8 +136,8 @@ func (c *Component) FlushInputs() {
c.inputs.Flush(false)
}

// FlushOutputs ...
// FlushAndClearOutputs flushes output ports and clears flushed ones
// @TODO: hide this method from user
func (c *Component) FlushOutputs() {
func (c *Component) FlushAndClearOutputs() {
c.outputs.Flush(true)
}
8 changes: 4 additions & 4 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestComponent_Description(t *testing.T) {
}
}

func TestComponent_FlushOutputs(t *testing.T) {
func TestComponent_FlushAndClearOutputs(t *testing.T) {
sink := port.New("sink")

componentWithAllOutputsSet := NewComponent("c1").WithOutputs("o1", "o2")
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestComponent_FlushOutputs(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.component.FlushOutputs()
tt.component.FlushAndClearOutputs()
tt.assertions(t, tt.component, tt.destPort)
})
}
Expand Down Expand Up @@ -239,8 +239,8 @@ func TestComponent_WithActivationFunc(t *testing.T) {
assert.Equal(t, err1, err2)

//Compare signals without keys (because they are random)
assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals().AsGroup(), testOutputs2.ByName("out1").Signals().AsGroup())
assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals().AsGroup(), testOutputs2.ByName("out2").Signals().AsGroup())
assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals(), testOutputs2.ByName("out1").Signals())
assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals(), testOutputs2.ByName("out2").Signals())

})
}
Expand Down
11 changes: 8 additions & 3 deletions fmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,17 @@ func (fm *FMesh) drainComponentsAfterCycle(cycle *cycle.Cycle) {
continue
}

c.FlushInputs()
c.FlushOutputs()
c.FlushAndClearOutputs() // Just flush and clear
c.FlushInputs() // Inputs are a bit trickier

//Check if a component wait for inputs and wants to keep existing input
keepInputs := c.WantsToKeepInputs(activationResult)

if !keepInputs {
c.Inputs().RemoveSignalsByKeys(activationResult.InputKeys())
// Inputs can not be just cleared, instead we remove signals which
// have been used (been set on inputs) during the last activation cycle
// thus not affecting ones the component could have been received from i2i pipes
c.Inputs().DisposeProcessedSignals(activationResult.InputsMetadata())
}

}
Expand Down
18 changes: 15 additions & 3 deletions fmesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,15 +715,27 @@ func TestFMesh_runCycle(t *testing.T) {
component.NewActivationResult("c1").
SetActivated(true).
WithActivationCode(component.ActivationCodeOK).
WithInputKeys([]string{"1"}),
WithInputsMetadata(port.MetadataMap{
"i1": &port.Metadata{
SignalBufferLen: 1,
},
}),
component.NewActivationResult("c2").
SetActivated(true).
WithActivationCode(component.ActivationCodeOK).
WithInputKeys([]string{"1"}),
WithInputsMetadata(port.MetadataMap{
"i1": &port.Metadata{
SignalBufferLen: 1,
},
}),
component.NewActivationResult("c3").
SetActivated(true).
WithActivationCode(component.ActivationCodeOK).
WithInputKeys([]string{"1"}),
WithInputsMetadata(port.MetadataMap{
"i1": &port.Metadata{
SignalBufferLen: 1,
},
}),
),
},
}
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/piping/fan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Test_Fan(t *testing.T) {
assert.True(t, fm.Components().ByName("consumer").Outputs().ByName("o1").HasSignals())

//The signal is combined and consist of 3 payloads
resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals().AsGroup()
resultSignals := fm.Components().ByName("consumer").Outputs().ByName("o1").Signals()
assert.Len(t, resultSignals, 3)

//And they are all different
Expand Down
92 changes: 90 additions & 2 deletions integration_tests/piping/piping_from_inputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_PipingFromInput(t *testing.T) {
setupFM: func() *fmesh.FMesh {
adder := component.NewComponent("adder").
WithDescription("adds i1 and i2").
WithInputs("i1", "i2").
WithInputsIndexed("i", 1, 2).
WithOutputs("out").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
i1, i2 := inputs.ByName("i1").Signals().FirstPayload().(int), inputs.ByName("i2").Signals().FirstPayload().(int)
Expand All @@ -47,7 +47,6 @@ func Test_PipingFromInput(t *testing.T) {
WithOutputs("log").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
for _, sig := range inputs.ByName("in").Signals() {
fmt.Println(fmt.Sprintf("LOGGED SIGNAL: %v", sig.Payload()))
outputs.ByName("log").PutSignals(signal.New(fmt.Sprintf("LOGGED SIGNAL: %v", sig.Payload())))
}
return nil
Expand Down Expand Up @@ -80,6 +79,95 @@ func Test_PipingFromInput(t *testing.T) {
assert.Len(t, l.Outputs().ByName("log").Signals(), 3)
},
},
{
name: "observing component which waits for inputs",
setupFM: func() *fmesh.FMesh {
starter := component.NewComponent("starter").
WithDescription("This component just starts the whole f-mesh").
WithInputs("start").
WithOutputsIndexed("o", 1, 2).
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
//Activate downstream components
outputs.PutSignals(inputs.AllSignals()[0])
return nil
})

incr1 := component.NewComponent("incr1").
WithDescription("Increments the input").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
outputs.PutSignals(signal.New(1 + inputs.AllSignals().FirstPayload().(int)))
return nil
})

incr2 := component.NewComponent("incr2").
WithDescription("Increments the input").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
outputs.PutSignals(signal.New(1 + inputs.AllSignals().FirstPayload().(int)))
return nil
})

doubler := component.NewComponent("doubler").
WithDescription("Doubles the input").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
outputs.PutSignals(signal.New(2 * inputs.AllSignals().FirstPayload().(int)))
return nil
})

agg := component.NewComponent("result_aggregator").
WithDescription("Adds 2 inputs (only when both are available)").
WithInputsIndexed("i", 1, 2).
WithOutputs("result").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
if !inputs.ByNames("i1", "i2").AllHaveSignals() {
return component.NewErrWaitForInputs(true)
}
i1 := inputs.ByName("i1").Signals().FirstPayload().(int)
i2 := inputs.ByName("i2").Signals().FirstPayload().(int)
outputs.PutSignals(signal.New(i1 + i2))
return nil
})

observer := component.NewComponent("obsrv").
WithDescription("Observes inputs of result aggregator").
WithInputsIndexed("i", 1, 2).
WithOutputs("log").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
outputs.ByName("log").PutSignals(inputs.AllSignals()...)
return nil
})

fm := fmesh.New("observer").WithComponents(starter, incr1, incr2, doubler, agg, observer)

starter.Outputs().ByName("o1").PipeTo(incr1.Inputs().ByName("i1"))
starter.Outputs().ByName("o2").PipeTo(incr2.Inputs().ByName("i1"))
incr1.Outputs().ByName("o1").PipeTo(doubler.Inputs().ByName("i1"))
doubler.Outputs().ByName("o1").PipeTo(agg.Inputs().ByName("i1"))
incr2.Outputs().ByName("o1").PipeTo(agg.Inputs().ByName("i2"))
agg.Inputs().ByName("i1").PipeTo(observer.Inputs().ByName("i1"))
agg.Inputs().ByName("i2").PipeTo(observer.Inputs().ByName("i2"))
return fm
},
setInputs: func(fm *fmesh.FMesh) {
fm.Components().ByName("starter").Inputs().PutSignals(signal.New(10))
},
assertions: func(t *testing.T, fm *fmesh.FMesh, cycles cycle.Collection, err error) {
assert.NoError(t, err)

//Multiplier result
assert.Equal(t, 33, fm.Components().ByName("result_aggregator").Outputs().ByName("result").Signals().FirstPayload())

//Observed signals
assert.Len(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals(), 2)
assert.Contains(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals().AllPayloads(), 11)
assert.Contains(t, fm.Components().ByName("obsrv").Outputs().ByName("log").Signals().AllPayloads(), 22)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading
Loading