Skip to content

Commit

Permalink
Merge pull request #56 from hovsep/v.0.1.0
Browse files Browse the repository at this point in the history
V.0.1.0
  • Loading branch information
hovsep authored Oct 3, 2024
2 parents 9cb3981 + 8bd7eec commit 2759cfe
Show file tree
Hide file tree
Showing 23 changed files with 852 additions and 1,296 deletions.
41 changes: 0 additions & 41 deletions component/activation_result.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package component

import (
"errors"
"fmt"
)

// ActivationResult defines the result (possibly an error) of the activation of given component in given cycle
type ActivationResult struct {
componentName string
activated bool
stateBefore *StateSnapshot //Contains the info about length of input ports during the activation (required for correct i2i piping)
stateAfter *StateSnapshot
code ActivationResultCode
err error
}
Expand All @@ -28,9 +25,6 @@ const (
// ActivationCodeNoFunction : component activation function is not set, so we can not activate it
ActivationCodeNoFunction

// ActivationCodeWaitingForInput : component is waiting for more inputs on some ports
ActivationCodeWaitingForInput

// ActivationCodeReturnedError : component is activated, but returned an error
ActivationCodeReturnedError

Expand Down Expand Up @@ -94,24 +88,6 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult {
return ar
}

func (ar *ActivationResult) WithStateBefore(snapshot *StateSnapshot) *ActivationResult {
ar.stateBefore = snapshot
return ar
}

func (ar *ActivationResult) StateBefore() *StateSnapshot {
return ar.stateBefore
}

func (ar *ActivationResult) WithStateAfter(snapshot *StateSnapshot) *ActivationResult {
ar.stateAfter = snapshot
return ar
}

func (ar *ActivationResult) StateAfter() *StateSnapshot {
return ar.stateAfter
}

// newActivationResultOK builds a specific activation result
func (c *Component) newActivationResultOK() *ActivationResult {
return NewActivationResult(c.Name()).
Expand All @@ -134,13 +110,6 @@ func (c *Component) newActivationResultNoFunction() *ActivationResult {
WithActivationCode(ActivationCodeNoFunction)
}

// newActivationResultWaitingForInput builds a specific activation result
func (c *Component) newActivationResultWaitingForInput() *ActivationResult {
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeWaitingForInput)
}

// newActivationResultReturnedError builds a specific activation result
func (c *Component) newActivationResultReturnedError(err error) *ActivationResult {
return NewActivationResult(c.Name()).
Expand All @@ -156,13 +125,3 @@ func (c *Component) newActivationResultPanicked(err error) *ActivationResult {
WithActivationCode(ActivationCodePanicked).
WithError(err)
}

// isWaitingForInput tells whether component is waiting for specific inputs
func (c *Component) isWaitingForInput(activationResult *ActivationResult) bool {
return activationResult.HasError() && errors.Is(activationResult.Error(), errWaitingForInputs)
}

// WantsToKeepInputs tells whether component wants to keep signals on input ports for the next cycle
func (c *Component) WantsToKeepInputs(activationResult *ActivationResult) bool {
return c.isWaitingForInput(activationResult) && errors.Is(activationResult.Error(), errWaitingForInputsKeep)
}
2 changes: 1 addition & 1 deletion component/activation_result_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestActivationResultCollection_Add(t *testing.T) {
},
},
{
name: "adding to existing collection",
name: "adding to non-empty collection",
collection: NewActivationResultCollection().Add(
New("c1").newActivationResultOK(),
New("c2").newActivationResultOK(),
Expand Down
2 changes: 1 addition & 1 deletion component/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestCollection_Add(t *testing.T) {
},
},
{
name: "adding to existing collection",
name: "adding to non-empty collection",
collection: NewComponentCollection().Add(New("c1"), New("c2")),
args: args{
components: []*Component{New("c3"), New("c4")},
Expand Down
64 changes: 16 additions & 48 deletions component/component.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package component

import (
"errors"
"fmt"
"github.com/hovsep/fmesh/port"
)
Expand Down Expand Up @@ -34,25 +33,25 @@ func (c *Component) WithDescription(description string) *Component {

// WithInputs ads input ports
func (c *Component) WithInputs(portNames ...string) *Component {
c.inputs = c.Inputs().Add(port.NewGroup(portNames...)...)
c.inputs = c.Inputs().With(port.NewGroup(portNames...)...)
return c
}

// WithOutputs adds output ports
func (c *Component) WithOutputs(portNames ...string) *Component {
c.outputs = c.Outputs().Add(port.NewGroup(portNames...)...)
c.outputs = c.Outputs().With(port.NewGroup(portNames...)...)
return c
}

// WithInputsIndexed creates multiple prefixed ports
func (c *Component) WithInputsIndexed(prefix string, startIndex int, endIndex int) *Component {
c.inputs = c.Inputs().AddIndexed(prefix, startIndex, endIndex)
c.inputs = c.Inputs().WithIndexed(prefix, startIndex, endIndex)
return c
}

// WithOutputsIndexed creates multiple prefixed ports
func (c *Component) WithOutputsIndexed(prefix string, startIndex int, endIndex int) *Component {
c.outputs = c.Outputs().AddIndexed(prefix, startIndex, endIndex)
c.outputs = c.Outputs().WithIndexed(prefix, startIndex, endIndex)
return c
}

Expand Down Expand Up @@ -90,77 +89,46 @@ func (c *Component) hasActivationFunction() bool {
// MaybeActivate tries to run the activation function if all required conditions are met
// @TODO: hide this method from user
func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
stateBeforeActivation := c.getStateSnapshot()
defer func() {
c.Inputs().Clear()
}()

defer func() {
if r := recover(); r != nil {
activationResult = c.newActivationResultPanicked(fmt.Errorf("panicked with: %v", r)).
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())
activationResult = c.newActivationResultPanicked(fmt.Errorf("panicked with: %v", r))
}
}()

if !c.hasActivationFunction() {
//Activation function is not set (maybe useful while the mesh is under development)
activationResult = c.newActivationResultNoFunction().
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())
activationResult = c.newActivationResultNoFunction()

return
}

if !c.inputs.AnyHasSignals() {
//No inputs set, stop here
activationResult = c.newActivationResultNoInput().
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())
activationResult = c.newActivationResultNoInput()
return
}

//Invoke the activation func
err := c.f(c.Inputs(), c.Outputs())

if errors.Is(err, errWaitingForInputs) {
activationResult = c.newActivationResultWaitingForInput().
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())

return
}

if err != nil {
activationResult = c.newActivationResultReturnedError(err).
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())
activationResult = c.newActivationResultReturnedError(err)

return
}

activationResult = c.newActivationResultOK().
WithStateBefore(stateBeforeActivation).
WithStateAfter(c.getStateSnapshot())
activationResult = c.newActivationResultOK()

return
}

// FlushInputs flushes and clears (when needed) input ports
// @TODO: hide this method from user
func (c *Component) FlushInputs(activationResult *ActivationResult, keepInputSignals bool) {
c.Inputs().Flush()
if !keepInputSignals {
// Inputs can not be just cleared, instead we remove signals which
// have been used (been set on inputs) during the last activation cycle
// thus not affecting ones the component could have been received from i2i pipes
for portName, p := range c.Inputs() {
p.DisposeSignals(activationResult.StateBefore().InputPortsMetadata()[portName].SignalBufferLen)
}
}
}

// FlushOutputs flushes output ports and disposes processed signals
// @TODO: hide this method from user
func (c *Component) FlushOutputs(activationResult *ActivationResult) {
for portName, p := range c.Outputs() {
p.FlushAndDispose(activationResult.StateAfter().OutputPortsMetadata()[portName].SignalBufferLen)
// FlushOutputs pushed signals out of the component outputs to pipes and clears outputs
func (c *Component) FlushOutputs() {
for _, out := range c.outputs {
out.Flush()
}
}
Loading

0 comments on commit 2759cfe

Please sign in to comment.