diff --git a/rpc/ws/accountSubscribe.go b/rpc/ws/accountSubscribe.go index 8c54229a..65bd59e7 100644 --- a/rpc/ws/accountSubscribe.go +++ b/rpc/ws/accountSubscribe.go @@ -85,7 +85,10 @@ type AccountSubscription struct { func (sw *AccountSubscription) Recv() (*AccountResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*AccountResult), nil case err := <-sw.sub.err: return nil, err diff --git a/rpc/ws/blockSubscribe.go b/rpc/ws/blockSubscribe.go index e1530f11..d392eb97 100644 --- a/rpc/ws/blockSubscribe.go +++ b/rpc/ws/blockSubscribe.go @@ -150,7 +150,10 @@ type BlockSubscription struct { func (sw *BlockSubscription) Recv() (*BlockResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*BlockResult), nil case err := <-sw.sub.err: return nil, err diff --git a/rpc/ws/client.go b/rpc/ws/client.go index 51770be7..0f574b81 100644 --- a/rpc/ws/client.go +++ b/rpc/ws/client.go @@ -19,6 +19,7 @@ package ws import ( "context" + "errors" "fmt" "io" "net/http" @@ -32,6 +33,8 @@ import ( "go.uber.org/zap" ) +var ErrSubscriptionClosed = errors.New("subscription closed") + type result interface{} type Client struct { diff --git a/rpc/ws/logsSubscribe.go b/rpc/ws/logsSubscribe.go index 1a9b5e03..98a4160d 100644 --- a/rpc/ws/logsSubscribe.go +++ b/rpc/ws/logsSubscribe.go @@ -109,7 +109,10 @@ type LogSubscription struct { func (sw *LogSubscription) Recv() (*LogResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*LogResult), nil case err := <-sw.sub.err: return nil, err diff --git a/rpc/ws/programSubscribe.go b/rpc/ws/programSubscribe.go index fb94491a..3a4f127e 100644 --- a/rpc/ws/programSubscribe.go +++ b/rpc/ws/programSubscribe.go @@ -88,7 +88,10 @@ type ProgramSubscription struct { func (sw *ProgramSubscription) Recv() (*ProgramResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*ProgramResult), nil case err := <-sw.sub.err: return nil, err diff --git a/rpc/ws/rootSubscribe.go b/rpc/ws/rootSubscribe.go index f54f30f5..46ce656b 100644 --- a/rpc/ws/rootSubscribe.go +++ b/rpc/ws/rootSubscribe.go @@ -44,7 +44,10 @@ type RootSubscription struct { func (sw *RootSubscription) Recv() (*RootResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*RootResult), nil case err := <-sw.sub.err: return nil, err diff --git a/rpc/ws/signatureSubscribe.go b/rpc/ws/signatureSubscribe.go index 134d244f..d0f08025 100644 --- a/rpc/ws/signatureSubscribe.go +++ b/rpc/ws/signatureSubscribe.go @@ -69,7 +69,10 @@ type SignatureSubscription struct { func (sw *SignatureSubscription) Recv() (*SignatureResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*SignatureResult), nil case err := <-sw.sub.err: return nil, err diff --git a/rpc/ws/slotSubscribe.go b/rpc/ws/slotSubscribe.go index 05096bad..c49ac9dc 100644 --- a/rpc/ws/slotSubscribe.go +++ b/rpc/ws/slotSubscribe.go @@ -47,7 +47,10 @@ type SlotSubscription struct { func (sw *SlotSubscription) Recv() (*SlotResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*SlotResult), nil case err := <-sw.sub.err: return nil, err diff --git a/rpc/ws/slotsUpdatesSubscribe.go b/rpc/ws/slotsUpdatesSubscribe.go index 45712434..36fb564b 100644 --- a/rpc/ws/slotsUpdatesSubscribe.go +++ b/rpc/ws/slotsUpdatesSubscribe.go @@ -79,7 +79,10 @@ type SlotsUpdatesSubscription struct { func (sw *SlotsUpdatesSubscription) Recv() (*SlotsUpdatesResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*SlotsUpdatesResult), nil case err := <-sw.sub.err: return nil, err diff --git a/rpc/ws/voteSubscribe.go b/rpc/ws/voteSubscribe.go index 1d4aebc6..b0e56694 100644 --- a/rpc/ws/voteSubscribe.go +++ b/rpc/ws/voteSubscribe.go @@ -61,7 +61,10 @@ type VoteSubscription struct { func (sw *VoteSubscription) Recv() (*VoteResult, error) { select { - case d := <-sw.sub.stream: + case d, ok := <-sw.sub.stream: + if !ok { + return nil, ErrSubscriptionClosed + } return d.(*VoteResult), nil case err := <-sw.sub.err: return nil, err