Skip to content

Commit

Permalink
Refactor drain logic: clear all inputs and then flush all
Browse files Browse the repository at this point in the history
  • Loading branch information
hovsep committed Nov 10, 2024
1 parent ed29e11 commit d89e9b6
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 130 deletions.
6 changes: 3 additions & 3 deletions component/collection.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package component

import (
"errors"
"fmt"
"github.com/hovsep/fmesh/common"
)

Expand Down Expand Up @@ -31,8 +31,8 @@ func (c *Collection) ByName(name string) *Component {
component, ok := c.components[name]

if !ok {
c.SetErr(errors.New("component not found"))
return nil
c.SetErr(fmt.Errorf("%w, component name: %s", errNotFound, name))
return New("").WithErr(c.Err())
}

return component
Expand Down
5 changes: 2 additions & 3 deletions component/collection_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package component

import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
)
Expand Down Expand Up @@ -29,7 +30,7 @@ func TestCollection_ByName(t *testing.T) {
args: args{
name: "c3",
},
want: nil,
want: New("").WithErr(fmt.Errorf("%w, component name: %s", errNotFound, "c3")),
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -69,7 +70,6 @@ func TestCollection_With(t *testing.T) {
assert.Equal(t, 2, collection.Len())
assert.NotNil(t, collection.ByName("c1"))
assert.NotNil(t, collection.ByName("c2"))
assert.Nil(t, collection.ByName("c999"))
},
},
{
Expand All @@ -84,7 +84,6 @@ func TestCollection_With(t *testing.T) {
assert.NotNil(t, collection.ByName("c2"))
assert.NotNil(t, collection.ByName("c3"))
assert.NotNil(t, collection.ByName("c4"))
assert.Nil(t, collection.ByName("c999"))
},
},
}
Expand Down
1 change: 1 addition & 0 deletions component/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
)

var (
errNotFound = errors.New("component not found")
errWaitingForInputs = errors.New("component is waiting for some inputs")
errWaitingForInputsKeep = fmt.Errorf("%w: do not clear input ports", errWaitingForInputs)
)
Expand Down
7 changes: 7 additions & 0 deletions cycle/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package cycle

import "errors"

var (
errNoCyclesInGroup = errors.New("group has no cycles")
)
9 changes: 9 additions & 0 deletions cycle/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,12 @@ func (g *Group) CyclesOrDefault(defaultCycles Cycles) Cycles {
func (g *Group) Len() int {
return len(g.cycles)
}

// Last returns the latest cycle added to the group
func (g *Group) Last() *Cycle {
if g.Len() == 0 {
return New().WithErr(errNoCyclesInGroup)
}

return g.cycles[g.Len()-1]
}
4 changes: 4 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ var (
ErrHitAPanic = errors.New("f-mesh hit a panic and will be stopped")
ErrUnsupportedErrorHandlingStrategy = errors.New("unsupported error handling strategy")
ErrReachedMaxAllowedCycles = errors.New("reached max allowed cycles")
errFailedToRunCycle = errors.New("failed to run cycle")
errNoComponents = errors.New("no components found")
errFailedToClearInputs = errors.New("failed to clear input ports")
ErrFailedToDrain = errors.New("failed to drain")
)
11 changes: 8 additions & 3 deletions examples/fibonacci.go → examples/fibonacci/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ import (
"github.com/hovsep/fmesh/signal"
)

// This example shows how a component can have a pipe looped into it's input.
// This pattern allows to activate components multiple time using control plane (special output with looped-in pipe)
// For example we can calculate Fibonacci numbers without actually having a code for loop (the loop is implemented by ports and pipes)
// This example demonstrates how a component can have a pipe looped back into its own input,
// enabling a pattern that reactivates the component multiple times.
// By looping the output back into the input, the component can perform repeated calculations
// without explicit looping constructs in the code.
//
// For instance, this approach can be used to calculate Fibonacci numbers without needing
// traditional looping code. Instead, the loop is achieved by configuring ports and pipes,
// where each cycle processes a new Fibonacci term.
func main() {
c1 := component.New("fibonacci number generator").
WithInputs("i_cur", "i_prev").
Expand Down
212 changes: 212 additions & 0 deletions examples/nesting/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package main

import (
"fmt"
"github.com/hovsep/fmesh"
"github.com/hovsep/fmesh/component"
"github.com/hovsep/fmesh/port"
"github.com/hovsep/fmesh/signal"
)

type FactorizedNumber struct {
Num int
Factors []any
}

// This example demonstrates the ability to nest meshes, where a component within a mesh
// can itself be another mesh. This nesting is recursive, allowing for an unlimited depth
// of nested meshes. Each nested mesh behaves as an individual component within the larger
// mesh, enabling complex and hierarchical workflows.
// In this example we implement prime factorization (which is core part of RSA encryption algorithm) as a sub-mesh
func main() {
starter := component.New("starter").
WithDescription("This component just holds numbers we want to factorize").
WithInputs("in"). // Single port is enough, as it can hold any number of signals (as long as they fit into1 memory)
WithOutputs("out").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
// Pure bypass
return port.ForwardSignals(inputs.ByName("in"), outputs.ByName("out"))
})

filter := component.New("filter").
WithDescription("In this component we can do some optional filtering").
WithInputs("in").
WithOutputs("out", "log").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
isValid := func(num int) bool {
return num < 1000
}

for _, sig := range inputs.ByName("in").AllSignalsOrNil() {
if isValid(sig.PayloadOrNil().(int)) {
outputs.ByName("out").PutSignals(sig)
} else {
outputs.ByName("log").PutSignals(sig)
}
}
return nil
})

