Skip to content

Commit

Permalink
Allow to pipe from inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
hovsep committed Sep 18, 2024
1 parent 3078b5c commit 9324f59
Show file tree
Hide file tree
Showing 16 changed files with 460 additions and 109 deletions.
55 changes: 48 additions & 7 deletions component/activation_result.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package component

import "fmt"
import (
"errors"
"fmt"
)

// ActivationResult defines the result (possibly an error) of the activation of given component in given cycle
type ActivationResult struct {
componentName string
activated bool
inputKeys []string //@TODO: check if we can replace this by one int which will show the index of last signal used as input within signals collection (use signal position in the collection as it's unique id)
code ActivationResultCode
err error
}
Expand Down Expand Up @@ -88,32 +92,69 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult {
return ar
}

func (ar *ActivationResult) WithInputKeys(keys []string) *ActivationResult {
ar.inputKeys = keys
return ar
}

func (ar *ActivationResult) InputKeys() []string {
return ar.inputKeys
}

// newActivationResultOK builds a specific activation result
func (c *Component) newActivationResultOK() *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(true).WithActivationCode(ActivationCodeOK)
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodeOK).
WithInputKeys(c.Inputs().GetSignalKeys())

}

// newActivationCodeNoInput builds a specific activation result
func (c *Component) newActivationCodeNoInput() *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(false).WithActivationCode(ActivationCodeNoInput)
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeNoInput)
}

// newActivationCodeNoFunction builds a specific activation result
func (c *Component) newActivationCodeNoFunction() *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(false).WithActivationCode(ActivationCodeNoFunction)
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeNoFunction)
}

// newActivationCodeWaitingForInput builds a specific activation result
func (c *Component) newActivationCodeWaitingForInput() *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(false).WithActivationCode(ActivationCodeWaitingForInput)
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeWaitingForInput)
}

// newActivationCodeReturnedError builds a specific activation result
func (c *Component) newActivationCodeReturnedError(err error) *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(true).WithActivationCode(ActivationCodeReturnedError).WithError(fmt.Errorf("component returned an error: %w", err))
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodeReturnedError).
WithError(fmt.Errorf("component returned an error: %w", err)).
WithInputKeys(c.Inputs().GetSignalKeys())
}

// newActivationCodePanicked builds a specific activation result
func (c *Component) newActivationCodePanicked(err error) *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(true).WithActivationCode(ActivationCodePanicked).WithError(err)
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodePanicked).
WithError(err).
WithInputKeys(c.Inputs().GetSignalKeys())
}

// isWaitingForInput tells whether component is waiting for specific inputs
func (c *Component) isWaitingForInput(activationResult *ActivationResult) bool {
return activationResult.HasError() && errors.Is(activationResult.Error(), errWaitingForInputs)
}

// WantsToKeepInputs tells whether component wants to keep signals on input ports for the next cycle
func (c *Component) WantsToKeepInputs(activationResult *ActivationResult) bool {
return c.isWaitingForInput(activationResult) && errors.Is(activationResult.Error(), errWaitingForInputsKeep)
}
9 changes: 9 additions & 0 deletions component/activation_result_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,12 @@ func (collection ActivationResultCollection) HasActivatedComponents() bool {
}
return false
}

// ByComponentName returns the activation result of given component
func (collection ActivationResultCollection) ByComponentName(componentName string) *ActivationResult {
if result, ok := collection[componentName]; ok {
return result
}

return nil
}
27 changes: 12 additions & 15 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ func (c *Component) hasActivationFunction() bool {
}

// MaybeActivate tries to run the activation function if all required conditions are met
// @TODO: hide this method from user
func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
defer func() {
if r := recover(); r != nil {
//Clear inputs and exit
c.inputs.ClearSignals()
activationResult = c.newActivationCodePanicked(fmt.Errorf("panicked with: %v", r))
}
}()
Expand All @@ -99,22 +98,15 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
return
}

//Run the computation
err := c.f(c.inputs, c.outputs)
//Invoke the activation func
err := c.f(c.Inputs(), c.Outputs())

