diff --git a/CHANGELOG.md b/CHANGELOG.md index 62e2923c50..fd94b4959e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,8 @@ Main (unreleased) - Fixed an issue where `loki.process` would sometimes output live debugging entries out-of-order (@thampiotr) +- Fixed a bug where components could be evaluated concurrently without the full context during a config reload (@wildum) + ### Other changes - Upgrading to Prometheus v2.54.1. (@ptodev) diff --git a/internal/runtime/alloy_test.go b/internal/runtime/alloy_test.go index 7eb849746f..01d3d497d4 100644 --- a/internal/runtime/alloy_test.go +++ b/internal/runtime/alloy_test.go @@ -1,11 +1,14 @@ package runtime import ( + "bytes" "context" "os" "path/filepath" + "strings" "testing" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -123,6 +126,60 @@ func TestController_LoadSource_WithModulePathWithoutFileExtension_Evaluation(t * require.Equal(t, filepath.Join("tmp_modulePath_test", "test"), out.(testcomponents.PassthroughExports).Output) } +// This test reloads the config a few times and checks that Alloy does not log errors. +// The ticker has a very small frequency to put pressure on the concurrent evaluations happening +// in the runtime while the loader is concurrently reloading the config. +func TestController_ReloadLoaderNoErrorLog(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + ctrl := New(testOptions(t)) + + var testFileFastTick = ` + testcomponents.tick "ticker" { + frequency = "10ns" + } + + testcomponents.passthrough "static" { + input = "hello, world!" + } + + testcomponents.passthrough "ticker" { + input = testcomponents.tick.ticker.tick_time + } + + testcomponents.passthrough "forwarded" { + input = testcomponents.passthrough.ticker.output + } +` + var logsBuffer bytes.Buffer + syncBuff := log.NewSyncWriter(&logsBuffer) + ctrl.log.SetTemporaryWriter(syncBuff) + + f, err := ParseSource(t.Name(), []byte(testFileFastTick)) + require.NoError(t, err) + require.NotNil(t, f) + + err = ctrl.LoadSource(f, nil, "") + require.NoError(t, err) + require.Len(t, ctrl.loader.Components(), 4) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + ctrl.Run(ctx) + close(done) + }() + + for i := 0; i < 5; i++ { + err = ctrl.LoadSource(f, nil, "") + require.NoError(t, err) + } + + cancel() + <-done + + require.False(t, strings.Contains(logsBuffer.String(), "level=error")) +} + func getFields(t *testing.T, g *dag.Graph, nodeID string) (component.Arguments, component.Exports) { t.Helper() diff --git a/internal/runtime/internal/controller/loader.go b/internal/runtime/internal/controller/loader.go index 455c8a32f8..065bfdc223 100644 --- a/internal/runtime/internal/controller/loader.go +++ b/internal/runtime/internal/controller/loader.go @@ -826,10 +826,10 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr var err error switch n := n.(type) { case BlockNode: - ectx := l.cache.GetContext() // RLock before evaluate to prevent Evaluating while the config is being reloaded l.mut.RLock() + ectx := l.cache.GetContext() evalErr := n.Evaluate(ectx) err = l.postEvaluate(l.log, n, evalErr) @@ -842,12 +842,12 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr // Upgrade to write lock to update the module exports. l.mut.RUnlock() l.mut.Lock() - defer l.mut.Unlock() // Check if the update still needed after obtaining the write lock and perform it. if l.cache.ExportChangeIndex() != l.moduleExportIndex { l.globals.OnExportsChange(l.cache.CreateModuleExports()) l.moduleExportIndex = l.cache.ExportChangeIndex() } + l.mut.Unlock() } else { // No need to upgrade to write lock, just release the read lock. l.mut.RUnlock()