diff --git a/examples/async_input/main.go b/examples/async_input/main.go new file mode 100644 index 0000000..c7743ec --- /dev/null +++ b/examples/async_input/main.go @@ -0,0 +1,155 @@ +package main + +import ( + "fmt" + "github.com/hovsep/fmesh" + "github.com/hovsep/fmesh/component" + "github.com/hovsep/fmesh/port" + "github.com/hovsep/fmesh/signal" + "net/http" + "time" +) + +// This example processes 1 url every 3 seconds +// NOTE: urls are not crawled concurrently, because fm has only 1 worker (crawler component) +func main() { + fm := getMesh() + + urls := []string{ + "http://fffff.com", + "https://google.com", + "http://habr.com", + "http://localhost:80", + "https://postman-echo.com/delay/1", + "https://postman-echo.com/delay/3", + "https://postman-echo.com/delay/5", + "https://postman-echo.com/delay/10", + } + + ticker := time.NewTicker(3 * time.Second) + resultsChan := make(chan []any) + doneChan := make(chan struct{}) // Signals when all urls are processed + + //Producer goroutine + go func() { + for { + select { + case <-ticker.C: + if len(urls) == 0 { + close(resultsChan) + return + } + //Pop an url + url := urls[0] + urls = urls[1:] + + fmt.Println("produce:", url) + + fm.Components().ByName("web crawler").InputByName("url").PutSignals(signal.New(url)) + _, err := fm.Run() + if err != nil { + fmt.Println("fmesh returned error ", err) + } + + if fm.Components().ByName("web crawler").OutputByName("headers").HasSignals() { + results, err := fm.Components().ByName("web crawler").OutputByName("headers").AllSignalsPayloads() + if err != nil { + fmt.Println("Failed to get results ", err) + } + fm.Components().ByName("web crawler").OutputByName("headers").Clear() //@TODO maybe we can add fm.Reset() for cases when FMesh is reused (instead of cleaning ports explicitly) + resultsChan <- results + } + } + } + }() + + //Consumer goroutine + go func() { + for { + select { + case r, ok := <-resultsChan: + if !ok { + fmt.Println("results chan is closed. shutting down the reader") + doneChan <- struct{}{} + return + } + fmt.Println(fmt.Sprintf("consume: %v", r)) + } + } + }() + + <-doneChan +} + +func getMesh() *fmesh.FMesh { + //Setup dependencies + client := &http.Client{} + + //Define components + crawler := component.New("web crawler"). + WithDescription("gets http headers from given url"). + WithInputs("url"). + WithOutputs("errors", "headers").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + if !inputs.ByName("url").HasSignals() { + return component.NewErrWaitForInputs(false) + } + + allUrls, err := inputs.ByName("url").AllSignalsPayloads() + if err != nil { + return err + } + + for _, urlVal := range allUrls { + + url := urlVal.(string) + //All urls will be crawled sequentially + // in order to call them concurrently we need run each request in separate goroutine and handle synchronization (e.g. waitgroup) + response, err := client.Get(url) + if err != nil { + outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("got error: %w from url: %s", err, url))) + continue + } + + if len(response.Header) == 0 { + outputs.ByName("errors").PutSignals(signal.New(fmt.Errorf("no headers for url %s", url))) + continue + } + + outputs.ByName("headers").PutSignals(signal.New(map[string]http.Header{ + url: response.Header, + })) + } + + return nil + }) + + logger := component.New("error logger"). + WithDescription("logs http errors"). + WithInputs("error").WithActivationFunc(func(inputs *port.Collection, outputs *port.Collection) error { + if !inputs.ByName("error").HasSignals() { + return component.NewErrWaitForInputs(false) + } + + allErrors, err := inputs.ByName("error").AllSignalsPayloads() + if err != nil { + return err + } + + for _, errVal := range allErrors { + e := errVal.(error) + if e != nil { + fmt.Println("Error logger says:", e) + } + } + + return nil + }) + + //Define pipes + crawler.OutputByName("errors").PipeTo(logger.InputByName("error")) + + return fmesh.New("web scraper").WithConfig(fmesh.Config{ + ErrorHandlingStrategy: fmesh.StopOnFirstErrorOrPanic, + }).WithComponents(crawler, logger) + +} diff --git a/experiments/11_nested_meshes/component.go b/experiments/11_nested_meshes/component.go deleted file mode 100644 index 563c713..0000000 --- a/experiments/11_nested_meshes/component.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "errors" - "fmt" - "runtime/debug" -) - -// @TODO: add a builder pattern implementation -type Component struct { - name string - description string - inputs Ports - outputs Ports - handler func(inputs Ports, outputs Ports) error -} - -func (c *Component) activate() (aRes ActivationResult) { - defer func() { - if r := recover(); r != nil { - aRes = ActivationResult{ - activated: true, - componentName: c.name, - err: fmt.Errorf("panicked with %w, stacktrace: %s", r, debug.Stack()), - } - } - }() - - //@TODO:: https://github.com/hovsep/fmesh/issues/15 - if !c.inputs.anyHasValue() { - //No inputs set, stop here - - aRes = ActivationResult{ - activated: false, - componentName: c.name, - err: nil, - } - - return - } - - //Run the computation - err := c.handler(c.inputs, c.outputs) - - if isWaitingForInput(err) { - aRes = ActivationResult{ - activated: false, - componentName: c.name, - err: nil, - } - - if !errors.Is(err, errWaitingForInputKeepInputs) { - c.inputs.clearAll() - } - - return - } - - //Clear inputs - c.inputs.clearAll() - - if err != nil { - aRes = ActivationResult{ - activated: true, - componentName: c.name, - err: fmt.Errorf("failed to activate component: %w", err), - } - - return - } - - aRes = ActivationResult{ - activated: true, - componentName: c.name, - err: nil, - } - - return -} - -func (c *Component) flushOutputs() { - for _, out := range c.outputs { - if !out.hasSignal() || len(out.pipes) == 0 { - continue - } - - for _, pipe := range out.pipes { - //Multiplexing - pipe.flush() - } - out.clearSignal() - } -} diff --git a/experiments/11_nested_meshes/components.go b/experiments/11_nested_meshes/components.go deleted file mode 100644 index 95a556e..0000000 --- a/experiments/11_nested_meshes/components.go +++ /dev/null @@ -1,12 +0,0 @@ -package main - -type Components []*Component - -func (components Components) byName(name string) *Component { - for _, c := range components { - if c.name == name { - return c - } - } - return nil -} diff --git a/experiments/11_nested_meshes/errors.go b/experiments/11_nested_meshes/errors.go deleted file mode 100644 index 50b99c0..0000000 --- a/experiments/11_nested_meshes/errors.go +++ /dev/null @@ -1,44 +0,0 @@ -package main - -import ( - "errors" - "sync" -) - -type ErrorHandlingStrategy int - -const ( - StopOnFirstError ErrorHandlingStrategy = iota - IgnoreAll -) - -var ( - errWaitingForInputResetInputs = errors.New("component is not ready (waiting for one or more inputs). All inputs will be reset") - errWaitingForInputKeepInputs = errors.New("component is not ready (waiting for one or more inputs). All inputs will be kept") -) - -// ActivationResult defines the result (possibly an error) of the activation of given component -type ActivationResult struct { - activated bool - componentName string - err error -} - -// HopResult describes the outcome of every single component activation in single hop -type HopResult struct { - sync.Mutex - activationResults map[string]error -} - -func (r *HopResult) hasErrors() bool { - for _, err := range r.activationResults { - if err != nil { - return true - } - } - return false -} - -func isWaitingForInput(err error) bool { - return errors.Is(err, errWaitingForInputResetInputs) || errors.Is(err, errWaitingForInputKeepInputs) -} diff --git a/experiments/11_nested_meshes/fmesh.go b/experiments/11_nested_meshes/fmesh.go deleted file mode 100644 index 9439f9a..0000000 --- a/experiments/11_nested_meshes/fmesh.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "sync" -) - -type FMesh struct { - Name string - Description string - Components Components - ErrorHandlingStrategy -} - -func (fm *FMesh) activateComponents() *HopResult { - hop := &HopResult{ - activationResults: make(map[string]error), - } - activationResultsChan := make(chan ActivationResult) - doneChan := make(chan struct{}) - - var wg sync.WaitGroup - - go func() { - for { - select { - case aRes := <-activationResultsChan: - if aRes.activated { - hop.Lock() - hop.activationResults[aRes.componentName] = aRes.err - hop.Unlock() - } - case <-doneChan: - return - } - } - }() - - for _, c := range fm.Components { - wg.Add(1) - c := c - go func() { - defer wg.Done() - activationResultsChan <- c.activate() - }() - } - - wg.Wait() - doneChan <- struct{}{} - return hop -} - -func (fm *FMesh) flushPipes() { - for _, c := range fm.Components { - c.flushOutputs() - } -} - -func (fm *FMesh) run() ([]*HopResult, error) { - hops := make([]*HopResult, 0) - for { - hopReport := fm.activateComponents() - hops = append(hops, hopReport) - - if fm.ErrorHandlingStrategy == StopOnFirstError && hopReport.hasErrors() { - return hops, fmt.Errorf("Hop #%d finished with errors. Stopping fmesh. Report: %v", len(hops), hopReport.activationResults) - } - - if len(hopReport.activationResults) == 0 { - return hops, nil - } - fm.flushPipes() - } -} diff --git a/experiments/11_nested_meshes/go.mod b/experiments/11_nested_meshes/go.mod deleted file mode 100644 index f8658fe..0000000 --- a/experiments/11_nested_meshes/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module github.com/hovsep/fmesh/experiments/11_nested_meshes - -go 1.23 diff --git a/experiments/11_nested_meshes/main.go b/experiments/11_nested_meshes/main.go deleted file mode 100644 index 1fc89fd..0000000 --- a/experiments/11_nested_meshes/main.go +++ /dev/null @@ -1,153 +0,0 @@ -package main - -import ( - "fmt" -) - -// This example demonstrates the ability of meshes to be nested (a component of mesh can be a mesh itself and nesting depth is unlimited) -func main() { - //Define components - c1 := &Component{ - name: "math", - description: "a * b + c", - inputs: Ports{ - "a": &Port{}, - "b": &Port{}, - "c": &Port{}, - }, - outputs: Ports{ - "out": &Port{}, - }, - handler: func(inputs Ports, outputs Ports) error { - if !inputs.manyByName("a", "b", "c").allHaveValue() { - return errWaitingForInputKeepInputs - } - - //This component is using a nested mesh for multiplication - multiplierWithLogger := getSubMesh() - - //Pass inputs - forwardSignal(inputs.byName("a"), multiplierWithLogger.Components.byName("Multiplier").inputs.byName("a")) - forwardSignal(inputs.byName("b"), multiplierWithLogger.Components.byName("Multiplier").inputs.byName("b")) - - //Run submesh inside a component - multiplierWithLogger.run() - - //Read the multiplication result - multiplicationResult := multiplierWithLogger.Components.byName("Multiplier").outputs.byName("result").getSignal().GetValue().(int) - - //Do the rest of calculation - res := multiplicationResult + inputs.byName("c").getSignal().GetValue().(int) - - outputs.byName("out").putSignal(newSignal(res)) - return nil - }, - } - - c2 := &Component{ - name: "add constant", - description: "a + 35", - inputs: Ports{ - "a": &Port{}, - }, - outputs: Ports{ - "out": &Port{}, - }, - handler: func(inputs Ports, outputs Ports) error { - if inputs.byName("a").hasSignal() { - a := inputs.byName("a").getSignal().GetValue().(int) - outputs.byName("out").putSignal(newSignal(a + 35)) - } - return nil - }, - } - - //Define pipes - c1.outputs.byName("out").CreatePipesTo(c2.inputs.byName("a")) - - //Build mesh - fm := &FMesh{ - Components: Components{c1, c2}, - ErrorHandlingStrategy: StopOnFirstError, - } - - //Set inputs - - c1.inputs.byName("a").putSignal(newSignal(2)) - c1.inputs.byName("b").putSignal(newSignal(3)) - c1.inputs.byName("c").putSignal(newSignal(4)) - - //Run the mesh - hops, err := fm.run() - if err != nil { - fmt.Println(err) - } - _ = hops - - res := c2.outputs.byName("out").getSignal().GetValue() - - fmt.Printf("outter fmesh result %v", res) -} - -func getSubMesh() *FMesh { - multiplier := &Component{ - name: "Multiplier", - description: "This component multiplies numbers on it's inputs", - inputs: Ports{ - "a": &Port{}, - "b": &Port{}, - }, - outputs: Ports{ - "bypass_a": &Port{}, - "bypass_b": &Port{}, - - "result": &Port{}, - }, - handler: func(inputs Ports, outputs Ports) error { - //@TODO: simplify waiting API - if !inputs.manyByName("a", "b").allHaveValue() { - return errWaitingForInputKeepInputs - } - - //Bypass input signals, so logger can get them - forwardSignal(inputs.byName("a"), outputs.byName("bypass_a")) - forwardSignal(inputs.byName("b"), outputs.byName("bypass_b")) - - a, b := inputs.byName("a").getSignal().GetValue().(int), inputs.byName("b").getSignal().GetValue().(int) - - outputs.byName("result").putSignal(newSignal(a * b)) - return nil - }, - } - - logger := &Component{ - name: "Logger", - description: "This component logs inputs of multiplier", - inputs: Ports{ - "a": &Port{}, - "b": &Port{}, - }, - outputs: nil, //No output - handler: func(inputs Ports, outputs Ports) error { - if inputs.byName("a").hasSignal() { - fmt.Println(fmt.Sprintf("Inner logger says: a is %v", inputs.byName("a").getSignal().GetValue())) - } - - if inputs.byName("b").hasSignal() { - fmt.Println(fmt.Sprintf("Inner logger says: b is %v", inputs.byName("b").getSignal().GetValue())) - } - - return nil - }, - } - - multiplier.outputs.byName("bypass_a").CreatePipesTo(logger.inputs.byName("a")) - multiplier.outputs.byName("bypass_b").CreatePipesTo(logger.inputs.byName("b")) - - return &FMesh{ - Name: "Logged multiplicator", - Description: "multiply 2 numbers and log inputs into std out", - Components: Components{multiplier, logger}, - ErrorHandlingStrategy: StopOnFirstError, - } -} diff --git a/experiments/11_nested_meshes/pipe.go b/experiments/11_nested_meshes/pipe.go deleted file mode 100644 index 8737a62..0000000 --- a/experiments/11_nested_meshes/pipe.go +++ /dev/null @@ -1,12 +0,0 @@ -package main - -type Pipe struct { - From *Port - To *Port -} - -type Pipes []*Pipe - -func (p *Pipe) flush() { - forwardSignal(p.From, p.To) -} diff --git a/experiments/11_nested_meshes/port.go b/experiments/11_nested_meshes/port.go deleted file mode 100644 index 7350a2c..0000000 --- a/experiments/11_nested_meshes/port.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -type Port struct { - signal Signal - pipes Pipes //Refs to pipes connected to that port (no in\out semantics) -} - -func (p *Port) getSignal() Signal { - if p.signal.IsAggregate() { - return p.signal.(*AggregateSignal) - } - return p.signal.(*SingleSignal) -} - -func (p *Port) putSignal(sig Signal) { - if p.hasSignal() { - //Aggregate signal - var resValues []*SingleSignal - - //Extract existing signal(s) - if p.signal.IsSingle() { - resValues = append(resValues, p.signal.(*SingleSignal)) - } else if p.signal.IsAggregate() { - resValues = p.signal.(*AggregateSignal).val - } - - //Add new signal(s) - if sig.IsSingle() { - resValues = append(resValues, sig.(*SingleSignal)) - } else if sig.IsAggregate() { - resValues = append(resValues, sig.(*AggregateSignal).val...) - } - - p.signal = &AggregateSignal{ - val: resValues, - } - return - } - - //Single signal - p.signal = sig -} - -func (p *Port) clearSignal() { - p.signal = nil -} - -func (p *Port) hasSignal() bool { - return p.signal != nil -} - -// Adds pipe reference to port, so all pipes of the port are easily iterable (no in\out semantics) -func (p *Port) addPipeRef(pipe *Pipe) { - p.pipes = append(p.pipes, pipe) -} - -// CreatePipeTo must be used to explicitly set pipe direction -func (p *Port) CreatePipesTo(toPorts ...*Port) { - for _, toPort := range toPorts { - newPipe := &Pipe{ - From: p, - To: toPort, - } - p.addPipeRef(newPipe) - toPort.addPipeRef(newPipe) - } - -} diff --git a/experiments/11_nested_meshes/ports.go b/experiments/11_nested_meshes/ports.go deleted file mode 100644 index b53e6d2..0000000 --- a/experiments/11_nested_meshes/ports.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -// @TODO: this type must have good tooling for working with collection -// like adding new ports, filtering and so on -type Ports map[string]*Port - -// @TODO: add error handling (e.g. when port does not exist) -func (ports Ports) byName(name string) *Port { - return ports[name] -} - -func (ports Ports) manyByName(names ...string) Ports { - selectedPorts := make(Ports) - - for _, name := range names { - if p, ok := ports[name]; ok { - selectedPorts[name] = p - } - } - - return selectedPorts -} - -func (ports Ports) anyHasValue() bool { - for _, p := range ports { - if p.hasSignal() { - return true - } - } - - return false -} - -func (ports Ports) allHaveValue() bool { - for _, p := range ports { - if !p.hasSignal() { - return false - } - } - - return true -} - -func (ports Ports) setAll(val Signal) { - for _, p := range ports { - p.putSignal(val) - } -} - -func (ports Ports) clearAll() { - for _, p := range ports { - p.clearSignal() - } -} diff --git a/experiments/11_nested_meshes/signal.go b/experiments/11_nested_meshes/signal.go deleted file mode 100644 index 17d7a9c..0000000 --- a/experiments/11_nested_meshes/signal.go +++ /dev/null @@ -1,48 +0,0 @@ -package main - -type Signal interface { - IsAggregate() bool - IsSingle() bool - GetValue() any -} - -// @TODO: enhance naming -type SingleSignal struct { - val any -} - -type AggregateSignal struct { - val []*SingleSignal -} - -func (s SingleSignal) IsAggregate() bool { - return false -} - -func (s SingleSignal) IsSingle() bool { - return !s.IsAggregate() -} - -func (s AggregateSignal) IsAggregate() bool { - return true -} - -func (s AggregateSignal) IsSingle() bool { - return !s.IsAggregate() -} - -func (s AggregateSignal) GetValue() any { - return s.val -} - -func (s SingleSignal) GetValue() any { - return s.val -} - -func newSignal(val any) *SingleSignal { - return &SingleSignal{val: val} -} - -func forwardSignal(source *Port, dest *Port) { - dest.putSignal(source.getSignal()) -} diff --git a/experiments/12_async_input/component.go b/experiments/12_async_input/component.go deleted file mode 100644 index 563c713..0000000 --- a/experiments/12_async_input/component.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "errors" - "fmt" - "runtime/debug" -) - -// @TODO: add a builder pattern implementation -type Component struct { - name string - description string - inputs Ports - outputs Ports - handler func(inputs Ports, outputs Ports) error -} - -func (c *Component) activate() (aRes ActivationResult) { - defer func() { - if r := recover(); r != nil { - aRes = ActivationResult{ - activated: true, - componentName: c.name, - err: fmt.Errorf("panicked with %w, stacktrace: %s", r, debug.Stack()), - } - } - }() - - //@TODO:: https://github.com/hovsep/fmesh/issues/15 - if !c.inputs.anyHasValue() { - //No inputs set, stop here - - aRes = ActivationResult{ - activated: false, - componentName: c.name, - err: nil, - } - - return - } - - //Run the computation - err := c.handler(c.inputs, c.outputs) - - if isWaitingForInput(err) { - aRes = ActivationResult{ - activated: false, - componentName: c.name, - err: nil, - } - - if !errors.Is(err, errWaitingForInputKeepInputs) { - c.inputs.clearAll() - } - - return - } - - //Clear inputs - c.inputs.clearAll() - - if err != nil { - aRes = ActivationResult{ - activated: true, - componentName: c.name, - err: fmt.Errorf("failed to activate component: %w", err), - } - - return - } - - aRes = ActivationResult{ - activated: true, - componentName: c.name, - err: nil, - } - - return -} - -func (c *Component) flushOutputs() { - for _, out := range c.outputs { - if !out.hasSignal() || len(out.pipes) == 0 { - continue - } - - for _, pipe := range out.pipes { - //Multiplexing - pipe.flush() - } - out.clearSignal() - } -} diff --git a/experiments/12_async_input/components.go b/experiments/12_async_input/components.go deleted file mode 100644 index 95a556e..0000000 --- a/experiments/12_async_input/components.go +++ /dev/null @@ -1,12 +0,0 @@ -package main - -type Components []*Component - -func (components Components) byName(name string) *Component { - for _, c := range components { - if c.name == name { - return c - } - } - return nil -} diff --git a/experiments/12_async_input/errors.go b/experiments/12_async_input/errors.go deleted file mode 100644 index 50b99c0..0000000 --- a/experiments/12_async_input/errors.go +++ /dev/null @@ -1,44 +0,0 @@ -package main - -import ( - "errors" - "sync" -) - -type ErrorHandlingStrategy int - -const ( - StopOnFirstError ErrorHandlingStrategy = iota - IgnoreAll -) - -var ( - errWaitingForInputResetInputs = errors.New("component is not ready (waiting for one or more inputs). All inputs will be reset") - errWaitingForInputKeepInputs = errors.New("component is not ready (waiting for one or more inputs). All inputs will be kept") -) - -// ActivationResult defines the result (possibly an error) of the activation of given component -type ActivationResult struct { - activated bool - componentName string - err error -} - -// HopResult describes the outcome of every single component activation in single hop -type HopResult struct { - sync.Mutex - activationResults map[string]error -} - -func (r *HopResult) hasErrors() bool { - for _, err := range r.activationResults { - if err != nil { - return true - } - } - return false -} - -func isWaitingForInput(err error) bool { - return errors.Is(err, errWaitingForInputResetInputs) || errors.Is(err, errWaitingForInputKeepInputs) -} diff --git a/experiments/12_async_input/fmesh.go b/experiments/12_async_input/fmesh.go deleted file mode 100644 index 9439f9a..0000000 --- a/experiments/12_async_input/fmesh.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "sync" -) - -type FMesh struct { - Name string - Description string - Components Components - ErrorHandlingStrategy -} - -func (fm *FMesh) activateComponents() *HopResult { - hop := &HopResult{ - activationResults: make(map[string]error), - } - activationResultsChan := make(chan ActivationResult) - doneChan := make(chan struct{}) - - var wg sync.WaitGroup - - go func() { - for { - select { - case aRes := <-activationResultsChan: - if aRes.activated { - hop.Lock() - hop.activationResults[aRes.componentName] = aRes.err - hop.Unlock() - } - case <-doneChan: - return - } - } - }() - - for _, c := range fm.Components { - wg.Add(1) - c := c - go func() { - defer wg.Done() - activationResultsChan <- c.activate() - }() - } - - wg.Wait() - doneChan <- struct{}{} - return hop -} - -func (fm *FMesh) flushPipes() { - for _, c := range fm.Components { - c.flushOutputs() - } -} - -func (fm *FMesh) run() ([]*HopResult, error) { - hops := make([]*HopResult, 0) - for { - hopReport := fm.activateComponents() - hops = append(hops, hopReport) - - if fm.ErrorHandlingStrategy == StopOnFirstError && hopReport.hasErrors() { - return hops, fmt.Errorf("Hop #%d finished with errors. Stopping fmesh. Report: %v", len(hops), hopReport.activationResults) - } - - if len(hopReport.activationResults) == 0 { - return hops, nil - } - fm.flushPipes() - } -} diff --git a/experiments/12_async_input/go.mod b/experiments/12_async_input/go.mod deleted file mode 100644 index ba77271..0000000 --- a/experiments/12_async_input/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module github.com/hovsep/fmesh/experiments/12_async_input - -go 1.23 diff --git a/experiments/12_async_input/main.go b/experiments/12_async_input/main.go deleted file mode 100644 index ca108ec..0000000 --- a/experiments/12_async_input/main.go +++ /dev/null @@ -1,187 +0,0 @@ -package main - -import ( - "fmt" - "net/http" - "time" -) - -// This example demonstrates how fmesh can be fed input asynchronously -func main() { - //exampleFeedBatches() - exampleFeedSequentially() -} - -// This example processes 1 url every 3 seconds -// NOTE: urls are not crawled concurrently, because fm has only 1 worker (crawler component) -func exampleFeedSequentially() { - fm := getMesh() - - urls := []string{ - "http://fffff.com", - "https://google.com", - "http://habr.com", - "http://localhost:80", - "https://postman-echo.com/delay/1", - "https://postman-echo.com/delay/3", - "https://postman-echo.com/delay/5", - "https://postman-echo.com/delay/10", - } - - ticker := time.NewTicker(3 * time.Second) - resultsChan := make(chan []any) - doneChan := make(chan struct{}) // Signals when all urls are processed - - //Feeder routine - go func() { - for { - select { - case <-ticker.C: - if len(urls) == 0 { - close(resultsChan) - return - } - //Pop an url - url := urls[0] - urls = urls[1:] - - fmt.Println("feed this url:", url) - - fm.Components.byName("web crawler").inputs.byName("url").putSignal(newSignal(url)) - _, err := fm.run() - if err != nil { - fmt.Println("fmesh returned error ", err) - } - - if fm.Components.byName("web crawler").outputs.byName("headers").hasSignal() { - results := fm.Components.byName("web crawler").outputs.byName("headers").getSignal().AllValues() - fm.Components.byName("web crawler").outputs.byName("headers").clearSignal() //@TODO maybe we can add fm.Reset() for cases when FMesh is reused (instead of cleaning ports explicitly) - resultsChan <- results - } - } - } - }() - - //Result reader routine - go func() { - for { - select { - case r, ok := <-resultsChan: - if !ok { - fmt.Println("results chan is closed. shutting down the reader") - doneChan <- struct{}{} - return - } - fmt.Println(fmt.Sprintf("got results from channel: %v", r)) - } - } - }() - - <-doneChan -} - -// This example leverages signal aggregation, so urls are pushed into fmesh all at once -// so we wait for all urls to be processed and only them we can read results -func exampleFeedBatches() { - batch := []string{ - "http://fffff.com", - "https://google.com", - "http://habr.com", - "http://localhost:80", - } - - fm := getMesh() - - for _, url := range batch { - fm.Components.byName("web crawler").inputs.byName("url").putSignal(newSignal(url)) - } - - _, err := fm.run() - if err != nil { - fmt.Println("fmesh returned error ", err) - } - - if fm.Components.byName("web crawler").outputs.byName("headers").hasSignal() { - results := fm.Components.byName("web crawler").outputs.byName("headers").getSignal().AllValues() - fmt.Printf("results: %v", results) - } -} - -func getMesh() *FMesh { - //Setup dependencies - client := &http.Client{} - - //Define components - crawler := &Component{ - name: "web crawler", - description: "gets http headers from given url", - inputs: Ports{ - "url": &Port{}, - }, - outputs: Ports{ - "errors": &Port{}, - "headers": &Port{}, - }, - handler: func(inputs Ports, outputs Ports) error { - if !inputs.byName("url").hasSignal() { - return errWaitingForInputResetInputs - } - - for _, sigVal := range inputs.byName("url").getSignal().AllValues() { - - url := sigVal.(string) - //All urls incoming as aggregatet signal will be crawled sequentially - // in order to call them concurrently we need run each request in separate goroutine and handle synchronization (e.g. waitgroup) - response, err := client.Get(url) - if err != nil { - outputs.byName("errors").putSignal(newSignal(fmt.Errorf("got error: %w from url: %s", err, url))) - continue - } - - if len(response.Header) == 0 { - outputs.byName("errors").putSignal(newSignal(fmt.Errorf("no headers for url %s", url))) - continue - } - - outputs.byName("headers").putSignal(newSignal(map[string]http.Header{ - url: response.Header, - })) - } - - return nil - }, - } - - logger := &Component{ - name: "error logger", - description: "logs http errors", - inputs: Ports{ - "error": &Port{}, - }, - outputs: nil, - handler: func(inputs Ports, outputs Ports) error { - if !inputs.byName("error").hasSignal() { - return errWaitingForInputResetInputs - } - - for _, sigVal := range inputs.byName("error").getSignal().AllValues() { - err := sigVal.(error) - if err != nil { - fmt.Println("Error logger says:", err) - } - } - - return nil - }, - } - - //Define pipes - crawler.outputs.byName("errors").CreatePipesTo(logger.inputs.byName("error")) - - //Build mesh - return &FMesh{ - Components: Components{crawler, logger}, - ErrorHandlingStrategy: StopOnFirstError, - } - -} diff --git a/experiments/12_async_input/pipe.go b/experiments/12_async_input/pipe.go deleted file mode 100644 index 8737a62..0000000 --- a/experiments/12_async_input/pipe.go +++ /dev/null @@ -1,12 +0,0 @@ -package main - -type Pipe struct { - From *Port - To *Port -} - -type Pipes []*Pipe - -func (p *Pipe) flush() { - forwardSignal(p.From, p.To) -} diff --git a/experiments/12_async_input/port.go b/experiments/12_async_input/port.go deleted file mode 100644 index 97c1358..0000000 --- a/experiments/12_async_input/port.go +++ /dev/null @@ -1,76 +0,0 @@ -package main - -type Port struct { - signal Signal - pipes Pipes //Refs to pipes connected to that port (no in\out semantics) -} - -func (p *Port) getSignal() Signal { - if p == nil { - panic("invalid port") - } - - if !p.hasSignal() { - return nil - } - - if p.signal.IsAggregate() { - return p.signal.(*AggregateSignal) - } - return p.signal.(*SingleSignal) -} - -func (p *Port) putSignal(sig Signal) { - if p.hasSignal() { - //Aggregate signal - var resValues []*SingleSignal - - //Extract existing signal(s) - if p.signal.IsSingle() { - resValues = append(resValues, p.signal.(*SingleSignal)) - } else if p.signal.IsAggregate() { - resValues = p.signal.(*AggregateSignal).val - } - - //Add new signal(s) - if sig.IsSingle() { - resValues = append(resValues, sig.(*SingleSignal)) - } else if sig.IsAggregate() { - resValues = append(resValues, sig.(*AggregateSignal).val...) - } - - p.signal = &AggregateSignal{ - val: resValues, - } - return - } - - //Single signal - p.signal = sig -} - -func (p *Port) clearSignal() { - p.signal = nil -} - -func (p *Port) hasSignal() bool { - return p.signal != nil -} - -// Adds pipe reference to port, so all pipes of the port are easily iterable (no in\out semantics) -func (p *Port) addPipeRef(pipe *Pipe) { - p.pipes = append(p.pipes, pipe) -} - -// CreatePipeTo must be used to explicitly set pipe direction -func (p *Port) CreatePipesTo(toPorts ...*Port) { - for _, toPort := range toPorts { - newPipe := &Pipe{ - From: p, - To: toPort, - } - p.addPipeRef(newPipe) - toPort.addPipeRef(newPipe) - } - -} diff --git a/experiments/12_async_input/ports.go b/experiments/12_async_input/ports.go deleted file mode 100644 index b53e6d2..0000000 --- a/experiments/12_async_input/ports.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -// @TODO: this type must have good tooling for working with collection -// like adding new ports, filtering and so on -type Ports map[string]*Port - -// @TODO: add error handling (e.g. when port does not exist) -func (ports Ports) byName(name string) *Port { - return ports[name] -} - -func (ports Ports) manyByName(names ...string) Ports { - selectedPorts := make(Ports) - - for _, name := range names { - if p, ok := ports[name]; ok { - selectedPorts[name] = p - } - } - - return selectedPorts -} - -func (ports Ports) anyHasValue() bool { - for _, p := range ports { - if p.hasSignal() { - return true - } - } - - return false -} - -func (ports Ports) allHaveValue() bool { - for _, p := range ports { - if !p.hasSignal() { - return false - } - } - - return true -} - -func (ports Ports) setAll(val Signal) { - for _, p := range ports { - p.putSignal(val) - } -} - -func (ports Ports) clearAll() { - for _, p := range ports { - p.clearSignal() - } -} diff --git a/experiments/12_async_input/signal.go b/experiments/12_async_input/signal.go deleted file mode 100644 index 55346b9..0000000 --- a/experiments/12_async_input/signal.go +++ /dev/null @@ -1,61 +0,0 @@ -package main - -type Signal interface { - IsAggregate() bool - IsSingle() bool - GetValue() any - AllValues() []any //@TODO: refactor with true iterator -} - -// @TODO: enhance naming -type SingleSignal struct { - val any -} - -type AggregateSignal struct { - val []*SingleSignal -} - -func (s SingleSignal) IsAggregate() bool { - return false -} - -func (s SingleSignal) IsSingle() bool { - return !s.IsAggregate() -} - -func (s AggregateSignal) IsAggregate() bool { - return true -} - -func (s AggregateSignal) IsSingle() bool { - return !s.IsAggregate() -} - -func (s AggregateSignal) GetValue() any { - return s.val -} - -func (s SingleSignal) GetValue() any { - return s.val -} - -func (s SingleSignal) AllValues() []any { - return []any{s.val} -} - -func (s AggregateSignal) AllValues() []any { - all := make([]any, 0) - for _, sig := range s.val { - all = append(all, sig.GetValue()) - } - return all -} - -func newSignal(val any) *SingleSignal { - return &SingleSignal{val: val} -} - -func forwardSignal(source *Port, dest *Port) { - dest.putSignal(source.getSignal()) -} diff --git a/experiments/README.md b/experiments/README.md deleted file mode 100644 index ee4f01e..0000000 --- a/experiments/README.md +++ /dev/null @@ -1 +0,0 @@ -This directory contains experiments made to reach a proof of concept of FMesh. Each next experiment is started as a copy of previous one. \ No newline at end of file