logger := component.New("logger").
WithDescription("Simple logger").
WithInputs("in").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
log := func(line string) {
fmt.Printf("LOG: %s", line)
}

for _, sig := range inputs.ByName("in").AllSignalsOrNil() {
if logLine := sig.PayloadOrNil(); logLine != nil {
log(logLine.(string))
}
}
return nil
})

factorizer := component.New("factorizer").
WithDescription("Prime factorization implemented as separate f-mesh").
WithInputs("in").
WithOutputs("out").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
//This activation function has no implementation of factorization algorithm,
//it only runs another f-mesh to get results

//Get nested mesh or meshes
factorization := getPrimeFactorizationMesh()

// As nested f-mesh processes 1 signal per run we run it in the loop per each number
for _, numSig := range inputs.ByName("in").AllSignalsOrNil() {
//Set init data to nested mesh (pass signals from outer mesh to inner one)
factorization.Components().ByName("starter").InputByName("in").PutSignals(numSig)

//Run nested mesh
_, err := factorization.Run()

if err != nil {
return fmt.Errorf("inner mesh failed: %w", err)
}

// Get results from nested mesh
factors, err := factorization.Components().ByName("results").OutputByName("factors").AllSignalsPayloads()
if err != nil {
return fmt.Errorf("failed to get factors: %w", err)
}

//Pass results to outer mesh
number := numSig.PayloadOrNil().(int)
outputs.ByName("out").PutSignals(signal.New(FactorizedNumber{
Num: number,
Factors: factors,
}))
}

return nil
})

//Setup pipes
starter.OutputByName("out").PipeTo(filter.InputByName("in"))
filter.OutputByName("log").PipeTo(logger.InputByName("in"))
filter.OutputByName("out").PipeTo(factorizer.InputByName("in"))

// Build the mesh
outerMesh := fmesh.New("outer").WithComponents(starter, filter, logger, factorizer)

//Set init data
outerMesh.Components().
ByName("starter").
InputByName("in").
PutSignals(signal.NewGroup(315).SignalsOrNil()...)

//Run outer mesh
_, err := outerMesh.Run()

if err != nil {
fmt.Println(fmt.Errorf("outer mesh failed with error: %w", err))
}

