Skip to content

Commit

Permalink
Add logger
Browse files Browse the repository at this point in the history
  • Loading branch information
hovsep committed Jan 26, 2025
1 parent b0c4fa5 commit 0bae3d2
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 136 deletions.
27 changes: 23 additions & 4 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"fmt"
"github.com/hovsep/fmesh/common"
"github.com/hovsep/fmesh/port"
"log"
)

type ActivationFunc func(inputs *port.Collection, outputs *port.Collection) error
type ActivationFunc func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error

// Component defines a main building block of FMesh
type Component struct {
Expand All @@ -18,6 +19,7 @@ type Component struct {
inputs *port.Collection
outputs *port.Collection
f ActivationFunc
logger *log.Logger
}

// New creates initialized component
Expand Down Expand Up @@ -218,8 +220,6 @@ func (c *Component) propagateChainErrors() {
}

// MaybeActivate tries to run the activation function if all required conditions are met
// @TODO: hide this method from user
// @TODO: can we remove named return ?
func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
c.propagateChainErrors()

Expand Down Expand Up @@ -247,7 +247,7 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
}

//Invoke the activation func
err := c.f(c.Inputs(), c.Outputs())
err := c.f(c.Inputs(), c.Outputs(), c.Logger())

if errors.Is(err, errWaitingForInputs) {
activationResult = c.newActivationResultWaitingForInputs(err)
Expand Down Expand Up @@ -297,3 +297,22 @@ func (c *Component) WithErr(err error) *Component {
c.SetErr(err)
return c
}

// WithPrefixedLogger creates a new logger prefixed with component name
func (c *Component) WithPrefixedLogger(logger *log.Logger) *Component {
if c.HasErr() {
return c
}

if logger == nil {
return c
}

prefix := fmt.Sprintf("%s : ", c.Name())
c.logger = log.New(logger.Writer(), prefix, logger.Flags())
return c
}

func (c *Component) Logger() *log.Logger {
return c.logger
}
76 changes: 65 additions & 11 deletions component/component_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package component

import (
"bytes"
"errors"
"github.com/hovsep/fmesh/common"
"github.com/hovsep/fmesh/port"
"github.com/hovsep/fmesh/signal"
"github.com/stretchr/testify/assert"
"log"
"testing"
)

Expand Down Expand Up @@ -180,7 +182,7 @@ func TestComponent_WithActivationFunc(t *testing.T) {
name: "happy path",
component: New("c1"),
args: args{
f: func(inputs *port.Collection, outputs *port.Collection) error {
f: func(inputs *port.Collection, outputs *port.Collection, logger *log.Logger) error {
outputs.ByName("out1").PutSignals(signal.New(23))
return nil
},
Expand All @@ -190,14 +192,15 @@ func TestComponent_WithActivationFunc(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
componentAfter := tt.component.WithActivationFunc(tt.args.f)
logger := log.Default()

//Compare activation functions by they result and error
testInputs1 := port.NewCollection().With(port.NewGroup("in1", "in2").PortsOrNil()...)
testInputs2 := port.NewCollection().With(port.NewGroup("in1", "in2").PortsOrNil()...)
testOutputs1 := port.NewCollection().With(port.NewGroup("out1", "out2").PortsOrNil()...)
testOutputs2 := port.NewCollection().With(port.NewGroup("out1", "out2").PortsOrNil()...)
err1 := componentAfter.f(testInputs1, testOutputs1)
err2 := tt.args.f(testInputs2, testOutputs2)
err1 := componentAfter.f(testInputs1, testOutputs1, logger)
err2 := tt.args.f(testInputs2, testOutputs2, logger)
assert.Equal(t, err1, err2)

//Compare signals without keys (because they are random)
Expand Down Expand Up @@ -367,6 +370,7 @@ func TestComponent_MaybeActivate(t *testing.T) {
name string
getComponent func() *Component
wantActivationResult *ActivationResult
loggerAssertions func(t *testing.T, output []byte)
}{
{
name: "component with no activation function and no inputs",
Expand All @@ -392,7 +396,7 @@ func TestComponent_MaybeActivate(t *testing.T) {
c := New("c1").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1"))
})
return c
Expand All @@ -406,7 +410,7 @@ func TestComponent_MaybeActivate(t *testing.T) {
getComponent: func() *Component {
c := New("c1").
WithInputs("i1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
return errors.New("test error")
})
//Only one input set
Expand All @@ -424,7 +428,7 @@ func TestComponent_MaybeActivate(t *testing.T) {
c := New("c1").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
return port.ForwardSignals(inputs.ByName("i1"), outputs.ByName("o1"))
})
//Only one input set
Expand All @@ -441,7 +445,7 @@ func TestComponent_MaybeActivate(t *testing.T) {
c := New("c1").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
panic(errors.New("oh shrimps"))
return nil
})
Expand All @@ -460,7 +464,7 @@ func TestComponent_MaybeActivate(t *testing.T) {
c := New("c1").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
panic("oh shrimps")
return nil
})
Expand All @@ -479,7 +483,7 @@ func TestComponent_MaybeActivate(t *testing.T) {
c1 := New("c1").
WithInputs("i1", "i2").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
if !inputs.ByNames("i1", "i2").AllHaveSignals() {
return NewErrWaitForInputs(false)
}
Expand All @@ -504,7 +508,7 @@ func TestComponent_MaybeActivate(t *testing.T) {
c1 := New("c1").
WithInputs("i1", "i2").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
if !inputs.ByNames("i1", "i2").AllHaveSignals() {
return NewErrWaitForInputs(true)
}
Expand Down Expand Up @@ -545,10 +549,56 @@ func TestComponent_MaybeActivate(t *testing.T) {
WithActivationCode(ActivationCodeUndefined).
WithErr(errors.New("some error")),
},
{
name: "component not activated, logger must be empty",
getComponent: func() *Component {
c := New("c1").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
log.Println("This must not be logged, as component must not activate")
return nil
})
return c
},

wantActivationResult: NewActivationResult("c1").
SetActivated(false).
WithActivationCode(ActivationCodeNoInput),
loggerAssertions: func(t *testing.T, output []byte) {
assert.Len(t, output, 0)
},
},
{
name: "activated with error, with logging",
getComponent: func() *Component {
c := New("c1").
WithInputs("i1").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
log.Println("This line must be logged")
return errors.New("test error")
})
//Only one input set
c.InputByName("i1").PutSignals(signal.New(123))
return c
},
wantActivationResult: NewActivationResult("c1").
SetActivated(true).
WithActivationCode(ActivationCodeReturnedError).
WithActivationError(errors.New("component returned an error: test error")),
loggerAssertions: func(t *testing.T, output []byte) {
assert.Len(t, output, 2+3+21+24) //lengths of component name, prefix, flags and logged message
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotActivationResult := tt.getComponent().MaybeActivate()
logger := log.Default()

var loggerOutput bytes.Buffer
logger.SetOutput(&loggerOutput)

gotActivationResult := tt.getComponent().WithPrefixedLogger(logger).MaybeActivate()
assert.Equal(t, tt.wantActivationResult.Activated(), gotActivationResult.Activated())
assert.Equal(t, tt.wantActivationResult.ComponentName(), gotActivationResult.ComponentName())
assert.Equal(t, tt.wantActivationResult.Code(), gotActivationResult.Code())
Expand All @@ -558,6 +608,10 @@ func TestComponent_MaybeActivate(t *testing.T) {
assert.False(t, gotActivationResult.IsError())
}

if tt.loggerAssertions != nil {
tt.loggerAssertions(t, loggerOutput.Bytes())
}

})
}
}
Expand Down
87 changes: 45 additions & 42 deletions examples/async_input/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/hovsep/fmesh/component"
"github.com/hovsep/fmesh/port"
"github.com/hovsep/fmesh/signal"
"log"
"net/http"
"time"
)
Expand Down Expand Up @@ -89,61 +90,63 @@ func getMesh() *fmesh.FMesh {
crawler := component.New("web crawler").
WithDescription("gets http headers from given url").
WithInputs("url").
WithOutputs("errors", "headers").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
if !inputs.ByName("url").HasSignals() {
return component.NewErrWaitForInputs(false)
}

allUrls, err := inputs.ByName("url").AllSignalsPayloads()
if err != nil {
return err
}

for _, urlVal := range allUrls {
WithOutputs("errors", "headers").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
if !inputs.ByName("url").HasSignals() {
return component.NewErrWaitForInputs(false)
}

url := urlVal.(string)
//All urls will be crawled sequentially
// in order to call them concurrently we need run each request in separate goroutine and handle synchronization (e.g. waitgroup)
response, err := client.Get(url)
allUrls, err := inputs.ByName("url").AllSignalsPayloads()
if err != nil {
outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("got error: %w from url: %s", err, url)))
continue
return err
}

