Skip to content

Commit

Permalink
Mark finalized head subscription as part of alive loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Nov 27, 2024
1 parent a1636a9 commit 656fb32
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 4 deletions.
33 changes: 33 additions & 0 deletions common/client/mock_node_client_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions common/client/mock_rpc_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
}

defer finalizedHeadsSub.Unsubscribe()
n.rpc.SetAliveLoopFinalizedHeadSub(finalizedHeadsSub.sub)
}

var pollCh <-chan time.Time
Expand Down
7 changes: 7 additions & 0 deletions common/client/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once()
rpc.On("SetAliveLoopSub", mock.Anything).Once()
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
node := newDialedNode(t, testNodeOpts{
config: testNodeConfig{},
Expand All @@ -467,6 +468,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
ch := make(chan Head)
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once()
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
name := "node-" + rand.Str(5)
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{},
Expand Down Expand Up @@ -501,6 +503,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
ch := make(chan Head)
close(ch)
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once()
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
node := newSubscribedNode(t, testNodeOpts{
chainConfig: clientMocks.ChainConfig{
Expand All @@ -527,6 +530,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
ch := make(chan Head, 1)
ch <- head{BlockNumber: 10}.ToMockHead(t)
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), newSub(t), nil).Once()
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
lggr, observed := logger.TestObserved(t, zap.DebugLevel)
noNewFinalizedHeadsThreshold := tests.TestInterval
node := newSubscribedNode(t, testNodeOpts{
Expand Down Expand Up @@ -560,6 +564,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
rpc := newMockNodeClient[types.ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once()
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once()
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
lggr, observed := logger.TestObserved(t, zap.DebugLevel)
noNewFinalizedHeadsThreshold := tests.TestInterval
node := newSubscribedNode(t, testNodeOpts{
Expand Down Expand Up @@ -593,6 +598,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
sub.On("Err").Return((<-chan error)(errCh))
sub.On("Unsubscribe").Once()
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(nil), sub, nil).Once()
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
node := newSubscribedNode(t, testNodeOpts{
chainConfig: clientMocks.ChainConfig{
Expand Down Expand Up @@ -1116,6 +1122,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
outOfSyncSubscription.On("Unsubscribe").Once()
ch := make(chan Head)
rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(ch), outOfSyncSubscription, nil).Once()
rpc.On("SetAliveLoopFinalizedHeadSub", mock.Anything).Once()

setupRPCForAliveLoop(t, rpc)

Expand Down
1 change: 1 addition & 0 deletions common/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type NodeClient[
ClientVersion(context.Context) (string, error)
SubscribersCount() int32
SetAliveLoopSub(types.Subscription)
SetAliveLoopFinalizedHeadSub(types.Subscription)
UnsubscribeAllExceptAliveLoop()
IsSyncing(ctx context.Context) (bool, error)
SubscribeToFinalizedHeads(_ context.Context) (<-chan HEAD, types.Subscription, error)
Expand Down
33 changes: 33 additions & 0 deletions core/chains/evm/client/mocks/rpc_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions core/chains/evm/client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ type rpcClient struct {
subs []ethereum.Subscription

// Need to track the aliveLoop subscription, so we do not cancel it when checking lease on the MultiNode
aliveLoopSub ethereum.Subscription
aliveLoopHeadsSub ethereum.Subscription
aliveLoopFinalizedHeadsSub ethereum.Subscription

// chStopInFlight can be closed to immediately cancel all in-flight requests on
// this rpcClient. Closing and replacing should be serialized through
Expand Down Expand Up @@ -368,11 +369,18 @@ func (r *rpcClient) unsubscribeAll() {
}
r.subs = nil
}
func (r *rpcClient) SetAliveLoopSub(sub commontypes.Subscription) {
func (r *rpcClient) SetAliveLoopSub(headsSub commontypes.Subscription) {
r.stateMu.Lock()
defer r.stateMu.Unlock()

r.aliveLoopSub = sub
r.aliveLoopHeadsSub = headsSub
}

func (r *rpcClient) SetAliveLoopFinalizedHeadSub(finalizedHeads commontypes.Subscription) {
r.stateMu.Lock()
defer r.stateMu.Unlock()

r.aliveLoopFinalizedHeadsSub = finalizedHeads
}

// SubscribersCount returns the number of client subscribed to the node
Expand All @@ -389,7 +397,7 @@ func (r *rpcClient) UnsubscribeAllExceptAliveLoop() {
defer r.stateMu.Unlock()

for _, s := range r.subs {
if s != r.aliveLoopSub {
if s != r.aliveLoopHeadsSub && s != r.aliveLoopFinalizedHeadsSub {
s.Unsubscribe()
}
}
Expand Down
19 changes: 19 additions & 0 deletions core/chains/evm/client/rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,25 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
require.NoError(t, err)
checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc)
})
t.Run("UnsubscribeAllExceptAliveLoop should keep finalized heads subscription open", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 1, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))

_, sub, err := rpc.SubscribeToFinalizedHeads(tests.Context(t))
require.NoError(t, err)
rpc.SetAliveLoopFinalizedHeadSub(sub)
rpc.UnsubscribeAllExceptAliveLoop()
select {
case <-sub.Err():
t.Fatal("Expected subscription to remain open")
default:
}
checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc)
})
t.Run("Subscription error is properly wrapper", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()
Expand Down

0 comments on commit 656fb32

Please sign in to comment.