Skip to content

Commit

Permalink
Failover connector max retry bug fix (open-telemetry#36605)
Browse files Browse the repository at this point in the history
  • Loading branch information
akats7 authored Nov 30, 2024
1 parent c961731 commit 4b45285
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 10 deletions.
27 changes: 27 additions & 0 deletions .chloggen/failover-max-retry-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: failoverconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Resolves a bug that prevents proper recovery when disabling max retries

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36587]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
68 changes: 67 additions & 1 deletion connector/failoverconnector/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestFailoverRecovery_MaxRetries(t *testing.T) {

failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.pS.SetRetryCountToMax(0)
failoverConnector.failover.pS.SetRetryCountToValue(0, cfg.MaxRetries)

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 2, tr)
Expand All @@ -211,11 +211,77 @@ func TestFailoverRecovery_MaxRetries(t *testing.T) {
failoverConnector.failover.ModifyConsumerAtIndex(0, &sinkFirst)
failoverConnector.failover.ModifyConsumerAtIndex(1, &sinkSecond)

// Check that level 0 is skipped because max retry value is hit
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 1, tr)
}, 3*time.Second, 5*time.Millisecond)
}

func TestFailoverRecovery_MaxRetriesDisabled(t *testing.T) {
var sinkFirst, sinkSecond, sinkThird, sinkFourth consumertest.TracesSink
tracesFirst := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/first")
tracesSecond := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/second")
tracesThird := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/third")
tracesFourth := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/fourth")

cfg := &Config{
PipelinePriority: [][]pipeline.ID{{tracesFirst}, {tracesSecond}, {tracesThird}, {tracesFourth}},
RetryInterval: 50 * time.Millisecond,
RetryGap: 10 * time.Millisecond,
MaxRetries: 0,
}

router := connector.NewTracesRouter(map[pipeline.ID]consumer.Traces{
tracesFirst: &sinkFirst,
tracesSecond: &sinkSecond,
tracesThird: &sinkThird,
tracesFourth: &sinkFourth,
})

conn, err := NewFactory().CreateTracesToTraces(context.Background(),
connectortest.NewNopSettings(), cfg, router.(consumer.Traces))

require.NoError(t, err)

failoverConnector := conn.(*tracesFailover)

tr := sampleTrace()

defer func() {
assert.NoError(t, failoverConnector.Shutdown(context.Background()))
}()

failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer))

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 2, tr)
}, 3*time.Second, 5*time.Millisecond)

failoverConnector.failover.ModifyConsumerAtIndex(0, &sinkFirst)
failoverConnector.failover.ModifyConsumerAtIndex(1, &sinkSecond)

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 0, tr)
}, 3*time.Second, 5*time.Millisecond)

failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.pS.SetRetryCountToValue(0, cfg.MaxRetries)

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 2, tr)
}, 3*time.Second, 5*time.Millisecond)

failoverConnector.failover.ModifyConsumerAtIndex(0, &sinkFirst)
failoverConnector.failover.ModifyConsumerAtIndex(1, &sinkSecond)

// Check that still resets to level 0 even though max retry value is hit
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 0, tr)
}, 3*time.Second, 5*time.Millisecond)
}

func resetConsumers(conn *tracesFailover, consumers ...consumer.Traces) {
for i, sink := range consumers {
conn.failover.ModifyConsumerAtIndex(i, sink)
Expand Down
13 changes: 4 additions & 9 deletions connector/failoverconnector/internal/state/pipeline_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (p *PipelineSelector) retryHighPriorityPipelines(ctx context.Context, retry
defer ticker.Stop()

for i := 0; i < len(p.pipelineRetries); i++ {
if p.maxRetriesUsed(i) {
if p.exceededMaxRetries(i) {
continue
}
select {
Expand All @@ -110,7 +110,7 @@ func (p *PipelineSelector) retryHighPriorityPipelines(ctx context.Context, retry
// checkContinueRetry checks if retry should be suspended if all higher priority levels have exceeded their max retries
func (p *PipelineSelector) checkContinueRetry(index int) bool {
for i := 0; i < index; i++ {
if p.loadRetryCount(i) < p.constants.MaxRetries {
if p.constants.MaxRetries == 0 || p.loadRetryCount(i) < p.constants.MaxRetries {
return true
}
}
Expand All @@ -127,11 +127,6 @@ func (p *PipelineSelector) setToStableIndex(idx int) {
p.currentIndex.Store(p.stableIndex.Load())
}

// MaxRetriesUsed exported access to maxRetriesUsed
func (p *PipelineSelector) maxRetriesUsed(idx int) bool {
return p.loadRetryCount(idx) >= p.constants.MaxRetries
}

// SetNewStableIndex Update stableIndex to the passed stable index
func (p *PipelineSelector) setNewStableIndex(idx int) {
p.resetRetryCount(idx)
Expand Down Expand Up @@ -249,8 +244,8 @@ func (p *PipelineSelector) TestRetryPipelines(ctx context.Context, retryInterval
p.enableRetry(ctx, retryInterval, retryGap)
}

func (p *PipelineSelector) SetRetryCountToMax(idx int) {
p.pipelineRetries[idx].Store(int32(p.constants.MaxRetries))
func (p *PipelineSelector) SetRetryCountToValue(idx int, val int) {
p.pipelineRetries[idx].Store(int32(val))
}

func (p *PipelineSelector) ResetRetryCount(idx int) {
Expand Down

0 comments on commit 4b45285

Please sign in to comment.