Skip to content

Commit

Permalink
Add PubSub event metadata (#490)
Browse files Browse the repository at this point in the history
* Map event metadata

Signed-off-by: Joni Collinge <[email protected]>

* Feedback

Signed-off-by: Joni Collinge <[email protected]>

* Lint

Signed-off-by: Joni Collinge <[email protected]>

---------

Signed-off-by: Joni Collinge <[email protected]>
  • Loading branch information
jjcollinge authored Jan 8, 2024
1 parent 61158e8 commit c8f3533
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 17 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
Expand Down
2 changes: 2 additions & 0 deletions service/common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type TopicEvent struct {
Topic string `json:"topic"`
// PubsubName is name of the pub/sub this message came from
PubsubName string `json:"pubsubname"`
// Metadata is the custom metadata attached to the event.
Metadata map[string]string `json:"metadata,omitempty"`
}

func (e *TopicEvent) Struct(target interface{}) error {
Expand Down
15 changes: 15 additions & 0 deletions service/grpc/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/metadata"

runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/service/common"
Expand Down Expand Up @@ -127,6 +128,7 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq
RawData: in.GetData(),
Topic: in.GetTopic(),
PubsubName: in.GetPubsubName(),
Metadata: getCustomMetadataFromContext(ctx),
}
h := sub.DefaultHandler
if in.GetPath() != "" {
Expand Down Expand Up @@ -154,3 +156,16 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq
in.GetPubsubName(), in.GetTopic(),
)
}

func getCustomMetadataFromContext(ctx context.Context) map[string]string {
md := make(map[string]string)
meta, ok := metadata.FromIncomingContext(ctx)
if ok {
for k, v := range meta {
if strings.HasPrefix(strings.ToLower(k), "metadata.") {
md[k[9:]] = v[0]
}
}
}
return md
}
27 changes: 27 additions & 0 deletions service/grpc/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"

"github.com/golang/protobuf/ptypes/empty"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -136,6 +137,32 @@ func TestTopic(t *testing.T) {
require.NoError(t, err)
})

t.Run("topic event for valid topic with metadata", func(t *testing.T) {
sub2 := &common.Subscription{
PubsubName: "messages",
Topic: "test2",
}
err := server.AddTopicEventHandler(sub2, func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
assert.Equal(t, "value1", e.Metadata["key1"])
return false, nil
})
require.NoError(t, err)

in := &runtime.TopicEventRequest{
Id: "a123",
Source: "test",
Type: "test",
SpecVersion: "v1.0",
DataContentType: "text/plain",
Data: []byte("test"),
Topic: sub2.Topic,
PubsubName: sub2.PubsubName,
}
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"Metadata.key1": "value1"}))
_, err = server.OnTopicEvent(ctx, in)
require.NoError(t, err)
})

stopTestServer(t, server)
}

Expand Down
12 changes: 12 additions & 0 deletions service/http/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"io"
"net/http"
"strings"

"github.com/go-chi/chi/v5"

Expand Down Expand Up @@ -278,6 +279,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
Subject: in.Subject,
PubsubName: in.PubsubName,
Topic: in.Topic,
Metadata: getCustomMetdataFromHeaders(r),
}

w.Header().Add("Content-Type", "application/json")
Expand All @@ -301,6 +303,16 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
return nil
}

func getCustomMetdataFromHeaders(r *http.Request) map[string]string {
md := make(map[string]string)
for k, v := range r.Header {
if strings.HasPrefix(strings.ToLower(k), "metadata.") {
md[k[9:]] = v[0]
}
}
return md
}

