Skip to content

Commit

Permalink
Rework Event.SetData, require data content encoding (#376)
Browse files Browse the repository at this point in the history
* Force data to always be ready to be sent as it rests inside of Event.

Signed-off-by: Scott Nichols <[email protected]>

* Passing tests. Going to look at deleting the binary array.

Signed-off-by: Scott Nichols <[email protected]>

* Dropped Event.DataBinary as a byte field and back to a bool.

Signed-off-by: Scott Nichols <[email protected]>

* switching contenttype and data order in SetData

Signed-off-by: Scott Nichols <[email protected]>
  • Loading branch information
n3wscott authored Mar 12, 2020
1 parent ab17940 commit 7b09386
Show file tree
Hide file tree
Showing 42 changed files with 447 additions and 493 deletions.
3 changes: 2 additions & 1 deletion alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (

ApplicationXML = event.ApplicationXML
ApplicationJSON = event.ApplicationJSON
TextPlain = event.TextPlain
ApplicationCloudEventsJSON = event.ApplicationCloudEventsJSON
ApplicationCloudEventsBatchJSON = event.ApplicationCloudEventsBatchJSON
Base64 = event.Base64
Expand All @@ -64,6 +65,7 @@ var (

StringOfApplicationJSON = event.StringOfApplicationJSON
StringOfApplicationXML = event.StringOfApplicationXML
StringOfTextPlain = event.StringOfTextPlain
StringOfApplicationCloudEventsJSON = event.StringOfApplicationCloudEventsJSON
StringOfApplicationCloudEventsBatchJSON = event.StringOfApplicationCloudEventsBatchJSON
StringOfBase64 = event.StringOfBase64
Expand All @@ -78,7 +80,6 @@ var (
WithEventDefaulter = client.WithEventDefaulter
WithUUIDs = client.WithUUIDs
WithTimeNow = client.WithTimeNow
WithDataContentType = client.WithDataContentType
WithoutTracePropagation = client.WithoutTracePropagation

// Event Creation
Expand Down
8 changes: 5 additions & 3 deletions cmd/samples/amqp/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/client"
"github.com/cloudevents/sdk-go/pkg/event"
ceamqp "github.com/cloudevents/sdk-go/pkg/transport/amqp"
Expand Down Expand Up @@ -58,11 +59,12 @@ type Example struct {
func (d *Demo) Send(eventContext event.EventContext, i int) error {
e := event.Event{
Context: eventContext,
Data: &Example{
}
_ = e.SetData(cloudevents.ApplicationJSON,
&Example{
Sequence: i,
Message: d.Message,
},
}
})
return d.Client.Send(context.Background(), e)
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/samples/complex/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ type Example struct {
func (d *Demo) Send() error {
e := cloudevents.Event{
Context: d.context(),
Data: &Example{
Sequence: seq,
Message: d.Message,
},
}
_ = e.SetData(cloudevents.ApplicationJSON, &Example{
Sequence: seq,
Message: d.Message,
})
seq++
return d.Client.Send(context.Background(), e)
}
Expand Down
15 changes: 7 additions & 8 deletions cmd/samples/http/requester-with-custom-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,15 @@ func _main(args []string, env envConfig) int {
for i := 0; i < count; i++ {
event := cloudevents.Event{
Context: cloudevents.EventContextV03{
ID: uuid.New().String(),
Type: "com.cloudevents.sample.sent",
Source: cloudevents.URIRef{URL: *source},
DataContentType: &dataContentType,
ID: uuid.New().String(),
Type: "com.cloudevents.sample.sent",
Source: cloudevents.URIRef{URL: *source},
}.AsV03(),
Data: &Example{
Sequence: i,
Message: message,
},
}
_ = event.SetData(dataContentType, &Example{
Sequence: i,
Message: message,
})

if resp, err := c.Request(context.Background(), event); err != nil {
log.Printf("failed to send: %v", err)
Expand Down
17 changes: 8 additions & 9 deletions cmd/samples/http/requester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func _main(args []string, env envConfig) int {
}

seq := 0
for _, contentType := range []string{"application/json", "application/xml"} {
for _, contentType := range []string{"application/json", "application/xml", "text/plain"} {
for _, encoding := range []cloudevents.HTTPEncoding{cloudevents.HTTPBinaryEncoding, cloudevents.HTTPStructuredEncoding} {

p, err := cloudevents.NewHTTPProtocol(cloudevents.WithTarget(env.Target))
Expand Down Expand Up @@ -75,16 +75,15 @@ func _main(args []string, env envConfig) int {
for i := 0; i < count; i++ {
event := cloudevents.Event{
Context: cloudevents.EventContextV1{
ID: uuid.New().String(),
Type: "com.cloudevents.sample.sent",
Source: cloudevents.URIRef{URL: *source},
DataContentType: &contentType,
ID: uuid.New().String(),
Type: "com.cloudevents.sample.sent",
Source: cloudevents.URIRef{URL: *source},
}.AsV1(),
Data: &Example{
Sequence: i,
Message: message,
},
}
_ = event.SetData(contentType, &Example{
Sequence: i,
Message: message,
})

if resp, err := c.Request(context.Background(), event); err != nil {
log.Printf("failed to request: %v", err)
Expand Down
8 changes: 4 additions & 4 deletions cmd/samples/http/responder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func gotEvent(ctx context.Context, event cloudevents.Event) (*cloudevents.Event,
Source: *cloudevents.ParseURIRef("/mod3"),
Type: "samples.http.mod3",
}.AsV1(),
Data: Example{
Sequence: data.Sequence,
Message: "mod 3!",
},
}
_ = r.SetData(cloudevents.ApplicationJSON, Example{
Sequence: data.Sequence,
Message: "mod 3!",
})
return &r, nil
}

Expand Down
6 changes: 2 additions & 4 deletions cmd/samples/httpb/requester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"log"

"github.com/cloudevents/sdk-go/pkg/event"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/transport/http"
)
Expand All @@ -23,7 +21,7 @@ func main() {
log.Fatalf("failed to create transport, %v", err)
}

c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs(), cloudevents.WithDataContentType(event.ApplicationJSON))
c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
Expand All @@ -32,7 +30,7 @@ func main() {
e := cloudevents.NewEvent()
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/cmd/samples/httpb/requester")
_ = e.SetData(map[string]interface{}{
_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"id": i,
"message": "Hello, World!",
})
Expand Down
6 changes: 2 additions & 4 deletions cmd/samples/httpb/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"log"

"github.com/cloudevents/sdk-go/pkg/event"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/transport/http"
)
Expand All @@ -23,7 +21,7 @@ func main() {
log.Fatalf("failed to create transport, %v", err)
}

c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs(), cloudevents.WithDataContentType(event.ApplicationJSON))
c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
Expand All @@ -32,7 +30,7 @@ func main() {
e := cloudevents.NewEvent()
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/cmd/samples/httpb/sender")
_ = e.SetData(map[string]interface{}{
_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"id": i,
"message": "Hello, World!",
})
Expand Down
13 changes: 6 additions & 7 deletions cmd/samples/nats/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
cloudevents "github.com/cloudevents/sdk-go"
"log"
"net/url"
"os"
Expand Down Expand Up @@ -53,13 +54,11 @@ type Example struct {
}

func (d *Demo) Send(eventContext event.EventContext, i int) error {
e := event.Event{
Context: eventContext,
Data: &Example{
Sequence: i,
Message: d.Message,
},
}
e := event.Event{Context: eventContext}
_ = e.SetData(cloudevents.ApplicationJSON, &Example{
Sequence: i,
Message: d.Message,
})
return d.Client.Send(context.Background(), e)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/samples/simple/http/requester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
event := cloudevents.NewEvent(version)
event.SetType("com.cloudevents.sample.sent")
event.SetSource("https://github.com/cloudevents/sdk-go/cmd/samples/sender")
event.Data = data
_ = event.SetData(cloudevents.ApplicationJSON, data)

if resp, err := c.Request(ctx, event); err != nil {
log.Printf("failed to send: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/samples/stats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opencensus.io/trace"
"go.opencensus.io/zpages"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/client"
cecontext "github.com/cloudevents/sdk-go/pkg/context"
"github.com/cloudevents/sdk-go/pkg/event"
Expand Down Expand Up @@ -77,8 +78,8 @@ func mainSender() {
Type: "com.cloudevents.sample.sent",
Source: *source,
}.AsV1(),
Data: data,
}
_ = e.SetData(cloudevents.ApplicationJSON, data)

if resp, err := c.Request(ctx, e); err != nil {
log.Printf("failed to send: %v", err)
Expand Down
6 changes: 2 additions & 4 deletions pkg/binding/buffering/acks_before_finish_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import (

func TestWithAcksBeforeFinish(t *testing.T) {
var testEvent = event.Event{
Data: []byte(`"data"`),
DataEncoded: true,
DataEncoded: []byte(`"data"`),
Context: event.EventContextV1{
DataContentType: event.StringOfApplicationJSON(),
Source: types.URIRef{URL: url.URL{Path: "source"}},
Expand Down Expand Up @@ -48,8 +47,7 @@ func TestWithAcksBeforeFinish(t *testing.T) {

func TestCopyAndWithAcksBeforeFinish(t *testing.T) {
var testEvent = event.Event{
Data: []byte(`"data"`),
DataEncoded: true,
DataEncoded: []byte(`"data"`),
Context: event.EventContextV1{
DataContentType: event.StringOfApplicationJSON(),
Source: types.URIRef{URL: url.URL{Path: "source"}},
Expand Down
5 changes: 1 addition & 4 deletions pkg/binding/event_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ func (m *EventMessage) ReadBinary(ctx context.Context, b BinaryWriter) (err erro
return err
}
// Pass the body
body, err := (*event.Event)(m).DataBytes()
if err != nil {
return err
}
body := (*event.Event)(m).Data()
if len(body) > 0 {
err = b.SetData(bytes.NewReader(body))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/binding/example_using_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func runSender(w io.Writer) error {
e.SetType("example.com/event")
e.SetSource("example.com/source")
e.SetID(strconv.Itoa(i))
if err := e.SetData(fmt.Sprintf("hello %d", i)); err != nil {
if err := e.SetData(event.TextJSON, fmt.Sprintf("hello %d", i)); err != nil {
return err
}
if err := c.Send(context.TODO(), e); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/binding/finish_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import (

func TestWithFinish(t *testing.T) {
var testEvent = event.Event{
Data: []byte(`"data"`),
DataEncoded: true,
DataEncoded: []byte(`"data"`),
Context: event.EventContextV1{
DataContentType: event.StringOfApplicationJSON(),
Source: types.URIRef{URL: url.URL{Path: "source"}},
Expand Down
19 changes: 11 additions & 8 deletions pkg/binding/format/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ func TestJSON(t *testing.T) {
}.AsV03(),
}
e.SetExtension("ex", "val")
assert.NoError(e.SetData("foo"))
assert.NoError(e.SetData(event.ApplicationJSON, "foo"))
b, err := format.JSON.Marshal(&e)
assert.NoError(err)
assert.Equal(`{"data":"foo","ex":"val","id":"id","source":"source","specversion":"0.3","type":"type"}`, string(b))
assert.Equal(`{"data":"foo","datacontenttype":"application/json","ex":"val","id":"id","source":"source","specversion":"0.3","type":"type"}`, string(b))

var e2 event.Event
assert.NoError(format.JSON.Unmarshal(b, &e2))
Expand All @@ -48,10 +48,10 @@ func TestMarshalUnmarshal(t *testing.T) {
Source: *types.ParseURIRef("source"),
}.AsV03(),
}
assert.NoError(e.SetData("foo"))
assert.NoError(e.SetData(event.ApplicationJSON, "foo"))
b, err := format.Marshal(format.JSON.MediaType(), &e)
assert.NoError(err)
assert.Equal(`{"data":"foo","id":"id","source":"source","specversion":"0.3","type":"type"}`, string(b))
assert.Equal(`{"data":"foo","datacontenttype":"application/json","id":"id","source":"source","specversion":"0.3","type":"type"}`, string(b))

var e2 event.Event
assert.NoError(format.Unmarshal(format.JSON.MediaType(), b, &e2))
Expand All @@ -65,9 +65,12 @@ func TestMarshalUnmarshal(t *testing.T) {

type dummyFormat struct{}

func (dummyFormat) MediaType() string { return "dummy" }
func (dummyFormat) Marshal(*event.Event) ([]byte, error) { return []byte("dummy!"), nil }
func (dummyFormat) Unmarshal(b []byte, e *event.Event) error { e.Data = "undummy!"; return nil }
func (dummyFormat) MediaType() string { return "dummy" }
func (dummyFormat) Marshal(*event.Event) ([]byte, error) { return []byte("dummy!"), nil }
func (dummyFormat) Unmarshal(b []byte, e *event.Event) error {
e.DataEncoded = []byte("undummy!")
return nil
}

func TestAdd(t *testing.T) {
assert := assert.New(t)
Expand All @@ -80,5 +83,5 @@ func TestAdd(t *testing.T) {
assert.Equal("dummy!", string(b))
err = format.Unmarshal("dummy", b, &e)
assert.NoError(err)
assert.Equal("undummy!", e.Data)
assert.Equal([]byte("undummy!"), e.Data())
}
15 changes: 7 additions & 8 deletions pkg/binding/test/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ var (
func FullEvent() event.Event {
e := event.Event{
Context: event.EventContextV1{
Type: "com.example.FullEvent",
Source: Source,
ID: "full-event",
Time: &Timestamp,
DataSchema: &Schema,
DataContentType: strptr("text/json"),
Subject: strptr("topic"),
Type: "com.example.FullEvent",
Source: Source,
ID: "full-event",
Time: &Timestamp,
DataSchema: &Schema,
Subject: strptr("topic"),
}.AsV1(),
}

Expand All @@ -41,7 +40,7 @@ func FullEvent() event.Event {
e.SetExtension("exurl", Source)
e.SetExtension("extime", Timestamp)

if err := e.SetData("hello"); err != nil {
if err := e.SetData("text/json", "hello"); err != nil {
panic(err)
}
return e
Expand Down
6 changes: 1 addition & 5 deletions pkg/binding/test/mock_binary_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ func MustCreateMockBinaryMessage(e event.Event) binding.Message {
m.Extensions[k] = v
}

var err error
m.Body, err = e.DataBytes()
if err != nil {
panic(err)
}
m.Body = e.Data()

return &m
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/binding/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ func AssertEventContextEquals(t *testing.T, want event.EventContext, have event.
// Assert two event.Event are equals
func AssertEventEquals(t *testing.T, want event.Event, have event.Event) {
AssertEventContextEquals(t, want.Context, have.Context)
wantPayload, err := want.DataBytes()
assert.NoError(t, err)
havePayload, err := have.DataBytes()
assert.NoError(t, err)
wantPayload := want.Data()
havePayload := have.Data()
assert.Equal(t, wantPayload, havePayload)
}

Expand Down
Loading

0 comments on commit 7b09386

Please sign in to comment.