Skip to content

Commit

Permalink
fix(nodebuilder/header): Cancel subscription when done (#2134)
Browse files Browse the repository at this point in the history
Found while debugging swamp.

We never cancel subscription so when node attempts to shut down, there
are outstanding subs that need to be closed.

---------

Co-authored-by: Hlib Kanunnikov <[email protected]>
  • Loading branch information
renaynay and Wondertan authored Apr 27, 2023
1 parent f8535b1 commit 11fea99
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
2 changes: 2 additions & 0 deletions nodebuilder/header/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func (s *Service) Subscribe(ctx context.Context) (<-chan *header.ExtendedHeader,
headerCh := make(chan *header.ExtendedHeader)
go func() {
defer close(headerCh)
defer subscription.Cancel()

for {
h, err := subscription.NextHeader(ctx)
if err != nil {
Expand Down
57 changes: 57 additions & 0 deletions nodebuilder/tests/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package tests

import (
"context"
"testing"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/nodebuilder"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
)

// TestHeaderSubscription ensures that the header subscription over RPC works
// as intended and gets canceled successfully after rpc context cancellation.
func TestHeaderSubscription(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
t.Cleanup(cancel)

sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime))

// start a bridge node
bridge := sw.NewBridgeNode()
err := bridge.Start(ctx)
require.NoError(t, err)

cfg := nodebuilder.DefaultConfig(node.Light)
addrs, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(bridge.Host))
require.NoError(t, err)
cfg.Header.TrustedPeers = append(cfg.Header.TrustedPeers, addrs[0].String())

// start a light node that's connected to the bridge node
light := sw.NewNodeWithConfig(node.Light, cfg)
err = light.Start(ctx)
require.NoError(t, err)

// subscribe to headers via the light node's RPC header subscription
subctx, subcancel := context.WithCancel(ctx)
sub, err := light.HeaderServ.Subscribe(subctx)
require.NoError(t, err)
// listen for 5 headers
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-sub:
}
}
// cancel subscription via context
subcancel()

// stop the light node and expect no outstanding subscription errors
err = light.Stop(ctx)
require.NoError(t, err)
}

0 comments on commit 11fea99

Please sign in to comment.