-
-
Notifications
You must be signed in to change notification settings - Fork 0
5. Component
Components are the primary building blocks in F-Mesh, encapsulating a unit of behavior in a single function known as the Activation Function. This design ensures that components remain lightweight and transient, activating only when needed and completing execution promptly. Unlike traditional FBP systems, F-Mesh components are not long-running processes like goroutines; instead, they execute their logic and return control to the scheduler after each cycle.
F-Mesh components are generally stateless by default, but they now have the capability to maintain state between activations. Each component contains its own State—a thread-safe key-value store that persists data across cycles. This allows for more complex and persistent behavior without sacrificing the modularity, predictability, and reusability of the design. The activation function is invoked every time F-Mesh schedules the component. If the component fails to complete its execution promptly, it can cause the entire mesh to hang, so it's critical to design components to handle small, focused tasks: read input signals, process them, and produce output signals, all within the designated cycle.
-
Name
Must be unique within a single mesh.
Used for identification and debugging. -
Inputs and Outputs
A collection of output ports where processed signals are sent.
Components can have an unlimited number of output ports. -
Activation Function
Defines the logic of the component.
Executes when the component is scheduled. -
State
Simple key-value storage which can be used to persist data between activation cycles.
You can initialize a component state calling WithInitialState when building it. -
Logger
Each component is instrumented with a simple *log.Logger from standard library. Later we can switch to something fancy like Uber Zap. The logger has set the component name as a prefix.
You can optionally provide a description for a component. This description can be useful for visualization when exporting or documenting a mesh.
Note
While components can have many ports, you are not required to use all of them in every activation.
The usage of ports depends entirely on your component's design and logic.
To explore the full capabilities of signals, refer to the component package API.
The Activation Function is the heart of every component, defining its core behavior. To design elegant and maintainable systems, always strive to keep your activation functions simple and concise.
All activation functions in F-Mesh share the same signature:
func(this *component.Components) error
This signature allows you to access everything you need, like:
- this.Inputs(): Collection of input ports.
- this.Outputs(): Collection of output ports.
- this.State(): Current state.
- this.Logger(): Logger.
The function returns an error to indicate if the activation encountered any issues.
Both inputs and outputs are of type port.Collection, allowing you to use the same API to interact with ports. Typically, you will:
- Read signals from input ports.
- Process these signals.
- Optionally read from or write to state.
- Write results as signals to output ports.
When used properly the pattern resembles the concept of pure functions in functional programming, promoting clean and predictable behavior.
Example: Summing Input Signals
Here’s a straightforward activation function that demonstrates how to use input and output ports:
func(this *component.Component) error {
sum := 0
// Access all signals from input port "i1"
for _, sig := range this.InputByName("i1").AllSignalsOrNil() {
sum += sig.PayloadOrNil().(int) // Extract and type-cast the payload
}
// Create a new signal with the sum and put it on output port "o1"
this.OutputByName("o1").PutSignals(signal.New(sum))
return nil // Return nil to indicate success
}
Explanation:
- The function calculates the sum of all integer payloads received on input port i1.
- A new signal with the computed sum is sent to output port o1.
- The loop iterates over all signals in the input port's buffer, allowing flexible handling of multiple signals.
As it is mentioned earlier components are stateless by default, but when you need you can make them stateful which means you can preserve some state between activation function invocations. Check the State type for all available methods.
Here is an example of stateful counter:
counter := component.New("stateful_counter").
WithDescription("counts all observed signals and bypasses them down the stream").
WithInputs("bypass_in").
WithOutputs("bypass_out").
WithInitialState(func(state component.State) {
// Explicitly initialize state with the key we want to use (state is just wrapper around map[string]any)
state.Set("observed_signals_count", 0)
}).
WithActivationFunc(func(this *component.Component) error {
// Read the current value from state
count := this.State().Get("observed_signals_count").(int)
defer func() {
// Once we finished activation we want to write back (persist) our state
this.State().Set("observed_signals_count", count)
}()
count += this.InputByName("bypass_in").Buffer().Len()
this.Logger().Println("so far signals observed ", count)
_ = port.ForwardSignals(this.InputByName("bypass_in"), this.OutputByName("bypass_out"))
return nil
})
In most cases, your activation function will complete successfully, and you can simply return nil. However, if an issue arises, returning an error is the proper way to communicate it to F-Mesh. Here's how error handling works:
-
Error Propagation: When your activation function returns an error, it notifies F-Mesh of the problem.
-
Mesh Behavior: Returning an error does not halt the entire mesh unless the error-handling strategy is explicitly set to StopOnFirstErrorOrPanic. This allows your mesh to continue processing other components even if one encounters an issue.
-
Error Handling Strategy: If you expect components to occasionally fail and want the mesh to proceed regardless, choose an error-handling strategy that tolerates errors when creating the mesh. Examples of such strategies include logging errors or collecting them for later inspection without stopping execution.
-
Signal Flushing: Errors do not affect how signals are drained from output ports. Any signals that were added to the output ports before the error occurred will still be flushed as usual.
-
Inspecting errors after execution: When you call fm.Run(), it provides detailed information about errors encountered during execution. The first return value contains the completed cycles, while the second return value is an error with running fmesh itself. This allows you to review the specific components activation results and understand the nature of the errors for debugging or reporting purposes. Here is the struct you will get per each component activation:
type ActivationResult struct {
*common.Chainable
componentName string //The name of the component
activated bool // Did it activate? (e.g. if component was not "scheduled" in given cycle you will see false here)
code ActivationResultCode // The code describing what happened with the component from FMesh point of view, see codes below
activationError error //Error returned from component activation function
}
And here is the description of activation result codes:
-
ActivationCodeUndefined
: The component's state is not defined. -
ActivationCodeOK
: The activation function executed successfully. -
ActivationCodeNoInput
: The component does not have any input signals. -
ActivationCodeNoFunction
: No activation function is assigned to the component. -
ActivationCodeReturnedError
: The activation function encountered an error and returned it. -
ActivationCodePanicked
: The activation function caused a panic during execution. -
ActivationCodeWaitingForInputsClear
: The component is waiting for input signals on particular ports and decided to clear its current inputs. -
ActivationCodeWaitingForInputsKeep
: The component is waiting for input signals on particular ports and decided to keep all current inputs till the next cycle.
Here’s a simple example that demonstrates returning an error:
func(this *component.Component) error {
// This signal will be successfully transferred, as it is put before any error is returned
this.OutputByName("log").PutSignals(signal.New("component activated"))
firstPayload := this.InputByName("i1").FirstSignalPayloadOrNil()
if firstPayload == nil {
return fmt.Errorf("no signals received on input port 'i1'")
}
number, ok := firstPayload.(int)
if !ok {
return fmt.Errorf("expected integer payload on 'i1', but got %T", firstPayload)
}
this.OutputByName("o1").PutSignals(signal.New(number * 2))
return nil // Success
}
Explanation:
- If no signals are received on the input port i1, an error is returned with a descriptive message.
- If the payload type is not an integer, an error is returned indicating the type mismatch.
- If everything is fine, the function processes the input and sends a signal to the output port o1 with the doubled value.
Key Considerations
- Use meaningful error messages to help diagnose issues during execution.
- Ensure your mesh's error-handling strategy aligns with your system's requirements.
- Errors are a tool to maintain clarity and predictability in your system without disrupting the entire flow unnecessarily.
In some cases, you may need to delay activation of a component until specific signals appear on one or more of its ports. F-Mesh provides a basic synchronization mechanism for such scenarios, allowing you to return a special error from the activation function to signal that the component should wait.
Let’s examine the following mesh setup:
Here’s how we initialize the mesh with input signals:
// Put one signal into each chain to start them in the same cycle
fm.Components().ByName("d1").InputByName("i1").PutSignals(signal.New(1))
fm.Components().ByName("d4").InputByName("i1").PutSignals(signal.New(2))
This configuration starts execution at the topmost components (d1 and d4) and progresses downward in parallel. The activation cycles will look like this:
- d4, d1
- d5, d2
- sum, d3
- sum
Suppose the sum component needs to compute the sum of the signals from both vertical chains. A problem arises at cycle #3 because the left chain is shorter, causing its signal to arrive at sum earlier. To resolve this, we can instruct F-Mesh to wait until both input ports of sum have signals before activating it.
Here’s how to implement this behavior:
s := component.New("sum").WithDescription("This component just sums 2 inputs").
WithInputs("i1", "i2").
WithOutputs("o1").
WithActivationFunc(func(this *component.Component) error {
// Wait until both input ports have signals
if !this.Inputs().ByNames("i1", "i2").AllHaveSignals() {
return component.NewErrWaitForInputs(true)
}
inputNum1 := this.InputByName("i1").FirstSignalPayloadOrDefault(0)
inputNum2 := this.InputByName("i2").FirstSignalPayloadOrDefault(0)
this.OutputByName("o1").PutSignals(signal.New(inputNum1.(int) + inputNum2.(int)))
return nil
})
The critical part of the implementation is the use of:
component.NewErrWaitForInputs(true)
The boolean flag passed here determines whether to preserve or clear the input ports' buffers while waiting:
- true: Keeps all signals in the input buffers untouched. This is ideal when every signal is important, allowing you to collect multiple signals on each port (remember, ports can buffer an unlimited number of signals).
- false: Clears the input buffers while waiting. This mode is suitable when the presence of signals on specific ports matters more than the actual content of the signals.
By using this mechanism, you can control when a component should activate and ensure proper synchronization in your mesh.