//Read results
for _, resSig := range outerMesh.Components().ByName("factorizer").OutputByName("out").AllSignalsOrNil() {
result := resSig.PayloadOrNil().(FactorizedNumber)
fmt.Println(fmt.Sprintf("Factors of number %d : %v", result.Num, result.Factors))
}
}

func getPrimeFactorizationMesh() *fmesh.FMesh {
starter := component.New("starter").
WithDescription("Load the number to be factorized").
WithInputs("in").
WithOutputs("out").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
//For simplicity this f-mesh processes only one signal per run, so ignore all except first
outputs.ByName("out").PutSignals(inputs.ByName("in").Buffer().First())
return nil
})

d2 := component.New("d2").
WithDescription("Divide by smallest prime (2) to handle even factors").
WithInputs("in").
WithOutputs("out", "factor").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
number := inputs.ByName("in").FirstSignalPayloadOrNil().(int)

for number%2 == 0 {
outputs.ByName("factor").PutSignals(signal.New(2))
number /= 2
}

outputs.ByName("out").PutSignals(signal.New(number))
return nil
})

dodd := component.New("dodd").
WithDescription("Divide by odd primes starting from 3").
WithInputs("in").
WithOutputs("out", "factor").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
number := inputs.ByName("in").FirstSignalPayloadOrNil().(int)
divisor := 3
for number > 1 && divisor*divisor <= number {
for number%divisor == 0 {
outputs.ByName("factor").PutSignals(signal.New(divisor))
number /= divisor
}
divisor += 2
}
outputs.ByName("out").PutSignals(signal.New(number))
return nil
})

finalPrime := component.New("final_prime").
WithDescription("Store the last remaining prime factor, if any").
WithInputs("in").
WithOutputs("factor").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
number := inputs.ByName("in").FirstSignalPayloadOrNil().(int)
if number > 1 {
outputs.ByName("factor").PutSignals(signal.New(number))
}
return nil
})

results := component.New("results").
WithDescription("factors holder").
WithInputs("factor").
WithOutputs("factors").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
return port.ForwardSignals(inputs.ByName("factor"), outputs.ByName("factors"))
})

//Main pipeline starter->d2->dodd->finalPrime
starter.OutputByName("out").PipeTo(d2.InputByName("in"))
d2.OutputByName("out").PipeTo(dodd.InputByName("in"))
dodd.OutputByName("out").PipeTo(finalPrime.InputByName("in"))

//All found factors are accumulated in results
d2.OutputByName("factor").PipeTo(results.InputByName("factor"))
dodd.OutputByName("factor").PipeTo(results.InputByName("factor"))
finalPrime.OutputByName("factor").PipeTo(results.InputByName("factor"))

return fmesh.New("prime factors algo").
WithDescription("Pass single signal to starter").
WithComponents(starter, d2, dodd, finalPrime, results)
}
4 changes: 2 additions & 2 deletions export/dot/dot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (d *dotExporter) ExportWithCycles(fm *fmesh.FMesh, activationCycles cycle.C
buf := new(bytes.Buffer)
graphForCycle.Write(buf)

results[activationCycle.Number()] = buf.Bytes()
results[activationCycle.Number()-1] = buf.Bytes()
}

return results, nil
Expand Down Expand Up @@ -143,7 +143,7 @@ func (d *dotExporter) addPipes(graph *dot.Graph, components fmeshcomponent.Compo
return fmt.Errorf("failed to add pipe to port: %s : %w", destPort.Name(), err)
}
// Delete label, as it is not needed anymore
destPort.DeleteLabel(nodeIDLabel)
//destPort.DeleteLabel(nodeIDLabel)

// Any source port in any pipe is always output port, so we can build its node ID
srcPortNode := graph.FindNodeByID(getPortID(c.Name(), port.DirectionOut, srcPort.Name()))
Expand Down
Loading

0 comments on commit d89e9b6

Please sign in to comment.