if errors.Is(err, errWaitingForInputs) {
activationResult = c.newActivationCodeWaitingForInput()

if !errors.Is(err, errWaitingForInputsKeep) {
c.inputs.ClearSignals()
}

return
}

//Clear inputs
c.inputs.ClearSignals()

if err != nil {
activationResult = c.newActivationCodeReturnedError(err)

Expand All @@ -126,9 +118,14 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
return
}

// FlushOutputs pushed signals out of the component outputs to pipes and clears outputs
// FlushInputs ...
// @TODO: hide this method from user
func (c *Component) FlushInputs() {
c.inputs.Flush(false)
}

// FlushOutputs ...
// @TODO: hide this method from user
func (c *Component) FlushOutputs() {
for _, out := range c.outputs {
out.Flush()
}
c.outputs.Flush(true)
}
7 changes: 5 additions & 2 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ func TestComponent_WithActivationFunc(t *testing.T) {
name string
component *Component
args args
want *Component
}{
{
name: "happy path",
Expand All @@ -238,7 +237,11 @@ func TestComponent_WithActivationFunc(t *testing.T) {
err1 := componentAfter.f(testInputs1, testOutputs1)
err2 := tt.args.f(testInputs2, testOutputs2)
assert.Equal(t, err1, err2)
assert.Equal(t, testOutputs1, testOutputs2)

//Compare signals without keys (because they are random)
assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals().AsGroup(), testOutputs2.ByName("out1").Signals().AsGroup())
assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals().AsGroup(), testOutputs2.ByName("out2").Signals().AsGroup())

})
}
}
Expand Down
9 changes: 7 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ import (
type ErrorHandlingStrategy int

const (
StopOnFirstError ErrorHandlingStrategy = iota
// StopOnFirstErrorOrPanic stops the f-mesh on first error or panic
StopOnFirstErrorOrPanic ErrorHandlingStrategy = iota

// StopOnFirstPanic ignores errors, but stops the f-mesh on first panic
StopOnFirstPanic

// IgnoreAll allows to continue running the f-mesh regardless of how components finish their activation functions
IgnoreAll
)

var (
ErrHitAnError = errors.New("f-mesh hit an error and will be stopped")
ErrHitAnErrorOrPanic = errors.New("f-mesh hit an error or panic and will be stopped")
ErrHitAPanic = errors.New("f-mesh hit a panic and will be stopped")
ErrUnsupportedErrorHandlingStrategy = errors.New("unsupported error handling strategy")
)
23 changes: 18 additions & 5 deletions fmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,23 @@ func (fm *FMesh) runCycle() *cycle.Cycle {
return newCycle
}

// DrainComponents drains the data from all components outputs
func (fm *FMesh) drainComponents() {
// DrainComponents drains the data from activated components
func (fm *FMesh) drainComponentsAfterCycle(cycle *cycle.Cycle) {
for _, c := range fm.components {
activationResult := cycle.ActivationResults().ByComponentName(c.Name())

if !activationResult.Activated() {
continue
}

c.FlushInputs()
c.FlushOutputs()

keepInputs := c.WantsToKeepInputs(activationResult)
if !keepInputs {
c.Inputs().RemoveSignalsByKeys(activationResult.InputKeys())
}

}
}

Expand All @@ -98,7 +111,7 @@ func (fm *FMesh) Run() (cycle.Collection, error) {
return allCycles, err
}

fm.drainComponents()
fm.drainComponentsAfterCycle(cycleResult)
}
}

Expand All @@ -110,8 +123,8 @@ func (fm *FMesh) mustStop(cycleResult *cycle.Cycle) (bool, error) {

//Check if mesh must stop because of configured error handling strategy
switch fm.errorHandlingStrategy {
case StopOnFirstError:
return cycleResult.HasErrors(), ErrHitAnError
case StopOnFirstErrorOrPanic:
return cycleResult.HasErrors() || cycleResult.HasPanics(), ErrHitAnErrorOrPanic
case StopOnFirstPanic:
return cycleResult.HasPanics(), ErrHitAPanic
case IgnoreAll:
Expand Down
Loading

0 comments on commit 9324f59

Please sign in to comment.