diff --git a/CHANGELOG.md b/CHANGELOG.md
index c0686bcc7c..ca05a37768 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,8 @@ Main (unreleased)
- Add `pyroscope.relabel` component to modify or filter profiles using Prometheus relabeling rules. (@marcsanmi)
+- (_Experimental_) A new `foreach` block which starts an Alloy pipeline for each item inside a list. (@wildum, @thampiotr, @ptodev)
+
### Enhancements
- Add `go_table_fallback` arg to `pyroscope.ebpf` (@korniltsev)
diff --git a/docs/design/1443-dynamic-pipelines.md b/docs/design/1443-dynamic-pipelines.md
new file mode 100644
index 0000000000..05cf2550f9
--- /dev/null
+++ b/docs/design/1443-dynamic-pipelines.md
@@ -0,0 +1,252 @@
+# Proposal: Alloy proposal process
+
+* Author: Paulin Todev (@ptodev), Piotr Gwizdala (@thampiotr)
+* Last updated: 2024-08-15
+* Original issue: https://github.com/grafana/alloy/issues/1443
+
+## Abstract
+
+We are proposing a new feature to the [Alloy standard library][stdlib].
+It will be similar to a `map` operation over a collection such as a `list()`.
+Each `map` transformation will be done by a chain of components (a "sub-pipeline") created for this transformation.
+Each item in the collection will be processed by a different "sub-pipeline".
+
+The final solution may differ from a standard `map` operation, since there may be multiple outputs for the same input.
+For example, the sub-pipeline may branch into different `prometheus.relabel` components,
+each of which sends outputs to different components outside of the sub-pipeline.
+
+[stdlib]: https://grafana.com/docs/alloy/latest/reference/stdlib/
+
+## Use cases
+
+
+
+### Using discovery components together with prometheus.exporter ones
+
+Discovery components output a list of targets. It's not possible to input those lists directly to most exporter components.
+
+Suppose we have a list of targets produced by a `discovery` component:
+
+```
+[
+ {"__address__" = "redis-one:9115", "instance" = "one"},
+ {"__address__" = "redis-two:9116", "instance" = "two"},
+]
+```
+
+The [Alloy type][alloy-types] of the list above is `list(map(string))`.
+However, you may want to pipe information from this list of targets to a component which doesn't work with a `list()` or a `map()`.
+For example, you may want to input the `"__address__"` string to a `prometheus.exporter.redis`,
+and you may want to use the `"instance"` string in a `discovery.relabel`.
+
+[alloy-types]: https://grafana.com/docs/alloy/latest/get-started/configuration-syntax/expressions/types_and_values/
+
+## Proposal 1: A foreach block
+
+A `foreach` block may start several sub-pipelines for a `collection` specified in its arguments.
+
+```alloy
+// All components in the sub-pipeline will be scoped under "foreach.default/1/...".
+// Here, "1" is sub-pipeline number 1.
+// This way component names won't clash with other sub-pipelines from the same foreach,
+// and with the names of components outside of the foreach.
+foreach "default" {
+
+ // "collection" is what the for loop will iterate over.
+ collection = discovery.file.default.targets
+
+ // Each item in the collection will be accessible via the "target" variable.
+ // E.g. `target["__address__"]`.
+ var = "target"
+
+ // A sub-pipeline consisting of components which process each target.
+ ...
+}
+```
+
+
+ Example
+
+```alloy
+discovery.file "default" {
+ files = ["/Users/batman/Desktop/redis_addresses.yaml"]
+}
+
+// Every component defined in the "foreach" block will be instantiated for each item in the collection.
+// The instantiated components will be scoped using the name of the foreach block and the index of the
+// item in the collection. For example: /foreach.redis/0/prometheus.exporter.redis.default
+foreach "redis" {
+ collection = discovery.file.default.targets
+ // Here, "target" is a variable whose value is the current item in the collection.
+ var = "target"
+
+ prometheus.exporter.redis "default" {
+ redis_addr = target["__address__"] // we can also do the necessary rewrites before this.
+ }
+
+ discovery.relabel "default" {
+ targets = prometheus.exporter.redis.default.targets
+ // Add a label which comes from the discovery component.
+ rule {
+ target_label = "filepath"
+ // __meta_filepath comes from discovery.file
+ replacement = target["__meta_filepath"]
+ }
+ }
+
+ prometheus.scrape "default" {
+ targets = discovery.relabel.default.targets
+ forward_to = prometheus.remote_write.mimir.receiver
+ }
+}
+
+prometheus.remote_write "mimir" {
+ endpoint {
+ url = "https://prometheus-prod-05-gb-south-0.grafana.net/api/prom/push"
+ basic_auth {
+ username = ""
+ password = ""
+ }
+ }
+}
+```
+
+
+
+Pros:
+* The `foreach` name is consistent with other programming languages.
+
+Cons:
+* It looks less like a component than a `declare.dynamic` block.
+ In order to instantiate multiple `foreach` blocks with similar config, you'd have to wrap them in a `declare` block.
+
+## Proposal 2: A declare.dynamic block
+
+A new `declare.dynamic` block would create a custom component which starts several sub-pipelines internally.
+Users can use `argument` and `export` blocks, just like in a normal `declare` block.
+
+```alloy
+declare.dynamic "ex1" {
+ argument "input_targets" {
+ optional = false
+ comment = "We will create a sub-pipeline for each target in input_targets."
+ }
+
+ argument "output_metrics" {
+ optional = false
+ comment = "All the metrics gathered from all pipelines."
+ }
+
+ // A sub-pipeline consisting of components which process each target.
+ ...
+}
+
+declare.dynamic.ex1 "default" {
+ input_targets = discovery.file.default.targets
+ output_metrics = [prometheus.remote_write.mimir.receiver]
+}
+```
+
+
+ Example
+
+```alloy
+// declare.dynamic "maps" each target to a sub-pipeline.
+// Each sub-pipeline has 1 exporter, 1 relabel, and 1 scraper.
+// Internally, maybe one way this can be done via serializing the pipeline to a string and then importing it as a module?
+declare.dynamic "redis_exporter" {
+ argument "input_targets" {
+ optional = false
+ comment = "We will create a sub-pipeline for each target in input_targets."
+ }
+
+ argument "output_metrics" {
+ optional = false
+ comment = "All the metrics gathered from all pipelines."
+ }
+
+ // "id" is a special identifier for every "sub-pipeline".
+ // The number of "sub-pipelines" is equal to len(input_targets).
+ prometheus.exporter.redis id {
+ redis_addr = input_targets["__address__"]
+ }
+
+ discovery.relabel id {
+ targets = prometheus.exporter.redis[id].targets
+ // Add a label which comes from the discovery component.
+ rule {
+ target_label = "filepath"
+ // __meta_filepath comes from discovery.file
+ replacement = input_targets["__meta_filepath"]
+ }
+ }
+
+ prometheus.scrape id {
+ targets = prometheus.exporter.redis[id].targets
+ forward_to = output_metrics
+ }
+
+}
+discovery.file "default" {
+ files = ["/Users/batman/Desktop/redis_addresses.yaml"]
+}
+
+declare.dynamic.redis_exporter "default" {
+ input_targets = discovery.file.default.targets
+ output_metrics = [prometheus.remote_write.mimir.receiver]
+}
+
+prometheus.remote_write "mimir" {
+ endpoint {
+ url = "https://prometheus-prod-05-gb-south-0.grafana.net/api/prom/push"
+ basic_auth {
+ username = ""
+ password = ""
+ }
+ }
+}
+```
+
+
+
+Pros:
+* Looks more like a component than a `foreach` block.
+* Flexible number of inputs and outputs.
+
+Cons:
+* A name such as `declare.dynamic` doesn't sound as familiar to most people than `foreach`.
+* It may not be practical to implement this in a way that there's more than one possible input collection.
+ * How can we limit users to having just one collection?
+* Having another variant of the `declare` block can feel complex.
+ Can we just add this functionality to the normal `declare` block, so that we can avoid having a `declare.dynamic` block?
+
+## Proposal 3: Do nothing
+
+It is customary to always include a "do nothing" proposal, in order to evaluate if the work is really required.
+
+Pros:
+* No effort required.
+* Alloy's syntax is simpler since we're not adding any new types of blocks.
+
+Cons:
+* Not possible to integrate most `prometheus.exporter` components with the `discovery` ones.
+
+## Unknowns
+
+We should find answers to the unknowns below before this proposal is accepted:
+
+* Will the solution only work for `list()`? What about `map()`?
+ * If we go with a `foreach`, we could have a `key` attribute in addition to the `var` one. That way we can also access the key. The `key` attribute can be a no-op if `collection` is a map?
+* What about debug metrics? Should we aggregate the metrics for all "sub-pipelines"?
+ * If there is 1 series for each sub-pipeline, the amount of metrics could be huge.
+ Some service discovery mechanisms may generate a huge number of elements in a list of targets.
+ * If we want to aggregate the metrics, how would we do that? Is it even possible to do in within Alloy?
+ * Can we have a configuration parameter which dictates whether the metrics should be aggregated or not?
+* Do we have to recreate the sub-pipelines every time a new collection is received,
+ even if the new collection has the same number of elements?
+* Do we need to have more than one output, of a different type?
+* Do we need to have more than one input, of a different type?
+
+## Recommended solution
+
+
\ No newline at end of file
diff --git a/docs/sources/reference/config-blocks/foreach.md b/docs/sources/reference/config-blocks/foreach.md
new file mode 100644
index 0000000000..4eee6b99b2
--- /dev/null
+++ b/docs/sources/reference/config-blocks/foreach.md
@@ -0,0 +1,200 @@
+---
+canonical: https://grafana.com/docs/alloy/latest/reference/stdlib/foreach/
+description: Learn about foreach
+labels:
+ stage: experimental
+menuTitle: foreach
+title: foreach
+---
+
+
+# foreach
+
+{{< docs/shared lookup="stability/experimental_feature.md" source="alloy" version="" >}}
+
+The `foreach` block runs a separate pipeline for each item inside a list.
+
+## Usage
+
+```alloy
+foreach "" {
+ collection = [...]
+ var = ""
+ template {
+ ...
+ }
+}
+```
+
+## Arguments
+
+You can use the following arguments with `foreach`:
+
+Name | Type | Description | Default | Required
+-----------------|-------------|------------------------------------------------------------------------------------------|---------|---------
+`collection` | `list(any)` | A list of items to loop over. | | yes
+`var` | `string` | Name of the variable referring to the current item in the collection. | | yes
+`enable_metrics` | `bool` | Whether to expose debug metrics in the {{< param "PRODUCT_NAME" >}} `/metrics` endpoint. | `false` | no
+
+The items in the `collection` list can be of any type [type][types], such as a bool, a string, a list, or a map.
+
+{{< admonition type="warning" >}}
+Setting `enable_metrics` to `true` when `collection` has lots of elements may cause a large number of metrics to appear on the {{< param "PRODUCT_NAME" >}} `/metric` endpoint.
+{{< /admonition >}}
+
+[types]: ../../../get-started/configuration-syntax/expressions/types_and_values
+
+## Blocks
+
+You can use the following blocks with `foreach`:
+
+Block | Description | Required
+--------------|------------------------------|---------
+[template][] | A component pipeline to run. | yes
+
+[template]: #template
+
+### `template`
+
+The `template` block contains the definition of {{< param "PRODUCT_NAME" >}} components which will be ran for every item in the collection.
+The contents of the block look like a normal {{< param "PRODUCT_NAME" >}} configuration file,
+except that you can use the keyword defined in `var` to refer to the current item in the collection.
+
+Components inside the `template` block can use exports of components defined outside of the `foreach` block.
+However, components outside of the `foreach` cannot use exports from components defined inside the `template` block of a `foreach`.
+
+## Example
+
+The following example shows you how to run Prometheus exporters dynamically on service discovery targets.
+
+`prometheus.exporter.*` components often require the address of one particular instance being monitored.
+For example, `prometheus.exporter.redis` has a `redis_addr` attribute for the Redis instance under observation.
+On the other hand, `discovery.*` components such as `discovery.kubernetes` output a list of targets such as this:
+
+{{< collapse title="Example targets output by `discovery.kubernetes`" >}}
+```json
+[
+ {
+ __address__ = "10.42.0.16:5432",
+ __meta_kubernetes_namespace = "ns1",
+ __meta_kubernetes_pod_container_id = "containerd://96b77d035d0bbe27bb173d8fc0c56d21965892a50e4e6eab9f6cffdb90b275fb",
+ __meta_kubernetes_pod_container_image = "postgres:bullseye",
+ __meta_kubernetes_pod_container_init = "false",
+ __meta_kubernetes_pod_container_name = "pgcont",
+ __meta_kubernetes_pod_container_port_name = "pg-db",
+ __meta_kubernetes_pod_container_port_number = "5432",
+ __meta_kubernetes_pod_container_port_protocol = "TCP",
+ __meta_kubernetes_pod_controller_kind = "ReplicaSet",
+ __meta_kubernetes_pod_controller_name = "postgres-db-cd54547b9",
+ __meta_kubernetes_pod_host_ip = "172.25.0.2",
+ __meta_kubernetes_pod_ip = "10.42.0.16",
+ __meta_kubernetes_pod_label_name = "postgres-db",
+ __meta_kubernetes_pod_label_pod_template_hash = "cd54547b9",
+ __meta_kubernetes_pod_labelpresent_name = "true",
+ __meta_kubernetes_pod_labelpresent_pod_template_hash = "true",
+ __meta_kubernetes_pod_name = "postgres-db-cd54547b9-4zpds",
+ __meta_kubernetes_pod_node_name = "k3d-asserts-test-server-0",
+ __meta_kubernetes_pod_phase = "Running",
+ __meta_kubernetes_pod_ready = "true",
+ __meta_kubernetes_pod_uid = "7cdcacdc-4a2d-460a-b1fb-6340700c4cac",
+ },
+ {
+ __address__ = "10.42.0.20:6379",
+ __meta_kubernetes_namespace = "ns1",
+ __meta_kubernetes_pod_container_id = "containerd://68f2f0eacd880eb4a141d833aafc1f297f7d9bdf00f4c787f9fcc964a039d278",
+ __meta_kubernetes_pod_container_image = "redis:latest",
+ __meta_kubernetes_pod_container_init = "false",
+ __meta_kubernetes_pod_container_name = "redis-cont",
+ __meta_kubernetes_pod_container_port_name = "redis-db",
+ __meta_kubernetes_pod_container_port_number = "6379",
+ __meta_kubernetes_pod_container_port_protocol = "TCP",
+ __meta_kubernetes_pod_controller_kind = "ReplicaSet",
+ __meta_kubernetes_pod_controller_name = "redis-db-778b66cb7d",
+ __meta_kubernetes_pod_host_ip = "172.25.0.2",
+ __meta_kubernetes_pod_ip = "10.42.0.20",
+ __meta_kubernetes_pod_label_name = "redis-db",
+ __meta_kubernetes_pod_label_pod_template_hash = "778b66cb7d",
+ __meta_kubernetes_pod_labelpresent_name = "true",
+ __meta_kubernetes_pod_labelpresent_pod_template_hash = "true",
+ __meta_kubernetes_pod_name = "redis-db-778b66cb7d-wxmf6",
+ __meta_kubernetes_pod_node_name = "k3d-asserts-test-server-0",
+ __meta_kubernetes_pod_phase = "Running",
+ __meta_kubernetes_pod_ready = "true",
+ __meta_kubernetes_pod_uid = "ae74e400-8eda-4b02-b4c8-669473fb001b",
+ }
+]
+```
+{{< /collapse >}}
+
+You can use a `foreach` to loop over each target and start a separate component pipeline for it.
+The following example configuration shows how a `prometheus.exporter.redis` instance is started for each Redis instance discovered by `discovery.kubernetes`.
+Additional Kubernetes labels from `discovery.kubernetes` are also added to the metrics created by `prometheus.exporter.redis`.
+
+```alloy
+discovery.kubernetes "default" {
+ role = "pod"
+}
+
+discovery.relabel "redis" {
+ targets = discovery.kubernetes.default.targets
+
+ // Remove all targets except the Redis ones.
+ rule {
+ source_labels = ["__meta_kubernetes_pod_container_name"]
+ regex = "redis-cont"
+ action = "keep"
+ }
+}
+
+// Collect metrics for each Redis instance.
+foreach "redis" {
+ collection = discovery.relabel.redis.output
+ var = "each"
+
+ template {
+ prometheus.exporter.redis "default" {
+ // This is the "__address__" label from discovery.kubernetes.
+ redis_addr = each["__address__"]
+ }
+
+ prometheus.scrape "default" {
+ targets = prometheus.exporter.redis.default.targets
+ forward_to = [prometheus.relabel.default.receiver]
+ }
+
+ // Add labels from discovery.kubernetes.
+ prometheus.relabel "default" {
+ rule {
+ replacement = each["__meta_kubernetes_namespace"]
+ target_label = "k8s_namespace"
+ action = "replace"
+ }
+
+ rule {
+ replacement = each["__meta_kubernetes_pod_container_name"]
+ target_label = "k8s_pod_container_name"
+ action = "replace"
+ }
+
+ forward_to = [prometheus.remote_write.mimir.receiver]
+ }
+ }
+}
+
+prometheus.remote_write "mimir" {
+ endpoint {
+ url = "https://prometheus-xxx.grafana.net/api/prom/push"
+
+ basic_auth {
+ username = sys.env("")
+ password = sys.env("")
+ }
+ }
+}
+```
+
+Replace the following:
+
+* _``_: Your Prometheus username.
+* _``_: Your Grafana Cloud API key.
+
diff --git a/docs/sources/reference/stdlib/array.md b/docs/sources/reference/stdlib/array.md
index 69cb90bd61..5d5a021a04 100644
--- a/docs/sources/reference/stdlib/array.md
+++ b/docs/sources/reference/stdlib/array.md
@@ -35,9 +35,7 @@ Elements within the list can be any type.
## array.combine_maps
-> **EXPERIMENTAL**: This is an [experimental][] feature.
-> Experimental features are subject to frequent breaking changes, and may be removed with no equivalent replacement.
-> The `stability.level` flag must be set to `experimental` to use the feature.
+{{< docs/shared lookup="stability/experimental_feature.md" source="alloy" version="" >}}
The `array.combine_maps` function allows you to join two arrays of maps if certain keys have matching values in both maps. It's particularly useful when combining labels of targets coming from different `prometheus.discovery.*` or `prometheus.exporter.*` components.
It takes three arguments:
diff --git a/docs/sources/shared/stability/experimental_feature.md b/docs/sources/shared/stability/experimental_feature.md
new file mode 100644
index 0000000000..ff6f5f008c
--- /dev/null
+++ b/docs/sources/shared/stability/experimental_feature.md
@@ -0,0 +1,11 @@
+---
+canonical: https://grafana.com/docs/alloy/latest/shared/stability/experimental_feature/
+description: Shared content, experimental
+headless: true
+---
+
+> **EXPERIMENTAL**: This is an [experimental][] feature.
+> Experimental features are subject to frequent breaking changes, and may be removed with no equivalent replacement.
+> The `stability.level` flag must be set to `experimental` to use the feature.
+
+[experimental]: https://grafana.com/docs/release-life-cycle/
diff --git a/internal/runtime/alloy.go b/internal/runtime/alloy.go
index 613fc204da..0643e3009e 100644
--- a/internal/runtime/alloy.go
+++ b/internal/runtime/alloy.go
@@ -207,17 +207,24 @@ func newController(o controllerOptions) *Runtime {
OnExportsChange: o.OnExportsChange,
Registerer: o.Reg,
ControllerID: o.ControllerID,
- NewModuleController: func(id string) controller.ModuleController {
+ NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
+ // The module controller registry should take precedence.,
+ // because it is tailored to this module.
+ reg := o.Reg
+ if opts.RegOverride != nil {
+ reg = opts.RegOverride
+ }
+
return newModuleController(&moduleControllerOptions{
ComponentRegistry: o.ComponentRegistry,
ModuleRegistry: o.ModuleRegistry,
Logger: log,
Tracer: tracer,
- Reg: o.Reg,
+ Reg: reg,
DataPath: o.DataPath,
MinStability: o.MinStability,
EnableCommunityComps: o.EnableCommunityComps,
- ID: id,
+ ID: opts.Id,
ServiceMap: serviceMap,
WorkerPool: workerPool,
})
@@ -265,6 +272,7 @@ func (f *Runtime) Run(ctx context.Context) {
components = f.loader.Components()
services = f.loader.Services()
imports = f.loader.Imports()
+ forEachs = f.loader.ForEachs()
runnables = make([]controller.RunnableNode, 0, len(components)+len(services)+len(imports))
)
@@ -276,6 +284,10 @@ func (f *Runtime) Run(ctx context.Context) {
runnables = append(runnables, i)
}
+ for _, fe := range forEachs {
+ runnables = append(runnables, fe)
+ }
+
// Only the root controller should run services, since modules share the
// same service instance as the root.
if !f.opts.IsModule {
diff --git a/internal/runtime/foreach_stringer_test.go b/internal/runtime/foreach_stringer_test.go
new file mode 100644
index 0000000000..d900a7d955
--- /dev/null
+++ b/internal/runtime/foreach_stringer_test.go
@@ -0,0 +1,65 @@
+package runtime_test
+
+import (
+ "context"
+ "os"
+ "path/filepath"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/grafana/alloy/internal/featuregate"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestForeachStringer(t *testing.T) {
+ directory := "./testdata/foreach_stringer"
+ for _, file := range getTestFiles(directory, t) {
+ tc := buildTestForEach(t, filepath.Join(directory, file.Name()))
+ t.Run(file.Name(), func(t *testing.T) {
+ if tc.module != "" {
+ defer os.Remove("module.alloy")
+ require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664))
+ }
+ testConfigForEachStringer(t, tc.main, tc.expectedDebugInfo, tc.expectedDebugInfo2)
+ })
+ }
+}
+
+func testConfigForEachStringer(t *testing.T, config string, expectedDebugInfo *string, expectedDebugInfo2 *string) {
+ defer verifyNoGoroutineLeaks(t)
+ reg := prometheus.NewRegistry()
+ ctrl, f := setup(t, config, reg, featuregate.StabilityExperimental)
+
+ err := ctrl.LoadSource(f, nil, "")
+ require.NoError(t, err)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ var wg sync.WaitGroup
+ defer func() {
+ cancel()
+ wg.Wait()
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ ctrl.Run(ctx)
+ }()
+
+ if expectedDebugInfo != nil {
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ debugInfo := getDebugInfo[string](t, ctrl, "", "testcomponents.string_receiver.log")
+ require.Equal(t, *expectedDebugInfo, debugInfo)
+ }, 3*time.Second, 10*time.Millisecond)
+ }
+
+ if expectedDebugInfo2 != nil {
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ debugInfo := getDebugInfo[string](t, ctrl, "", "testcomponents.string_receiver.log2")
+ require.Equal(t, *expectedDebugInfo2, debugInfo)
+ }, 3*time.Second, 10*time.Millisecond)
+ }
+}
diff --git a/internal/runtime/foreach_test.go b/internal/runtime/foreach_test.go
new file mode 100644
index 0000000000..e40aa5bcf0
--- /dev/null
+++ b/internal/runtime/foreach_test.go
@@ -0,0 +1,206 @@
+package runtime_test
+
+import (
+ "context"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/grafana/alloy/internal/component"
+ "github.com/grafana/alloy/internal/featuregate"
+ "github.com/grafana/alloy/internal/runtime"
+ alloy_runtime "github.com/grafana/alloy/internal/runtime"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/stretchr/testify/require"
+ "golang.org/x/tools/txtar"
+)
+
+func TestForeach(t *testing.T) {
+ directory := "./testdata/foreach"
+ for _, file := range getTestFiles(directory, t) {
+ tc := buildTestForEach(t, filepath.Join(directory, file.Name()))
+ t.Run(file.Name(), func(t *testing.T) {
+ if tc.module != "" {
+ defer os.Remove("module.alloy")
+ require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664))
+ }
+ if tc.update != nil {
+ testConfigForEach(t, tc.main, tc.reloadConfig, func() {
+ require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664))
+ }, nil, nil)
+ } else {
+ testConfigForEach(t, tc.main, tc.reloadConfig, nil, nil, nil)
+ }
+ })
+ }
+}
+
+func TestForeachMetrics(t *testing.T) {
+ directory := "./testdata/foreach_metrics"
+ for _, file := range getTestFiles(directory, t) {
+ tc := buildTestForEach(t, filepath.Join(directory, file.Name()))
+ t.Run(tc.description, func(t *testing.T) {
+ if tc.module != "" {
+ defer os.Remove("module.alloy")
+ require.NoError(t, os.WriteFile("module.alloy", []byte(tc.module), 0664))
+ }
+ if tc.update != nil {
+ testConfigForEach(t, tc.main, tc.reloadConfig, func() {
+ require.NoError(t, os.WriteFile(tc.update.name, []byte(tc.update.updateConfig), 0664))
+ }, tc.expectedMetrics, tc.expectedDurationMetrics)
+ } else {
+ testConfigForEach(t, tc.main, tc.reloadConfig, nil, tc.expectedMetrics, tc.expectedDurationMetrics)
+ }
+ })
+ }
+}
+
+type testForEachFile struct {
+ description string // description at the top of the txtar file
+ main string // root config that the controller should load
+ module string // module imported by the root config
+ reloadConfig string // root config that the controller should apply on reload
+ update *updateFile // update can be used to update the content of a file at runtime
+ expectedMetrics *string // expected prometheus metrics
+ expectedDurationMetrics *int // expected prometheus duration metrics - check those separately as they vary with each test run
+ expectedDebugInfo *string // expected debug info after running the config
+ expectedDebugInfo2 *string // 2nd optional expected debug info after running the config
+}
+
+func buildTestForEach(t *testing.T, filename string) testForEachFile {
+ archive, err := txtar.ParseFile(filename)
+ require.NoError(t, err)
+ var tc testForEachFile
+ tc.description = string(archive.Comment)
+ for _, alloyConfig := range archive.Files {
+ switch alloyConfig.Name {
+ case mainFile:
+ tc.main = string(alloyConfig.Data)
+ case "module.alloy":
+ tc.module = string(alloyConfig.Data)
+ case "update/module.alloy":
+ require.Nil(t, tc.update)
+ tc.update = &updateFile{
+ name: "module.alloy",
+ updateConfig: string(alloyConfig.Data),
+ }
+ case "reload_config.alloy":
+ tc.reloadConfig = string(alloyConfig.Data)
+ case "expected_metrics.prom":
+ expectedMetrics := string(alloyConfig.Data)
+ tc.expectedMetrics = &expectedMetrics
+ case "expected_duration_metrics.prom":
+ expectedDurationMetrics, err := strconv.Atoi(strings.TrimSpace(string((alloyConfig.Data))))
+ require.NoError(t, err)
+ tc.expectedDurationMetrics = &expectedDurationMetrics
+ case "expected_debug_info.txt":
+ expectedDebugInfo := string(alloyConfig.Data)
+ tc.expectedDebugInfo = &expectedDebugInfo
+ case "expected_debug_info2.txt":
+ expectedDebugInfo2 := string(alloyConfig.Data)
+ tc.expectedDebugInfo2 = &expectedDebugInfo2
+ }
+ }
+ return tc
+}
+
+func testConfigForEach(t *testing.T, config string, reloadConfig string, update func(), expectedMetrics *string, expectedDurationMetrics *int) {
+ defer verifyNoGoroutineLeaks(t)
+ reg := prometheus.NewRegistry()
+ ctrl, f := setup(t, config, reg, featuregate.StabilityExperimental)
+
+ err := ctrl.LoadSource(f, nil, "")
+ require.NoError(t, err)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ var wg sync.WaitGroup
+ defer func() {
+ cancel()
+ wg.Wait()
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ ctrl.Run(ctx)
+ }()
+
+ require.Eventually(t, func() bool {
+ sum := getDebugInfo[int](t, ctrl, "", "testcomponents.summation_receiver.sum")
+ return sum >= 10
+ }, 3*time.Second, 10*time.Millisecond)
+
+ if expectedDurationMetrics != nil {
+ // These metrics have different values in each run.
+ // Hence, we can't compare their values from run to run.
+ // But we can check if the metric exists as a whole, which is good enough.
+ metricsToCheck := []string{
+ "alloy_component_dependencies_wait_seconds",
+ "alloy_component_evaluation_seconds",
+ }
+
+ countedMetrics, err := testutil.GatherAndCount(reg, metricsToCheck...)
+ require.NoError(t, err)
+ require.Equal(t, *expectedDurationMetrics, countedMetrics)
+ }
+
+ if expectedMetrics != nil {
+ // These metrics have fixed values.
+ // Hence, we can compare their values from run to run.
+ metricsToCheck := []string{
+ "alloy_component_controller_evaluating",
+ "alloy_component_controller_running_components",
+ "alloy_component_evaluation_queue_size",
+ "pulse_count",
+ }
+
+ err := testutil.GatherAndCompare(reg, strings.NewReader(*expectedMetrics), metricsToCheck...)
+ require.NoError(t, err)
+ }
+
+ if update != nil {
+ update()
+
+ // Sum should be 30 after update
+ require.Eventually(t, func() bool {
+ sum := getDebugInfo[int](t, ctrl, "", "testcomponents.summation_receiver.sum")
+ return sum >= 30
+ }, 3*time.Second, 10*time.Millisecond)
+ }
+
+ if reloadConfig != "" {
+ f, err = alloy_runtime.ParseSource(t.Name(), []byte(reloadConfig))
+ require.NoError(t, err)
+ require.NotNil(t, f)
+
+ // Reload the controller with the new config.
+ err = ctrl.LoadSource(f, nil, "")
+ require.NoError(t, err)
+
+ // Sum should be 30 after update
+ require.Eventually(t, func() bool {
+ sum := getDebugInfo[int](t, ctrl, "", "testcomponents.summation_receiver.sum")
+ return sum >= 30
+ }, 3*time.Second, 10*time.Millisecond)
+ }
+}
+
+func getDebugInfo[T any](t *testing.T, ctrl *runtime.Runtime, moduleId string, nodeId string) T {
+ t.Helper()
+ info, err := ctrl.GetComponent(component.ID{
+ ModuleID: moduleId,
+ LocalID: nodeId,
+ }, component.InfoOptions{
+ GetHealth: true,
+ GetArguments: true,
+ GetExports: true,
+ GetDebugInfo: true,
+ })
+ require.NoError(t, err)
+ return info.DebugInfo.(T)
+}
diff --git a/internal/runtime/import_git_test.go b/internal/runtime/import_git_test.go
index ceb7e09241..3e81b3befa 100644
--- a/internal/runtime/import_git_test.go
+++ b/internal/runtime/import_git_test.go
@@ -14,6 +14,7 @@ import (
"testing"
"time"
+ "github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/vcs"
"github.com/stretchr/testify/require"
)
@@ -55,7 +56,7 @@ testImport.add "cc" {
runGit(t, testRepo, "commit", "-m \"test\"")
defer verifyNoGoroutineLeaks(t)
- ctrl, f := setup(t, main)
+ ctrl, f := setup(t, main, nil, featuregate.StabilityPublicPreview)
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
@@ -122,7 +123,7 @@ testImport.add "cc" {
runGit(t, testRepo, "commit", "-m \"test\"")
defer verifyNoGoroutineLeaks(t)
- ctrl, f := setup(t, main)
+ ctrl, f := setup(t, main, nil, featuregate.StabilityPublicPreview)
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
@@ -205,7 +206,7 @@ testImport.add "cc" {
runGit(t, testRepo, "commit", "-m \"test2\"")
defer verifyNoGoroutineLeaks(t)
- ctrl, f := setup(t, main)
+ ctrl, f := setup(t, main, nil, featuregate.StabilityPublicPreview)
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
@@ -269,7 +270,7 @@ testImport.add "cc" {
defer verifyNoGoroutineLeaks(t)
- ctrl, f := setup(t, main)
+ ctrl, f := setup(t, main, nil, featuregate.StabilityPublicPreview)
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
@@ -360,7 +361,7 @@ testImport.add "cc" {
runGit(t, testRepo, "commit", "-m \"test\"")
defer verifyNoGoroutineLeaks(t)
- ctrl, f := setup(t, main)
+ ctrl, f := setup(t, main, nil, featuregate.StabilityPublicPreview)
err = ctrl.LoadSource(f, nil, "")
expectedErr := vcs.InvalidRevisionError{
Revision: "nonexistent",
diff --git a/internal/runtime/import_test.go b/internal/runtime/import_test.go
index f6a1ab499b..98389a5cef 100644
--- a/internal/runtime/import_test.go
+++ b/internal/runtime/import_test.go
@@ -16,6 +16,7 @@ import (
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/util"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"golang.org/x/tools/txtar"
@@ -298,7 +299,7 @@ func TestImportError(t *testing.T) {
func testConfig(t *testing.T, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
- ctrl, f := setup(t, config)
+ ctrl, f := setup(t, config, nil, featuregate.StabilityPublicPreview)
err := ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
@@ -351,7 +352,7 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func())
func testConfigError(t *testing.T, config string, expectedError string) {
defer verifyNoGoroutineLeaks(t)
- ctrl, f := setup(t, config)
+ ctrl, f := setup(t, config, nil, featuregate.StabilityPublicPreview)
err := ctrl.LoadSource(f, nil, "")
require.ErrorContains(t, err, expectedError)
ctx, cancel := context.WithCancel(context.Background())
@@ -368,14 +369,14 @@ func testConfigError(t *testing.T, config string, expectedError string) {
}()
}
-func setup(t *testing.T, config string) (*alloy_runtime.Runtime, *alloy_runtime.Source) {
+func setup(t *testing.T, config string, reg prometheus.Registerer, stability featuregate.Stability) (*alloy_runtime.Runtime, *alloy_runtime.Source) {
s, err := logging.New(os.Stderr, logging.DefaultOptions)
require.NoError(t, err)
ctrl := alloy_runtime.New(alloy_runtime.Options{
Logger: s,
DataPath: t.TempDir(),
- MinStability: featuregate.StabilityPublicPreview,
- Reg: nil,
+ MinStability: stability,
+ Reg: reg,
Services: []service.Service{},
})
f, err := alloy_runtime.ParseSource(t.Name(), []byte(config))
@@ -392,5 +393,19 @@ func getTestFiles(directory string, t *testing.T) []fs.FileInfo {
files, err := dir.Readdir(-1)
require.NoError(t, err)
- return files
+ // Don't use files which start with a dot (".").
+ // This is to prevent the test suite from using files such as ".DS_Store",
+ // which Visual Studio Code may add.
+ return filterFiles(files, ".")
+}
+
+// Only take into account files which don't have a certain prefix.
+func filterFiles(files []fs.FileInfo, denylistedPrefix string) []fs.FileInfo {
+ res := make([]fs.FileInfo, 0, len(files))
+ for _, file := range files {
+ if !strings.HasPrefix(file.Name(), denylistedPrefix) {
+ res = append(res, file)
+ }
+ }
+ return res
}
diff --git a/internal/runtime/internal/controller/component_references.go b/internal/runtime/internal/controller/component_references.go
index 5754878042..00a9273fa4 100644
--- a/internal/runtime/internal/controller/component_references.go
+++ b/internal/runtime/internal/controller/component_references.go
@@ -64,7 +64,16 @@ func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scop
_, scopeMatch := scope.Lookup(t[0].Name)
if !componentRefMatch && !scopeMatch {
- diags = append(diags, resolveDiags...)
+ // The traversal for the foreach node is used at the foreach level to access the references from outside of the foreach block.
+ // This is quite handy but not perfect because:
+ // - it fails with the var
+ // - it fails at the root level to link two components that are inside of the template (because they are not evaluated at the root level)
+ // Both cases should be ignored at the linking level, that's the diags are ignored here.
+ // This is not super clean, but it should not create any problem since that the errors will be caught either during evaluation or while linking components
+ // inside of the foreach.
+ if _, ok := cn.(*ForeachConfigNode); !ok {
+ diags = append(diags, resolveDiags...)
+ }
continue
}
diff --git a/internal/runtime/internal/controller/loader.go b/internal/runtime/internal/controller/loader.go
index 8ba483093a..455c8a32f8 100644
--- a/internal/runtime/internal/controller/loader.go
+++ b/internal/runtime/internal/controller/loader.go
@@ -46,6 +46,7 @@ type Loader struct {
componentNodes []ComponentNode
declareNodes map[string]*DeclareNode
importConfigNodes map[string]*ImportConfigNode
+ forEachNodes map[string]*ForeachConfigNode
serviceNodes []*ServiceNode
cache *valueCache
blocks []*ast.BlockStmt // Most recently loaded blocks, used for writing
@@ -511,7 +512,7 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con
node = exist.(BlockNode)
node.UpdateBlock(block)
} else {
- node, newConfigNodeDiags = NewConfigNode(block, l.globals)
+ node, newConfigNodeDiags = NewConfigNode(block, l.globals, l.componentNodeManager.customComponentReg)
diags = append(diags, newConfigNodeDiags...)
if diags.HasErrors() {
continue
@@ -547,6 +548,7 @@ func (l *Loader) populateConfigBlockNodes(args map[string]any, g *dag.Graph, con
}
l.importConfigNodes = nodeMap.importMap
+ l.forEachNodes = nodeMap.foreachMap
return diags
}
@@ -623,6 +625,8 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {
continue
case *CustomComponentNode:
l.wireCustomComponentNode(g, n)
+ case *ForeachConfigNode:
+ l.wireForEachNode(g, n)
}
// Finally, wire component references.
@@ -654,6 +658,14 @@ func (l *Loader) wireCustomComponentNode(g *dag.Graph, cc *CustomComponentNode)
}
}
+// wireForEachNode add edges between a foreach node and declare/import nodes that are used in the foreach pipeline.
+func (l *Loader) wireForEachNode(g *dag.Graph, fn *ForeachConfigNode) {
+ refs := l.findCustomComponentReferences(fn.Block())
+ for ref := range refs {
+ g.AddEdge(dag.Edge{From: fn, To: ref})
+ }
+}
+
// Variables returns the Variables the Loader exposes for other components to
// reference.
func (l *Loader) Variables() map[string]interface{} {
@@ -681,6 +693,13 @@ func (l *Loader) Imports() map[string]*ImportConfigNode {
return l.importConfigNodes
}
+// ForEachs returns the current set of foreach nodes.
+func (l *Loader) ForEachs() map[string]*ForeachConfigNode {
+ l.mut.RLock()
+ defer l.mut.RUnlock()
+ return l.forEachNodes
+}
+
// Graph returns a copy of the DAG managed by the Loader.
func (l *Loader) Graph() *dag.Graph {
l.mut.RLock()
@@ -909,10 +928,10 @@ func (l *Loader) isRootController() bool {
return l.globals.ControllerID == ""
}
-// findCustomComponentReferences returns references to import/declare nodes in a declare block.
-func (l *Loader) findCustomComponentReferences(declare *ast.BlockStmt) map[BlockNode]struct{} {
+// findCustomComponentReferences returns references to import/declare nodes in a block.
+func (l *Loader) findCustomComponentReferences(block *ast.BlockStmt) map[BlockNode]struct{} {
uniqueReferences := make(map[BlockNode]struct{})
- l.collectCustomComponentReferences(declare.Body, uniqueReferences)
+ l.collectCustomComponentReferences(block.Body, uniqueReferences)
return uniqueReferences
}
@@ -932,7 +951,7 @@ func (l *Loader) collectCustomComponentReferences(stmts ast.Body, uniqueReferenc
)
switch {
- case componentName == declareType:
+ case componentName == declareType || componentName == templateType:
l.collectCustomComponentReferences(blockStmt.Body, uniqueReferences)
case foundDeclare:
uniqueReferences[declareNode] = struct{}{}
diff --git a/internal/runtime/internal/controller/loader_test.go b/internal/runtime/internal/controller/loader_test.go
index 2c251d813b..e8ac3ee446 100644
--- a/internal/runtime/internal/controller/loader_test.go
+++ b/internal/runtime/internal/controller/loader_test.go
@@ -83,7 +83,7 @@ func TestLoader(t *testing.T) {
MinStability: stability,
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
- NewModuleController: func(id string) controller.ModuleController {
+ NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
return nil
},
},
@@ -353,6 +353,19 @@ func TestLoader(t *testing.T) {
diags = applyFromContent(t, l, nil, nil, []byte(invalidFile))
require.ErrorContains(t, diags.ErrorOrNil(), `block declare.a already declared at TestLoader/Declare_block_redefined_after_reload:2:4`)
})
+
+ t.Run("Foreach incorrect feature stability", func(t *testing.T) {
+ invalidFile := `
+ foreach "a" {
+ collection = [5]
+ var = "item"
+ template {}
+ }
+ `
+ l := controller.NewLoader(newLoaderOptions())
+ diags := applyFromContent(t, l, nil, []byte(invalidFile), nil)
+ require.ErrorContains(t, diags.ErrorOrNil(), `config block "foreach" is at stability level "experimental", which is below the minimum allowed stability level "public-preview". Use --stability.level command-line flag to enable "experimental"`)
+ })
}
func TestLoader_Services(t *testing.T) {
@@ -382,7 +395,7 @@ func TestLoader_Services(t *testing.T) {
MinStability: stability,
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
- NewModuleController: func(id string) controller.ModuleController {
+ NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
return nil
},
},
@@ -439,7 +452,7 @@ func TestScopeWithFailingComponent(t *testing.T) {
MinStability: featuregate.StabilityPublicPreview,
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
- NewModuleController: func(id string) controller.ModuleController {
+ NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
return fakeModuleController{}
},
},
diff --git a/internal/runtime/internal/controller/node_builtin_component.go b/internal/runtime/internal/controller/node_builtin_component.go
index 43e4893a2f..f1df4ff422 100644
--- a/internal/runtime/internal/controller/node_builtin_component.go
+++ b/internal/runtime/internal/controller/node_builtin_component.go
@@ -60,20 +60,29 @@ func (id ComponentID) Equals(other ComponentID) bool {
// DialFunc is a function to establish a network connection.
type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)
+type ModuleControllerOpts struct {
+ Id string
+
+ // When RegOverride is nil, the registry used in the module will the usual one.
+ // When RegOverride is not nil, the registry used in the module will be RegOverride.
+ // This can be used to disable metrics for modules by giving them a no-op registry.
+ RegOverride prometheus.Registerer
+}
+
// ComponentGlobals are used by BuiltinComponentNodes to build managed components. All
// BuiltinComponentNodes should use the same ComponentGlobals.
type ComponentGlobals struct {
- Logger *logging.Logger // Logger shared between all managed components.
- TraceProvider trace.TracerProvider // Tracer shared between all managed components.
- DataPath string // Shared directory where component data may be stored
- MinStability featuregate.Stability // Minimum allowed stability level for features
- OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate
- OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
- Registerer prometheus.Registerer // Registerer for serving Alloy and component metrics
- ControllerID string // ID of controller.
- NewModuleController func(id string) ModuleController // Func to generate a module controller.
- GetServiceData func(name string) (interface{}, error) // Get data for a service.
- EnableCommunityComps bool // Enables the use of community components.
+ Logger *logging.Logger // Logger shared between all managed components.
+ TraceProvider trace.TracerProvider // Tracer shared between all managed components.
+ DataPath string // Shared directory where component data may be stored
+ MinStability featuregate.Stability // Minimum allowed stability level for features
+ OnBlockNodeUpdate func(cn BlockNode) // Informs controller that we need to reevaluate
+ OnExportsChange func(exports map[string]any) // Invoked when the managed component updated its exports
+ Registerer prometheus.Registerer // Registerer for serving Alloy and component metrics
+ ControllerID string // ID of controller.
+ NewModuleController func(opts ModuleControllerOpts) ModuleController // Func to generate a module controller.
+ GetServiceData func(name string) (interface{}, error) // Get data for a service.
+ EnableCommunityComps bool // Enables the use of community components.
}
// BuiltinComponentNode is a controller node which manages a builtin component.
@@ -146,7 +155,7 @@ func NewBuiltinComponentNode(globals ComponentGlobals, reg component.Registratio
componentName: strings.Join(b.Name, "."),
reg: reg,
exportsType: getExportsType(reg),
- moduleController: globals.NewModuleController(globalID),
+ moduleController: globals.NewModuleController(ModuleControllerOpts{Id: globalID}),
OnBlockNodeUpdate: globals.OnBlockNodeUpdate,
block: b,
diff --git a/internal/runtime/internal/controller/node_builtin_component_test.go b/internal/runtime/internal/controller/node_builtin_component_test.go
index c38ee0aac5..d1d9ed3ed4 100644
--- a/internal/runtime/internal/controller/node_builtin_component_test.go
+++ b/internal/runtime/internal/controller/node_builtin_component_test.go
@@ -13,7 +13,7 @@ func TestGlobalID(t *testing.T) {
DataPath: "/data/",
MinStability: featuregate.StabilityPublicPreview,
ControllerID: "module.file",
- NewModuleController: func(id string) ModuleController {
+ NewModuleController: func(opts ModuleControllerOpts) ModuleController {
return nil
},
}, &BuiltinComponentNode{
@@ -28,7 +28,7 @@ func TestLocalID(t *testing.T) {
DataPath: "/data/",
MinStability: featuregate.StabilityPublicPreview,
ControllerID: "",
- NewModuleController: func(id string) ModuleController {
+ NewModuleController: func(opts ModuleControllerOpts) ModuleController {
return nil
},
}, &BuiltinComponentNode{
diff --git a/internal/runtime/internal/controller/node_config.go b/internal/runtime/internal/controller/node_config.go
index 1293eac305..56acc3ae98 100644
--- a/internal/runtime/internal/controller/node_config.go
+++ b/internal/runtime/internal/controller/node_config.go
@@ -3,6 +3,7 @@ package controller
import (
"fmt"
+ "github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/internal/importsource"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/diag"
@@ -13,11 +14,29 @@ const (
exportBlockID = "export"
loggingBlockID = "logging"
tracingBlockID = "tracing"
+ foreachID = "foreach"
)
+// Add config blocks that are not GA. Config blocks that are not specified here are considered GA.
+var configBlocksUnstable = map[string]featuregate.Stability{
+ foreachID: featuregate.StabilityExperimental,
+}
+
// NewConfigNode creates a new ConfigNode from an initial ast.BlockStmt.
// The underlying config isn't applied until Evaluate is called.
-func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, diag.Diagnostics) {
+func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals, customReg *CustomComponentRegistry) (BlockNode, diag.Diagnostics) {
+ var diags diag.Diagnostics
+
+ if err := checkFeatureStability(block.GetBlockName(), globals.MinStability); err != nil {
+ diags.Add(diag.Diagnostic{
+ Severity: diag.SeverityLevelError,
+ Message: err.Error(),
+ StartPos: ast.StartPos(block).Position(),
+ EndPos: ast.EndPos(block).Position(),
+ })
+ return nil, diags
+ }
+
switch block.GetBlockName() {
case argumentBlockID:
return NewArgumentConfigNode(block, globals), nil
@@ -29,8 +48,9 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d
return NewTracingConfigNode(block, globals), nil
case importsource.BlockImportFile, importsource.BlockImportString, importsource.BlockImportHTTP, importsource.BlockImportGit:
return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil
+ case foreachID:
+ return NewForeachConfigNode(block, globals, customReg), nil
default:
- var diags diag.Diagnostics
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Message: fmt.Sprintf("invalid config block type %s while creating new config node", block.GetBlockName()),
@@ -41,6 +61,14 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d
}
}
+func checkFeatureStability(blockName string, minStability featuregate.Stability) error {
+ blockStability, exist := configBlocksUnstable[blockName]
+ if exist {
+ return featuregate.CheckAllowed(blockStability, minStability, fmt.Sprintf("config block %q", blockName))
+ }
+ return nil
+}
+
// ConfigNodeMap represents the config BlockNodes in their explicit types.
// This is helpful when validating node conditions specific to config node
// types.
@@ -50,6 +78,7 @@ type ConfigNodeMap struct {
argumentMap map[string]*ArgumentConfigNode
exportMap map[string]*ExportConfigNode
importMap map[string]*ImportConfigNode
+ foreachMap map[string]*ForeachConfigNode
}
// NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called
@@ -61,6 +90,7 @@ func NewConfigNodeMap() *ConfigNodeMap {
argumentMap: map[string]*ArgumentConfigNode{},
exportMap: map[string]*ExportConfigNode{},
importMap: map[string]*ImportConfigNode{},
+ foreachMap: map[string]*ForeachConfigNode{},
}
}
@@ -80,6 +110,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics {
nodeMap.tracing = n
case *ImportConfigNode:
nodeMap.importMap[n.Label()] = n
+ case *ForeachConfigNode:
+ nodeMap.foreachMap[n.Label()] = n
default:
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
diff --git a/internal/runtime/internal/controller/node_config_foreach.go b/internal/runtime/internal/controller/node_config_foreach.go
new file mode 100644
index 0000000000..3707d80fae
--- /dev/null
+++ b/internal/runtime/internal/controller/node_config_foreach.go
@@ -0,0 +1,431 @@
+package controller
+
+import (
+ "context"
+ "crypto/sha256"
+ "encoding/hex"
+ "fmt"
+ "hash/fnv"
+ "path"
+ "strings"
+ "sync"
+ "time"
+ "unicode"
+
+ "github.com/go-kit/log"
+ "github.com/grafana/alloy/internal/component"
+ "github.com/grafana/alloy/internal/runner"
+ "github.com/grafana/alloy/internal/runtime/logging/level"
+ "github.com/grafana/alloy/syntax/ast"
+ "github.com/grafana/alloy/syntax/vm"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+const templateType = "template"
+
+// The ForeachConfigNode will create the pipeline defined in its template block for each entry defined in its collection argument.
+// Each pipeline is managed by a custom component.
+// The custom component has access to the root scope (it can access exports and modules outside of the foreach template).
+// The collection may contain any item. Each child has one item from the collection associated to him and that can be accessed via the defined var argument.
+// Nesting foreach blocks is allowed.
+type ForeachConfigNode struct {
+ id ComponentID
+ nodeID string
+ label string
+ componentName string
+ moduleController ModuleController
+
+ logger log.Logger
+
+ // customReg is the customComponentRegistry of the current loader.
+ // We pass it so that the foreach children have access to modules.
+ customReg *CustomComponentRegistry
+
+ customComponents map[string]CustomComponent // track the children
+ customComponentHashCounts map[string]int // track the hash to avoid collisions
+
+ forEachChildrenUpdateChan chan struct{} // used to trigger an update of the running children
+ forEachChildrenRunning bool
+
+ mut sync.RWMutex
+ block *ast.BlockStmt
+ args Arguments
+
+ moduleControllerFactory func(opts ModuleControllerOpts) ModuleController
+ moduleControllerOpts ModuleControllerOpts
+
+ healthMut sync.RWMutex
+ evalHealth component.Health // Health of the last evaluate
+ runHealth component.Health // Health of running the component
+}
+
+var _ ComponentNode = (*ForeachConfigNode)(nil)
+
+func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals, customReg *CustomComponentRegistry) *ForeachConfigNode {
+ nodeID := BlockComponentID(block).String()
+ globalID := nodeID
+ if globals.ControllerID != "" {
+ globalID = path.Join(globals.ControllerID, nodeID)
+ }
+
+ return &ForeachConfigNode{
+ nodeID: nodeID,
+ label: block.Label,
+ block: block,
+ componentName: block.GetBlockName(),
+ id: BlockComponentID(block),
+ logger: log.With(globals.Logger, "component_path", globals.ControllerID, "component_id", nodeID),
+ moduleControllerFactory: globals.NewModuleController,
+ moduleControllerOpts: ModuleControllerOpts{Id: globalID},
+ customReg: customReg,
+ forEachChildrenUpdateChan: make(chan struct{}, 1),
+ customComponents: make(map[string]CustomComponent, 0),
+ customComponentHashCounts: make(map[string]int, 0),
+ }
+}
+
+func (fn *ForeachConfigNode) Label() string { return fn.label }
+
+func (fn *ForeachConfigNode) NodeID() string { return fn.nodeID }
+
+func (fn *ForeachConfigNode) Block() *ast.BlockStmt {
+ fn.mut.RLock()
+ defer fn.mut.RUnlock()
+ return fn.block
+}
+
+func (fn *ForeachConfigNode) Arguments() component.Arguments {
+ fn.mut.RLock()
+ defer fn.mut.RUnlock()
+ return fn.args
+}
+
+func (fn *ForeachConfigNode) ModuleIDs() []string {
+ fn.mut.RLock()
+ defer fn.mut.RUnlock()
+ return fn.moduleController.ModuleIDs()
+}
+
+func (fn *ForeachConfigNode) ComponentName() string {
+ return fn.componentName
+}
+
+func (fn *ForeachConfigNode) Exports() component.Exports {
+ return nil
+}
+func (fn *ForeachConfigNode) ID() ComponentID {
+ return fn.id
+}
+
+// Foreach doesn't have the ability to export values.
+// This is something we could implement in the future if there is a need for it.
+type Arguments struct {
+ Collection []any `alloy:"collection,attr"`
+ Var string `alloy:"var,attr"`
+
+ // enable_metrics should be false by default.
+ // That way users are protected from an explosion of debug metrics
+ // if there are many items inside "collection".
+ EnableMetrics bool `alloy:"enable_metrics,attr,optional"`
+}
+
+func (fn *ForeachConfigNode) Evaluate(evalScope *vm.Scope) error {
+ err := fn.evaluate(evalScope)
+
+ switch err {
+ case nil:
+ fn.setEvalHealth(component.HealthTypeHealthy, "foreach evaluated")
+ default:
+ msg := fmt.Sprintf("foreach evaluation failed: %s", err)
+ fn.setEvalHealth(component.HealthTypeUnhealthy, msg)
+ }
+ return err
+}
+
+func (fn *ForeachConfigNode) evaluate(scope *vm.Scope) error {
+ fn.mut.Lock()
+ defer fn.mut.Unlock()
+
+ // Split the template block from the rest of the body because it should not be evaluated.
+ var argsBody ast.Body
+ var template *ast.BlockStmt
+ for _, stmt := range fn.block.Body {
+ if blockStmt, ok := stmt.(*ast.BlockStmt); ok && blockStmt.GetBlockName() == templateType {
+ template = blockStmt
+ continue
+ }
+ argsBody = append(argsBody, stmt)
+ }
+
+ if template == nil {
+ return fmt.Errorf("the block template is missing in the foreach block")
+ }
+
+ eval := vm.New(argsBody)
+
+ var args Arguments
+ if err := eval.Evaluate(scope, &args); err != nil {
+ return fmt.Errorf("decoding configuration: %w", err)
+ }
+
+ fn.args = args
+
+ // By default don't show debug metrics.
+ if args.EnableMetrics {
+ // If metrics should be enabled, just use the regular registry.
+ // There's no need to pass a special registry specific for this module controller.
+ fn.moduleControllerOpts.RegOverride = nil
+ } else {
+ fn.moduleControllerOpts.RegOverride = NoopRegistry{}
+ }
+ fn.moduleController = fn.moduleControllerFactory(fn.moduleControllerOpts)
+
+ // Loop through the items to create the custom components.
+ // On re-evaluation new components are added and existing ones are updated.
+ newCustomComponentIds := make(map[string]bool, len(args.Collection))
+ fn.customComponentHashCounts = make(map[string]int)
+ for i := 0; i < len(args.Collection); i++ {
+
+ // We must create an ID from the collection entries to avoid recreating all components on every updates.
+ // We track the hash counts because the collection might contain duplicates ([1, 1, 1] would result in the same ids
+ // so we handle it by adding the count at the end -> [11, 12, 13]
+ customComponentID := fmt.Sprintf("foreach_%s", objectFingerprint(args.Collection[i]))
+ count := fn.customComponentHashCounts[customComponentID] // count = 0 if the key is not found
+ fn.customComponentHashCounts[customComponentID] = count + 1
+ customComponentID += fmt.Sprintf("_%d", count+1)
+
+ cc, err := fn.getOrCreateCustomComponent(customComponentID)
+ if err != nil {
+ return err
+ }
+
+ // Expose the current scope + the collection item that correspond to the child.
+ vars := deepCopyMap(scope.Variables)
+ vars[args.Var] = args.Collection[i]
+
+ customComponentRegistry := NewCustomComponentRegistry(fn.customReg, vm.NewScope(vars))
+ if err := cc.LoadBody(template.Body, map[string]any{}, customComponentRegistry); err != nil {
+ return fmt.Errorf("updating custom component in foreach: %w", err)
+ }
+ newCustomComponentIds[customComponentID] = true
+ }
+
+ // Delete the custom components that are no longer in the foreach.
+ // The runner pkg will stop them properly.
+ for id := range fn.customComponents {
+ if _, exist := newCustomComponentIds[id]; !exist {
+ delete(fn.customComponents, id)
+ }
+ }
+
+ // Trigger to stop previous children from running and to start running the new ones.
+ if fn.forEachChildrenRunning {
+ select {
+ case fn.forEachChildrenUpdateChan <- struct{}{}: // queued trigger
+ default: // trigger already queued; no-op
+ }
+ }
+ return nil
+}
+
+// Assumes that a lock is held,
+// so that fn.moduleController doesn't change while the function is running.
+func (fn *ForeachConfigNode) getOrCreateCustomComponent(customComponentID string) (CustomComponent, error) {
+ cc, exists := fn.customComponents[customComponentID]
+ if exists {
+ return cc, nil
+ }
+
+ newCC, err := fn.moduleController.NewCustomComponent(customComponentID, func(exports map[string]any) {})
+ if err != nil {
+ return nil, fmt.Errorf("creating custom component: %w", err)
+ }
+ fn.customComponents[customComponentID] = newCC
+ return newCC, nil
+}
+
+func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) {
+ fn.mut.Lock()
+ defer fn.mut.Unlock()
+ fn.block = b
+}
+
+func (fn *ForeachConfigNode) Run(ctx context.Context) error {
+ newCtx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ runner := runner.New(func(forEachChild *forEachChild) runner.Worker {
+ return &forEachChildRunner{
+ child: forEachChild,
+ }
+ })
+ defer runner.Stop()
+
+ updateTasks := func() error {
+ fn.mut.Lock()
+ defer fn.mut.Unlock()
+ fn.forEachChildrenRunning = true
+ var tasks []*forEachChild
+ for customComponentID, customComponent := range fn.customComponents {
+ tasks = append(tasks, &forEachChild{
+ id: customComponentID,
+ cc: customComponent,
+ logger: log.With(fn.logger, "foreach_path", fn.nodeID, "child_id", customComponentID),
+ healthUpdate: fn.setRunHealth,
+ })
+ }
+ return runner.ApplyTasks(newCtx, tasks)
+ }
+
+ fn.setRunHealth(component.HealthTypeHealthy, "started foreach")
+
+ err := updateTasks()
+ if err != nil {
+ return fmt.Errorf("running foreach children failed: %w", err)
+ }
+
+ err = fn.run(ctx, updateTasks)
+
+ // Note: logging of this error is handled by the scheduler.
+ if err != nil {
+ fn.setRunHealth(component.HealthTypeExited, fmt.Sprintf("foreach node shut down with error: %s", err))
+ } else {
+ fn.setRunHealth(component.HealthTypeExited, "foreach node shut down cleanly")
+ }
+
+ return err
+}
+
+func (fn *ForeachConfigNode) run(ctx context.Context, updateTasks func() error) error {
+ for {
+ select {
+ case <-fn.forEachChildrenUpdateChan:
+ err := updateTasks()
+ if err != nil {
+ level.Error(fn.logger).Log("msg", "error encountered while updating foreach children", "err", err)
+ fn.setRunHealth(component.HealthTypeUnhealthy, fmt.Sprintf("error encountered while updating foreach children: %s", err))
+ // the error is not fatal, the node can still run in unhealthy mode
+ } else {
+ fn.setRunHealth(component.HealthTypeHealthy, "foreach children updated successfully")
+ }
+ case <-ctx.Done():
+ return nil
+ }
+ }
+}
+
+// CurrentHealth returns the current health of the ForeachConfigNode.
+//
+// The health of a ForeachConfigNode is determined by combining:
+//
+// 1. Health from the call to Run().
+// 2. Health from the last call to Evaluate().
+func (fn *ForeachConfigNode) CurrentHealth() component.Health {
+ fn.healthMut.RLock()
+ defer fn.healthMut.RUnlock()
+ return component.LeastHealthy(fn.runHealth, fn.evalHealth)
+}
+
+func (fn *ForeachConfigNode) setEvalHealth(t component.HealthType, msg string) {
+ fn.healthMut.Lock()
+ defer fn.healthMut.Unlock()
+
+ fn.evalHealth = component.Health{
+ Health: t,
+ Message: msg,
+ UpdateTime: time.Now(),
+ }
+}
+
+func (fn *ForeachConfigNode) setRunHealth(t component.HealthType, msg string) {
+ fn.healthMut.Lock()
+ defer fn.healthMut.Unlock()
+
+ fn.runHealth = component.Health{
+ Health: t,
+ Message: msg,
+ UpdateTime: time.Now(),
+ }
+}
+
+type forEachChildRunner struct {
+ child *forEachChild
+}
+
+type forEachChild struct {
+ cc CustomComponent
+ id string
+ logger log.Logger
+ healthUpdate func(t component.HealthType, msg string)
+}
+
+func (fr *forEachChildRunner) Run(ctx context.Context) {
+ err := fr.child.cc.Run(ctx)
+ if err != nil {
+ level.Error(fr.child.logger).Log("msg", "foreach child stopped running", "err", err)
+ fr.child.healthUpdate(component.HealthTypeUnhealthy, fmt.Sprintf("foreach child stopped running: %s", err))
+ }
+}
+
+func (fi *forEachChild) Hash() uint64 {
+ fnvHash := fnv.New64a()
+ fnvHash.Write([]byte(fi.id))
+ return fnvHash.Sum64()
+}
+
+func (fi *forEachChild) Equals(other runner.Task) bool {
+ return fi.id == other.(*forEachChild).id
+}
+
+// This function uses a 256 bits hash to minimize the risk of collisions between foreach children.
+// If this is ever a performance bottleneck, it should still be totally safe to switch the 64bits hash.
+func computeHash(s string) string {
+ hasher := sha256.New()
+ hasher.Write([]byte(s))
+ return hex.EncodeToString(hasher.Sum(nil))
+}
+
+func objectFingerprint(obj any) string {
+ //TODO: Test what happens if there is a "true" string and a true bool in the collection.
+ switch v := obj.(type) {
+ case string:
+ return replaceNonAlphaNumeric(v)
+ case int, bool:
+ return fmt.Sprintf("%v", v)
+ case float64:
+ // Dots are not valid characters in Alloy syntax identifiers.
+ // For example, "foreach_3.14_1" should become "foreach_3_14_1".
+ return strings.Replace(fmt.Sprintf("%f", v), ".", "_", -1)
+ default:
+ return computeHash(fmt.Sprintf("%#v", v))
+ }
+}
+
+func replaceNonAlphaNumeric(s string) string {
+ var builder strings.Builder
+ for _, r := range s {
+ if unicode.IsLetter(r) || unicode.IsDigit(r) {
+ builder.WriteRune(r)
+ } else {
+ builder.WriteRune('_')
+ }
+ }
+ return builder.String()
+}
+
+type NoopRegistry struct{}
+
+var _ prometheus.Registerer = NoopRegistry{}
+
+// MustRegister implements prometheus.Registerer.
+func (n NoopRegistry) MustRegister(...prometheus.Collector) {}
+
+// Register implements prometheus.Registerer.
+func (n NoopRegistry) Register(prometheus.Collector) error {
+ return nil
+}
+
+// Unregister implements prometheus.Registerer.
+func (n NoopRegistry) Unregister(prometheus.Collector) bool {
+ return true
+}
diff --git a/internal/runtime/internal/controller/node_config_foreach_test.go b/internal/runtime/internal/controller/node_config_foreach_test.go
new file mode 100644
index 0000000000..035744e661
--- /dev/null
+++ b/internal/runtime/internal/controller/node_config_foreach_test.go
@@ -0,0 +1,349 @@
+package controller
+
+import (
+ "context"
+ "os"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/grafana/alloy/internal/component"
+ "github.com/grafana/alloy/internal/featuregate"
+ "github.com/grafana/alloy/internal/runtime/logging"
+ "github.com/grafana/alloy/syntax/ast"
+ "github.com/grafana/alloy/syntax/parser"
+ "github.com/grafana/alloy/syntax/vm"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.opentelemetry.io/otel/trace/noop"
+)
+
+func TestCreateCustomComponents(t *testing.T) {
+ config := `foreach "default" {
+ collection = [1, 2, 3]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+ customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
+ require.ElementsMatch(t, customComponentIds, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"})
+ keys := make([]string, 0, len(foreachConfigNode.customComponents))
+ for key := range foreachConfigNode.customComponents {
+ keys = append(keys, key)
+ }
+ require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"})
+}
+
+func TestCreateCustomComponentsDuplicatedIds(t *testing.T) {
+ config := `foreach "default" {
+ collection = [1, 2, 1]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+ customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
+ require.ElementsMatch(t, customComponentIds, []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"})
+ keys := make([]string, 0, len(foreachConfigNode.customComponents))
+ for key := range foreachConfigNode.customComponents {
+ keys = append(keys, key)
+ }
+ require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"})
+}
+
+func TestCreateCustomComponentsWithUpdate(t *testing.T) {
+ config := `foreach "default" {
+ collection = [1, 2, 3]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+ customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
+ require.ElementsMatch(t, customComponentIds, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"})
+ keys := make([]string, 0, len(foreachConfigNode.customComponents))
+ for key := range foreachConfigNode.customComponents {
+ keys = append(keys, key)
+ }
+ require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_3_1"})
+
+ newConfig := `foreach "default" {
+ collection = [2, 1, 1]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode.moduleController.(*ModuleControllerMock).Reset()
+ foreachConfigNode.UpdateBlock(getBlockFromConfig(t, newConfig))
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+ customComponentIds = foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
+
+ // Only the 2nd "1" item in the collection is created because the two others were already created.
+ require.ElementsMatch(t, customComponentIds, []string{"foreach_1_2"})
+
+ // "foreach31" was removed, "foreach12" was added
+ keys = make([]string, 0, len(foreachConfigNode.customComponents))
+ for key := range foreachConfigNode.customComponents {
+ keys = append(keys, key)
+ }
+ require.ElementsMatch(t, keys, []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"})
+}
+
+func TestRunCustomComponents(t *testing.T) {
+ config := `foreach "default" {
+ collection = [1, 2, 3]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+ ctx, cancel := context.WithCancel(context.Background())
+ go foreachConfigNode.Run(ctx)
+
+ // check that all custom components are running correctly
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ for _, cc := range foreachConfigNode.customComponents {
+ assert.True(c, cc.(*CustomComponentMock).IsRunning.Load())
+ }
+ }, 1*time.Second, 5*time.Millisecond)
+
+ cancel()
+ // check that all custom components are stopped
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ for _, cc := range foreachConfigNode.customComponents {
+ assert.False(c, cc.(*CustomComponentMock).IsRunning.Load())
+ }
+ }, 1*time.Second, 5*time.Millisecond)
+}
+
+func TestRunCustomComponentsAfterUpdate(t *testing.T) {
+ config := `foreach "default" {
+ collection = [1, 2, 3]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+ ctx, cancel := context.WithCancel(context.Background())
+ go foreachConfigNode.Run(ctx)
+
+ // check that all custom components are running correctly
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ for _, cc := range foreachConfigNode.customComponents {
+ assert.True(c, cc.(*CustomComponentMock).IsRunning.Load())
+ }
+ }, 1*time.Second, 5*time.Millisecond)
+
+ newConfig := `foreach "default" {
+ collection = [2, 1, 1]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode.moduleController.(*ModuleControllerMock).Reset()
+ foreachConfigNode.UpdateBlock(getBlockFromConfig(t, newConfig))
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+
+ newComponentIds := []string{"foreach_1_1", "foreach_2_1", "foreach_1_2"}
+ // check that all new custom components are running correctly
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ for id, cc := range foreachConfigNode.customComponents {
+ assert.Contains(c, newComponentIds, id)
+ assert.True(c, cc.(*CustomComponentMock).IsRunning.Load())
+ }
+ }, 1*time.Second, 5*time.Millisecond)
+
+ cancel()
+ // check that all custom components are stopped
+ require.EventuallyWithT(t, func(c *assert.CollectT) {
+ for _, cc := range foreachConfigNode.customComponents {
+ assert.False(c, cc.(*CustomComponentMock).IsRunning.Load())
+ }
+ }, 1*time.Second, 5*time.Millisecond)
+}
+
+func TestCreateCustomComponentsCollectionObjectsWithUpdate(t *testing.T) {
+ config := `foreach "default" {
+ collection = [obj1, obj2]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ vars := map[string]interface{}{
+ "obj1": map[string]string{
+ "label1": "a",
+ "label2": "b",
+ },
+ "obj2": map[string]string{
+ "label3": "c",
+ },
+ }
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(vars)))
+ customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
+ require.ElementsMatch(t, customComponentIds, []string{"foreach_be19d02a2ccb2cbc2c47e90dcad8446a50459577449624176398d1f2aa6cd23a_1", "foreach_b335d50e2e8490eb8bf5f51b3ca8b1599d811514ca40d28ada5214294d49752d_1"})
+ keys := make([]string, 0, len(foreachConfigNode.customComponents))
+ for key := range foreachConfigNode.customComponents {
+ keys = append(keys, key)
+ }
+ require.ElementsMatch(t, keys, []string{"foreach_be19d02a2ccb2cbc2c47e90dcad8446a50459577449624176398d1f2aa6cd23a_1", "foreach_b335d50e2e8490eb8bf5f51b3ca8b1599d811514ca40d28ada5214294d49752d_1"})
+
+ newConfig := `foreach "default" {
+ collection = [obj1, obj3]
+ var = "num"
+ template {
+ }
+ }`
+ vars2 := map[string]interface{}{
+ "obj1": map[string]string{
+ "label1": "a",
+ "label2": "b",
+ },
+ "obj3": map[string]string{
+ "label3": "d",
+ },
+ }
+ foreachConfigNode.moduleController.(*ModuleControllerMock).Reset()
+ foreachConfigNode.UpdateBlock(getBlockFromConfig(t, newConfig))
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(vars2)))
+ customComponentIds = foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
+
+ // Create only the custom component for the obj3 because the one for obj1 was already created
+ require.ElementsMatch(t, customComponentIds, []string{"foreach_1464766cf9c8fd1095d0f7a22abe0632b6a6d44c3eeae65766086350eef3ac33_1"})
+
+ // "foreach_b335d50e2e8490eb8bf5f51b3ca8b1599d811514ca40d28ada5214294d49752d_1" was removed, "foreach_1464766cf9c8fd1095d0f7a22abe0632b6a6d44c3eeae65766086350eef3ac33_1" was added
+ keys = make([]string, 0, len(foreachConfigNode.customComponents))
+ for key := range foreachConfigNode.customComponents {
+ keys = append(keys, key)
+ }
+ require.ElementsMatch(t, keys, []string{"foreach_be19d02a2ccb2cbc2c47e90dcad8446a50459577449624176398d1f2aa6cd23a_1", "foreach_1464766cf9c8fd1095d0f7a22abe0632b6a6d44c3eeae65766086350eef3ac33_1"})
+}
+
+func TestNonAlphaNumericString(t *testing.T) {
+ config := `foreach "default" {
+ collection = ["123./st%4$"]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+ customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
+ require.ElementsMatch(t, customComponentIds, []string{"foreach_123__st_4__1"})
+}
+
+func TestNonAlphaNumericString2(t *testing.T) {
+ // All non-alphanumeric characters are replaced with "_".
+ // This test uses two different strings that will be normalized to the same string.
+ // Both "123./s4" and "123/.s4" will become "123__s4".
+ // We expect this to be ok - the controller will name one of them "123__s4_1", and the other "123__s4_2"
+ config := `foreach "default" {
+ collection = ["123./s4", "123/.s4"]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+ customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
+ require.ElementsMatch(t, customComponentIds, []string{"foreach_123__s4_1", "foreach_123__s4_2"})
+}
+
+func TestNonAlphaNumericString3(t *testing.T) {
+ // The "123./s4" non-alphanumeric string should normally be converted into "foreach_123__s4_1".
+ // However, there is already a "foreach_123__s4_1".
+ // We expect the controller to avoid such name collisions.
+ config := `foreach "default" {
+ collection = ["123./s4", "123__s4_1"]
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))))
+ customComponentIds := foreachConfigNode.moduleController.(*ModuleControllerMock).CustomComponents
+ //TODO: It's not very clear which item became "foreach_123__s4_1_1".
+ // To avoid confusion, maybe we should log a mapping?
+ require.ElementsMatch(t, customComponentIds, []string{"foreach_123__s4_1", "foreach_123__s4_1_1"})
+}
+
+func TestCollectionNonArrayValue(t *testing.T) {
+ config := `foreach "default" {
+ collection = "aaa"
+ var = "num"
+ template {
+ }
+ }`
+ foreachConfigNode := NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t), nil)
+ require.ErrorContains(t, foreachConfigNode.Evaluate(vm.NewScope(make(map[string]interface{}))), `"aaa" should be array, got string`)
+}
+
+func getBlockFromConfig(t *testing.T, config string) *ast.BlockStmt {
+ file, err := parser.ParseFile("", []byte(config))
+ require.NoError(t, err)
+ return file.Body[0].(*ast.BlockStmt)
+}
+
+func getComponentGlobals(t *testing.T) ComponentGlobals {
+ l, _ := logging.New(os.Stderr, logging.DefaultOptions)
+ return ComponentGlobals{
+ Logger: l,
+ TraceProvider: noop.NewTracerProvider(),
+ DataPath: t.TempDir(),
+ MinStability: featuregate.StabilityGenerallyAvailable,
+ OnBlockNodeUpdate: func(cn BlockNode) { /* no-op */ },
+ Registerer: prometheus.NewRegistry(),
+ NewModuleController: func(opts ModuleControllerOpts) ModuleController {
+ return NewModuleControllerMock()
+ },
+ }
+}
+
+type ModuleControllerMock struct {
+ CustomComponents []string
+}
+
+func NewModuleControllerMock() ModuleController {
+ return &ModuleControllerMock{
+ CustomComponents: make([]string, 0),
+ }
+}
+
+func (m *ModuleControllerMock) NewModule(id string, export component.ExportFunc) (component.Module, error) {
+ return nil, nil
+}
+
+func (m *ModuleControllerMock) ModuleIDs() []string {
+ return nil
+}
+
+func (m *ModuleControllerMock) NewCustomComponent(id string, export component.ExportFunc) (CustomComponent, error) {
+ m.CustomComponents = append(m.CustomComponents, id)
+ return &CustomComponentMock{}, nil
+}
+
+func (m *ModuleControllerMock) Reset() {
+ m.CustomComponents = make([]string, 0)
+}
+
+type CustomComponentMock struct {
+ IsRunning atomic.Bool
+}
+
+func (c *CustomComponentMock) LoadBody(body ast.Body, args map[string]any, customComponentRegistry *CustomComponentRegistry) error {
+ return nil
+}
+
+func (c *CustomComponentMock) Run(ctx context.Context) error {
+ c.IsRunning.Store(true)
+ <-ctx.Done()
+ c.IsRunning.Store(false)
+ return nil
+}
diff --git a/internal/runtime/internal/controller/node_custom_component.go b/internal/runtime/internal/controller/node_custom_component.go
index 52fff29bc0..df98803857 100644
--- a/internal/runtime/internal/controller/node_custom_component.go
+++ b/internal/runtime/internal/controller/node_custom_component.go
@@ -114,7 +114,7 @@ func NewCustomComponentNode(globals ComponentGlobals, b *ast.BlockStmt, getConfi
componentName: componentName,
importNamespace: importNamespace,
customComponentName: customComponentName,
- moduleController: globals.NewModuleController(globalID),
+ moduleController: globals.NewModuleController(ModuleControllerOpts{Id: globalID}),
OnBlockNodeUpdate: globals.OnBlockNodeUpdate,
logger: log.With(globals.Logger, "component_path", parent, "component_id", node),
getConfig: getConfig,
diff --git a/internal/runtime/internal/testcomponents/pulse.go b/internal/runtime/internal/testcomponents/pulse.go
new file mode 100644
index 0000000000..20876a2761
--- /dev/null
+++ b/internal/runtime/internal/testcomponents/pulse.go
@@ -0,0 +1,106 @@
+package testcomponents
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/go-kit/log"
+ "github.com/grafana/alloy/internal/component"
+ "github.com/grafana/alloy/internal/featuregate"
+ "github.com/grafana/alloy/internal/runtime/logging/level"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// testcomponents.pulse sends the value 1 at the defined frequency for a number of times defined by the max argument.
+func init() {
+ component.Register(component.Registration{
+ Name: "testcomponents.pulse",
+ Stability: featuregate.StabilityPublicPreview,
+ Args: PulseConfig{},
+
+ Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
+ return NewPulse(opts, args.(PulseConfig))
+ },
+ })
+}
+
+type PulseConfig struct {
+ Frequency time.Duration `alloy:"frequency,attr"`
+ Max int `alloy:"max,attr"`
+ ForwardTo []IntReceiver `alloy:"forward_to,attr,optional"`
+}
+
+type Pulse struct {
+ opts component.Options
+ log log.Logger
+
+ cfgMut sync.Mutex
+ cfg PulseConfig
+ count int
+
+ pulseCount prometheus.Counter
+}
+
+func NewPulse(o component.Options, cfg PulseConfig) (*Pulse, error) {
+ t := &Pulse{
+ opts: o,
+ log: o.Logger,
+ pulseCount: prometheus.NewCounter(prometheus.CounterOpts{Name: "pulse_count"}),
+ }
+
+ err := o.Registerer.Register(t.pulseCount)
+ if err != nil {
+ return nil, err
+ }
+
+ if err := t.Update(cfg); err != nil {
+ return nil, err
+ }
+ return t, nil
+}
+
+var (
+ _ component.Component = (*Pulse)(nil)
+)
+
+func (p *Pulse) Run(ctx context.Context) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-time.After(p.getNextPulse()):
+ p.cfgMut.Lock()
+ if p.cfg.Max == 0 || p.count < p.cfg.Max {
+ for _, r := range p.cfg.ForwardTo {
+ r.ReceiveInt(1)
+ }
+ p.pulseCount.Inc()
+ p.count++
+ }
+ p.cfgMut.Unlock()
+ }
+ }
+}
+
+func (t *Pulse) getNextPulse() time.Duration {
+ t.cfgMut.Lock()
+ defer t.cfgMut.Unlock()
+ return t.cfg.Frequency
+}
+
+// Update implements Component.
+func (t *Pulse) Update(args component.Arguments) error {
+ t.cfgMut.Lock()
+ defer t.cfgMut.Unlock()
+
+ cfg := args.(PulseConfig)
+ if cfg.Frequency == 0 {
+ return fmt.Errorf("frequency must not be 0")
+ }
+
+ level.Info(t.log).Log("msg", "setting count frequency", "freq", cfg.Frequency)
+ t.cfg = cfg
+ return nil
+}
diff --git a/internal/runtime/internal/testcomponents/string_receiver.go b/internal/runtime/internal/testcomponents/string_receiver.go
new file mode 100644
index 0000000000..67cd2b6f93
--- /dev/null
+++ b/internal/runtime/internal/testcomponents/string_receiver.go
@@ -0,0 +1,91 @@
+package testcomponents
+
+import (
+ "context"
+ "sync"
+
+ "github.com/go-kit/log"
+ "github.com/grafana/alloy/internal/component"
+ "github.com/grafana/alloy/internal/featuregate"
+)
+
+func init() {
+ component.Register(component.Registration{
+ Name: "testcomponents.string_receiver",
+ Stability: featuregate.StabilityPublicPreview,
+ Args: StringReceiverConfig{},
+ Exports: StringReceiverExports{},
+
+ Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
+ return NewStringReceiverComp(opts, args.(StringReceiverConfig))
+ },
+ })
+}
+
+type StringReceiverConfig struct {
+}
+
+type StringReceiver interface {
+ Receive(string)
+}
+
+type StringReceiverImpl struct {
+ log func(string)
+}
+
+func (r StringReceiverImpl) Receive(s string) {
+ r.log(s)
+}
+
+type StringReceiverExports struct {
+ Receiver StringReceiver `alloy:"receiver,attr"`
+}
+
+type StringReceiverComponent struct {
+ opts component.Options
+ log log.Logger
+
+ mut sync.Mutex
+ recvStr string
+ receiver StringReceiver
+}
+
+// NewStringReceiver creates a new string_receiver component.
+func NewStringReceiverComp(o component.Options, cfg StringReceiverConfig) (*StringReceiverComponent, error) {
+ s := &StringReceiverComponent{opts: o, log: o.Logger}
+ s.receiver = StringReceiverImpl{
+ log: func(str string) {
+ s.mut.Lock()
+ defer s.mut.Unlock()
+ s.recvStr += str + "\n"
+ },
+ }
+
+ o.OnStateChange(StringReceiverExports{
+ Receiver: s.receiver,
+ })
+
+ return s, nil
+}
+
+var (
+ _ component.Component = (*StringReceiverComponent)(nil)
+)
+
+// Run implements Component.
+func (s *StringReceiverComponent) Run(ctx context.Context) error {
+ <-ctx.Done()
+ return nil
+}
+
+// Return the receiver as debug info instead of export to avoid evaluation loop.
+func (s *StringReceiverComponent) DebugInfo() interface{} {
+ s.mut.Lock()
+ defer s.mut.Unlock()
+ return s.recvStr
+}
+
+// Update implements Component.
+func (s *StringReceiverComponent) Update(args component.Arguments) error {
+ return nil
+}
diff --git a/internal/runtime/internal/testcomponents/stringer.go b/internal/runtime/internal/testcomponents/stringer.go
new file mode 100644
index 0000000000..4e475d5a08
--- /dev/null
+++ b/internal/runtime/internal/testcomponents/stringer.go
@@ -0,0 +1,95 @@
+package testcomponents
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/go-kit/log"
+ "github.com/grafana/alloy/internal/component"
+ "github.com/grafana/alloy/internal/featuregate"
+)
+
+// testcomponents.stringer takes in an Alloy value, converts it to a string, and forwards it to the defined receivers.
+func init() {
+ component.Register(component.Registration{
+ Name: "testcomponents.stringer",
+ Stability: featuregate.StabilityPublicPreview,
+ Args: StringerConfig{},
+
+ Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
+ return NewStringer(opts, args.(StringerConfig))
+ },
+ })
+}
+
+type StringerConfig struct {
+ InputString *string `alloy:"input_string,attr,optional"`
+ InputInt *int `alloy:"input_int,attr,optional"`
+ InputFloat *float64 `alloy:"input_float,attr,optional"`
+ InputBool *bool `alloy:"input_bool,attr,optional"`
+ InputMap *map[string]any `alloy:"input_map,attr,optional"`
+ InputArray *[]any `alloy:"input_array,attr,optional"`
+ ForwardTo []StringReceiver `alloy:"forward_to,attr"`
+}
+
+type Stringer struct {
+ opts component.Options
+ log log.Logger
+ cfgUpdate chan StringerConfig
+}
+
+func NewStringer(o component.Options, cfg StringerConfig) (*Stringer, error) {
+ t := &Stringer{
+ opts: o,
+ log: o.Logger,
+ cfgUpdate: make(chan StringerConfig, 10),
+ }
+ return t, nil
+}
+
+var (
+ _ component.Component = (*Stringer)(nil)
+)
+
+func forward(val any, to []StringReceiver) {
+ for _, r := range to {
+ str := fmt.Sprintf("%#v", val)
+ r.Receive(str)
+ }
+}
+
+func (s *Stringer) Run(ctx context.Context) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case cfg := <-s.cfgUpdate:
+ // Send the new values to the receivers
+ if cfg.InputString != nil {
+ forward(*cfg.InputString, cfg.ForwardTo)
+ }
+ if cfg.InputInt != nil {
+ forward(*cfg.InputInt, cfg.ForwardTo)
+ }
+ if cfg.InputFloat != nil {
+ forward(*cfg.InputFloat, cfg.ForwardTo)
+ }
+ if cfg.InputBool != nil {
+ forward(*cfg.InputBool, cfg.ForwardTo)
+ }
+ if cfg.InputArray != nil {
+ forward(*cfg.InputArray, cfg.ForwardTo)
+ }
+ if cfg.InputMap != nil {
+ forward(*cfg.InputMap, cfg.ForwardTo)
+ }
+ }
+ }
+}
+
+// Update implements Component.
+func (s *Stringer) Update(args component.Arguments) error {
+ cfg := args.(StringerConfig)
+ s.cfgUpdate <- cfg
+ return nil
+}
diff --git a/internal/runtime/internal/testcomponents/summation_receiver.go b/internal/runtime/internal/testcomponents/summation_receiver.go
new file mode 100644
index 0000000000..9c853f7a0e
--- /dev/null
+++ b/internal/runtime/internal/testcomponents/summation_receiver.go
@@ -0,0 +1,89 @@
+package testcomponents
+
+import (
+ "context"
+
+ "github.com/go-kit/log"
+ "github.com/grafana/alloy/internal/component"
+ "github.com/grafana/alloy/internal/featuregate"
+ "go.uber.org/atomic"
+)
+
+// testcomponents.summation_receiver sums up the values that it receives via the exported int receiver.
+// The sum is exposed via the DebugInfo instead of the Exports to avoid triggering an update loop.
+// (the components that are using the exported receiver would be updated every time the sum would be updated)
+func init() {
+ component.Register(component.Registration{
+ Name: "testcomponents.summation_receiver",
+ Stability: featuregate.StabilityPublicPreview,
+ Args: SummationReceiverConfig{},
+ Exports: SummationReceiverExports{},
+
+ Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
+ return NewSummationReceiver(opts, args.(SummationReceiverConfig))
+ },
+ })
+}
+
+type SummationReceiverConfig struct {
+}
+
+type IntReceiver interface {
+ ReceiveInt(int)
+}
+
+type IntReceiverImpl struct {
+ incrementSum func(int)
+}
+
+func (r IntReceiverImpl) ReceiveInt(i int) {
+ r.incrementSum(i)
+}
+
+type SummationReceiverExports struct {
+ Receiver IntReceiver `alloy:"receiver,attr"`
+}
+
+type SummationReceiver struct {
+ opts component.Options
+ log log.Logger
+
+ sum atomic.Int32
+ receiver IntReceiver
+}
+
+// NewSummationReceiver creates a new summation component.
+func NewSummationReceiver(o component.Options, cfg SummationReceiverConfig) (*SummationReceiver, error) {
+ s := &SummationReceiver{opts: o, log: o.Logger}
+ s.receiver = IntReceiverImpl{
+ incrementSum: func(i int) {
+ s.sum.Add(int32(i))
+ },
+ }
+
+ o.OnStateChange(SummationReceiverExports{
+ Receiver: s.receiver,
+ })
+
+ return s, nil
+}
+
+var (
+ _ component.Component = (*SummationReceiver)(nil)
+)
+
+// Run implements Component.
+func (s *SummationReceiver) Run(ctx context.Context) error {
+ <-ctx.Done()
+ return nil
+}
+
+// Return the sum as debug info instead of export to avoid evaluation loop.
+func (s *SummationReceiver) DebugInfo() interface{} {
+ return int(s.sum.Load())
+}
+
+// Update implements Component.
+func (s *SummationReceiver) Update(args component.Arguments) error {
+ return nil
+}
diff --git a/internal/runtime/source.go b/internal/runtime/source.go
index a32cdd3f5a..b036f7e127 100644
--- a/internal/runtime/source.go
+++ b/internal/runtime/source.go
@@ -77,7 +77,7 @@ func sourceFromBody(body ast.Body) (*Source, error) {
switch fullName {
case "declare":
declares = append(declares, stmt)
- case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git":
+ case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git", "foreach":
configs = append(configs, stmt)
default:
components = append(components, stmt)
diff --git a/internal/runtime/testdata/foreach/foreach_1.txtar b/internal/runtime/testdata/foreach/foreach_1.txtar
new file mode 100644
index 0000000000..4ca243844e
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_1.txtar
@@ -0,0 +1,19 @@
+Foreach with only one item. The pulse will send "1" to the receiver of the summation component until it reaches 10.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [10]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+// Similar to testcomponents.summation, but with a "receiver" export
+testcomponents.summation_receiver "sum" {
+}
diff --git a/internal/runtime/testdata/foreach/foreach_10.txtar b/internal/runtime/testdata/foreach/foreach_10.txtar
new file mode 100644
index 0000000000..50dc4cc353
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_10.txtar
@@ -0,0 +1,20 @@
+A collection containing arrays.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [[10, 4, 100], [20, 6, 200]]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ // Only ingest the 4 and the 6.
+ max = num[1]
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+// Similar to testcomponents.summation, but with a "receiver" export
+testcomponents.summation_receiver "sum" {
+}
diff --git a/internal/runtime/testdata/foreach/foreach_11.txtar b/internal/runtime/testdata/foreach/foreach_11.txtar
new file mode 100644
index 0000000000..25867a12ac
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_11.txtar
@@ -0,0 +1,19 @@
+A collection containing maps which contain maps.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [{"a" = {"c" = 3}}, {"a" = {"c" = 7}}]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num["a"]["c"]
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+// Similar to testcomponents.summation, but with a "receiver" export
+testcomponents.summation_receiver "sum" {
+}
diff --git a/internal/runtime/testdata/foreach/foreach_2.txtar b/internal/runtime/testdata/foreach/foreach_2.txtar
new file mode 100644
index 0000000000..d7750d3fed
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_2.txtar
@@ -0,0 +1,18 @@
+Foreach with two items. Both pulse components will send "1" till they both reach 5, adding to 10 in the summation component.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [5, 5]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+testcomponents.summation_receiver "sum" {
+}
diff --git a/internal/runtime/testdata/foreach/foreach_3.txtar b/internal/runtime/testdata/foreach/foreach_3.txtar
new file mode 100644
index 0000000000..d352462c43
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_3.txtar
@@ -0,0 +1,37 @@
+The config is reloaded to reach a total sum of 30.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [5, 5]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+testcomponents.summation_receiver "sum" {
+}
+
+-- reload_config.alloy --
+foreach "testForeach" {
+ // For the tests you must put different numbers in the collection because the foreach node caches the nodes.
+ // Here we need all nodes to be created new because the pulse components will stop sending pulses after they reach their max.
+ collection = [6, 8, 6]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+testcomponents.summation_receiver "sum" {
+}
diff --git a/internal/runtime/testdata/foreach/foreach_4.txtar b/internal/runtime/testdata/foreach/foreach_4.txtar
new file mode 100644
index 0000000000..5a7191eb19
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_4.txtar
@@ -0,0 +1,30 @@
+Foreach in a module.
+
+-- main.alloy --
+import.file "testImport" {
+ filename = "module.alloy"
+}
+
+testImport.a "cc" {
+ receiver = testcomponents.summation_receiver.sum.receiver
+}
+
+testcomponents.summation_receiver "sum" {
+}
+
+-- module.alloy --
+declare "a" {
+ argument "receiver" {}
+ foreach "testForeach" {
+ collection = [5, 5]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [argument.receiver.value]
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/internal/runtime/testdata/foreach/foreach_5.txtar b/internal/runtime/testdata/foreach/foreach_5.txtar
new file mode 100644
index 0000000000..8877b45da9
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_5.txtar
@@ -0,0 +1,47 @@
+Foreach in a module which will be updated to reach a sum of 30.
+
+-- main.alloy --
+import.file "testImport" {
+ filename = "module.alloy"
+}
+
+testImport.a "cc" {
+ receiver = testcomponents.summation_receiver.sum.receiver
+}
+
+testcomponents.summation_receiver "sum" {
+}
+
+-- module.alloy --
+declare "a" {
+ argument "receiver" {}
+ foreach "testForeach" {
+ collection = [5, 5]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [argument.receiver.value]
+ }
+ }
+ }
+}
+
+-- update/module.alloy --
+declare "a" {
+ argument "receiver" {}
+ foreach "testForeach" {
+ collection = [6, 8, 6]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [argument.receiver.value]
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/internal/runtime/testdata/foreach/foreach_6.txtar b/internal/runtime/testdata/foreach/foreach_6.txtar
new file mode 100644
index 0000000000..6a9a67b926
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_6.txtar
@@ -0,0 +1,26 @@
+Foreach in a foreach.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [1, 2]
+ var = "num"
+
+ template {
+ foreach "testForeachNested" {
+ collection = [1, 2]
+ var = "nestedNum"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num + nestedNum
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+
+ }
+ }
+}
+
+testcomponents.summation_receiver "sum" {
+}
diff --git a/internal/runtime/testdata/foreach/foreach_7.txtar b/internal/runtime/testdata/foreach/foreach_7.txtar
new file mode 100644
index 0000000000..4994d52549
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_7.txtar
@@ -0,0 +1,32 @@
+Module used inside of a foreach.
+
+-- main.alloy --
+import.file "testImport" {
+ filename = "module.alloy"
+}
+
+foreach "testForeach" {
+ collection = [5, 5]
+ var = "num"
+
+ template {
+ testImport.a "cc" {
+ max = num
+ receiver = testcomponents.summation_receiver.sum.receiver
+ }
+ }
+}
+
+testcomponents.summation_receiver "sum" {
+}
+
+-- module.alloy --
+declare "a" {
+ argument "max" {}
+ argument "receiver" {}
+ testcomponents.pulse "pt" {
+ max = argument.max.value
+ frequency = "10ms"
+ forward_to = [argument.receiver.value]
+ }
+}
\ No newline at end of file
diff --git a/internal/runtime/testdata/foreach/foreach_8.txtar b/internal/runtime/testdata/foreach/foreach_8.txtar
new file mode 100644
index 0000000000..763c04e3cd
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_8.txtar
@@ -0,0 +1,23 @@
+Foreach with two components linked together.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [10]
+ var = "num"
+
+ template {
+ testcomponents.passthrough "pt" {
+ input = num
+ lag = "1ms"
+ }
+ testcomponents.pulse "p" {
+ max = testcomponents.passthrough.pt.output
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+// Similar to testcomponents.summation, but with a "receiver" export
+testcomponents.summation_receiver "sum" {
+}
diff --git a/internal/runtime/testdata/foreach/foreach_9.txtar b/internal/runtime/testdata/foreach/foreach_9.txtar
new file mode 100644
index 0000000000..11c371a4f5
--- /dev/null
+++ b/internal/runtime/testdata/foreach/foreach_9.txtar
@@ -0,0 +1,19 @@
+A collection containing maps.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [{"a" = 4}, {"a" = 6}]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num["a"]
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+// Similar to testcomponents.summation, but with a "receiver" export
+testcomponents.summation_receiver "sum" {
+}
diff --git a/internal/runtime/testdata/foreach_metrics/foreach_1.txtar b/internal/runtime/testdata/foreach_metrics/foreach_1.txtar
new file mode 100644
index 0000000000..67bd58b01d
--- /dev/null
+++ b/internal/runtime/testdata/foreach_metrics/foreach_1.txtar
@@ -0,0 +1,36 @@
+Test to make sure debug metrics are disabled by default for foreach.
+Use a foreach with only one item. The pulse will send "1" to the receiver of the summation component until it reaches 10.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [10]
+ var = "num"
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+// Similar to testcomponents.summation, but with a "receiver" export
+testcomponents.summation_receiver "sum" {
+}
+
+-- expected_metrics.prom --
+
+# HELP alloy_component_controller_evaluating Tracks if the controller is currently in the middle of a graph evaluation
+# TYPE alloy_component_controller_evaluating gauge
+alloy_component_controller_evaluating{controller_id="",controller_path="/"} 0
+# HELP alloy_component_controller_running_components Total number of running components.
+# TYPE alloy_component_controller_running_components gauge
+alloy_component_controller_running_components{controller_id="",controller_path="/",health_type="healthy"} 2
+# HELP alloy_component_evaluation_queue_size Tracks the number of components waiting to be evaluated in the worker pool
+# TYPE alloy_component_evaluation_queue_size gauge
+alloy_component_evaluation_queue_size{controller_id="",controller_path="/"} 1
+
+-- expected_duration_metrics.prom --
+
+2
\ No newline at end of file
diff --git a/internal/runtime/testdata/foreach_metrics/foreach_2.txtar b/internal/runtime/testdata/foreach_metrics/foreach_2.txtar
new file mode 100644
index 0000000000..b8894cc32c
--- /dev/null
+++ b/internal/runtime/testdata/foreach_metrics/foreach_2.txtar
@@ -0,0 +1,43 @@
+Test to make sure debug metrics for foreach are shown when they are enabled explicitly.
+Foreach with only one item. The pulse will send "1" to the receiver of the summation component until it reaches 10.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [10]
+ var = "num"
+ enable_metrics = true
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+// Similar to testcomponents.summation, but with a "receiver" export
+testcomponents.summation_receiver "sum" {
+}
+
+-- expected_metrics.prom --
+
+# HELP alloy_component_controller_evaluating Tracks if the controller is currently in the middle of a graph evaluation
+# TYPE alloy_component_controller_evaluating gauge
+alloy_component_controller_evaluating{controller_id="",controller_path="/"} 0
+alloy_component_controller_evaluating{controller_id="foreach_10_1",controller_path="/foreach.testForeach"} 0
+# HELP alloy_component_controller_running_components Total number of running components.
+# TYPE alloy_component_controller_running_components gauge
+alloy_component_controller_running_components{controller_id="",controller_path="/",health_type="healthy"} 2
+alloy_component_controller_running_components{controller_id="foreach_10_1",controller_path="/foreach.testForeach",health_type="healthy"} 1
+# HELP alloy_component_evaluation_queue_size Tracks the number of components waiting to be evaluated in the worker pool
+# TYPE alloy_component_evaluation_queue_size gauge
+alloy_component_evaluation_queue_size{controller_id="",controller_path="/"} 1
+alloy_component_evaluation_queue_size{controller_id="foreach_10_1",controller_path="/foreach.testForeach"} 0
+# HELP pulse_count
+# TYPE pulse_count counter
+pulse_count{component_id="testcomponents.pulse.pt",component_path="/foreach.testForeach/foreach_10_1"} 10
+
+-- expected_duration_metrics.prom --
+
+4
\ No newline at end of file
diff --git a/internal/runtime/testdata/foreach_metrics/foreach_3.txtar b/internal/runtime/testdata/foreach_metrics/foreach_3.txtar
new file mode 100644
index 0000000000..06809a3dd7
--- /dev/null
+++ b/internal/runtime/testdata/foreach_metrics/foreach_3.txtar
@@ -0,0 +1,46 @@
+This test uses two different items in the collection.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [6,4]
+ var = "num"
+ enable_metrics = true
+
+ template {
+ testcomponents.pulse "pt" {
+ max = num
+ frequency = "10ms"
+ forward_to = [testcomponents.summation_receiver.sum.receiver]
+ }
+ }
+}
+
+// Similar to testcomponents.summation, but with a "receiver" export
+testcomponents.summation_receiver "sum" {
+}
+
+-- expected_metrics.prom --
+
+# HELP alloy_component_controller_evaluating Tracks if the controller is currently in the middle of a graph evaluation
+# TYPE alloy_component_controller_evaluating gauge
+alloy_component_controller_evaluating{controller_id="",controller_path="/"} 0
+alloy_component_controller_evaluating{controller_id="foreach_4_1",controller_path="/foreach.testForeach"} 0
+alloy_component_controller_evaluating{controller_id="foreach_6_1",controller_path="/foreach.testForeach"} 0
+# HELP alloy_component_controller_running_components Total number of running components.
+# TYPE alloy_component_controller_running_components gauge
+alloy_component_controller_running_components{controller_id="",controller_path="/",health_type="healthy"} 2
+alloy_component_controller_running_components{controller_id="foreach_4_1",controller_path="/foreach.testForeach",health_type="healthy"} 1
+alloy_component_controller_running_components{controller_id="foreach_6_1",controller_path="/foreach.testForeach",health_type="healthy"} 1
+# HELP alloy_component_evaluation_queue_size Tracks the number of components waiting to be evaluated in the worker pool
+# TYPE alloy_component_evaluation_queue_size gauge
+alloy_component_evaluation_queue_size{controller_id="",controller_path="/"} 1
+alloy_component_evaluation_queue_size{controller_id="foreach_4_1",controller_path="/foreach.testForeach"} 0
+alloy_component_evaluation_queue_size{controller_id="foreach_6_1",controller_path="/foreach.testForeach"} 0
+# HELP pulse_count
+# TYPE pulse_count counter
+pulse_count{component_id="testcomponents.pulse.pt",component_path="/foreach.testForeach/foreach_4_1"} 4
+pulse_count{component_id="testcomponents.pulse.pt",component_path="/foreach.testForeach/foreach_6_1"} 6
+
+-- expected_duration_metrics.prom --
+
+6
\ No newline at end of file
diff --git a/internal/runtime/testdata/foreach_stringer/foreach_1.txtar b/internal/runtime/testdata/foreach_stringer/foreach_1.txtar
new file mode 100644
index 0000000000..47c4de7aa9
--- /dev/null
+++ b/internal/runtime/testdata/foreach_stringer/foreach_1.txtar
@@ -0,0 +1,22 @@
+A collection containing an int.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [1]
+ var = "item"
+
+ template {
+ testcomponents.stringer "st" {
+ input_int = item
+ forward_to = [testcomponents.string_receiver.log.receiver]
+ }
+ }
+}
+
+// Receive strings and append them to a log,
+// separated by a new line.
+testcomponents.string_receiver "log" {
+}
+
+-- expected_debug_info.txt --
+1
diff --git a/internal/runtime/testdata/foreach_stringer/foreach_2.txtar b/internal/runtime/testdata/foreach_stringer/foreach_2.txtar
new file mode 100644
index 0000000000..804139d297
--- /dev/null
+++ b/internal/runtime/testdata/foreach_stringer/foreach_2.txtar
@@ -0,0 +1,22 @@
+A collection containing a map.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [{"a" = 1, "b" = "c"}]
+ var = "item"
+
+ template {
+ testcomponents.stringer "st" {
+ input_map = item
+ forward_to = [testcomponents.string_receiver.log.receiver]
+ }
+ }
+}
+
+// Receive strings and append them to a log,
+// separated by a new line.
+testcomponents.string_receiver "log" {
+}
+
+-- expected_debug_info.txt --
+map[string]interface {}{"a":1, "b":"c"}
diff --git a/internal/runtime/testdata/foreach_stringer/foreach_3.txtar b/internal/runtime/testdata/foreach_stringer/foreach_3.txtar
new file mode 100644
index 0000000000..634412f888
--- /dev/null
+++ b/internal/runtime/testdata/foreach_stringer/foreach_3.txtar
@@ -0,0 +1,22 @@
+A collection containing an array of bools.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [[true, false]]
+ var = "item"
+
+ template {
+ testcomponents.stringer "st" {
+ input_array = item
+ forward_to = [testcomponents.string_receiver.log.receiver]
+ }
+ }
+}
+
+// Receive strings and append them to a log,
+// separated by a new line.
+testcomponents.string_receiver "log" {
+}
+
+-- expected_debug_info.txt --
+[]interface {}{true, false}
diff --git a/internal/runtime/testdata/foreach_stringer/foreach_4.txtar b/internal/runtime/testdata/foreach_stringer/foreach_4.txtar
new file mode 100644
index 0000000000..20682c7fcf
--- /dev/null
+++ b/internal/runtime/testdata/foreach_stringer/foreach_4.txtar
@@ -0,0 +1,22 @@
+A collection containing a bool.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [true]
+ var = "item"
+
+ template {
+ testcomponents.stringer "st" {
+ input_bool = item
+ forward_to = [testcomponents.string_receiver.log.receiver]
+ }
+ }
+}
+
+// Receive strings and append them to a log,
+// separated by a new line.
+testcomponents.string_receiver "log" {
+}
+
+-- expected_debug_info.txt --
+true
diff --git a/internal/runtime/testdata/foreach_stringer/foreach_5.txtar b/internal/runtime/testdata/foreach_stringer/foreach_5.txtar
new file mode 100644
index 0000000000..5b0cd3830d
--- /dev/null
+++ b/internal/runtime/testdata/foreach_stringer/foreach_5.txtar
@@ -0,0 +1,22 @@
+A collection containing a float.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [3.14]
+ var = "item"
+
+ template {
+ testcomponents.stringer "st" {
+ input_float = item
+ forward_to = [testcomponents.string_receiver.log.receiver]
+ }
+ }
+}
+
+// Receive strings and append them to a log,
+// separated by a new line.
+testcomponents.string_receiver "log" {
+}
+
+-- expected_debug_info.txt --
+3.14
diff --git a/internal/runtime/testdata/foreach_stringer/foreach_6.txtar b/internal/runtime/testdata/foreach_stringer/foreach_6.txtar
new file mode 100644
index 0000000000..5ce546b9ad
--- /dev/null
+++ b/internal/runtime/testdata/foreach_stringer/foreach_6.txtar
@@ -0,0 +1,22 @@
+A collection containing a string with a dot.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = ["aaa.bbb"]
+ var = "item"
+
+ template {
+ testcomponents.stringer "st" {
+ input_string = item
+ forward_to = [testcomponents.string_receiver.log.receiver]
+ }
+ }
+}
+
+// Receive strings and append them to a log,
+// separated by a new line.
+testcomponents.string_receiver "log" {
+}
+
+-- expected_debug_info.txt --
+"aaa.bbb"
\ No newline at end of file
diff --git a/internal/runtime/testdata/foreach_stringer/foreach_7.txtar b/internal/runtime/testdata/foreach_stringer/foreach_7.txtar
new file mode 100644
index 0000000000..5f2295226a
--- /dev/null
+++ b/internal/runtime/testdata/foreach_stringer/foreach_7.txtar
@@ -0,0 +1,27 @@
+A collection containing capsules.
+
+-- main.alloy --
+foreach "testForeach" {
+ collection = [testcomponents.string_receiver.log.receiver, testcomponents.string_receiver.log2.receiver]
+ var = "item"
+
+ template {
+ testcomponents.stringer "st" {
+ input_string = "gragent"
+ forward_to = [item]
+ }
+ }
+}
+
+// Receive strings and append them to a log,
+// separated by a new line.
+testcomponents.string_receiver "log" {
+}
+
+testcomponents.string_receiver "log2" {
+}
+
+-- expected_debug_info.txt --
+"gragent"
+-- expected_debug_info2.txt --
+"gragent"
\ No newline at end of file
diff --git a/internal/web/ui/src/features/component/ComponentList.tsx b/internal/web/ui/src/features/component/ComponentList.tsx
index b5224583e0..b2faba5b00 100644
--- a/internal/web/ui/src/features/component/ComponentList.tsx
+++ b/internal/web/ui/src/features/component/ComponentList.tsx
@@ -9,30 +9,38 @@ import styles from './ComponentList.module.css';
interface ComponentListProps {
components: ComponentInfo[];
- moduleID?: string;
+ overrideModuleID?: string;
useRemotecfg: boolean;
handleSorting?: (sortField: string, sortOrder: SortOrder) => void;
}
const TABLEHEADERS = ['Health', 'ID'];
-const ComponentList = ({ components, moduleID, useRemotecfg, handleSorting }: ComponentListProps) => {
+// overrideModuleID is a workaround for the remote config page because the remotecfg component has the moduleID of its controller,
+// it should not be fetched as a module.
+const ComponentList = ({ components, overrideModuleID, useRemotecfg, handleSorting }: ComponentListProps) => {
const tableStyles = { width: '130px' };
const urlPrefix = useRemotecfg ? '/remotecfg' : '';
- const pathPrefix = moduleID ? moduleID + '/' : '';
-
/**
* Custom renderer for table data
*/
const renderTableData = () => {
- return components.map(({ health, localID: id }) => (
+ return components.map(({ health, localID: id, moduleID }) => (
{id}
-
+
View
diff --git a/internal/web/ui/src/features/component/ComponentView.tsx b/internal/web/ui/src/features/component/ComponentView.tsx
index 5aa29c1375..733674a2f3 100644
--- a/internal/web/ui/src/features/component/ComponentView.tsx
+++ b/internal/web/ui/src/features/component/ComponentView.tsx
@@ -144,7 +144,7 @@ export const ComponentView: FC = (props) => {
)}
@@ -153,7 +153,7 @@ export const ComponentView: FC = (props) => {
)}
@@ -162,11 +162,7 @@ export const ComponentView: FC = (props) => {
)}
diff --git a/internal/web/ui/src/pages/PageRemoteComponentList.tsx b/internal/web/ui/src/pages/PageRemoteComponentList.tsx
index dd86ced561..8543ffab5a 100644
--- a/internal/web/ui/src/pages/PageRemoteComponentList.tsx
+++ b/internal/web/ui/src/pages/PageRemoteComponentList.tsx
@@ -38,7 +38,7 @@ function PageRemoteComponentList() {
return (
-
+
);
}