Skip to content

Commit

Permalink
introduce a new subscribe function for ordered responses
Browse files Browse the repository at this point in the history
  • Loading branch information
illia-malachyn committed Oct 22, 2024
1 parent 5ba2321 commit 92a96e3
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
78 changes: 75 additions & 3 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartHeight(
return convert.MessageToAccountStatus(response)
}

return subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse)
return subscribeContinuouslyIndexed(ctx, subscribeClient.Recv, convertAccountStatusResponse)
}

func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID(
Expand All @@ -1454,7 +1454,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID(
return convert.MessageToAccountStatus(response)
}

return subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse)
return subscribeContinuouslyIndexed(ctx, subscribeClient.Recv, convertAccountStatusResponse)
}

func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock(
Expand All @@ -1479,7 +1479,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock(
return convert.MessageToAccountStatus(response)
}

return subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse)
return subscribeContinuouslyIndexed(ctx, subscribeClient.Recv, convertAccountStatusResponse)
}

func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID(
Expand Down Expand Up @@ -1564,6 +1564,13 @@ func (c *BaseClient) SubscribeBlockDigestsFromLatest(
return subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse)
}

// subscribe sets up a generic subscription that continuously receives and processes messages
// from a data source. It does not enforce any message ordering or indexing. The function takes
// three parameters: a receive() function for getting the next message, a convertResponse() function
// for transforming the message into the desired response type, and a context for cancellation.
// It returns two channels: one for the converted responses and another for errors. The function
// runs in a separate goroutine and handles errors gracefully, signaling completion when the
// context is canceled or an error occurs.
func subscribe[Response any, ClientResponse any](
ctx context.Context,
receive func() (*ClientResponse, error),
Expand Down Expand Up @@ -1610,3 +1617,68 @@ func subscribe[Response any, ClientResponse any](

return subChan, errChan, nil
}

type IndexedMessage interface {
GetMessageIndex() uint64
}

// subscribeContinuouslyIndexed is a specialized version of the subscription function for cases
// where messages contain an index to ensure order. The Response type must implement the
// IndexedMessage interface, providing a GetMessageIndex method. The function checks that each
// received message's index matches the expected sequence, starting from zero and incrementing
// by one. If a message arrives out of order, an error is sent. This function helps clients
// detect any missed messages and ensures consistent message processing.
func subscribeContinuouslyIndexed[Response IndexedMessage, ClientResponse any](
ctx context.Context,
receive func() (*ClientResponse, error),
convertResponse func(*ClientResponse) (Response, error),
) (<-chan Response, <-chan error, error) {
subChan := make(chan Response)
errChan := make(chan error)

sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

go func() {
defer close(subChan)
defer close(errChan)

var nextExpectedMessageIndex uint64

for {
resp, err := receive()
if err != nil {
if err == io.EOF {
return
}

sendErr(fmt.Errorf("error receiving %s: %w", reflect.TypeOf(resp).Name(), err))
return
}

response, err := convertResponse(resp)
if err != nil {
sendErr(fmt.Errorf("error converting %s: %w", reflect.TypeOf(resp).Name(), err))
return
}

if response.GetMessageIndex() != nextExpectedMessageIndex {
sendErr(fmt.Errorf("message received out of order"))
return
}
nextExpectedMessageIndex += 1

select {
case <-ctx.Done():
return
case subChan <- response:
}
}
}()

return subChan, errChan, nil
}
4 changes: 4 additions & 0 deletions account.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ type AccountStatus struct {
Results []*AccountStatusResult
}

func (a AccountStatus) GetMessageIndex() uint64 {
return a.MessageIndex
}

type AccountStatusResult struct {
Address Address
Events []Event
Expand Down

0 comments on commit 92a96e3

Please sign in to comment.