if len(response.Header) == 0 {
outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("no headers for url %s", url)))
continue
}
for _, urlVal := range allUrls {

outputs.ByName("headers").PutSignals(signal.New(map[string]http.Header{
url: response.Header,
}))
}
url := urlVal.(string)
//All urls will be crawled sequentially
// in order to call them concurrently we need run each request in separate goroutine and handle synchronization (e.g. waitgroup)
response, err := client.Get(url)
if err != nil {
outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("got error: %w from url: %s", err, url)))
continue
}

if len(response.Header) == 0 {
outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("no headers for url %s", url)))
continue
}

outputs.ByName("headers").PutSignals(signal.New(map[string]http.Header{
url: response.Header,
}))
}

return nil
})
return nil
})

logger := component.New("error logger").
WithDescription("logs http errors").
WithInputs("error").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error {
if !inputs.ByName("error").HasSignals() {
return component.NewErrWaitForInputs(false)
}
WithInputs("error").
WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection, log *log.Logger) error {
if !inputs.ByName("error").HasSignals() {
return component.NewErrWaitForInputs(false)
}

allErrors, err := inputs.ByName("error").AllSignalsPayloads()
if err != nil {
return err
}
allErrors, err := inputs.ByName("error").AllSignalsPayloads()
if err != nil {
return err
}

for _, errVal := range allErrors {
e := errVal.(error)
if e != nil {
fmt.Println("Error logger says:", e)
for _, errVal := range allErrors {
e := errVal.(error)
if e != nil {
fmt.Println("Error logger says:", e)
}
}
}

return nil
})
return nil
})

//Define pipes
crawler.OutputByName("errors").PipeTo(logger.InputByName("error"))
Expand Down
Loading

0 comments on commit 0bae3d2

Please sign in to comment.