Skip to content

Commit

Permalink
additional API method to support getting events of different types in…
Browse files Browse the repository at this point in the history
… index order
  • Loading branch information
ettec committed Dec 6, 2024
1 parent b6684ee commit 082f5f1
Show file tree
Hide file tree
Showing 10 changed files with 1,623 additions and 543 deletions.
1,181 changes: 758 additions & 423 deletions pkg/loop/internal/pb/contract_reader.pb.go

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions pkg/loop/internal/pb/contract_reader.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ service ContractReader {
rpc GetLatestValueWithHeadData (GetLatestValueRequest) returns (GetLatestValueWithHeadDataReply) {}
rpc BatchGetLatestValues (BatchGetLatestValuesRequest) returns (BatchGetLatestValuesReply) {}
rpc QueryKey(QueryKeyRequest) returns (QueryKeyReply) {}
rpc QueryKeys(QueryKeysRequest) returns (QueryKeysReply) {}
rpc Bind(BindRequest) returns (google.protobuf.Empty) {}
rpc Unbind(UnbindRequest) returns (google.protobuf.Empty) {}
}
Expand All @@ -37,6 +38,18 @@ message QueryKeyRequest {
bool as_value_type = 4;
}

// QueryKeysRequest has arguments for [github.com/smartcontractkit/chainlink-common/pkg/types.ContractReader.QueryKeys].
message QueryKeysRequest {
repeated ContractKeyFilter filters = 1;
LimitAndSort limit_and_sort = 2;
}

message ContractKeyFilter {
BoundContract contract = 1;
QueryKeyFilter filter = 2;
bool as_value_type = 4;
}