func writeStatus(w http.ResponseWriter, s string) {
status := &common.SubscriptionResponse{Status: s}
if err := json.NewEncoder(w).Encode(status); err != nil {
Expand Down
106 changes: 98 additions & 8 deletions service/http/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func TestEventHandler(t *testing.T) {

func TestEventDataHandling(t *testing.T) {
tests := map[string]struct {
data string
result interface{}
data string
expectedData interface{}
}{
"JSON nested": {
data: `{
Expand All @@ -166,7 +166,7 @@ func TestEventDataHandling(t *testing.T) {
"message":"hello"
}
}`,
result: map[string]interface{}{
expectedData: map[string]interface{}{
"message": "hello",
},
},
Expand All @@ -183,7 +183,7 @@ func TestEventDataHandling(t *testing.T) {
"datacontenttype" : "application/json",
"data" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ=="
}`,
result: map[string]interface{}{
expectedData: map[string]interface{}{
"message": "hello",
},
},
Expand All @@ -200,7 +200,7 @@ func TestEventDataHandling(t *testing.T) {
"datacontenttype" : "application/json",
"data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ=="
}`,
result: map[string]interface{}{
expectedData: map[string]interface{}{
"message": "hello",
},
},
Expand All @@ -217,7 +217,7 @@ func TestEventDataHandling(t *testing.T) {
"datacontenttype" : "application/octet-stream",
"data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ=="
}`,
result: []byte(`{"message":"hello"}`),
expectedData: []byte(`{"message":"hello"}`),
},
"JSON string escaped": {
data: `{
Expand All @@ -232,7 +232,7 @@ func TestEventDataHandling(t *testing.T) {
"datacontenttype" : "application/json",
"data" : "{\"message\":\"hello\"}"
}`,
result: map[string]interface{}{
expectedData: map[string]interface{}{
"message": "hello",
},
},
Expand Down Expand Up @@ -264,7 +264,85 @@ func TestEventDataHandling(t *testing.T) {
t.Run(name, func(t *testing.T) {
makeEventRequest(t, s, "/test", tt.data, http.StatusOK)
<-recv
assert.Equal(t, tt.result, topicEvent.Data)
assert.Equal(t, tt.expectedData, topicEvent.Data)
})
}
}

func TestEventMetadataHandling(t *testing.T) {
tests := map[string]struct {
metadata map[string]string
expectedMetadata map[string]string
}{
"single key-value pair with prefix": {
metadata: map[string]string{
"metadata.key1": "value1",
},
expectedMetadata: map[string]string{
"key1": "value1",
},
},
"multiple key-value pairs with prefix": {
metadata: map[string]string{
"metadata.key1": "value1",
"metadata.key2": "value2",
},
expectedMetadata: map[string]string{
"key1": "value1",
"key2": "value2",
},
},
"some keys with prefix and some without": {
metadata: map[string]string{
"metadata.key1": "value1",
"key2": "value2",
},
expectedMetadata: map[string]string{
"key1": "value1",
},
},
}

s := newServer("", nil)

sub := &common.Subscription{
PubsubName: "messages",
Topic: "test",
Route: "/test",
Metadata: map[string]string{},
}

recv := make(chan struct{}, 1)
var topicEvent *common.TopicEvent
handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
topicEvent = e
recv <- struct{}{}

return false, nil
}
err := s.AddTopicEventHandler(sub, handler)
require.NoErrorf(t, err, "error adding event handler")

s.registerBaseHandler()

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
makeEventRequestWithMetadata(t, s, "/test", `{
"specversion" : "1.0",
"type" : "com.github.pull.create",
"source" : "https://github.com/cloudevents/spec/pull",
"subject" : "123",
"id" : "A234-1234-1234",
"time" : "2018-04-05T17:31:00Z",
"comexampleextension1" : "value",
"comexampleothervalue" : 5,
"datacontenttype" : "application/json",
"data" : {
"message":"hello"
}
}`, http.StatusOK, tt.metadata)
<-recv
assert.Equal(t, tt.expectedMetadata, topicEvent.Metadata)
})
}
}
Expand Down Expand Up @@ -357,6 +435,18 @@ func makeEventRequest(t *testing.T, s *Server, route, data string, expectedStatu
testRequest(t, s, req, expectedStatusCode)
}

func makeEventRequestWithMetadata(t *testing.T, s *Server, route, data string, expectedStatusCode int, metadata map[string]string) {
t.Helper()

req, err := http.NewRequest(http.MethodPost, route, strings.NewReader(data))
require.NoErrorf(t, err, "error creating request: %s", data)
req.Header.Set("Content-Type", "application/json")
for k, v := range metadata {
req.Header.Set(k, v)
}
testRequest(t, s, req, expectedStatusCode)
}

func TestAddingInvalidEventHandlers(t *testing.T) {
s := newServer("", nil)
err := s.AddTopicEventHandler(nil, testTopicFunc)
Expand Down

0 comments on commit c8f3533

Please sign in to comment.