Skip to content

Commit

Permalink
fix: check if the channel is closed before returning ws.result (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhughdo authored Oct 10, 2024
1 parent 28f36b6 commit 6beb7f8
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 9 deletions.
5 changes: 4 additions & 1 deletion rpc/ws/accountSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ type AccountSubscription struct {

func (sw *AccountSubscription) Recv() (*AccountResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*AccountResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion rpc/ws/blockSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ type BlockSubscription struct {

func (sw *BlockSubscription) Recv() (*BlockResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*BlockResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions rpc/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ws

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -32,6 +33,8 @@ import (
"go.uber.org/zap"
)

var ErrSubscriptionClosed = errors.New("subscription closed")

type result interface{}

type Client struct {
Expand Down
5 changes: 4 additions & 1 deletion rpc/ws/logsSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ type LogSubscription struct {

func (sw *LogSubscription) Recv() (*LogResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*LogResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion rpc/ws/programSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ type ProgramSubscription struct {

func (sw *ProgramSubscription) Recv() (*ProgramResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*ProgramResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion rpc/ws/rootSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ type RootSubscription struct {

func (sw *RootSubscription) Recv() (*RootResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*RootResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion rpc/ws/signatureSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ type SignatureSubscription struct {

func (sw *SignatureSubscription) Recv() (*SignatureResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*SignatureResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion rpc/ws/slotSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ type SlotSubscription struct {

func (sw *SlotSubscription) Recv() (*SlotResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*SlotResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion rpc/ws/slotsUpdatesSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ type SlotsUpdatesSubscription struct {

func (sw *SlotsUpdatesSubscription) Recv() (*SlotsUpdatesResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*SlotsUpdatesResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion rpc/ws/voteSubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ type VoteSubscription struct {

func (sw *VoteSubscription) Recv() (*VoteResult, error) {
select {
case d := <-sw.sub.stream:
case d, ok := <-sw.sub.stream:
if !ok {
return nil, ErrSubscriptionClosed
}
return d.(*VoteResult), nil
case err := <-sw.sub.err:
return nil, err
Expand Down

0 comments on commit 6beb7f8

Please sign in to comment.