Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make event encoding configurable over grpc #673

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ const PreviewnetHost = "access.previewnet.nodes.onflow.org:9000"
type ClientOption func(*options)

type options struct {
dialOptions []grpc.DialOption
jsonOptions []jsoncdc.Option
dialOptions []grpc.DialOption
jsonOptions []jsoncdc.Option
eventEncoding flow.EventEncodingVersion
}

func DefaultClientOptions() *options {
Expand All @@ -61,6 +62,7 @@ func DefaultClientOptions() *options {
jsonOptions: []jsoncdc.Option{
jsoncdc.WithAllowUnstructuredStaticTypes(true),
},
eventEncoding: flow.EventEncodingVersionCCF,
}
}

Expand All @@ -78,6 +80,13 @@ func WithJSONOptions(jsonOpts ...jsoncdc.Option) ClientOption {
}
}

// WithEventEncoding sets the default event encoding to use when requesting events from the API
func WithEventEncoding(version flow.EventEncodingVersion) ClientOption {
return func(opts *options) {
opts.eventEncoding = version
}
}

// NewClient creates an gRPC client exposing all the common access APIs.
// Client will use provided host for connection.
func NewClient(host string, opts ...ClientOption) (*Client, error) {
Expand All @@ -92,6 +101,7 @@ func NewClient(host string, opts ...ClientOption) (*Client, error) {
}

client.SetJSONOptions(cfg.jsonOptions)
client.SetEventEncoding(cfg.eventEncoding)

return &Client{grpc: client}, nil
}
Expand All @@ -103,6 +113,11 @@ type Client struct {
grpc *BaseClient
}

// RPCClient returns the underlying gRPC client.
func (c *Client) RPCClient() RPCClient {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't strictly needed here, but it provides some flexibility for users in the future to break out into a raw grpc request when the sdk doesn't expose some needed/new functionality

return c.grpc.RPCClient()
}

func (c *Client) Ping(ctx context.Context) error {
return c.grpc.Ping(ctx)
}
Expand Down
37 changes: 24 additions & 13 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/onflow/cadence"
"github.com/onflow/cadence/encoding/json"
"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
"github.com/onflow/flow/protobuf/go/flow/executiondata"

"github.com/onflow/flow-go-sdk"
Expand Down Expand Up @@ -81,6 +80,7 @@ type BaseClient struct {
executionDataClient ExecutionDataRPCClient
close func() error
jsonOptions []json.Option
eventEncoding flow.EventEncodingVersion
}

// NewBaseClient creates a new gRPC handler for network communication.
Expand All @@ -99,14 +99,16 @@ func NewBaseClient(url string, opts ...grpc.DialOption) (*BaseClient, error) {
executionDataClient: execDataClient,
close: func() error { return conn.Close() },
jsonOptions: []json.Option{json.WithAllowUnstructuredStaticTypes(true)},
eventEncoding: flow.EventEncodingVersionCCF,
}, nil
}

// NewFromRPCClient initializes a Flow client using a pre-configured gRPC provider.
func NewFromRPCClient(rpcClient RPCClient) *BaseClient {
return &BaseClient{
rpcClient: rpcClient,
close: func() error { return nil },
rpcClient: rpcClient,
close: func() error { return nil },
eventEncoding: flow.EventEncodingVersionCCF,
}
}

Expand All @@ -115,13 +117,22 @@ func NewFromExecutionDataRPCClient(rpcClient ExecutionDataRPCClient) *BaseClient
return &BaseClient{
executionDataClient: rpcClient,
close: func() error { return nil },
eventEncoding: flow.EventEncodingVersionCCF,
}
}

func (c *BaseClient) SetJSONOptions(options []json.Option) {
c.jsonOptions = options
}

func (c *BaseClient) SetEventEncoding(version flow.EventEncodingVersion) {
c.eventEncoding = version
}

func (c *BaseClient) RPCClient() RPCClient {
return c.rpcClient
}

