diff --git a/.chloggen/failover-max-retry-fix.yaml b/.chloggen/failover-max-retry-fix.yaml new file mode 100644 index 000000000000..10de74cc0762 --- /dev/null +++ b/.chloggen/failover-max-retry-fix.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: failoverconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Resolves a bug that prevents proper recovery when disabling max retries + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36587] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/connector/failoverconnector/failover_test.go b/connector/failoverconnector/failover_test.go index 603bcde86781..6ef406835612 100644 --- a/connector/failoverconnector/failover_test.go +++ b/connector/failoverconnector/failover_test.go @@ -202,7 +202,7 @@ func TestFailoverRecovery_MaxRetries(t *testing.T) { failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer)) failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer)) - failoverConnector.failover.pS.SetRetryCountToMax(0) + failoverConnector.failover.pS.SetRetryCountToValue(0, cfg.MaxRetries) require.Eventually(t, func() bool { return consumeTracesAndCheckStable(failoverConnector, 2, tr) @@ -211,11 +211,77 @@ func TestFailoverRecovery_MaxRetries(t *testing.T) { failoverConnector.failover.ModifyConsumerAtIndex(0, &sinkFirst) failoverConnector.failover.ModifyConsumerAtIndex(1, &sinkSecond) + // Check that level 0 is skipped because max retry value is hit require.Eventually(t, func() bool { return consumeTracesAndCheckStable(failoverConnector, 1, tr) }, 3*time.Second, 5*time.Millisecond) } +func TestFailoverRecovery_MaxRetriesDisabled(t *testing.T) { + var sinkFirst, sinkSecond, sinkThird, sinkFourth consumertest.TracesSink + tracesFirst := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/first") + tracesSecond := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/second") + tracesThird := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/third") + tracesFourth := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/fourth") + + cfg := &Config{ + PipelinePriority: [][]pipeline.ID{{tracesFirst}, {tracesSecond}, {tracesThird}, {tracesFourth}}, + RetryInterval: 50 * time.Millisecond, + RetryGap: 10 * time.Millisecond, + MaxRetries: 0, + } + + router := connector.NewTracesRouter(map[pipeline.ID]consumer.Traces{ + tracesFirst: &sinkFirst, + tracesSecond: &sinkSecond, + tracesThird: &sinkThird, + tracesFourth: &sinkFourth, + }) + + conn, err := NewFactory().CreateTracesToTraces(context.Background(), + connectortest.NewNopSettings(), cfg, router.(consumer.Traces)) + + require.NoError(t, err) + + failoverConnector := conn.(*tracesFailover) + + tr := sampleTrace() + + defer func() { + assert.NoError(t, failoverConnector.Shutdown(context.Background())) + }() + + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer)) + failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer)) + + require.Eventually(t, func() bool { + return consumeTracesAndCheckStable(failoverConnector, 2, tr) + }, 3*time.Second, 5*time.Millisecond) + + failoverConnector.failover.ModifyConsumerAtIndex(0, &sinkFirst) + failoverConnector.failover.ModifyConsumerAtIndex(1, &sinkSecond) + + require.Eventually(t, func() bool { + return consumeTracesAndCheckStable(failoverConnector, 0, tr) + }, 3*time.Second, 5*time.Millisecond) + + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer)) + failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer)) + failoverConnector.failover.pS.SetRetryCountToValue(0, cfg.MaxRetries) + + require.Eventually(t, func() bool { + return consumeTracesAndCheckStable(failoverConnector, 2, tr) + }, 3*time.Second, 5*time.Millisecond) + + failoverConnector.failover.ModifyConsumerAtIndex(0, &sinkFirst) + failoverConnector.failover.ModifyConsumerAtIndex(1, &sinkSecond) + + // Check that still resets to level 0 even though max retry value is hit + require.Eventually(t, func() bool { + return consumeTracesAndCheckStable(failoverConnector, 0, tr) + }, 3*time.Second, 5*time.Millisecond) +} + func resetConsumers(conn *tracesFailover, consumers ...consumer.Traces) { for i, sink := range consumers { conn.failover.ModifyConsumerAtIndex(i, sink) diff --git a/connector/failoverconnector/internal/state/pipeline_selector.go b/connector/failoverconnector/internal/state/pipeline_selector.go index a0f395513b67..08bcedf9bb5f 100644 --- a/connector/failoverconnector/internal/state/pipeline_selector.go +++ b/connector/failoverconnector/internal/state/pipeline_selector.go @@ -92,7 +92,7 @@ func (p *PipelineSelector) retryHighPriorityPipelines(ctx context.Context, retry defer ticker.Stop() for i := 0; i < len(p.pipelineRetries); i++ { - if p.maxRetriesUsed(i) { + if p.exceededMaxRetries(i) { continue } select { @@ -110,7 +110,7 @@ func (p *PipelineSelector) retryHighPriorityPipelines(ctx context.Context, retry // checkContinueRetry checks if retry should be suspended if all higher priority levels have exceeded their max retries func (p *PipelineSelector) checkContinueRetry(index int) bool { for i := 0; i < index; i++ { - if p.loadRetryCount(i) < p.constants.MaxRetries { + if p.constants.MaxRetries == 0 || p.loadRetryCount(i) < p.constants.MaxRetries { return true } } @@ -127,11 +127,6 @@ func (p *PipelineSelector) setToStableIndex(idx int) { p.currentIndex.Store(p.stableIndex.Load()) } -// MaxRetriesUsed exported access to maxRetriesUsed -func (p *PipelineSelector) maxRetriesUsed(idx int) bool { - return p.loadRetryCount(idx) >= p.constants.MaxRetries -} - // SetNewStableIndex Update stableIndex to the passed stable index func (p *PipelineSelector) setNewStableIndex(idx int) { p.resetRetryCount(idx) @@ -249,8 +244,8 @@ func (p *PipelineSelector) TestRetryPipelines(ctx context.Context, retryInterval p.enableRetry(ctx, retryInterval, retryGap) } -func (p *PipelineSelector) SetRetryCountToMax(idx int) { - p.pipelineRetries[idx].Store(int32(p.constants.MaxRetries)) +func (p *PipelineSelector) SetRetryCountToValue(idx int, val int) { + p.pipelineRetries[idx].Store(int32(val)) } func (p *PipelineSelector) ResetRetryCount(idx int) {