diff --git a/pkg/tbtc/coordination.go b/pkg/tbtc/coordination.go index e868b0e2b2..6d06ff9634 100644 --- a/pkg/tbtc/coordination.go +++ b/pkg/tbtc/coordination.go @@ -7,6 +7,8 @@ import ( "fmt" "math/rand" "sort" + "strings" + "time" "github.com/keep-network/keep-core/pkg/internal/pb" "go.uber.org/zap" @@ -576,13 +578,15 @@ func (ce *coordinationExecutor) executeLeaderRoutine( ) (CoordinationProposal, error) { walletPublicKeyHash := ce.walletPublicKeyHash() - proposal, err := ce.proposalGenerator.Generate( + proposal, err := ce.generateProposal( &CoordinationProposalRequest{ WalletPublicKeyHash: walletPublicKeyHash, WalletOperators: ce.coordinatedWallet.signingGroupOperators, ExecutingOperator: ce.operatorAddress, ActionsChecklist: actionsChecklist, }, + 2, // 2 attempts at most + 1*time.Minute, // 1 minute between attempts ) if err != nil { return nil, fmt.Errorf("failed to generate proposal: [%v]", err) @@ -613,6 +617,40 @@ func (ce *coordinationExecutor) executeLeaderRoutine( return proposal, nil } +// generateProposal generates a proposal for the given coordination request. +// The generator retries the proposal generation if it fails. The number of +// attempts is limited to attemptLimit. The generator waits for retryDelay +// between attempts. +func (ce *coordinationExecutor) generateProposal( + request *CoordinationProposalRequest, + attemptLimit uint, + retryDelay time.Duration, +) (CoordinationProposal, error) { + var attemptErrs []string + + for attempt := uint(1); attempt <= attemptLimit; attempt++ { + if attempt > 1 { + time.Sleep(retryDelay) + } + + proposal, err := ce.proposalGenerator.Generate(request) + if err != nil { + attemptErrs = append( + attemptErrs, + fmt.Sprintf("attempt [%v] error: [%v]", attempt, err), + ) + continue + } + + return proposal, nil + } + + return nil, fmt.Errorf( + "all attempts failed: [%v]", + strings.Join(attemptErrs, "; "), + ) +} + // executeFollowerRoutine executes the follower's routine for the given coordination // window. The routine listens for the coordination message from the leader and // validates it. If the leader's proposal is valid, it returns the received diff --git a/pkg/tbtc/coordination_test.go b/pkg/tbtc/coordination_test.go index 540d22d40e..763964a64b 100644 --- a/pkg/tbtc/coordination_test.go +++ b/pkg/tbtc/coordination_test.go @@ -306,6 +306,7 @@ func TestCoordinationExecutor_Coordinate(t *testing.T) { func( walletPublicKeyHash [20]byte, actionsChecklist []WalletActionType, + _ uint, ) (CoordinationProposal, error) { for _, action := range actionsChecklist { if walletPublicKeyHash == publicKeyHash && action == ActionRedemption { @@ -700,6 +701,7 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) { func( walletPublicKeyHash [20]byte, actionsChecklist []WalletActionType, + _ uint, ) ( CoordinationProposal, error, @@ -799,6 +801,97 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) { } } +func TestCoordinationExecutor_GenerateProposal(t *testing.T) { + var tests = map[string]struct { + proposalGenerator CoordinationProposalGenerator + expectedProposal CoordinationProposal + expectedError error + }{ + "first attempt success": { + proposalGenerator: newMockCoordinationProposalGenerator( + func( + _ [20]byte, + _ []WalletActionType, + _ uint, + ) (CoordinationProposal, error) { + return &NoopProposal{}, nil + }, + ), + expectedProposal: &NoopProposal{}, + expectedError: nil, + }, + "last attempt success": { + proposalGenerator: newMockCoordinationProposalGenerator( + func( + _ [20]byte, + _ []WalletActionType, + call uint, + ) (CoordinationProposal, error) { + if call == 1 { + return nil, fmt.Errorf("unexpected error") + } else if call == 2 { + return &NoopProposal{}, nil + } else { + panic("unexpected call") + } + }, + ), + expectedProposal: &NoopProposal{}, + expectedError: nil, + }, + "all attempts failed": { + proposalGenerator: newMockCoordinationProposalGenerator( + func( + _ [20]byte, + _ []WalletActionType, + call uint, + ) (CoordinationProposal, error) { + return nil, fmt.Errorf("unexpected error %v", call) + }, + ), + expectedProposal: nil, + expectedError: fmt.Errorf( + "all attempts failed: [attempt [1] error: [unexpected error 1]; attempt [2] error: [unexpected error 2]]", + ), + }, + } + + for testName, test := range tests { + t.Run(testName, func(t *testing.T) { + executor := &coordinationExecutor{ + // Set only relevant fields. + proposalGenerator: test.proposalGenerator, + } + + proposal, err := executor.generateProposal( + &CoordinationProposalRequest{}, // request fields not relevant + 2, + 1*time.Second, + ) + + if !reflect.DeepEqual(test.expectedError, err) { + t.Errorf( + "unexpected error\n"+ + "expected: %v\n"+ + "actual: %v\n", + test.expectedError, + err, + ) + } + + if !reflect.DeepEqual(test.expectedProposal, proposal) { + t.Errorf( + "unexpected proposal\n"+ + "expected: %v\n"+ + "actual: %v\n", + test.expectedProposal, + proposal, + ) + } + }) + } +} + func TestCoordinationExecutor_ExecuteFollowerRoutine(t *testing.T) { // Uncompressed public key corresponding to the 20-byte public key hash: // aa768412ceed10bd423c025542ca90071f9fb62d. @@ -1171,9 +1264,11 @@ func TestCoordinationExecutor_ExecuteFollowerRoutine_WithIdleLeader(t *testing.T } type mockCoordinationProposalGenerator struct { + calls uint delegate func( walletPublicKeyHash [20]byte, actionsChecklist []WalletActionType, + call uint, ) (CoordinationProposal, error) } @@ -1181,6 +1276,7 @@ func newMockCoordinationProposalGenerator( delegate func( walletPublicKeyHash [20]byte, actionsChecklist []WalletActionType, + call uint, ) (CoordinationProposal, error), ) *mockCoordinationProposalGenerator { return &mockCoordinationProposalGenerator{ @@ -1191,5 +1287,10 @@ func newMockCoordinationProposalGenerator( func (mcpg *mockCoordinationProposalGenerator) Generate( request *CoordinationProposalRequest, ) (CoordinationProposal, error) { - return mcpg.delegate(request.WalletPublicKeyHash, request.ActionsChecklist) + mcpg.calls++ + return mcpg.delegate( + request.WalletPublicKeyHash, + request.ActionsChecklist, + mcpg.calls, + ) }