Skip to content

Commit

Permalink
Move context retrieval under mutex lock in concurrent eval (#2648)
Browse files Browse the repository at this point in the history
* move context retrieval undex mutex lock in concurrent eval

* changelog

* change unlock logic

* add test

* typo
  • Loading branch information
wildum authored Feb 14, 2025
1 parent 4573fa5 commit a454352
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Main (unreleased)

- Fixed an issue where `loki.process` would sometimes output live debugging entries out-of-order (@thampiotr)

- Fixed a bug where components could be evaluated concurrently without the full context during a config reload (@wildum)

### Other changes

- Upgrading to Prometheus v2.54.1. (@ptodev)
Expand Down
57 changes: 57 additions & 0 deletions internal/runtime/alloy_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package runtime

import (
"bytes"
"context"
"os"
"path/filepath"
"strings"
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

Expand Down Expand Up @@ -123,6 +126,60 @@ func TestController_LoadSource_WithModulePathWithoutFileExtension_Evaluation(t *
require.Equal(t, filepath.Join("tmp_modulePath_test", "test"), out.(testcomponents.PassthroughExports).Output)
}

// This test reloads the config a few times and checks that Alloy does not log errors.
// The ticker has a very small frequency to put pressure on the concurrent evaluations happening
// in the runtime while the loader is concurrently reloading the config.
func TestController_ReloadLoaderNoErrorLog(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
ctrl := New(testOptions(t))

var testFileFastTick = `
testcomponents.tick "ticker" {
frequency = "10ns"
}
testcomponents.passthrough "static" {
input = "hello, world!"
}
testcomponents.passthrough "ticker" {
input = testcomponents.tick.ticker.tick_time
}
testcomponents.passthrough "forwarded" {
input = testcomponents.passthrough.ticker.output
}
`
var logsBuffer bytes.Buffer
syncBuff := log.NewSyncWriter(&logsBuffer)
ctrl.log.SetTemporaryWriter(syncBuff)

f, err := ParseSource(t.Name(), []byte(testFileFastTick))
require.NoError(t, err)
require.NotNil(t, f)

err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
require.Len(t, ctrl.loader.Components(), 4)

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
ctrl.Run(ctx)
close(done)
}()

for i := 0; i < 5; i++ {
err = ctrl.LoadSource(f, nil, "")
require.NoError(t, err)
}

cancel()
<-done

require.False(t, strings.Contains(logsBuffer.String(), "level=error"))
}

func getFields(t *testing.T, g *dag.Graph, nodeID string) (component.Arguments, component.Exports) {
t.Helper()

Expand Down
4 changes: 2 additions & 2 deletions internal/runtime/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,10 +826,10 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr
var err error
switch n := n.(type) {
case BlockNode:
ectx := l.cache.GetContext()

// RLock before evaluate to prevent Evaluating while the config is being reloaded
l.mut.RLock()
ectx := l.cache.GetContext()
evalErr := n.Evaluate(ectx)

err = l.postEvaluate(l.log, n, evalErr)
Expand All @@ -842,12 +842,12 @@ func (l *Loader) concurrentEvalFn(n dag.Node, spanCtx context.Context, tracer tr
// Upgrade to write lock to update the module exports.
l.mut.RUnlock()
l.mut.Lock()
defer l.mut.Unlock()
// Check if the update still needed after obtaining the write lock and perform it.
if l.cache.ExportChangeIndex() != l.moduleExportIndex {
l.globals.OnExportsChange(l.cache.CreateModuleExports())
l.moduleExportIndex = l.cache.ExportChangeIndex()
}
l.mut.Unlock()
} else {
// No need to upgrade to write lock, just release the read lock.
l.mut.RUnlock()
Expand Down

0 comments on commit a454352

Please sign in to comment.