// Close closes the client connection.
func (c *BaseClient) Close() error {
return c.close()
Expand Down Expand Up @@ -380,7 +391,7 @@ func (c *BaseClient) GetTransactionResult(
) (*flow.TransactionResult, error) {
req := &access.GetTransactionRequest{
Id: txID.Bytes(),
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetTransactionResult(ctx, req, opts...)
Expand All @@ -406,7 +417,7 @@ func (c *BaseClient) GetTransactionResultByIndex(
req := &access.GetTransactionByIndexRequest{
BlockId: blockID.Bytes(),
Index: index,
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetTransactionResultByIndex(ctx, req, opts...)
Expand All @@ -429,7 +440,7 @@ func (c *BaseClient) GetTransactionResultsByBlockID(

req := &access.GetTransactionsByBlockIDRequest{
BlockId: blockID.Bytes(),
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetTransactionResultsByBlockID(ctx, req, opts...)
Expand Down Expand Up @@ -607,7 +618,7 @@ func (c *BaseClient) GetEventsForHeightRange(
Type: query.Type,
StartHeight: query.StartHeight,
EndHeight: query.EndHeight,
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetEventsForHeightRange(ctx, req, opts...)
Expand All @@ -627,7 +638,7 @@ func (c *BaseClient) GetEventsForBlockIDs(
req := &access.GetEventsForBlockIDsRequest{
Type: eventType,
BlockIds: identifiersToMessages(blockIDs),
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}

res, err := c.rpcClient.GetEventsForBlockIDs(ctx, req, opts...)
Expand Down Expand Up @@ -724,7 +735,7 @@ func (c *BaseClient) GetExecutionDataByBlockID(

ed, err := c.executionDataClient.GetExecutionDataByBlockID(ctx, &executiondata.GetExecutionDataByBlockIDRequest{
BlockId: identifierToMessage(blockID),
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}, opts...)
if err != nil {
return nil, newRPCError(err)
Expand All @@ -741,7 +752,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockID(
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
req := executiondata.SubscribeExecutionDataRequest{
StartBlockId: startBlockID[:],
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}
return c.subscribeExecutionData(ctx, &req, opts...)
}
Expand All @@ -753,7 +764,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockHeight(
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
req := executiondata.SubscribeExecutionDataRequest{
StartBlockHeight: startHeight,
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}
return c.subscribeExecutionData(ctx, &req, opts...)
}
Expand Down Expand Up @@ -824,7 +835,7 @@ func (c *BaseClient) SubscribeEventsByBlockID(
) (<-chan flow.BlockEvents, <-chan error, error) {
req := executiondata.SubscribeEventsRequest{
StartBlockId: startBlockID[:],
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}
return c.subscribeEvents(ctx, &req, filter, opts...)
}
Expand All @@ -837,7 +848,7 @@ func (c *BaseClient) SubscribeEventsByBlockHeight(
) (<-chan flow.BlockEvents, <-chan error, error) {
req := executiondata.SubscribeEventsRequest{
StartBlockHeight: startHeight,
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
EventEncodingVersion: c.eventEncoding,
}
return c.subscribeEvents(ctx, &req, filter, opts...)
}
Expand Down
84 changes: 81 additions & 3 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"testing"

"github.com/onflow/cadence/encoding/ccf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -927,7 +928,8 @@ func TestClient_GetEventsForHeightRange(t *testing.T) {

func TestClient_GetEventsForBlockIDs(t *testing.T) {
ids := test.IdentifierGenerator()
events := test.EventGenerator()
ccfEvents := test.EventGenerator().WithEncoding(flow.EventEncodingVersionCCF)
jsonEvents := test.EventGenerator().WithEncoding(flow.EventEncodingVersionJSONCDC)

t.Run(
"Empty result",
Expand All @@ -948,10 +950,85 @@ func TestClient_GetEventsForBlockIDs(t *testing.T) {
)

t.Run(
"Non-empty result",
"Non-empty result with ccf encoding",
clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
blockIDA, blockIDB := ids.New(), ids.New()
eventA, eventB, eventC, eventD := events.New(), events.New(), events.New(), events.New()
eventA, eventB, eventC, eventD := ccfEvents.New(), ccfEvents.New(), ccfEvents.New(), ccfEvents.New()

eventAMsg, _ := eventToMessage(eventA)
eventBMsg, _ := eventToMessage(eventB)
eventCMsg, _ := eventToMessage(eventC)
eventDMsg, _ := eventToMessage(eventD)

var err error
eventAMsg.Payload, err = ccf.Encode(eventA.Value)
require.NoError(t, err)

eventBMsg.Payload, err = ccf.Encode(eventB.Value)
require.NoError(t, err)

eventCMsg.Payload, err = ccf.Encode(eventC.Value)
require.NoError(t, err)

eventDMsg.Payload, err = ccf.Encode(eventD.Value)
require.NoError(t, err)

response := &access.EventsResponse{
Results: []*access.EventsResponse_Result{
{
BlockId: blockIDA.Bytes(),
BlockHeight: 1,
BlockTimestamp: timestamppb.Now(),
Events: []*entities.Event{
eventAMsg,
eventBMsg,
},
},
{
BlockId: blockIDB.Bytes(),
BlockHeight: 2,
BlockTimestamp: timestamppb.Now(),
Events: []*entities.Event{
eventCMsg,
eventDMsg,
},
},
},
}

rpc.On("GetEventsForBlockIDs", ctx, mock.Anything).Return(response, nil)

blocks, err := c.GetEventsForBlockIDs(ctx, "foo", []flow.Identifier{blockIDA, blockIDB})
require.NoError(t, err)

// Force evaluation of type ID, which is cached in type.
// Necessary for equality checks below
for _, block := range blocks {
for _, event := range block.Events {
_ = event.Value.Type().ID()
}
}

assert.Len(t, blocks, len(response.Results))

assert.Equal(t, response.Results[0].BlockId, blocks[0].BlockID.Bytes())
assert.Equal(t, response.Results[0].BlockHeight, blocks[0].Height)

assert.Equal(t, response.Results[1].BlockId, blocks[1].BlockID.Bytes())
assert.Equal(t, response.Results[1].BlockHeight, blocks[1].Height)

assert.Equal(t, eventA, blocks[0].Events[0])
assert.Equal(t, eventB, blocks[0].Events[1])
assert.Equal(t, eventC, blocks[1].Events[0])
assert.Equal(t, eventD, blocks[1].Events[1])
}),
)

t.Run(
"Non-empty result with json encoding",
clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
blockIDA, blockIDB := ids.New(), ids.New()
eventA, eventB, eventC, eventD := jsonEvents.New(), jsonEvents.New(), jsonEvents.New(), jsonEvents.New()

eventAMsg, _ := eventToMessage(eventA)
eventBMsg, _ := eventToMessage(eventB)
Expand Down Expand Up @@ -983,6 +1060,7 @@ func TestClient_GetEventsForBlockIDs(t *testing.T) {

rpc.On("GetEventsForBlockIDs", ctx, mock.Anything).Return(response, nil)

c.SetEventEncoding(flow.EventEncodingVersionJSONCDC)
blocks, err := c.GetEventsForBlockIDs(ctx, "foo", []flow.Identifier{blockIDA, blockIDB})
require.NoError(t, err)

Expand Down
8 changes: 8 additions & 0 deletions decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,12 @@ import (
// as it registers the type ID decoder for the Flow types,
// e.g. `flow.AccountCreated`
_ "github.com/onflow/cadence/runtime/stdlib"
"github.com/onflow/flow/protobuf/go/flow/entities"
)

type EventEncodingVersion = entities.EventEncodingVersion

const (
EventEncodingVersionCCF = entities.EventEncodingVersion_CCF_V0
EventEncodingVersionJSONCDC = entities.EventEncodingVersion_JSON_CDC_V0
)
8 changes: 4 additions & 4 deletions test/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ func EventGenerator() *Events {

func (g *Events) WithEncoding(encoding entities.EventEncodingVersion) *Events {
switch encoding {
case entities.EventEncodingVersion_CCF_V0:
case flow.EventEncodingVersionCCF:
g.encoding = encoding
case entities.EventEncodingVersion_JSON_CDC_V0:
case flow.EventEncodingVersionJSONCDC:
g.encoding = encoding
default:
panic(fmt.Errorf("unsupported event encoding: %v", encoding))
Expand Down Expand Up @@ -297,7 +297,7 @@ func (g *Events) New() flow.Event {

var payload []byte
var err error
if g.encoding == entities.EventEncodingVersion_CCF_V0 {
if g.encoding == flow.EventEncodingVersionCCF {
payload, err = ccf.Encode(testEvent)
} else {
payload, err = jsoncdc.Encode(testEvent)
Expand Down Expand Up @@ -490,7 +490,7 @@ func ChunkExecutionDataGenerator() *ChunkExecutionDatas {
return &ChunkExecutionDatas{
ids: IdentifierGenerator(),
txs: TransactionGenerator(),
events: EventGenerator().WithEncoding(entities.EventEncodingVersion_CCF_V0),
events: EventGenerator().WithEncoding(flow.EventEncodingVersionCCF),
trieUpdates: TrieUpdateGenerator(),
results: LightTransactionResultGenerator(),
}
Expand Down
Loading