From c9f41141e3200628b6d901e4ec71ee524e843dd7 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Mon, 19 Feb 2024 23:31:46 +0100 Subject: [PATCH] fix: race conditions due to share variables --- .../relay/evm/functions/coordinator_v2.go | 6 +- .../evm/functions/logpoller_wrapper_test.go | 185 +++++++++--------- 2 files changed, 98 insertions(+), 93 deletions(-) diff --git a/core/services/relay/evm/functions/coordinator_v2.go b/core/services/relay/evm/functions/coordinator_v2.go index 04c02570dc2..4189e575b2a 100644 --- a/core/services/relay/evm/functions/coordinator_v2.go +++ b/core/services/relay/evm/functions/coordinator_v2.go @@ -93,12 +93,12 @@ func (c *CoordinatorV2) LogsToRequests(requestLogs []logpoller.Log) ([]evmRelayT oracleRequest.Commitment.Client, oracleRequest.Commitment.SubscriptionId, oracleRequest.Commitment.CallbackGasLimit, - oracleRequest.Commitment.AdminFee, - oracleRequest.Commitment.DonFee, + oracleRequest.Commitment.AdminFeeJuels, + oracleRequest.Commitment.DonFeeJuels, oracleRequest.Commitment.GasOverheadBeforeCallback, oracleRequest.Commitment.GasOverheadAfterCallback, oracleRequest.Commitment.TimeoutTimestamp, - oracleRequest.Commitment.OperationFee, + oracleRequest.Commitment.OperationFeeJuels, ) if err != nil { c.lggr.Errorw("LogsToRequests: failed to pack Coordinator v2 commitment bytes, skipping", err) diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index b2f196c5da6..9087356c65b 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -37,11 +37,6 @@ const ( OracleRequestV200 = `[{"constant":true,"inputs":[{"indexed":true,"internalType":"bytes32","name":"requestId","type":"bytes32"},{"indexed":true,"internalType":"address","name":"requestingContract","type":"address"},{"indexed":false,"internalType":"address","name":"requestInitiator","type":"address"},{"indexed":false,"internalType":"uint64","name":"subscriptionId","type":"uint64"},{"indexed":false,"internalType":"address","name":"subscriptionOwner","type":"address"},{"indexed":false,"internalType":"bytes","name":"data","type":"bytes"},{"indexed":false,"internalType":"uint16","name":"dataVersion","type":"uint16"},{"indexed":false,"internalType":"bytes32","name":"flags","type":"bytes32"},{"indexed":false,"internalType":"uint64","name":"callbackGasLimit","type":"uint64"},{"components":[{"internalType":"bytes32","name":"requestId","type":"bytes32"},{"internalType":"address","name":"coordinator","type":"address"},{"internalType":"uint96","name":"estimatedTotalCostJuels","type":"uint96"},{"internalType":"address","name":"client","type":"address"},{"internalType":"uint64","name":"subscriptionId","type":"uint64"},{"internalType":"uint32","name":"callbackGasLimit","type":"uint32"},{"internalType":"uint72","name":"adminFee","type":"uint72"},{"internalType":"uint72","name":"donFee","type":"uint72"},{"internalType":"uint40","name":"gasOverheadBeforeCallback","type":"uint40"},{"internalType":"uint40","name":"gasOverheadAfterCallback","type":"uint40"},{"internalType":"uint32","name":"timeoutTimestamp","type":"uint32"},{"internalType":"uint72","name":"operationFee","type":"uint72"}],"indexed":false,"internalType":"structFunctionsResponse.Commitment","name":"commitment","type":"tuple"}],"name":"OracleRequest","type":"function"}]` ) -var routerAddressBytes []byte -var routerAddressHex common.Address -var coordinatorAddressBytes []byte -var coordinatorAddressHex common.Address - func (s *subscriber) UpdateRoutes(activeCoordinator common.Address, proposedCoordinator common.Address) error { if s.expectedCalls == 0 { panic("unexpected call to UpdateRoutes") @@ -68,24 +63,34 @@ func addr(lastByte string) ([]byte, error) { return contractAddr, nil } -func setUp(t *testing.T, updateFrequencySec uint32) (*lpmocks.LogPoller, types.LogPollerWrapper, *evmclimocks.Client) { +type setupResponse struct { + LogPoller *lpmocks.LogPoller + LogPollerWrapper types.LogPollerWrapper + Client *evmclimocks.Client + RouterAddress common.Address + CoordinatorAddress common.Address + CoordinatorAddressBytes []byte +} + +func setUp(t *testing.T, updateFrequencySec uint32) setupResponse { + s := setupResponse{} lggr := logger.TestLogger(t) - client := evmclimocks.NewClient(t) - lp := lpmocks.NewLogPoller(t) + s.Client = evmclimocks.NewClient(t) + s.LogPoller = lpmocks.NewLogPoller(t) config := config.PluginConfig{ ContractUpdateCheckFrequencySec: updateFrequencySec, ContractVersion: 1, } routerAddressBytes, err := addr("01") require.NoError(t, err) - routerAddressHex = common.BytesToAddress(routerAddressBytes) - coordinatorAddressBytes, err = addr("02") + s.RouterAddress = common.BytesToAddress(routerAddressBytes) + s.CoordinatorAddressBytes, err = addr("02") require.NoError(t, err) - coordinatorAddressHex = common.BytesToAddress(coordinatorAddressBytes) - lpWrapper, err := NewLogPollerWrapper(routerAddressHex, config, client, lp, lggr) + s.CoordinatorAddress = common.BytesToAddress(s.CoordinatorAddressBytes) + s.LogPollerWrapper, err = NewLogPollerWrapper(s.RouterAddress, config, s.Client, s.LogPoller, lggr) require.NoError(t, err) - return lp, lpWrapper, client + return s } func getMockedRequestLogV1(t *testing.T) logpoller.Log { @@ -126,31 +131,31 @@ func getMockedRequestLogV2(t *testing.T) logpoller.Log { func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV1(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) // check only once - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup := setUp(t, 100_000) // check only once + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, }, mock.Anything).Return(addr("00")) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV100) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) subscriber.updates.Wait() - reqs, resps, err := lpWrapper.LatestEvents() + reqs, resps, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) require.Equal(t, 0, len(reqs)) require.Equal(t, 0, len(resps)) @@ -158,31 +163,31 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV1(t *testing.T func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV2(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) // check only once - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup := setUp(t, 100_000) // check only once + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, }, mock.Anything).Return(addr("00")) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV200) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) subscriber.updates.Wait() - reqs, resps, err := lpWrapper.LatestEvents() + reqs, resps, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) require.Equal(t, 0, len(reqs)) require.Equal(t, 0, len(resps)) @@ -190,48 +195,48 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents_CoordinatorV2(t *testing.T func TestLogPollerWrapper_ErrorOnZeroAddresses(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) // check only once - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup := setUp(t, 100_000) // check only once + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr("00")) + setup.Client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr("00")) - servicetest.Run(t, lpWrapper) - _, _, err := lpWrapper.LatestEvents() + servicetest.Run(t, setup.LogPollerWrapper) + _, _, err := setup.LogPollerWrapper.LatestEvents() require.Error(t, err) } func TestLogPollerWrapper_LatestEvents_ReorgHandlingV1(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup := setUp(t, 100_000) + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, }, mock.Anything).Return(addr("00")) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV100) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) mockedLog := getMockedRequestLogV1(t) // All logPoller queries for responses return none - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) // On the first logPoller query for requests, the request log appears - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() // On the 2nd query, the request log disappears - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator_1_1_0.FunctionsCoordinator110OracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() // On the 3rd query, the original request log appears again - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) done := make(chan struct{}) go func() { @@ -247,74 +252,74 @@ func TestLogPollerWrapper_LatestEvents_ReorgHandlingV1(t *testing.T) { t.FailNow() } - oracleRequests, _, err := lpWrapper.LatestEvents() + oracleRequests, _, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 1, len(oracleRequests)) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) require.NoError(t, err) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) } func TestLogPollerWrapper_LatestEvents_ReorgHandlingV2(t *testing.T) { t.Parallel() - lp, lpWrapper, client := setUp(t, 100_000) - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById - To: &routerAddressHex, + setup := setUp(t, 100_000) + setup.LogPoller.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getContractById + To: &setup.RouterAddress, Data: []uint8{0xa9, 0xc9, 0xa9, 0x18, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, - }, mock.Anything).Return(coordinatorAddressBytes, nil) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById - To: &routerAddressHex, + }, mock.Anything).Return(setup.CoordinatorAddressBytes, nil) + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // getProposedContractById + To: &setup.RouterAddress, Data: []uint8{0x6a, 0x22, 0x15, 0xde, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, }, mock.Anything).Return(addr("00")) typeAndVersionResponse, err := encodeTypeAndVersion(CoordinatorContractV200) require.NoError(t, err) - client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion - To: &coordinatorAddressHex, + setup.Client.On("CallContract", mock.Anything, ethereum.CallMsg{ // typeAndVersion + To: &setup.CoordinatorAddress, Data: hexutil.MustDecode("0x181f5a77"), }, mock.Anything).Return(typeAndVersionResponse, nil) - lp.On("RegisterFilter", mock.Anything).Return(nil) + setup.LogPoller.On("RegisterFilter", mock.Anything).Return(nil) subscriber := newSubscriber(1) - lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + setup.LogPollerWrapper.SubscribeToUpdates("mock_subscriber", subscriber) mockedLog := getMockedRequestLogV2(t) // All logPoller queries for responses return none - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) // On the first logPoller query for requests, the request log appears - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() // On the 2nd query, the request log disappears - lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() // On the 3rd query, the original request log appears again - lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + setup.LogPoller.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() - servicetest.Run(t, lpWrapper) + servicetest.Run(t, setup.LogPollerWrapper) subscriber.updates.Wait() - oracleRequests, _, err := lpWrapper.LatestEvents() + oracleRequests, _, err := setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 1, len(oracleRequests)) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) require.NoError(t, err) - oracleRequests, _, err = lpWrapper.LatestEvents() + oracleRequests, _, err = setup.LogPollerWrapper.LatestEvents() require.NoError(t, err) assert.Equal(t, 0, len(oracleRequests)) } func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_TruncatesLogs(t *testing.T) { t.Parallel() - _, lpWrapper, _ := setUp(t, 100_000) + setup := setUp(t, 100_000) inputLogs := make([]logpoller.Log, maxLogsToProcess+100) for i := 0; i < 1100; i++ { inputLogs[i] = getMockedRequestLogV1(t) } - functionsLpWrapper := lpWrapper.(*logPollerWrapper) + functionsLpWrapper := setup.LogPollerWrapper.(*logPollerWrapper) mockedDetectedEvents := detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})} outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") @@ -325,12 +330,12 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_TruncatesLogs(t *testin func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_SkipsInvalidLog(t *testing.T) { t.Parallel() - _, lpWrapper, _ := setUp(t, 100_000) + setup := setUp(t, 100_000) inputLogs := []logpoller.Log{getMockedRequestLogV1(t)} inputLogs[0].Topics = [][]byte{[]byte("invalid topic")} mockedDetectedEvents := detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})} - functionsLpWrapper := lpWrapper.(*logPollerWrapper) + functionsLpWrapper := setup.LogPollerWrapper.(*logPollerWrapper) outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") assert.Equal(t, 0, len(outputLogs)) @@ -340,7 +345,7 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_SkipsInvalidLog(t *test func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_FiltersPreviouslyDetectedEvent(t *testing.T) { t.Parallel() - _, lpWrapper, _ := setUp(t, 100_000) + setup := setUp(t, 100_000) mockedRequestLog := getMockedRequestLogV1(t) inputLogs := []logpoller.Log{mockedRequestLog} var mockedRequestId [32]byte @@ -356,7 +361,7 @@ func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_FiltersPreviouslyDetect timeDetected: time.Now().Add(-time.Second * time.Duration(logPollerCacheDurationSecDefault+1)), } - functionsLpWrapper := lpWrapper.(*logPollerWrapper) + functionsLpWrapper := setup.LogPollerWrapper.(*logPollerWrapper) outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") assert.Equal(t, 0, len(outputLogs))