diff --git a/client/actor.go b/client/actor.go index 6f6f0f6b..c7099aa4 100644 --- a/client/actor.go +++ b/client/actor.go @@ -74,7 +74,7 @@ func (c *GRPCClient) InvokeActor(ctx context.Context, in *InvokeActorRequest) (o out = &InvokeActorResponse{} if resp != nil { - out.Data = resp.Data + out.Data = resp.GetData() } return out, nil @@ -421,7 +421,7 @@ func (c *GRPCClient) GetActorState(ctx context.Context, in *GetActorStateRequest if err != nil { return nil, fmt.Errorf("error invoking actor get state %s/%s: %w", in.ActorType, in.ActorID, err) } - return &GetActorStateResponse{Data: rsp.Data}, nil + return &GetActorStateResponse{Data: rsp.GetData()}, nil } type ActorStateOperation struct { diff --git a/client/binding.go b/client/binding.go index 3acae37c..f06d7276 100644 --- a/client/binding.go +++ b/client/binding.go @@ -68,8 +68,8 @@ func (c *GRPCClient) InvokeBinding(ctx context.Context, in *InvokeBindingRequest if resp != nil { return &BindingEvent{ - Data: resp.Data, - Metadata: resp.Metadata, + Data: resp.GetData(), + Metadata: resp.GetMetadata(), }, nil } diff --git a/client/client_test.go b/client/client_test.go index 0e7ae310..04d070ec 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -251,7 +251,7 @@ func (s *testDaprServer) UnlockAlpha1(ctx context.Context, req *pb.UnlockRequest } func (s *testDaprServer) InvokeService(ctx context.Context, req *pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) { - if req.Message == nil { + if req.GetMessage() == nil { return &commonv1pb.InvokeResponse{ ContentType: "text/plain", Data: &anypb.Any{ @@ -260,14 +260,14 @@ func (s *testDaprServer) InvokeService(ctx context.Context, req *pb.InvokeServic }, nil } return &commonv1pb.InvokeResponse{ - ContentType: req.Message.ContentType, - Data: req.Message.Data, + ContentType: req.GetMessage().GetContentType(), + Data: req.GetMessage().GetData(), }, nil } func (s *testDaprServer) GetState(ctx context.Context, req *pb.GetStateRequest) (*pb.GetStateResponse, error) { return &pb.GetStateResponse{ - Data: s.state[req.Key], + Data: s.state[req.GetKey()], Etag: "1", }, nil } @@ -290,15 +290,15 @@ func (s *testDaprServer) GetBulkState(ctx context.Context, in *pb.GetBulkStateRe } func (s *testDaprServer) SaveState(ctx context.Context, req *pb.SaveStateRequest) (*empty.Empty, error) { - for _, item := range req.States { - s.state[item.Key] = item.Value + for _, item := range req.GetStates() { + s.state[item.GetKey()] = item.GetValue() } return &empty.Empty{}, nil } func (s *testDaprServer) QueryStateAlpha1(ctx context.Context, req *pb.QueryStateRequest) (*pb.QueryStateResponse, error) { var v map[string]interface{} - if err := json.Unmarshal([]byte(req.Query), &v); err != nil { + if err := json.Unmarshal([]byte(req.GetQuery()), &v); err != nil { return nil, err } @@ -306,19 +306,19 @@ func (s *testDaprServer) QueryStateAlpha1(ctx context.Context, req *pb.QueryStat Results: make([]*pb.QueryStateItem, 0, len(s.state)), } for key, value := range s.state { - ret.Results = append(ret.Results, &pb.QueryStateItem{Key: key, Data: value}) + ret.Results = append(ret.GetResults(), &pb.QueryStateItem{Key: key, Data: value}) } return ret, nil } func (s *testDaprServer) DeleteState(ctx context.Context, req *pb.DeleteStateRequest) (*empty.Empty, error) { - delete(s.state, req.Key) + delete(s.state, req.GetKey()) return &empty.Empty{}, nil } func (s *testDaprServer) DeleteBulkState(ctx context.Context, req *pb.DeleteBulkStateRequest) (*empty.Empty, error) { - for _, item := range req.States { - delete(s.state, item.Key) + for _, item := range req.GetStates() { + delete(s.state, item.GetKey()) } return &empty.Empty{}, nil } @@ -328,9 +328,9 @@ func (s *testDaprServer) ExecuteStateTransaction(ctx context.Context, in *pb.Exe item := op.GetRequest() switch opType := op.GetOperationType(); opType { case "upsert": - s.state[item.Key] = item.Value + s.state[item.GetKey()] = item.GetValue() case "delete": - delete(s.state, item.Key) + delete(s.state, item.GetKey()) default: return &empty.Empty{}, fmt.Errorf("invalid operation type: %s", opType) } @@ -362,14 +362,14 @@ func (s *testDaprServer) PublishEvent(ctx context.Context, req *pb.PublishEventR // It will fail the entire request if an event starts with "failall". func (s *testDaprServer) BulkPublishEventAlpha1(ctx context.Context, req *pb.BulkPublishRequest) (*pb.BulkPublishResponse, error) { failedEntries := make([]*pb.BulkPublishResponseFailedEntry, 0) - for _, entry := range req.Entries { - if bytes.HasPrefix(entry.Event, []byte("failall")) { + for _, entry := range req.GetEntries() { + if bytes.HasPrefix(entry.GetEvent(), []byte("failall")) { // fail the entire request return nil, errors.New("failed to publish events") - } else if bytes.HasPrefix(entry.Event, []byte("fail")) { + } else if bytes.HasPrefix(entry.GetEvent(), []byte("fail")) { // fail this entry failedEntries = append(failedEntries, &pb.BulkPublishResponseFailedEntry{ - EntryId: entry.EntryId, + EntryId: entry.GetEntryId(), Error: "failed to publish events", }) } @@ -378,15 +378,15 @@ func (s *testDaprServer) BulkPublishEventAlpha1(ctx context.Context, req *pb.Bul } func (s *testDaprServer) InvokeBinding(ctx context.Context, req *pb.InvokeBindingRequest) (*pb.InvokeBindingResponse, error) { - if req.Data == nil { + if req.GetData() == nil { return &pb.InvokeBindingResponse{ Data: []byte("test"), Metadata: map[string]string{"k1": "v1", "k2": "v2"}, }, nil } return &pb.InvokeBindingResponse{ - Data: req.Data, - Metadata: req.Metadata, + Data: req.GetData(), + Metadata: req.GetMetadata(), }, nil } @@ -491,12 +491,12 @@ func (s *testDaprServer) SubscribeConfiguration(in *pb.SubscribeConfigurationReq func (s *testDaprServer) UnsubscribeConfiguration(ctx context.Context, in *pb.UnsubscribeConfigurationRequest) (*pb.UnsubscribeConfigurationResponse, error) { s.configurationSubscriptionIDMapLoc.Lock() defer s.configurationSubscriptionIDMapLoc.Unlock() - ch, ok := s.configurationSubscriptionID[in.Id] + ch, ok := s.configurationSubscriptionID[in.GetId()] if !ok { return &pb.UnsubscribeConfigurationResponse{Ok: true}, nil } close(ch) - delete(s.configurationSubscriptionID, in.Id) + delete(s.configurationSubscriptionID, in.GetId()) return &pb.UnsubscribeConfigurationResponse{Ok: true}, nil } diff --git a/client/configuration.go b/client/configuration.go index bbdae219..b5344945 100644 --- a/client/configuration.go +++ b/client/configuration.go @@ -50,11 +50,11 @@ func (c *GRPCClient) GetConfigurationItems(ctx context.Context, storeName string } configItems := make(map[string]*ConfigurationItem) - for k, v := range rsp.Items { + for k, v := range rsp.GetItems() { configItems[k] = &ConfigurationItem{ - Value: v.Value, - Version: v.Version, - Metadata: v.Metadata, + Value: v.GetValue(), + Version: v.GetVersion(), + Metadata: v.GetMetadata(), } } return configItems, nil @@ -88,21 +88,21 @@ func (c *GRPCClient) SubscribeConfigurationItems(ctx context.Context, storeName } configurationItems := make(map[string]*ConfigurationItem) - for k, v := range rsp.Items { + for k, v := range rsp.GetItems() { configurationItems[k] = &ConfigurationItem{ - Value: v.Value, - Version: v.Version, - Metadata: v.Metadata, + Value: v.GetValue(), + Version: v.GetVersion(), + Metadata: v.GetMetadata(), } } // Get the subscription ID from the first response. if isFirst { - subscribeIDChan <- rsp.Id + subscribeIDChan <- rsp.GetId() isFirst = false } // Do not invoke handler in case there are no items. if len(configurationItems) > 0 { - handler(rsp.Id, configurationItems) + handler(rsp.GetId(), configurationItems) } } }() @@ -119,7 +119,7 @@ func (c *GRPCClient) UnsubscribeConfigurationItems(ctx context.Context, storeNam if err != nil { return fmt.Errorf("unsubscribe failed with error = %w", err) } - if !resp.Ok { + if !resp.GetOk() { return fmt.Errorf("unsubscribe error message = %s", resp.GetMessage()) } return nil diff --git a/client/crypto.go b/client/crypto.go index 9afc459a..41ffcf2d 100644 --- a/client/crypto.go +++ b/client/crypto.go @@ -186,13 +186,13 @@ func (c *GRPCClient) performCryptoOperation(ctx context.Context, stream grpc.Cli // Write the data, if any, into the pipe payload = resProto.GetPayload() if payload != nil { - if payload.Seq != expectSeq { - pw.CloseWithError(fmt.Errorf("invalid sequence number in chunk: %d (expected: %d)", payload.Seq, expectSeq)) + if payload.GetSeq() != expectSeq { + pw.CloseWithError(fmt.Errorf("invalid sequence number in chunk: %d (expected: %d)", payload.GetSeq(), expectSeq)) return } expectSeq++ - _, readErr = pw.Write(payload.Data) + _, readErr = pw.Write(payload.GetData()) if readErr != nil { pw.CloseWithError(fmt.Errorf("error writing data: %w", readErr)) return diff --git a/client/crypto_test.go b/client/crypto_test.go index 04b509b0..46956f36 100644 --- a/client/crypto_test.go +++ b/client/crypto_test.go @@ -236,13 +236,13 @@ func (s *testDaprServer) performCryptoOperation(stream grpc.ServerStream, reqPro payload := reqProto.GetPayload() if payload != nil { - if payload.Seq != expectSeq { - pw.CloseWithError(fmt.Errorf("invalid sequence number: %d (expected: %d)", payload.Seq, expectSeq)) + if payload.GetSeq() != expectSeq { + pw.CloseWithError(fmt.Errorf("invalid sequence number: %d (expected: %d)", payload.GetSeq(), expectSeq)) return } expectSeq++ - _, err = pw.Write(payload.Data) + _, err = pw.Write(payload.GetData()) if err != nil { pw.CloseWithError(err) return diff --git a/client/invoke.go b/client/invoke.go index fb9ce1c5..1a4fff3a 100644 --- a/client/invoke.go +++ b/client/invoke.go @@ -46,7 +46,7 @@ func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.Invok // allow for service to not return any value if resp != nil && resp.GetData() != nil { - out = resp.GetData().Value + out = resp.GetData().GetValue() return } diff --git a/client/invoke_test.go b/client/invoke_test.go index 20a34ac6..3a8b769b 100644 --- a/client/invoke_test.go +++ b/client/invoke_test.go @@ -114,27 +114,27 @@ func TestVerbParsing(t *testing.T) { t.Run("valid lower case", func(t *testing.T) { v := queryAndVerbToHTTPExtension("", "post") assert.NotNil(t, v) - assert.Equal(t, v1.HTTPExtension_POST, v.Verb) - assert.Len(t, v.Querystring, 0) + assert.Equal(t, v1.HTTPExtension_POST, v.GetVerb()) + assert.Empty(t, v.GetQuerystring()) }) t.Run("valid upper case", func(t *testing.T) { v := queryAndVerbToHTTPExtension("", "GET") assert.NotNil(t, v) - assert.Equal(t, v1.HTTPExtension_GET, v.Verb) + assert.Equal(t, v1.HTTPExtension_GET, v.GetVerb()) }) t.Run("invalid verb", func(t *testing.T) { v := queryAndVerbToHTTPExtension("", "BAD") assert.NotNil(t, v) - assert.Equal(t, v1.HTTPExtension_NONE, v.Verb) + assert.Equal(t, v1.HTTPExtension_NONE, v.GetVerb()) }) t.Run("valid query", func(t *testing.T) { v := queryAndVerbToHTTPExtension("foo=bar&url=http://dapr.io", "post") assert.NotNil(t, v) - assert.Equal(t, v1.HTTPExtension_POST, v.Verb) - assert.Equal(t, "foo=bar&url=http://dapr.io", v.Querystring) + assert.Equal(t, v1.HTTPExtension_POST, v.GetVerb()) + assert.Equal(t, "foo=bar&url=http://dapr.io", v.GetQuerystring()) }) } diff --git a/client/lock.go b/client/lock.go index 26bc226a..26ca8614 100644 --- a/client/lock.go +++ b/client/lock.go @@ -68,7 +68,7 @@ func (c *GRPCClient) TryLockAlpha1(ctx context.Context, storeName string, reques } return &LockResponse{ - Success: resp.Success, + Success: resp.GetSuccess(), }, nil } @@ -94,7 +94,7 @@ func (c *GRPCClient) UnlockAlpha1(ctx context.Context, storeName string, request } return &UnlockResponse{ - StatusCode: int32(resp.Status), - Status: pb.UnlockResponse_Status_name[int32(resp.Status)], + StatusCode: int32(resp.GetStatus()), + Status: pb.UnlockResponse_Status_name[int32(resp.GetStatus())], }, nil } diff --git a/client/metadata.go b/client/metadata.go index 1d1c46e2..151c66ac 100644 --- a/client/metadata.go +++ b/client/metadata.go @@ -57,48 +57,48 @@ func (c *GRPCClient) GetMetadata(ctx context.Context) (metadata *GetMetadataResp return nil, fmt.Errorf("error invoking service: %w", err) } if resp != nil { - activeActorsCount := make([]*MetadataActiveActorsCount, len(resp.ActiveActorsCount)) - for a := range resp.ActiveActorsCount { - activeActorsCount[a] = &MetadataActiveActorsCount{ - Type: resp.ActiveActorsCount[a].Type, - Count: resp.ActiveActorsCount[a].Count, + activeActorsCount := make([]*MetadataActiveActorsCount, len(resp.GetActiveActorsCount())) + for i, a := range resp.GetActiveActorsCount() { + activeActorsCount[i] = &MetadataActiveActorsCount{ + Type: a.GetType(), + Count: a.GetCount(), } } - registeredComponents := make([]*MetadataRegisteredComponents, len(resp.RegisteredComponents)) - for r := range resp.RegisteredComponents { - registeredComponents[r] = &MetadataRegisteredComponents{ - Name: resp.RegisteredComponents[r].Name, - Type: resp.RegisteredComponents[r].Type, - Version: resp.RegisteredComponents[r].Version, - Capabilities: resp.RegisteredComponents[r].Capabilities, + registeredComponents := make([]*MetadataRegisteredComponents, len(resp.GetRegisteredComponents())) + for i, r := range resp.GetRegisteredComponents() { + registeredComponents[i] = &MetadataRegisteredComponents{ + Name: r.GetName(), + Type: r.GetType(), + Version: r.GetVersion(), + Capabilities: r.GetCapabilities(), } } - subscriptions := make([]*MetadataSubscription, len(resp.Subscriptions)) - for s := range resp.Subscriptions { + subscriptions := make([]*MetadataSubscription, len(resp.GetSubscriptions())) + for i, s := range resp.GetSubscriptions() { rules := &PubsubSubscriptionRules{} - for r := range resp.Subscriptions[s].Rules.Rules { + for _, r := range s.GetRules().GetRules() { rules.Rules = append(rules.Rules, &PubsubSubscriptionRule{ - Match: resp.Subscriptions[s].Rules.Rules[r].Match, - Path: resp.Subscriptions[s].Rules.Rules[r].Path, + Match: r.GetMatch(), + Path: r.GetPath(), }) } - subscriptions[s] = &MetadataSubscription{ - PubsubName: resp.Subscriptions[s].PubsubName, - Topic: resp.Subscriptions[s].Topic, - Metadata: resp.Subscriptions[s].Metadata, + subscriptions[i] = &MetadataSubscription{ + PubsubName: s.GetPubsubName(), + Topic: s.GetTopic(), + Metadata: s.GetMetadata(), Rules: rules, - DeadLetterTopic: resp.Subscriptions[s].DeadLetterTopic, + DeadLetterTopic: s.GetDeadLetterTopic(), } } - httpEndpoints := make([]*MetadataHTTPEndpoint, len(resp.HttpEndpoints)) - for e := range resp.HttpEndpoints { - httpEndpoints[e] = &MetadataHTTPEndpoint{ - Name: resp.HttpEndpoints[e].Name, + httpEndpoints := make([]*MetadataHTTPEndpoint, len(resp.GetHttpEndpoints())) + for i, e := range resp.GetHttpEndpoints() { + httpEndpoints[i] = &MetadataHTTPEndpoint{ + Name: e.GetName(), } } metadata = &GetMetadataResponse{ - ID: resp.Id, + ID: resp.GetId(), ActiveActorsCount: activeActorsCount, RegisteredComponents: registeredComponents, ExtendedMetadata: resp.GetExtendedMetadata(), diff --git a/client/pubsub.go b/client/pubsub.go index 6e5a83d2..3996c32e 100644 --- a/client/pubsub.go +++ b/client/pubsub.go @@ -91,7 +91,7 @@ func PublishEventWithMetadata(metadata map[string]string) PublishEventOption { // PublishEventWithRawPayload can be passed as option to PublishEvent to set rawPayload metadata. func PublishEventWithRawPayload() PublishEventOption { return func(e *pb.PublishEventRequest) { - if e.Metadata == nil { + if e.GetMetadata() == nil { e.Metadata = map[string]string{rawPayload: trueValue} } else { e.Metadata[rawPayload] = trueValue @@ -156,7 +156,7 @@ func (c *GRPCClient) PublishEvents(ctx context.Context, pubsubName, topicName st failedEvents = append(failedEvents, event) continue } - eventMap[entry.EntryId] = event + eventMap[entry.GetEntryId()] = event entries = append(entries, entry) } @@ -178,11 +178,11 @@ func (c *GRPCClient) PublishEvents(ctx context.Context, pubsubName, topicName st } } - for _, failedEntry := range res.FailedEntries { - event, ok := eventMap[failedEntry.EntryId] + for _, failedEntry := range res.GetFailedEntries() { + event, ok := eventMap[failedEntry.GetEntryId()] if !ok { // This should never happen. - failedEvents = append(failedEvents, failedEntry.EntryId) + failedEvents = append(failedEvents, failedEntry.GetEntryId()) } failedEvents = append(failedEvents, event) } @@ -224,12 +224,12 @@ func createBulkPublishRequestEntry(data interface{}) (*pb.BulkPublishRequestEntr return &pb.BulkPublishRequestEntry{}, fmt.Errorf("error serializing input struct: %w", err) } - if isCloudEvent(entry.Event) { + if isCloudEvent(entry.GetEvent()) { entry.ContentType = "application/cloudevents+json" } } - if entry.EntryId == "" { + if entry.GetEntryId() == "" { entry.EntryId = uuid.New().String() } @@ -239,7 +239,7 @@ func createBulkPublishRequestEntry(data interface{}) (*pb.BulkPublishRequestEntr // PublishEventsWithContentType can be passed as option to PublishEvents to explicitly set the same Content-Type for all events. func PublishEventsWithContentType(contentType string) PublishEventsOption { return func(r *pb.BulkPublishRequest) { - for _, entry := range r.Entries { + for _, entry := range r.GetEntries() { entry.ContentType = contentType } } @@ -255,7 +255,7 @@ func PublishEventsWithMetadata(metadata map[string]string) PublishEventsOption { // PublishEventsWithRawPayload can be passed as option to PublishEvents to set rawPayload request metadata. func PublishEventsWithRawPayload() PublishEventsOption { return func(r *pb.BulkPublishRequest) { - if r.Metadata == nil { + if r.GetMetadata() == nil { r.Metadata = map[string]string{rawPayload: trueValue} } else { r.Metadata[rawPayload] = trueValue diff --git a/client/pubsub_test.go b/client/pubsub_test.go index c125ab8b..56db1357 100644 --- a/client/pubsub_test.go +++ b/client/pubsub_test.go @@ -280,8 +280,8 @@ func TestCreateBulkPublishRequestEntry(t *testing.T) { require.Error(t, err) } else { require.NoError(t, err) - assert.Equal(t, tc.expectedEvent, entry.Event) - assert.Equal(t, tc.expectedContentType, entry.ContentType) + assert.Equal(t, tc.expectedEvent, entry.GetEvent()) + assert.Equal(t, tc.expectedContentType, entry.GetContentType()) } }) } @@ -295,8 +295,8 @@ func TestCreateBulkPublishRequestEntry(t *testing.T) { Metadata: map[string]string{"key": "value"}, }) require.NoError(t, err) - assert.Equal(t, "123", entry.EntryId) - assert.Equal(t, map[string]string{"key": "value"}, entry.Metadata) + assert.Equal(t, "123", entry.GetEntryId()) + assert.Equal(t, map[string]string{"key": "value"}, entry.GetMetadata()) }) t.Run("should set random uuid as entryID when not provided", func(t *testing.T) { @@ -321,10 +321,10 @@ func TestCreateBulkPublishRequestEntry(t *testing.T) { t.Run(tc.name, func(t *testing.T) { entry, err := createBulkPublishRequestEntry(tc.data) require.NoError(t, err) - assert.NotEmpty(t, entry.EntryId) - assert.Nil(t, entry.Metadata) + assert.NotEmpty(t, entry.GetEntryId()) + assert.Nil(t, entry.GetMetadata()) - _, err = uuid.Parse(entry.EntryId) + _, err = uuid.Parse(entry.GetEntryId()) require.NoError(t, err) }) } diff --git a/client/secret.go b/client/secret.go index dae2c63c..d5a90814 100644 --- a/client/secret.go +++ b/client/secret.go @@ -67,10 +67,10 @@ func (c *GRPCClient) GetBulkSecret(ctx context.Context, storeName string, meta m if resp != nil { data = map[string]map[string]string{} - for secretName, secretResponse := range resp.Data { + for secretName, secretResponse := range resp.GetData() { data[secretName] = map[string]string{} - for k, v := range secretResponse.Secrets { + for k, v := range secretResponse.GetSecrets() { data[secretName][k] = v } } diff --git a/client/state.go b/client/state.go index b2ed44c1..5d142210 100644 --- a/client/state.go +++ b/client/state.go @@ -317,7 +317,7 @@ func (c *GRPCClient) SaveBulkState(ctx context.Context, storeName string, items for _, si := range items { item := toProtoSaveStateItem(si) - req.States = append(req.States, item) + req.States = append(req.GetStates(), item) } _, err := c.protoClient.SaveState(c.withAuthToken(ctx), req) @@ -349,17 +349,17 @@ func (c *GRPCClient) GetBulkState(ctx context.Context, storeName string, keys [] return nil, fmt.Errorf("error getting state: %w", err) } - if results == nil || results.Items == nil { + if results == nil || results.GetItems() == nil { return items, nil } - for _, r := range results.Items { + for _, r := range results.GetItems() { item := &BulkStateItem{ - Key: r.Key, - Etag: r.Etag, - Value: r.Data, - Metadata: r.Metadata, - Error: r.Error, + Key: r.GetKey(), + Etag: r.GetEtag(), + Value: r.GetData(), + Metadata: r.GetMetadata(), + Error: r.GetError(), } items = append(items, item) } @@ -391,10 +391,10 @@ func (c *GRPCClient) GetStateWithConsistency(ctx context.Context, storeName, key } return &StateItem{ - Etag: result.Etag, + Etag: result.GetEtag(), Key: key, - Value: result.Data, - Metadata: result.Metadata, + Value: result.GetData(), + Metadata: result.GetMetadata(), }, nil } @@ -417,15 +417,15 @@ func (c *GRPCClient) QueryStateAlpha1(ctx context.Context, storeName, query stri } ret := &QueryResponse{ - Results: make([]QueryItem, len(resp.Results)), - Token: resp.Token, - Metadata: resp.Metadata, - } - for i, item := range resp.Results { - ret.Results[i].Key = item.Key - ret.Results[i].Value = item.Data - ret.Results[i].Etag = item.Etag - ret.Results[i].Error = item.Error + Results: make([]QueryItem, len(resp.GetResults())), + Token: resp.GetToken(), + Metadata: resp.GetMetadata(), + } + for i, item := range resp.GetResults() { + ret.Results[i].Key = item.GetKey() + ret.Results[i].Value = item.GetData() + ret.Results[i].Etag = item.GetEtag() + ret.Results[i].Error = item.GetError() } return ret, nil diff --git a/client/state_test.go b/client/state_test.go index ce973bef..c99aec6d 100644 --- a/client/state_test.go +++ b/client/state_test.go @@ -43,7 +43,7 @@ func TestDurationConverter(t *testing.T) { d := 10 * time.Second pd := toProtoDuration(d) assert.NotNil(t, pd) - assert.Equal(t, pd.Seconds, int64(10)) + assert.Equal(t, int64(10), pd.GetSeconds()) } func TestStateOptionsConverter(t *testing.T) { @@ -53,8 +53,8 @@ func TestStateOptionsConverter(t *testing.T) { } p := toProtoStateOptions(s) assert.NotNil(t, p) - assert.Equal(t, p.Concurrency, v1.StateOptions_CONCURRENCY_LAST_WRITE) - assert.Equal(t, p.Consistency, v1.StateOptions_CONSISTENCY_STRONG) + assert.Equal(t, v1.StateOptions_CONCURRENCY_LAST_WRITE, p.GetConcurrency()) + assert.Equal(t, v1.StateOptions_CONSISTENCY_STRONG, p.GetConsistency()) } // go test -timeout 30s ./client -count 1 -run ^TestSaveState$ diff --git a/service/grpc/binding.go b/service/grpc/binding.go index a1ae8a84..ae1efecb 100644 --- a/service/grpc/binding.go +++ b/service/grpc/binding.go @@ -54,19 +54,19 @@ func (s *Server) OnBindingEvent(ctx context.Context, in *pb.BindingEventRequest) if in == nil { return nil, errors.New("nil binding event request") } - if fn, ok := s.bindingHandlers[in.Name]; ok { + if fn, ok := s.bindingHandlers[in.GetName()]; ok { e := &common.BindingEvent{ - Data: in.Data, - Metadata: in.Metadata, + Data: in.GetData(), + Metadata: in.GetMetadata(), } data, err := fn(ctx, e) if err != nil { - return nil, fmt.Errorf("error executing %s binding: %w", in.Name, err) + return nil, fmt.Errorf("error executing %s binding: %w", in.GetName(), err) } return &pb.BindingEventResponse{ Data: data, }, nil } - return nil, fmt.Errorf("binding not implemented: %s", in.Name) + return nil, fmt.Errorf("binding not implemented: %s", in.GetName()) } diff --git a/service/grpc/binding_test.go b/service/grpc/binding_test.go index 20adc90e..b87420ec 100644 --- a/service/grpc/binding_test.go +++ b/service/grpc/binding_test.go @@ -43,7 +43,7 @@ func TestListInputBindings(t *testing.T) { resp, err := server.ListInputBindings(context.Background(), &empty.Empty{}) require.NoError(t, err) assert.NotNil(t, resp) - assert.Lenf(t, resp.Bindings, 2, "expected 2 handlers") + assert.Lenf(t, resp.GetBindings(), 2, "expected 2 handlers") } func TestBindingForErrors(t *testing.T) { @@ -92,7 +92,7 @@ func TestBinding(t *testing.T) { out, err := server.OnBindingEvent(ctx, in) require.NoError(t, err) assert.NotNil(t, out) - assert.Equal(t, data, string(out.Data)) + assert.Equal(t, data, string(out.GetData())) }) t.Run("binding event with metadata", func(t *testing.T) { diff --git a/service/grpc/invoke.go b/service/grpc/invoke.go index b34c5a58..82dfa1a6 100644 --- a/service/grpc/invoke.go +++ b/service/grpc/invoke.go @@ -58,18 +58,18 @@ func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.Invo return nil, errors.New("authentication failed. app token key not exist") } } - if fn, ok := s.invokeHandlers[in.Method]; ok { + if fn, ok := s.invokeHandlers[in.GetMethod()]; ok { e := &cc.InvocationEvent{} - e.ContentType = in.ContentType + e.ContentType = in.GetContentType() - if in.Data != nil { - e.Data = in.Data.Value - e.DataTypeURL = in.Data.TypeUrl + if in.GetData() != nil { + e.Data = in.GetData().GetValue() + e.DataTypeURL = in.GetData().GetTypeUrl() } - if in.HttpExtension != nil { - e.Verb = in.HttpExtension.Verb.String() - e.QueryString = in.HttpExtension.Querystring + if in.GetHttpExtension() != nil { + e.Verb = in.GetHttpExtension().GetVerb().String() + e.QueryString = in.GetHttpExtension().GetQuerystring() } ct, er := fn(ctx, e) @@ -89,5 +89,5 @@ func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.Invo }, }, nil } - return nil, fmt.Errorf("method not implemented: %s", in.Method) + return nil, fmt.Errorf("method not implemented: %s", in.GetMethod()) } diff --git a/service/grpc/invoke_test.go b/service/grpc/invoke_test.go index b02187d7..1365172f 100644 --- a/service/grpc/invoke_test.go +++ b/service/grpc/invoke_test.go @@ -130,8 +130,8 @@ func TestInvoke(t *testing.T) { out, err := server.OnInvoke(ctx, in) require.NoError(t, err) assert.NotNil(t, out) - assert.Equal(t, dataContentType, out.ContentType) - assert.Equal(t, data, string(out.Data.Value)) + assert.Equal(t, dataContentType, out.GetContentType()) + assert.Equal(t, data, string(out.GetData().GetValue())) }) t.Run("invoke request with error", func(t *testing.T) { diff --git a/service/grpc/topic.go b/service/grpc/topic.go index da0c093c..f749de52 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -76,13 +76,13 @@ func convertRoutes(routes *internal.TopicRoutes) *runtimev1pb.TopicRoutes { // OnTopicEvent fired whenever a message has been published to a topic that has been subscribed. // Dapr sends published messages in a CloudEvents v1.0 envelope. func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) { - if in == nil || in.Topic == "" || in.PubsubName == "" { + if in == nil || in.GetTopic() == "" || in.GetPubsubName() == "" { // this is really Dapr issue more than the event request format. // since Dapr will not get updated until long after this event expires, just drop it return &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_DROP}, errors.New("pub/sub and topic names required") } - key := in.PubsubName + "-" + in.Topic - noValidationKey := in.PubsubName + key := in.GetPubsubName() + "-" + in.GetTopic() + noValidationKey := in.GetPubsubName() var sub *internal.TopicRegistration var ok bool @@ -93,23 +93,23 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq } if ok { - data := interface{}(in.Data) - if len(in.Data) > 0 { - mediaType, _, err := mime.ParseMediaType(in.DataContentType) + data := interface{}(in.GetData()) + if len(in.GetData()) > 0 { + mediaType, _, err := mime.ParseMediaType(in.GetDataContentType()) if err == nil { var v interface{} switch mediaType { case "application/json": - if err := json.Unmarshal(in.Data, &v); err == nil { + if err := json.Unmarshal(in.GetData(), &v); err == nil { data = v } case "text/plain": // Assume UTF-8 encoded string. - data = string(in.Data) + data = string(in.GetData()) default: if strings.HasPrefix(mediaType, "application/") && strings.HasSuffix(mediaType, "+json") { - if err := json.Unmarshal(in.Data, &v); err == nil { + if err := json.Unmarshal(in.GetData(), &v); err == nil { data = v } } @@ -118,26 +118,26 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq } e := &common.TopicEvent{ - ID: in.Id, - Source: in.Source, - Type: in.Type, - SpecVersion: in.SpecVersion, - DataContentType: in.DataContentType, + ID: in.GetId(), + Source: in.GetSource(), + Type: in.GetType(), + SpecVersion: in.GetSpecVersion(), + DataContentType: in.GetDataContentType(), Data: data, - RawData: in.Data, - Topic: in.Topic, - PubsubName: in.PubsubName, + RawData: in.GetData(), + Topic: in.GetTopic(), + PubsubName: in.GetPubsubName(), } h := sub.DefaultHandler - if in.Path != "" { - if pathHandler, ok := sub.RouteHandlers[in.Path]; ok { + if in.GetPath() != "" { + if pathHandler, ok := sub.RouteHandlers[in.GetPath()]; ok { h = pathHandler } } if h == nil { return &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_RETRY}, fmt.Errorf( "route %s for pub/sub and topic combination not configured: %s/%s", - in.Path, in.PubsubName, in.Topic, + in.GetPath(), in.GetPubsubName(), in.GetTopic(), ) } retry, err := h(ctx, e) @@ -151,6 +151,6 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq } return &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_RETRY}, fmt.Errorf( "pub/sub and topic combination not configured: %s/%s", - in.PubsubName, in.Topic, + in.GetPubsubName(), in.GetTopic(), ) } diff --git a/service/grpc/topic_test.go b/service/grpc/topic_test.go index 78a78514..3c66c6c0 100644 --- a/service/grpc/topic_test.go +++ b/service/grpc/topic_test.go @@ -59,11 +59,11 @@ func TestTopicSubscriptionList(t *testing.T) { resp, err := server.ListTopicSubscriptions(context.Background(), &empty.Empty{}) require.NoError(t, err) assert.NotNil(t, resp) - if assert.Lenf(t, resp.Subscriptions, 1, "expected 1 handlers") { - sub := resp.Subscriptions[0] - assert.Equal(t, "messages", sub.PubsubName) - assert.Equal(t, "test", sub.Topic) - assert.Nil(t, sub.Routes) + if assert.Lenf(t, resp.GetSubscriptions(), 1, "expected 1 handlers") { + sub := resp.GetSubscriptions()[0] + assert.Equal(t, "messages", sub.GetPubsubName()) + assert.Equal(t, "test", sub.GetTopic()) + assert.Nil(t, sub.GetRoutes()) } // Add routing rule. @@ -78,16 +78,16 @@ func TestTopicSubscriptionList(t *testing.T) { resp, err = server.ListTopicSubscriptions(context.Background(), &empty.Empty{}) require.NoError(t, err) assert.NotNil(t, resp) - if assert.Lenf(t, resp.Subscriptions, 1, "expected 1 handlers") { - sub := resp.Subscriptions[0] - assert.Equal(t, "messages", sub.PubsubName) - assert.Equal(t, "test", sub.Topic) - if assert.NotNil(t, sub.Routes) { - assert.Equal(t, "/test", sub.Routes.Default) - if assert.Len(t, sub.Routes.Rules, 1) { - rule := sub.Routes.Rules[0] - assert.Equal(t, "/other", rule.Path) - assert.Equal(t, `event.type == "other"`, rule.Match) + if assert.Lenf(t, resp.GetSubscriptions(), 1, "expected 1 handlers") { + sub := resp.GetSubscriptions()[0] + assert.Equal(t, "messages", sub.GetPubsubName()) + assert.Equal(t, "test", sub.GetTopic()) + if assert.NotNil(t, sub.GetRoutes()) { + assert.Equal(t, "/test", sub.GetRoutes().GetDefault()) + if assert.Len(t, sub.GetRoutes().GetRules(), 1) { + rule := sub.GetRoutes().GetRules()[0] + assert.Equal(t, "/other", rule.GetPath()) + assert.Equal(t, `event.type == "other"`, rule.GetMatch()) } } }