From 88cdb61da8d1ab1dddd6df3c11faf40c533cb278 Mon Sep 17 00:00:00 2001 From: Lukasz Zimnoch Date: Thu, 11 Apr 2024 13:07:38 +0200 Subject: [PATCH] Retry upon coordination proposal failure Here we attempt to harden proposal generation by retrying the process in case of failure. Proposal generation strongly relies on 3rd party services like Ethereum RPC and Electrum servers. Temporary disruption of those services can easily nuke proposal generation and cause a coordination window miss. Retrying proposal generation after a short delay should help overcome temporary problems and increase success chance of coordination windows. --- pkg/tbtc/coordination.go | 40 ++++++++++++- pkg/tbtc/coordination_test.go | 103 +++++++++++++++++++++++++++++++++- 2 files changed, 141 insertions(+), 2 deletions(-) diff --git a/pkg/tbtc/coordination.go b/pkg/tbtc/coordination.go index e868b0e2b2..6d06ff9634 100644 --- a/pkg/tbtc/coordination.go +++ b/pkg/tbtc/coordination.go @@ -7,6 +7,8 @@ import ( "fmt" "math/rand" "sort" + "strings" + "time" "github.com/keep-network/keep-core/pkg/internal/pb" "go.uber.org/zap" @@ -576,13 +578,15 @@ func (ce *coordinationExecutor) executeLeaderRoutine( ) (CoordinationProposal, error) { walletPublicKeyHash := ce.walletPublicKeyHash() - proposal, err := ce.proposalGenerator.Generate( + proposal, err := ce.generateProposal( &CoordinationProposalRequest{ WalletPublicKeyHash: walletPublicKeyHash, WalletOperators: ce.coordinatedWallet.signingGroupOperators, ExecutingOperator: ce.operatorAddress, ActionsChecklist: actionsChecklist, }, + 2, // 2 attempts at most + 1*time.Minute, // 1 minute between attempts ) if err != nil { return nil, fmt.Errorf("failed to generate proposal: [%v]", err) @@ -613,6 +617,40 @@ func (ce *coordinationExecutor) executeLeaderRoutine( return proposal, nil } +// generateProposal generates a proposal for the given coordination request. +// The generator retries the proposal generation if it fails. The number of +// attempts is limited to attemptLimit. The generator waits for retryDelay +// between attempts. +func (ce *coordinationExecutor) generateProposal( + request *CoordinationProposalRequest, + attemptLimit uint, + retryDelay time.Duration, +) (CoordinationProposal, error) { + var attemptErrs []string + + for attempt := uint(1); attempt <= attemptLimit; attempt++ { + if attempt > 1 { + time.Sleep(retryDelay) + } + + proposal, err := ce.proposalGenerator.Generate(request) + if err != nil { + attemptErrs = append( + attemptErrs, + fmt.Sprintf("attempt [%v] error: [%v]", attempt, err), + ) + continue + } + + return proposal, nil + } + + return nil, fmt.Errorf( + "all attempts failed: [%v]", + strings.Join(attemptErrs, "; "), + ) +} + // executeFollowerRoutine executes the follower's routine for the given coordination // window. The routine listens for the coordination message from the leader and // validates it. If the leader's proposal is valid, it returns the received diff --git a/pkg/tbtc/coordination_test.go b/pkg/tbtc/coordination_test.go index 540d22d40e..763964a64b 100644 --- a/pkg/tbtc/coordination_test.go +++ b/pkg/tbtc/coordination_test.go @@ -306,6 +306,7 @@ func TestCoordinationExecutor_Coordinate(t *testing.T) { func( walletPublicKeyHash [20]byte, actionsChecklist []WalletActionType, + _ uint, ) (CoordinationProposal, error) { for _, action := range actionsChecklist { if walletPublicKeyHash == publicKeyHash && action == ActionRedemption { @@ -700,6 +701,7 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) { func( walletPublicKeyHash [20]byte, actionsChecklist []WalletActionType, + _ uint, ) ( CoordinationProposal, error, @@ -799,6 +801,97 @@ func TestCoordinationExecutor_ExecuteLeaderRoutine(t *testing.T) { } } +func TestCoordinationExecutor_GenerateProposal(t *testing.T) { + var tests = map[string]struct { + proposalGenerator CoordinationProposalGenerator + expectedProposal CoordinationProposal + expectedError error + }{ + "first attempt success": { + proposalGenerator: newMockCoordinationProposalGenerator( + func( + _ [20]byte, + _ []WalletActionType, + _ uint, + ) (CoordinationProposal, error) { + return &NoopProposal{}, nil + }, + ), + expectedProposal: &NoopProposal{}, + expectedError: nil, + }, + "last attempt success": { + proposalGenerator: newMockCoordinationProposalGenerator( + func( + _ [20]byte, + _ []WalletActionType, + call uint, + ) (CoordinationProposal, error) { + if call == 1 { + return nil, fmt.Errorf("unexpected error") + } else if call == 2 { + return &NoopProposal{}, nil + } else { + panic("unexpected call") + } + }, + ), + expectedProposal: &NoopProposal{}, + expectedError: nil, + }, + "all attempts failed": { + proposalGenerator: newMockCoordinationProposalGenerator( + func( + _ [20]byte, + _ []WalletActionType, + call uint, + ) (CoordinationProposal, error) { + return nil, fmt.Errorf("unexpected error %v", call) + }, + ), + expectedProposal: nil, + expectedError: fmt.Errorf( + "all attempts failed: [attempt [1] error: [unexpected error 1]; attempt [2] error: [unexpected error 2]]", + ), + }, + } + + for testName, test := range tests { + t.Run(testName, func(t *testing.T) { + executor := &coordinationExecutor{ + // Set only relevant fields. + proposalGenerator: test.proposalGenerator, + } + + proposal, err := executor.generateProposal( + &CoordinationProposalRequest{}, // request fields not relevant + 2, + 1*time.Second, + ) + + if !reflect.DeepEqual(test.expectedError, err) { + t.Errorf( + "unexpected error\n"+ + "expected: %v\n"+ + "actual: %v\n", + test.expectedError, + err, + ) + } + + if !reflect.DeepEqual(test.expectedProposal, proposal) { + t.Errorf( + "unexpected proposal\n"+ + "expected: %v\n"+ + "actual: %v\n", + test.expectedProposal, + proposal, + ) + } + }) + } +} + func TestCoordinationExecutor_ExecuteFollowerRoutine(t *testing.T) { // Uncompressed public key corresponding to the 20-byte public key hash: // aa768412ceed10bd423c025542ca90071f9fb62d. @@ -1171,9 +1264,11 @@ func TestCoordinationExecutor_ExecuteFollowerRoutine_WithIdleLeader(t *testing.T } type mockCoordinationProposalGenerator struct { + calls uint delegate func( walletPublicKeyHash [20]byte, actionsChecklist []WalletActionType, + call uint, ) (CoordinationProposal, error) } @@ -1181,6 +1276,7 @@ func newMockCoordinationProposalGenerator( delegate func( walletPublicKeyHash [20]byte, actionsChecklist []WalletActionType, + call uint, ) (CoordinationProposal, error), ) *mockCoordinationProposalGenerator { return &mockCoordinationProposalGenerator{ @@ -1191,5 +1287,10 @@ func newMockCoordinationProposalGenerator( func (mcpg *mockCoordinationProposalGenerator) Generate( request *CoordinationProposalRequest, ) (CoordinationProposal, error) { - return mcpg.delegate(request.WalletPublicKeyHash, request.ActionsChecklist) + mcpg.calls++ + return mcpg.delegate( + request.WalletPublicKeyHash, + request.ActionsChecklist, + mcpg.calls, + ) }