diff --git a/component/component.go b/component/component.go index 7828040..a822c6a 100644 --- a/component/component.go +++ b/component/component.go @@ -5,9 +5,10 @@ import ( "fmt" "github.com/hovsep/fmesh/common" "github.com/hovsep/fmesh/port" + "log" ) -type ActivationFunc func(inputs *port.Collection, outputs *port.Collection) error +type ActivationFunc func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error // Component defines a main building block of FMesh type Component struct { @@ -18,6 +19,7 @@ type Component struct { inputs *port.Collection outputs *port.Collection f ActivationFunc + logger *log.Logger } // New creates initialized component @@ -218,8 +220,6 @@ func (c *Component) propagateChainErrors() { } // MaybeActivate tries to run the activation function if all required conditions are met -// @TODO: hide this method from user -// @TODO: can we remove named return ? func (c *Component) MaybeActivate() (activationResult *ActivationResult) { c.propagateChainErrors() @@ -247,7 +247,7 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) { } //Invoke the activation func - err := c.f(c.Inputs(), c.Outputs()) + err := c.f(c.Inputs(), c.Outputs(), c.Logger()) if errors.Is(err, errWaitingForInputs) { activationResult = c.newActivationResultWaitingForInputs(err) @@ -297,3 +297,22 @@ func (c *Component) WithErr(err error) *Component { c.SetErr(err) return c } + +// WithPrefixedLogger creates a new logger prefixed with component name +func (c *Component) WithPrefixedLogger(logger *log.Logger) *Component { + if c.HasErr() { + return c + } + + if logger == nil { + return c + } + + prefix := fmt.Sprintf("%s : ", c.Name()) + c.logger = log.New(logger.Writer(), prefix, logger.Flags()) + return c +} + +func (c *Component) Logger() *log.Logger { + return c.logger +} diff --git a/component/component_test.go b/component/component_test.go index b7c4e22..ac84269 100644 --- a/component/component_test.go +++ b/component/component_test.go @@ -1,11 +1,13 @@ package component import ( + "bytes" "errors" "github.com/hovsep/fmesh/common" "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" "github.com/stretchr/testify/assert" + "log" "testing" ) @@ -180,7 +182,7 @@ func TestComponent_WithActivationFunc(t *testing.T) { name: "happy path", component: New("c1"), args: args{ - f: func(inputs *port.Collection, outputs *port.Collection) error { + f: func(inputs *port.Collection, outputs *port.Collection, logger *log.Logger) error { outputs.ByName("out1").PutSignals(signal.New(23)) return nil }, @@ -190,14 +192,15 @@ func TestComponent_WithActivationFunc(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { componentAfter := tt.component.WithActivationFunc(tt.args.f) + logger := log.Default() //Compare activation functions by they result and error testInputs1 := port.NewCollection().With(port.NewGroup("in1", "in2").PortsOrNil()...) testInputs2 := port.NewCollection().With(port.NewGroup("in1", "in2").PortsOrNil()...) testOutputs1 := port.NewCollection().With(port.NewGroup("out1", "out2").PortsOrNil()...) testOutputs2 := port.NewCollection().With(port.NewGroup("out1", "out2").PortsOrNil()...) - err1 := componentAfter.f(testInputs1, testOutputs1) - err2 := tt.args.f(testInputs2, testOutputs2) + err1 := componentAfter.f(testInputs1, testOutputs1, logger) + err2 := tt.args.f(testInputs2, testOutputs2, logger) assert.Equal(t, err1, err2) //Compare signals without keys (because they are random) @@ -367,6 +370,7 @@ func TestComponent_MaybeActivate(t *testing.T) { name string getComponent func() *Component wantActivationResult *ActivationResult + loggerAssertions func(t *testing.T, output []byte) }{ { name: "component with no activation function and no inputs", @@ -392,7 +396,7 @@ func TestComponent_MaybeActivate(t *testing.T) { c := New("c1"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) }) return c @@ -406,7 +410,7 @@ func TestComponent_MaybeActivate(t *testing.T) { getComponent: func() *Component { c := New("c1"). WithInputs("i1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return errors.New("test error") }) //Only one input set @@ -424,7 +428,7 @@ func TestComponent_MaybeActivate(t *testing.T) { c := New("c1"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) }) //Only one input set @@ -441,7 +445,7 @@ func TestComponent_MaybeActivate(t *testing.T) { c := New("c1"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { panic(errors.New("oh shrimps")) return nil }) @@ -460,7 +464,7 @@ func TestComponent_MaybeActivate(t *testing.T) { c := New("c1"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { panic("oh shrimps") return nil }) @@ -479,7 +483,7 @@ func TestComponent_MaybeActivate(t *testing.T) { c1 := New("c1"). WithInputs("i1", "i2"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { if !inputs.ByNames("i1", "i2").AllHaveSignals() { return NewErrWaitForInputs(false) } @@ -504,7 +508,7 @@ func TestComponent_MaybeActivate(t *testing.T) { c1 := New("c1"). WithInputs("i1", "i2"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { if !inputs.ByNames("i1", "i2").AllHaveSignals() { return NewErrWaitForInputs(true) } @@ -545,10 +549,56 @@ func TestComponent_MaybeActivate(t *testing.T) { WithActivationCode(ActivationCodeUndefined). WithErr(errors.New("some error")), }, + { + name: "component not activated, logger must be empty", + getComponent: func() *Component { + c := New("c1"). + WithInputs("i1"). + WithOutputs("o1"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { + log.Println("This must not be logged, as component must not activate") + return nil + }) + return c + }, + + wantActivationResult: NewActivationResult("c1"). + SetActivated(false). + WithActivationCode(ActivationCodeNoInput), + loggerAssertions: func(t *testing.T, output []byte) { + assert.Len(t, output, 0) + }, + }, + { + name: "activated with error, with logging", + getComponent: func() *Component { + c := New("c1"). + WithInputs("i1"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { + log.Println("This line must be logged") + return errors.New("test error") + }) + //Only one input set + c.InputByName("i1").PutSignals(signal.New(123)) + return c + }, + wantActivationResult: NewActivationResult("c1"). + SetActivated(true). + WithActivationCode(ActivationCodeReturnedError). + WithActivationError(errors.New("component returned an error: test error")), + loggerAssertions: func(t *testing.T, output []byte) { + assert.Len(t, output, 2+3+21+24) //lengths of component name, prefix, flags and logged message + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotActivationResult := tt.getComponent().MaybeActivate() + logger := log.Default() + + var loggerOutput bytes.Buffer + logger.SetOutput(&loggerOutput) + + gotActivationResult := tt.getComponent().WithPrefixedLogger(logger).MaybeActivate() assert.Equal(t, tt.wantActivationResult.Activated(), gotActivationResult.Activated()) assert.Equal(t, tt.wantActivationResult.ComponentName(), gotActivationResult.ComponentName()) assert.Equal(t, tt.wantActivationResult.Code(), gotActivationResult.Code()) @@ -558,6 +608,10 @@ func TestComponent_MaybeActivate(t *testing.T) { assert.False(t, gotActivationResult.IsError()) } + if tt.loggerAssertions != nil { + tt.loggerAssertions(t, loggerOutput.Bytes()) + } + }) } } diff --git a/examples/async_input/main.go b/examples/async_input/main.go index c7743ec..ee06499 100644 --- a/examples/async_input/main.go +++ b/examples/async_input/main.go @@ -6,6 +6,7 @@ import ( "github.com/hovsep/fmesh/component" "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" + "log" "net/http" "time" ) @@ -89,61 +90,63 @@ func getMesh() *fmesh.FMesh { crawler := component.New("web crawler"). WithDescription("gets http headers from given url"). WithInputs("url"). - WithOutputs("errors", "headers").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { - if !inputs.ByName("url").HasSignals() { - return component.NewErrWaitForInputs(false) - } - - allUrls, err := inputs.ByName("url").AllSignalsPayloads() - if err != nil { - return err - } - - for _, urlVal := range allUrls { + WithOutputs("errors", "headers"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { + if !inputs.ByName("url").HasSignals() { + return component.NewErrWaitForInputs(false) + } - url := urlVal.(string) - //All urls will be crawled sequentially - // in order to call them concurrently we need run each request in separate goroutine and handle synchronization (e.g. waitgroup) - response, err := client.Get(url) + allUrls, err := inputs.ByName("url").AllSignalsPayloads() if err != nil { - outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("got error: %w from url: %s", err, url))) - continue + return err } - if len(response.Header) == 0 { - outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("no headers for url %s", url))) - continue - } + for _, urlVal := range allUrls { - outputs.ByName("headers").PutSignals(signal.New(map[string]http.Header{ - url: response.Header, - })) - } + url := urlVal.(string) + //All urls will be crawled sequentially + // in order to call them concurrently we need run each request in separate goroutine and handle synchronization (e.g. waitgroup) + response, err := client.Get(url) + if err != nil { + outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("got error: %w from url: %s", err, url))) + continue + } + + if len(response.Header) == 0 { + outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("no headers for url %s", url))) + continue + } + + outputs.ByName("headers").PutSignals(signal.New(map[string]http.Header{ + url: response.Header, + })) + } - return nil - }) + return nil + }) logger := component.New("error logger"). WithDescription("logs http errors"). - WithInputs("error").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { - if !inputs.ByName("error").HasSignals() { - return component.NewErrWaitForInputs(false) - } + WithInputs("error"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { + if !inputs.ByName("error").HasSignals() { + return component.NewErrWaitForInputs(false) + } - allErrors, err := inputs.ByName("error").AllSignalsPayloads() - if err != nil { - return err - } + allErrors, err := inputs.ByName("error").AllSignalsPayloads() + if err != nil { + return err + } - for _, errVal := range allErrors { - e := errVal.(error) - if e != nil { - fmt.Println("Error logger says:", e) + for _, errVal := range allErrors { + e := errVal.(error) + if e != nil { + fmt.Println("Error logger says:", e) + } } - } - return nil - }) + return nil + }) //Define pipes crawler.OutputByName("errors").PipeTo(logger.InputByName("error")) diff --git a/examples/battery_and_lightbulb/main.go b/examples/battery_and_lightbulb/main.go index 451c9ec..fa165cd 100644 --- a/examples/battery_and_lightbulb/main.go +++ b/examples/battery_and_lightbulb/main.go @@ -6,6 +6,7 @@ import ( "github.com/hovsep/fmesh/component" "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" + "log" ) const ( @@ -24,25 +25,24 @@ func main() { WithDescription("electric battery with initial charge level"). WithInputs("power_demand"). WithOutputs("power_supply"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { - //@TODO add component level loggers - fmt.Println("battery:activated, level= ", batteryLevel) + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { + log.Println("activated, level= ", batteryLevel) //Power demand/supply cycle if inputs.ByName("power_demand").HasSignals() { demandedCurrent := inputs.ByName("power_demand").FirstSignalPayloadOrDefault(0).(int) - fmt.Println("battery:consumption = ", demandedCurrent) + log.Println("consumption = ", demandedCurrent) //Emit current represented as a number suppliedCurrent := min(batteryLevel, demandedCurrent) if suppliedCurrent > 0 { outputs.ByName("power_supply").PutSignals(signal.New(suppliedCurrent)) - fmt.Println("battery:emiting power ", suppliedCurrent) + log.Println("emiting power ", suppliedCurrent) //Discharge batteryLevel = max(0, batteryLevel-suppliedCurrent) - fmt.Println("battery:discharged to", batteryLevel) + log.Println("discharged to", batteryLevel) } else { - fmt.Println("battery:LOW POWER") + log.Println("LOW POWER") } } @@ -53,23 +53,23 @@ func main() { WithDescription("electric lightbulb"). WithInputs("power_supply", "start_power_demand"). WithOutputs("light_supply", "power_demand"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { - fmt.Println("bulb:activated") + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { + log.Println("activated") //Power consumption cycle (at constant rate) inputPower := inputs.ByName("power_supply").FirstSignalPayloadOrDefault(0).(int) - fmt.Println("bulb:got power: ", inputPower) + log.Println("got power: ", inputPower) if inputPower >= lightBulbPowerConsumption { //Emit light outputs.ByName("light_supply").PutSignals(signal.New(lightBulbLuminousFlux)) - fmt.Println("bulb:emited light: ", lightBulbLuminousFlux) + log.Println("emited light: ", lightBulbLuminousFlux) } else { - fmt.Println("bulb:LOW POWER") + log.Println("LOW POWER") } //Always continue demanding power outputs.ByName("power_demand").PutSignals(signal.New(lightBulbPowerConsumption)) - fmt.Println("bulb:demanded power: ", lightBulbPowerConsumption) + log.Println("demanded power: ", lightBulbPowerConsumption) return nil }) @@ -78,7 +78,10 @@ func main() { fm := fmesh.New("battery_and_lightbulb"). WithDescription("simple electric simulation"). - WithComponents(battery, lightbulb) + WithComponents(battery, lightbulb). + WithConfig(fmesh.Config{ + ErrorHandlingStrategy: fmesh.StopOnFirstErrorOrPanic, + }) // Turn on the lightbulb (yes you can init an output port) lightbulb.InputByName("start_power_demand").PutSignals(signal.New("start")) diff --git a/examples/fibonacci/main.go b/examples/fibonacci/main.go index 08de21b..e9edaa0 100644 --- a/examples/fibonacci/main.go +++ b/examples/fibonacci/main.go @@ -6,6 +6,7 @@ import ( "github.com/hovsep/fmesh/component" "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" + "log" ) // This example demonstrates how a component can have a pipe looped back into its own input, @@ -20,7 +21,7 @@ func main() { c1 := component.New("fibonacci number generator"). WithInputs("i_cur", "i_prev"). WithOutputs("o_cur", "o_prev"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { cur := inputs.ByName("i_cur").FirstSignalPayloadOrDefault(0).(int) prev := inputs.ByName("i_prev").FirstSignalPayloadOrDefault(0).(int) diff --git a/examples/nesting/main.go b/examples/nesting/main.go index 7523fa5..137beb2 100644 --- a/examples/nesting/main.go +++ b/examples/nesting/main.go @@ -6,6 +6,7 @@ import ( "github.com/hovsep/fmesh/component" "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" + "log" ) type FactorizedNumber struct { @@ -23,7 +24,7 @@ func main() { WithDescription("This component just holds numbers we want to factorize"). WithInputs("in"). // Single port is enough, as it can hold any number of signals (as long as they fit into1 memory) WithOutputs("out"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { // Pure bypass return port.ForwardSignals(inputs.ByName("in"), outputs.ByName("out")) }) @@ -32,7 +33,7 @@ func main() { WithDescription("In this component we can do some optional filtering"). WithInputs("in"). WithOutputs("out", "log"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { isValid := func(num int) bool { return num < 1000 } @@ -50,13 +51,9 @@ func main() { logger := component.New("logger"). WithDescription("Simple logger"). WithInputs("in"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { - log := func(data any) { - fmt.Printf("LOG: %v", data) - } - + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { for _, sig := range inputs.ByName("in").AllSignalsOrNil() { - log(sig.PayloadOrNil()) + log.Println(sig.PayloadOrNil()) } return nil }) @@ -65,7 +62,7 @@ func main() { WithDescription("Prime factorization implemented as separate f-mesh"). WithInputs("in"). WithOutputs("out"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { //This activation function has no implementation of factorization algorithm, //it only runs another f-mesh to get results @@ -134,7 +131,7 @@ func getPrimeFactorizationMesh() *fmesh.FMesh { WithDescription("Load the number to be factorized"). WithInputs("in"). WithOutputs("out"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { //For simplicity this f-mesh processes only one signal per run, so ignore all except first outputs.ByName("out").PutSignals(inputs.ByName("in").Buffer().First()) return nil @@ -144,7 +141,7 @@ func getPrimeFactorizationMesh() *fmesh.FMesh { WithDescription("Divide by smallest prime (2) to handle even factors"). WithInputs("in"). WithOutputs("out", "factor"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { number := inputs.ByName("in").FirstSignalPayloadOrNil().(int) for number%2 == 0 { @@ -160,7 +157,7 @@ func getPrimeFactorizationMesh() *fmesh.FMesh { WithDescription("Divide by odd primes starting from 3"). WithInputs("in"). WithOutputs("out", "factor"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { number := inputs.ByName("in").FirstSignalPayloadOrNil().(int) divisor := 3 for number > 1 && divisor*divisor <= number { @@ -178,7 +175,7 @@ func getPrimeFactorizationMesh() *fmesh.FMesh { WithDescription("Store the last remaining prime factor, if any"). WithInputs("in"). WithOutputs("factor"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { number := inputs.ByName("in").FirstSignalPayloadOrNil().(int) if number > 1 { outputs.ByName("factor").PutSignals(signal.New(number)) @@ -190,7 +187,7 @@ func getPrimeFactorizationMesh() *fmesh.FMesh { WithDescription("factors holder"). WithInputs("factor"). WithOutputs("factors"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return port.ForwardSignals(inputs.ByName("factor"), outputs.ByName("factors")) }) diff --git a/examples/strings_processing/main.go b/examples/strings_processing/main.go index 539a112..ead19ae 100644 --- a/examples/strings_processing/main.go +++ b/examples/strings_processing/main.go @@ -6,6 +6,7 @@ import ( "github.com/hovsep/fmesh/component" "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" + "log" "os" "strings" ) @@ -17,7 +18,7 @@ func main() { component.New("concat"). WithInputs("i1", "i2"). WithOutputs("res"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { word1 := inputs.ByName("i1").FirstSignalPayloadOrDefault("").(string) word2 := inputs.ByName("i2").FirstSignalPayloadOrDefault("").(string) @@ -27,7 +28,7 @@ func main() { component.New("case"). WithInputs("i1"). WithOutputs("res"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { inputString := inputs.ByName("i1").FirstSignalPayloadOrDefault("").(string) outputs.ByName("res").PutSignals(signal.New(strings.ToTitle(inputString))) diff --git a/export/dot/dot_test.go b/export/dot/dot_test.go index 9cf92dd..ec0bd4e 100644 --- a/export/dot/dot_test.go +++ b/export/dot/dot_test.go @@ -6,6 +6,7 @@ import ( "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" "github.com/stretchr/testify/assert" + "log" "testing" ) @@ -36,7 +37,7 @@ func Test_dotExporter_Export(t *testing.T) { WithDescription("This component adds 2 numbers"). WithInputs("num1", "num2"). WithOutputs("result"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { //The activation func can be even empty, does not affect export return nil }) @@ -45,7 +46,7 @@ func Test_dotExporter_Export(t *testing.T) { WithDescription("This component multiplies number by 3"). WithInputs("num"). WithOutputs("result"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { //The activation func can be even empty, does not affect export return nil }) @@ -93,7 +94,7 @@ func Test_dotExporter_ExportWithCycles(t *testing.T) { WithDescription("This component adds 2 numbers"). WithInputs("num1", "num2"). WithOutputs("result"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { num1, err := inputs.ByName("num1").FirstSignalPayload() if err != nil { return err @@ -112,7 +113,7 @@ func Test_dotExporter_ExportWithCycles(t *testing.T) { WithDescription("This component multiplies number by 3"). WithInputs("num"). WithOutputs("result"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { num, err := inputs.ByName("num").FirstSignalPayload() if err != nil { return err diff --git a/fmesh.go b/fmesh.go index 02345e0..cc3b45e 100644 --- a/fmesh.go +++ b/fmesh.go @@ -6,6 +6,8 @@ import ( "github.com/hovsep/fmesh/common" "github.com/hovsep/fmesh/component" "github.com/hovsep/fmesh/cycle" + "log" + "os" "sync" ) @@ -31,6 +33,7 @@ type FMesh struct { components *component.Collection cycles *cycle.Group config Config + logger *log.Logger } // New creates a new f-mesh @@ -42,6 +45,7 @@ func New(name string) *FMesh { components: component.NewCollection(), cycles: cycle.NewGroup(), config: defaultConfig, + logger: getDefaultLogger(), } } @@ -72,7 +76,7 @@ func (fm *FMesh) WithComponents(components ...*component.Component) *FMesh { } for _, c := range components { - fm.components = fm.components.With(c) + fm.components = fm.components.With(c.WithPrefixedLogger(fm.Logger())) if c.HasErr() { return fm.WithErr(c.Err()) } @@ -287,3 +291,23 @@ func (fm *FMesh) WithErr(err error) *FMesh { fm.SetErr(err) return fm } + +func (fm *FMesh) WithLogger(logger *log.Logger) *FMesh { + if fm.HasErr() { + return fm + } + + fm.logger = logger + return fm +} + +func (fm *FMesh) Logger() *log.Logger { + return fm.logger +} + +func getDefaultLogger() *log.Logger { + logger := log.Default() + logger.SetOutput(os.Stdout) + logger.SetFlags(log.LstdFlags | log.Lmsgprefix) + return logger +} diff --git a/fmesh_test.go b/fmesh_test.go index c97e21e..beec2dd 100644 --- a/fmesh_test.go +++ b/fmesh_test.go @@ -9,6 +9,7 @@ import ( "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" "github.com/stretchr/testify/assert" + "log" "testing" ) @@ -33,6 +34,7 @@ func TestNew(t *testing.T) { components: component.NewCollection(), cycles: cycle.NewGroup(), config: defaultConfig, + logger: getDefaultLogger(), }, }, { @@ -47,6 +49,7 @@ func TestNew(t *testing.T) { components: component.NewCollection(), cycles: cycle.NewGroup(), config: defaultConfig, + logger: getDefaultLogger(), }, }, } @@ -80,6 +83,7 @@ func TestFMesh_WithDescription(t *testing.T) { components: component.NewCollection(), cycles: cycle.NewGroup(), config: defaultConfig, + logger: getDefaultLogger(), }, }, { @@ -95,6 +99,7 @@ func TestFMesh_WithDescription(t *testing.T) { components: component.NewCollection(), cycles: cycle.NewGroup(), config: defaultConfig, + logger: getDefaultLogger(), }, }, } @@ -134,6 +139,7 @@ func TestFMesh_WithConfig(t *testing.T) { ErrorHandlingStrategy: IgnoreAll, CyclesLimit: 9999, }, + logger: getDefaultLogger(), }, }, } @@ -247,7 +253,7 @@ func TestFMesh_Run(t *testing.T) { WithDescription("This component simply puts a constant on o1"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { outputs.ByName("o1").PutSignals(signal.New(77)) return nil }), @@ -274,7 +280,7 @@ func TestFMesh_Run(t *testing.T) { component.New("c1"). WithDescription("This component just returns an unexpected error"). WithInputs("i1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return errors.New("boom") })), initFM: func(fm *FMesh) { @@ -302,7 +308,7 @@ func TestFMesh_Run(t *testing.T) { WithDescription("This component just sends a number to c2"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { outputs.ByName("o1").PutSignals(signal.New(10)) return nil }), @@ -310,7 +316,7 @@ func TestFMesh_Run(t *testing.T) { WithDescription("This component receives a number from c1 and passes it to c4"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) return nil }), @@ -318,14 +324,14 @@ func TestFMesh_Run(t *testing.T) { WithDescription("This component returns an error, but the mesh is configured to ignore errors"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return errors.New("boom") }), component.New("c4"). WithDescription("This component receives a number from c2 and panics"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { panic("no way") return nil }), @@ -402,7 +408,7 @@ func TestFMesh_Run(t *testing.T) { WithDescription("This component just sends a number to c2"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { outputs.ByName("o1").PutSignals(signal.New(10)) return nil }), @@ -410,7 +416,7 @@ func TestFMesh_Run(t *testing.T) { WithDescription("This component receives a number from c1 and passes it to c4"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) return nil }), @@ -418,14 +424,14 @@ func TestFMesh_Run(t *testing.T) { WithDescription("This component returns an error, but the mesh is configured to ignore errors"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return errors.New("boom") }), component.New("c4"). WithDescription("This component receives a number from c2 and panics, but the mesh is configured to ignore even panics"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) // Even component panicked, it managed to set some data on output "o1" @@ -437,7 +443,7 @@ func TestFMesh_Run(t *testing.T) { WithDescription("This component receives a number from c4"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) return nil }), @@ -609,7 +615,7 @@ func TestFMesh_runCycle(t *testing.T) { component.New("c1"). WithDescription(""). WithInputs("i1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { // No output return nil }), @@ -617,7 +623,7 @@ func TestFMesh_runCycle(t *testing.T) { WithDescription(""). WithInputs("i1"). WithOutputs("o1", "o2"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { // Sets output outputs.ByName("o1").PutSignals(signal.New(1)) @@ -627,7 +633,7 @@ func TestFMesh_runCycle(t *testing.T) { component.New("c3"). WithDescription(""). WithInputs("i1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { // No output return nil }), @@ -822,14 +828,14 @@ func TO_BE_REWRITTEN_FMesh_drainComponents(t *testing.T) { c1 := component.New("c1"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return nil }) c2 := component.New("c2"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return nil }) @@ -869,14 +875,14 @@ func TO_BE_REWRITTEN_FMesh_drainComponents(t *testing.T) { c1 := component.New("c1"). WithInputs("i1", "i2"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return nil }) c2 := component.New("c2"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return nil }) @@ -921,14 +927,14 @@ func TO_BE_REWRITTEN_FMesh_drainComponents(t *testing.T) { c1 := component.New("c1"). WithInputs("i1", "i2"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return nil }) c2 := component.New("c2"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { return nil }) diff --git a/integration_tests/computation/basic_test.go b/integration_tests/computation/basic_test.go index e675b1b..b6b5606 100644 --- a/integration_tests/computation/basic_test.go +++ b/integration_tests/computation/basic_test.go @@ -8,6 +8,7 @@ import ( "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" "github.com/stretchr/testify/assert" + "log" "os" "strings" "testing" @@ -27,7 +28,7 @@ func Test_Math(t *testing.T) { WithDescription("adds 2 to the input"). WithInputs("num"). WithOutputs("res"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { num := inputs.ByName("num").FirstSignalPayloadOrNil() outputs.ByName("res").PutSignals(signal.New(num.(int) + 2)) return nil @@ -37,7 +38,7 @@ func Test_Math(t *testing.T) { WithDescription("multiplies by 3"). WithInputs("num"). WithOutputs("res"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { num := inputs.ByName("num").FirstSignalPayloadOrDefault(0) outputs.ByName("res").PutSignals(signal.New(num.(int) * 3)) return nil @@ -81,7 +82,7 @@ func Test_Readme(t *testing.T) { component.New("concat"). WithInputs("i1", "i2"). WithOutputs("res"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { word1 := inputs.ByName("i1").FirstSignalPayloadOrDefault("").(string) word2 := inputs.ByName("i2").FirstSignalPayloadOrDefault("").(string) @@ -91,7 +92,7 @@ func Test_Readme(t *testing.T) { component.New("case"). WithInputs("i1"). WithOutputs("res"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { inputString := inputs.ByName("i1").FirstSignalPayloadOrDefault("").(string) outputs.ByName("res").PutSignals(signal.New(strings.ToTitle(inputString))) diff --git a/integration_tests/error_handling/chainable_api_test.go b/integration_tests/error_handling/chainable_api_test.go index 64b4179..498c1e9 100644 --- a/integration_tests/error_handling/chainable_api_test.go +++ b/integration_tests/error_handling/chainable_api_test.go @@ -7,6 +7,7 @@ import ( "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" "github.com/stretchr/testify/assert" + "log" "testing" ) @@ -60,12 +61,13 @@ func Test_FMesh(t *testing.T) { test: func(t *testing.T) { fm := fmesh.New("test").WithComponents( component.New("c1").WithInputs("num1", "num2"). - WithOutputs("sum").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { - num1 := inputs.ByName("num1").FirstSignalPayloadOrDefault(0).(int) - num2 := inputs.ByName("num2").FirstSignalPayloadOrDefault(0).(int) - outputs.ByName("sum").PutSignals(signal.New(num1 + num2)) - return nil - }), + WithOutputs("sum"). + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { + num1 := inputs.ByName("num1").FirstSignalPayloadOrDefault(0).(int) + num2 := inputs.ByName("num2").FirstSignalPayloadOrDefault(0).(int) + outputs.ByName("sum").PutSignals(signal.New(num1 + num2)) + return nil + }), ) fm.Components().ByName("c1").InputByName("num1").PutSignals(signal.New(10)) @@ -83,7 +85,7 @@ func Test_FMesh(t *testing.T) { component.New("c1"). WithInputs("num1", "num2"). WithOutputs("sum"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { num1 := inputs.ByName("num1").FirstSignalPayloadOrDefault(0).(int) num2 := inputs.ByName("num2").FirstSignalPayloadOrDefault(0).(int) outputs.ByName("sum").PutSignals(signal.New(num1 + num2)) @@ -108,7 +110,7 @@ func Test_FMesh(t *testing.T) { component.New("c1"). WithInputs("num1", "num2"). WithOutputs("sum"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { num1 := inputs.ByName("num1").FirstSignalPayloadOrDefault(0).(int) num2 := inputs.ByName("num2").FirstSignalPayloadOrDefault(0).(int) outputs.ByName("sum").PutSignals(signal.New(num1 + num2)) @@ -131,7 +133,7 @@ func Test_FMesh(t *testing.T) { test: func(t *testing.T) { fm := fmesh.New("test").WithComponents( component.New("c1").WithInputs("num1", "num2"). - WithOutputs("sum").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithOutputs("sum").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { num1 := inputs.ByName("num1").FirstSignalPayloadOrDefault(0).(int) num2 := inputs.ByName("num2").FirstSignalPayloadOrDefault(0).(int) outputs.ByName("sum").PutSignals(signal.New(num1 + num2)) diff --git a/integration_tests/piping/fan_test.go b/integration_tests/piping/fan_test.go index 6ad6126..86ecd39 100644 --- a/integration_tests/piping/fan_test.go +++ b/integration_tests/piping/fan_test.go @@ -7,6 +7,7 @@ import ( "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" "github.com/stretchr/testify/assert" + "log" "math/rand" "testing" "time" @@ -26,7 +27,7 @@ func Test_Fan(t *testing.T) { component.New("producer"). WithInputs("start"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { outputs.ByName("o1").PutSignals(signal.New(time.Now())) return nil }), @@ -34,7 +35,7 @@ func Test_Fan(t *testing.T) { component.New("consumer1"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { //Bypass received signal to output port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) return nil @@ -43,7 +44,7 @@ func Test_Fan(t *testing.T) { component.New("consumer2"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { //Bypass received signal to output port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) return nil @@ -52,7 +53,7 @@ func Test_Fan(t *testing.T) { component.New("consumer3"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { //Bypass received signal to output port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) return nil @@ -94,7 +95,7 @@ func Test_Fan(t *testing.T) { producer1 := component.New("producer1"). WithInputs("start"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { outputs.ByName("o1").PutSignals(signal.New(rand.Int())) return nil }) @@ -102,7 +103,7 @@ func Test_Fan(t *testing.T) { producer2 := component.New("producer2"). WithInputs("start"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { outputs.ByName("o1").PutSignals(signal.New(rand.Int())) return nil }) @@ -110,14 +111,14 @@ func Test_Fan(t *testing.T) { producer3 := component.New("producer3"). WithInputs("start"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { outputs.ByName("o1").PutSignals(signal.New(rand.Int())) return nil }) consumer := component.New("consumer"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { //Bypass port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1")) return nil diff --git a/integration_tests/ports/waiting_for_inputs_test.go b/integration_tests/ports/waiting_for_inputs_test.go index 424428d..69484ee 100644 --- a/integration_tests/ports/waiting_for_inputs_test.go +++ b/integration_tests/ports/waiting_for_inputs_test.go @@ -7,6 +7,7 @@ import ( "github.com/hovsep/fmesh/port" "github.com/hovsep/fmesh/signal" "github.com/stretchr/testify/assert" + "log" "testing" ) @@ -25,7 +26,7 @@ func Test_WaitingForInputs(t *testing.T) { WithDescription("This component just doubles the input"). WithInputs("i1"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { inputNum := inputs.ByName("i1").FirstSignalPayloadOrDefault(0) outputs.ByName("o1").PutSignals(signal.New(inputNum.(int) * 2)) @@ -43,7 +44,7 @@ func Test_WaitingForInputs(t *testing.T) { WithDescription("This component just sums 2 inputs"). WithInputs("i1", "i2"). WithOutputs("o1"). - WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error { if !inputs.ByNames("i1", "i2").AllHaveSignals() { return component.NewErrWaitForInputs(true) }