// BindRequest has arguments for [github.com/smartcontractkit/chainlink-common/pkg/types.ContractReader.Bind].
message BindRequest {
repeated BoundContract bindings = 1;
Expand Down Expand Up @@ -69,6 +82,11 @@ message QueryKeyReply {
repeated Sequence sequences = 1;
}

// QueryKeysReply has return arguments for [github.com/smartcontractkit/chainlink-common/pkg/types.ContractReader.QueryKeys].
message QueryKeysReply {
repeated SequenceWithKey sequences = 1;
}

// ContractBatch is gRPC adapter for the BatchGetLatestValuesRequest struct map value [github.com/smartcontractkit/chainlink-common/pkg/types.ContractReader.BatchGetLatestValuesRequest].
message ContractBatch {
BoundContract contract = 1;
Expand Down Expand Up @@ -109,6 +127,13 @@ message Sequence {
VersionedBytes data = 3;
}

message SequenceWithKey {
string sequence_cursor = 1;
Head head = 2;
VersionedBytes data = 3;
string key = 4;
}

// BoundContract represents a [github.com/smartcontractkit/chainlink-common/pkg/types.BoundContract].
message BoundContract {
string address = 1;
Expand Down
37 changes: 37 additions & 0 deletions pkg/loop/internal/pb/contract_reader_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"iter"
"reflect"

"github.com/fxamacker/cbor/v2"
Expand Down Expand Up @@ -280,6 +281,43 @@ func (c *Client) QueryKey(ctx context.Context, contract types.BoundContract, fil
return convertSequencesFromProto(reply.Sequences, sequenceDataType)
}

func (c *Client) QueryKeys(ctx context.Context, keyQueries []types.ContractKeyFilter, limitAndSort query.LimitAndSort) (iter.Seq2[string, types.Sequence], error) {
var filters []*pb.ContractKeyFilter
for _, keyQuery := range keyQueries {
_, asValueType := keyQuery.SequenceDataType.(*values.Value)
contract := convertBoundContractToProto(keyQuery.Contract)

pbQueryFilter, err := convertQueryFilterToProto(keyQuery.KeyFilter, c.encodeWith)
if err != nil {
return nil, err
}

filters = append(filters, &pb.ContractKeyFilter{
Contract: contract,
Filter: pbQueryFilter,
AsValueType: asValueType,
})
}

pbLimitAndSort, err := convertLimitAndSortToProto(limitAndSort)
if err != nil {
return nil, err
}

reply, err := c.grpc.QueryKeys(
ctx,
&pb.QueryKeysRequest{
Filters: filters,
LimitAndSort: pbLimitAndSort,
},
)
if err != nil {
return nil, net.WrapRPCErr(err)
}

return convertSequencesWithKeyFromProto(reply.Sequences, keyQueries)
}

func (c *Client) Bind(ctx context.Context, bindings []types.BoundContract) error {
pbBindings := make([]*pb.BoundContract, len(bindings))
for i, b := range bindings {
Expand Down Expand Up @@ -462,7 +500,7 @@ func (c *Server) BatchGetLatestValues(ctx context.Context, pbRequest *pb.BatchGe
func (c *Server) QueryKey(ctx context.Context, request *pb.QueryKeyRequest) (*pb.QueryKeyReply, error) {
contract := convertBoundContractFromProto(request.Contract)

queryFilter, err := convertQueryFiltersFromProto(request, contract, c.impl)
queryFilter, err := convertQueryFiltersFromProto(request.Filter, contract, c.impl)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -495,6 +533,46 @@ func (c *Server) QueryKey(ctx context.Context, request *pb.QueryKeyRequest) (*pb
return &pb.QueryKeyReply{Sequences: pbSequences}, nil
}

func (c *Server) QueryKeys(ctx context.Context, request *pb.QueryKeysRequest) (*pb.QueryKeysReply, error) {
var filters []types.ContractKeyFilter
for _, keyQuery := range request.Filters {
contract := convertBoundContractFromProto(keyQuery.Contract)

queryFilter, err := convertQueryFiltersFromProto(keyQuery.Filter, contract, c.impl)
if err != nil {
return nil, err
}

sequenceDataType, err := getContractEncodedType(contract.ReadIdentifier(queryFilter.Key), c.impl, false)
if err != nil {
return nil, err
}

filters = append(filters, types.ContractKeyFilter{
Contract: contract,
KeyFilter: queryFilter,
SequenceDataType: sequenceDataType,
})
}

limitAndSort, err := convertLimitAndSortFromProto(request.GetLimitAndSort())
if err != nil {
return nil, err
}

sequences, err := c.impl.QueryKeys(ctx, filters, limitAndSort)
if err != nil {
return nil, err
}

pbSequences, err := convertSequencesWithKeyToVersionedBytesProto(sequences, request.Filters, c.encodeWith)
if err != nil {
return nil, err
}

return &pb.QueryKeysReply{Sequences: pbSequences}, nil
}

func (c *Server) Bind(ctx context.Context, bindings *pb.BindRequest) (*emptypb.Empty, error) {
tBindings := make([]types.BoundContract, len(bindings.Bindings))
for i, b := range bindings.Bindings {
Expand Down Expand Up @@ -758,6 +836,42 @@ func convertSequencesToVersionedBytesProto(sequences []types.Sequence, version E
return pbSequences, nil
}

func convertSequencesWithKeyToVersionedBytesProto(sequences iter.Seq2[string, types.Sequence], filters []*pb.ContractKeyFilter, encodeWith EncodingVersion) ([]*pb.SequenceWithKey, error) {
keyToEncodingVersion := make(map[string]EncodingVersion)
for _, filter := range filters {
if filter.AsValueType {
keyToEncodingVersion[filter.Filter.Key] = ValuesEncodingVersion
} else {
keyToEncodingVersion[filter.Filter.Key] = encodeWith
}
}

var pbSequences []*pb.SequenceWithKey
for key, sequence := range sequences {
version, ok := keyToEncodingVersion[key]
if !ok {
return nil, fmt.Errorf("missing encoding version for key %s", key)
}

versionedSequenceDataType, err := EncodeVersionedBytes(sequence.Data, version)
if err != nil {
return nil, err
}
pbSequence := &pb.SequenceWithKey{
Key: key,
SequenceCursor: sequence.Cursor,
Head: &pb.Head{
Height: sequence.Height,
Hash: sequence.Hash,
Timestamp: sequence.Timestamp,
},
Data: versionedSequenceDataType,
}
pbSequences = append(pbSequences, pbSequence)
}
return pbSequences, nil
}

func parseBatchGetLatestValuesReply(request types.BatchGetLatestValuesRequest, reply *pb.BatchGetLatestValuesReply) (types.BatchGetLatestValuesResult, error) {
if reply == nil {
return nil, fmt.Errorf("received nil reply from grpc BatchGetLatestValues")
Expand Down Expand Up @@ -844,8 +958,7 @@ func convertBoundContractFromProto(contract *pb.BoundContract) types.BoundContra
}
}

func convertQueryFiltersFromProto(request *pb.QueryKeyRequest, contract types.BoundContract, impl types.ContractReader) (query.KeyFilter, error) {
pbQueryFilters := request.Filter
func convertQueryFiltersFromProto(pbQueryFilters *pb.QueryKeyFilter, contract types.BoundContract, impl types.ContractReader) (query.KeyFilter, error) {
queryFilter := query.KeyFilter{Key: pbQueryFilters.Key}
for _, pbQueryFilter := range pbQueryFilters.Expression {
expression, err := convertExpressionFromProto(pbQueryFilter, contract, queryFilter.Key, impl)
Expand Down Expand Up @@ -949,16 +1062,9 @@ func convertLimitAndSortFromProto(limitAndSort *pb.LimitAndSort) (query.LimitAnd
func convertSequencesFromProto(pbSequences []*pb.Sequence, sequenceDataType any) ([]types.Sequence, error) {
sequences := make([]types.Sequence, len(pbSequences))

seqTypeOf := reflect.TypeOf(sequenceDataType)

// get the non-pointer data type for the sequence data
nonPointerType := seqTypeOf
if seqTypeOf.Kind() == reflect.Pointer {
nonPointerType = seqTypeOf.Elem()
}

if nonPointerType.Kind() == reflect.Pointer {
return nil, fmt.Errorf("%w: sequenceDataType does not support pointers to pointers", types.ErrInvalidType)
seqTypeOf, nonPointerType, err := getSequenceTypeInformation(sequenceDataType)
if err != nil {
return nil, err
}

for idx, pbSequence := range pbSequences {
Expand Down Expand Up @@ -986,6 +1092,78 @@ func convertSequencesFromProto(pbSequences []*pb.Sequence, sequenceDataType any)
return sequences, nil
}

func getSequenceTypeInformation(sequenceDataType any) (reflect.Type, reflect.Type, error) {
seqTypeOf := reflect.TypeOf(sequenceDataType)

// get the non-pointer data type for the sequence data
nonPointerType := seqTypeOf
if seqTypeOf.Kind() == reflect.Pointer {
nonPointerType = seqTypeOf.Elem()
}

if nonPointerType.Kind() == reflect.Pointer {
return nil, nil, fmt.Errorf("%w: sequenceDataType does not support pointers to pointers", types.ErrInvalidType)
}
return seqTypeOf, nonPointerType, nil
}

func convertSequencesWithKeyFromProto(pbSequences []*pb.SequenceWithKey, keyQueries []types.ContractKeyFilter) (iter.Seq2[string, types.Sequence], error) {
type sequenceWithKey struct {
Key string
Sequence types.Sequence
}

sequencesWithKey := make([]sequenceWithKey, len(pbSequences))

keyToSeqTypeOf := make(map[string]reflect.Type)
keyToNonPointerType := make(map[string]reflect.Type)

for _, keyQuery := range keyQueries {
seqTypeOf, nonPointerType, err := getSequenceTypeInformation(keyQuery.SequenceDataType)
if err != nil {
return nil, err
}

keyToSeqTypeOf[keyQuery.Key] = seqTypeOf
keyToNonPointerType[keyQuery.Key] = nonPointerType
}

for idx, pbSequence := range pbSequences {
seqTypeOf, nonPointerType := keyToSeqTypeOf[pbSequence.Key], keyToNonPointerType[pbSequence.Key]

cpy := reflect.New(nonPointerType).Interface()
if err := DecodeVersionedBytes(cpy, pbSequence.Data); err != nil {
return nil, err
}

// match provided data type either as pointer or non-pointer
if seqTypeOf.Kind() != reflect.Pointer {
cpy = reflect.Indirect(reflect.ValueOf(cpy)).Interface()
}
pbSeq := pbSequences[idx]
sequencesWithKey[idx] = sequenceWithKey{
Key: pbSeq.Key,
Sequence: types.Sequence{
Cursor: pbSeq.SequenceCursor,
Head: types.Head{
Height: pbSeq.Head.Height,
Hash: pbSeq.Head.Hash,
Timestamp: pbSeq.Head.Timestamp,
},
Data: cpy,
},
}
}

return func(yield func(string, types.Sequence) bool) {
for _, s := range sequencesWithKey {
if !yield(s.Key, s.Sequence) {
return
}
}
}, nil
}

func RegisterContractReaderService(s *grpc.Server, contractReader types.ContractReader) {
service := goplugin.ServiceServer{Srv: contractReader}
pb.RegisterServiceServer(s, &service)
Expand Down
Loading

0 comments on commit 082f5f1

Please sign in to comment.