Skip to content

Commit

Permalink
Merge pull request #117 from hyperledger/custom-dispatchers
Browse files Browse the repository at this point in the history
Support custom dispatchers for event streams
  • Loading branch information
peterbroadhurst authored Jan 16, 2024
2 parents da45c46 + c540322 commit ba21054
Show file tree
Hide file tree
Showing 28 changed files with 571 additions and 240 deletions.
4 changes: 2 additions & 2 deletions examples/ffpubsub.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -162,7 +162,7 @@ func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) {
log.L(ctx).Infof("Running on: %s", u)

p := eventstreams.NewEventStreamPersistence[pubSubConfig](sql, dbsql.UUIDValidator)
c := eventstreams.GenerateConfig(ctx)
c := eventstreams.GenerateConfig[pubSubConfig, pubSubMessage](ctx)
ims := &inMemoryStream{
messages: []string{},
newMessages: *sync.NewCond(new(sync.Mutex)),
Expand Down
22 changes: 13 additions & 9 deletions pkg/dbsql/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ import (

type TestCRUDable struct {
ResourceBase
NS *string `json:"namespace"`
Name *string `json:"name"`
Field1 *string `json:"f1"`
Field2 *fftypes.FFBigInt `json:"f2"`
Field3 *fftypes.JSONAny `json:"f3"`
Field4 *int64 `json:"f4"`
Field5 *bool `json:"f5"`
NS *string `json:"namespace"`
Name *string `json:"name"`
Field1 *string `json:"f1"`
Field2 *fftypes.FFBigInt `json:"f2"`
Field3 *fftypes.JSONAny `json:"f3"`
Field4 *int64 `json:"f4"`
Field5 *bool `json:"f5"`
Field6 *fftypes.FFDuration `json:"f6"`
}

var CRUDableQueryFactory = &ffapi.QueryFields{
Expand All @@ -55,8 +56,8 @@ var CRUDableQueryFactory = &ffapi.QueryFields{
"f2": &ffapi.BigIntField{},
"f3": &ffapi.JSONField{},
"f4": &ffapi.Int64Field{},
"f5": &ffapi.JSONField{},
"f6": &ffapi.BoolField{},
"f5": &ffapi.BoolField{},
"f6": &ffapi.Int64Field{},
}

// TestHistory shows a simple object:
Expand Down Expand Up @@ -177,6 +178,7 @@ func newCRUDCollection(db *Database, ns string) *TestCRUD {
"field3",
"field4",
"field5",
"field6",
},
FilterFieldMap: map[string]string{
"f1": "field1",
Expand Down Expand Up @@ -212,6 +214,8 @@ func newCRUDCollection(db *Database, ns string) *TestCRUD {
return &inst.Field4
case "field5":
return &inst.Field5
case "field6":
return &inst.Field6
}
return nil
},
Expand Down
22 changes: 18 additions & 4 deletions pkg/eventstreams/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,6 +18,7 @@ package eventstreams

import (
"context"
"crypto/tls"

"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/ffresty"
Expand All @@ -26,12 +27,25 @@ import (
"github.com/hyperledger/firefly-common/pkg/retry"
)

type Config struct {
// DispatcherFactory is the interface to plug in a custom dispatcher, for example to provide
// local in-process processing of events (in addition to remote WebSocket/Webhook consumption).
// Generics:
// - CT is the Configuration Type - the custom extensions to the configuration schema
// - DT is the Data Type - the payload type that will be delivered to the application
type DispatcherFactory[CT any, DT any] interface {
Validate(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT], tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error
NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT]
}

type Config[CT any, DT any] struct {
TLSConfigs map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"`
Retry *retry.Retry `ffstruct:"EventStreamConfig" json:"retry,omitempty"`
DisablePrivateIPs bool `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"`
Checkpoints CheckpointsTuningConfig `ffstruct:"EventStreamConfig" json:"checkpoints"`
Defaults EventStreamDefaults `ffstruct:"EventStreamConfig" json:"defaults,omitempty"`

// Allow plugging in additional types (important that the embedding code adds the FFEnum doc entry for the EventStreamType)
AdditionalDispatchers map[EventStreamType]DispatcherFactory[CT, DT] `json:"-"`
}

type CheckpointsTuningConfig struct {
Expand Down Expand Up @@ -121,15 +135,15 @@ func InitConfig(conf config.Section) {

// Optional function to generate config directly from YAML configuration using the config package.
// You can also generate the configuration programmatically
func GenerateConfig(ctx context.Context) *Config {
func GenerateConfig[CT any, DT any](ctx context.Context) *Config[CT, DT] {
httpDefaults, _ := ffresty.GenerateConfig(ctx, WebhookDefaultsConfig)
tlsConfigs := map[string]*fftls.Config{}
for i := 0; i < TLSConfigs.ArraySize(); i++ {
tlsConf := TLSConfigs.ArrayEntry(i)
name := tlsConf.GetString(ConfigTLSConfigName)
tlsConfigs[name] = fftls.GenerateConfig(tlsConf.SubSection("tls"))
}
return &Config{
return &Config[CT, DT]{
TLSConfigs: tlsConfigs,
DisablePrivateIPs: RootConfig.GetBool(ConfigDisablePrivateIPs),
Checkpoints: CheckpointsTuningConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventstreams/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestGenerateConfigTLS(t *testing.T) {
tls0.Set(ConfigTLSConfigName, "tls0")
tls0.SubSection("tls").Set(fftls.HTTPConfTLSCAFile, t.TempDir())

c := GenerateConfig(context.Background())
c := GenerateConfig[testESConfig, testData](context.Background())
assert.True(t, c.TLSConfigs["tls0"].Enabled)

}
35 changes: 14 additions & 21 deletions pkg/eventstreams/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,17 @@ func TestE2E_DeliveryWebSockets(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebSocket,
BatchSize: ptrTo(10),
Config: &testESConfig{Config1: "1111"},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand Down Expand Up @@ -162,18 +161,17 @@ func TestE2E_DeliveryWebSocketsNack(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebSocket,
BatchSize: ptrTo(10),
Config: &testESConfig{Config1: "1111"},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand Down Expand Up @@ -208,18 +206,17 @@ func TestE2E_WebsocketDeliveryRestartReset(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebSocket,
BatchSize: ptrTo(10),
Config: &testESConfig{Config1: "1111"},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand Down Expand Up @@ -269,7 +266,7 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

got100 := make(chan struct{})
Expand Down Expand Up @@ -302,7 +299,6 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) {

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebhook,
BatchSize: ptrTo(10),
Expand All @@ -315,7 +311,7 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) {
},
},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand All @@ -340,7 +336,7 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

gotFiveTimes := make(chan struct{})
Expand Down Expand Up @@ -372,7 +368,6 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) {

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebhook,
BatchSize: ptrTo(10),
Expand All @@ -385,7 +380,7 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) {
},
},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand All @@ -410,33 +405,31 @@ func TestE2E_CRUDLifecycle(t *testing.T) {
started: make(chan struct{}), // we never start it
}

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

// Create first event stream started
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic1"), // only one of the topics
Type: &EventStreamTypeWebSocket,
Config: &testESConfig{
Config1: "confValue1",
},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

// Create second event stream stopped
es2 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream2"),
TopicFilter: ptrTo("topic2"), // only one of the topics
Type: &EventStreamTypeWebSocket,
Status: &EventStreamStatusStopped,
Config: &testESConfig{
Config1: "confValue2",
},
}
created, err = mgr.UpsertStream(ctx, es2)
created, err = mgr.UpsertStream(ctx, "stream2", es2)
assert.NoError(t, err)
assert.True(t, created)

Expand All @@ -458,7 +451,7 @@ func TestE2E_CRUDLifecycle(t *testing.T) {

// Rename second event stream
es2.Name = ptrTo("stream2a")
created, err = mgr.UpsertStream(ctx, es2)
created, err = mgr.UpsertStream(ctx, "" /* ID is in es2 object */, es2)
assert.NoError(t, err)
assert.False(t, created)

Expand Down
Loading

0 comments on commit ba21054

Please sign in to comment.