Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom dispatchers for event streams #117

Merged
merged 17 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/ffpubsub.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -162,7 +162,7 @@ func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) {
log.L(ctx).Infof("Running on: %s", u)

p := eventstreams.NewEventStreamPersistence[pubSubConfig](sql, dbsql.UUIDValidator)
c := eventstreams.GenerateConfig(ctx)
c := eventstreams.GenerateConfig[pubSubConfig, pubSubMessage](ctx)
ims := &inMemoryStream{
messages: []string{},
newMessages: *sync.NewCond(new(sync.Mutex)),
Expand Down
22 changes: 13 additions & 9 deletions pkg/dbsql/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ import (

type TestCRUDable struct {
ResourceBase
NS *string `json:"namespace"`
Name *string `json:"name"`
Field1 *string `json:"f1"`
Field2 *fftypes.FFBigInt `json:"f2"`
Field3 *fftypes.JSONAny `json:"f3"`
Field4 *int64 `json:"f4"`
Field5 *bool `json:"f5"`
NS *string `json:"namespace"`
Name *string `json:"name"`
Field1 *string `json:"f1"`
Field2 *fftypes.FFBigInt `json:"f2"`
Field3 *fftypes.JSONAny `json:"f3"`
Field4 *int64 `json:"f4"`
Field5 *bool `json:"f5"`
Field6 *fftypes.FFDuration `json:"f6"`
}

var CRUDableQueryFactory = &ffapi.QueryFields{
Expand All @@ -55,8 +56,8 @@ var CRUDableQueryFactory = &ffapi.QueryFields{
"f2": &ffapi.BigIntField{},
"f3": &ffapi.JSONField{},
"f4": &ffapi.Int64Field{},
"f5": &ffapi.JSONField{},
"f6": &ffapi.BoolField{},
"f5": &ffapi.BoolField{},
"f6": &ffapi.Int64Field{},
}

// TestHistory shows a simple object:
Expand Down Expand Up @@ -177,6 +178,7 @@ func newCRUDCollection(db *Database, ns string) *TestCRUD {
"field3",
"field4",
"field5",
"field6",
},
FilterFieldMap: map[string]string{
"f1": "field1",
Expand Down Expand Up @@ -212,6 +214,8 @@ func newCRUDCollection(db *Database, ns string) *TestCRUD {
return &inst.Field4
case "field5":
return &inst.Field5
case "field6":
return &inst.Field6
}
return nil
},
Expand Down
17 changes: 13 additions & 4 deletions pkg/eventstreams/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,6 +18,7 @@ package eventstreams

import (
"context"
"crypto/tls"

"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/ffresty"
Expand All @@ -26,12 +27,20 @@ import (
"github.com/hyperledger/firefly-common/pkg/retry"
)

type Config struct {
type DispatcherFactory[CT any, DT any] interface {
peterbroadhurst marked this conversation as resolved.
Show resolved Hide resolved
Validate(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT], tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error
NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT]
}

type Config[CT any, DT any] struct {
TLSConfigs map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"`
Retry *retry.Retry `ffstruct:"EventStreamConfig" json:"retry,omitempty"`
DisablePrivateIPs bool `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"`
Checkpoints CheckpointsTuningConfig `ffstruct:"EventStreamConfig" json:"checkpoints"`
Defaults EventStreamDefaults `ffstruct:"EventStreamConfig" json:"defaults,omitempty"`

// Allow plugging in additional types (important that the embedding code adds the FFEnum doc entry for the EventStreamType)
AdditionalDispatchers map[EventStreamType]DispatcherFactory[CT, DT] `json:"-"`
}

type CheckpointsTuningConfig struct {
Expand Down Expand Up @@ -121,15 +130,15 @@ func InitConfig(conf config.Section) {

// Optional function to generate config directly from YAML configuration using the config package.
// You can also generate the configuration programmatically
func GenerateConfig(ctx context.Context) *Config {
func GenerateConfig[CT any, DT any](ctx context.Context) *Config[CT, DT] {
httpDefaults, _ := ffresty.GenerateConfig(ctx, WebhookDefaultsConfig)
tlsConfigs := map[string]*fftls.Config{}
for i := 0; i < TLSConfigs.ArraySize(); i++ {
tlsConf := TLSConfigs.ArrayEntry(i)
name := tlsConf.GetString(ConfigTLSConfigName)
tlsConfigs[name] = fftls.GenerateConfig(tlsConf.SubSection("tls"))
}
return &Config{
return &Config[CT, DT]{
TLSConfigs: tlsConfigs,
DisablePrivateIPs: RootConfig.GetBool(ConfigDisablePrivateIPs),
Checkpoints: CheckpointsTuningConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventstreams/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestGenerateConfigTLS(t *testing.T) {
tls0.Set(ConfigTLSConfigName, "tls0")
tls0.SubSection("tls").Set(fftls.HTTPConfTLSCAFile, t.TempDir())

c := GenerateConfig(context.Background())
c := GenerateConfig[testESConfig, testData](context.Background())
assert.True(t, c.TLSConfigs["tls0"].Enabled)

}
35 changes: 14 additions & 21 deletions pkg/eventstreams/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,17 @@ func TestE2E_DeliveryWebSockets(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebSocket,
BatchSize: ptrTo(10),
Config: &testESConfig{Config1: "1111"},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand Down Expand Up @@ -162,18 +161,17 @@ func TestE2E_DeliveryWebSocketsNack(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebSocket,
BatchSize: ptrTo(10),
Config: &testESConfig{Config1: "1111"},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand Down Expand Up @@ -208,18 +206,17 @@ func TestE2E_WebsocketDeliveryRestartReset(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebSocket,
BatchSize: ptrTo(10),
Config: &testESConfig{Config1: "1111"},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand Down Expand Up @@ -269,7 +266,7 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

got100 := make(chan struct{})
Expand Down Expand Up @@ -302,7 +299,6 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) {

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebhook,
BatchSize: ptrTo(10),
Expand All @@ -315,7 +311,7 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) {
},
},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand All @@ -340,7 +336,7 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) {
ts := &testSource{started: make(chan struct{})}
close(ts.started) // start delivery immediately - will block as no WS connected

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

gotFiveTimes := make(chan struct{})
Expand Down Expand Up @@ -372,7 +368,6 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) {

// Create a stream to sub-select one topic
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic_1"), // only one of the topics
Type: &EventStreamTypeWebhook,
BatchSize: ptrTo(10),
Expand All @@ -385,7 +380,7 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) {
},
},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

Expand All @@ -410,33 +405,31 @@ func TestE2E_CRUDLifecycle(t *testing.T) {
started: make(chan struct{}), // we never start it
}

mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts)
mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts)
assert.NoError(t, err)

// Create first event stream started
es1 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream1"),
TopicFilter: ptrTo("topic1"), // only one of the topics
Type: &EventStreamTypeWebSocket,
Config: &testESConfig{
Config1: "confValue1",
},
}
created, err := mgr.UpsertStream(ctx, es1)
created, err := mgr.UpsertStream(ctx, "stream1", es1)
assert.NoError(t, err)
assert.True(t, created)

// Create second event stream stopped
es2 := &EventStreamSpec[testESConfig]{
Name: ptrTo("stream2"),
TopicFilter: ptrTo("topic2"), // only one of the topics
Type: &EventStreamTypeWebSocket,
Status: &EventStreamStatusStopped,
Config: &testESConfig{
Config1: "confValue2",
},
}
created, err = mgr.UpsertStream(ctx, es2)
created, err = mgr.UpsertStream(ctx, "stream2", es2)
assert.NoError(t, err)
assert.True(t, created)

Expand All @@ -458,7 +451,7 @@ func TestE2E_CRUDLifecycle(t *testing.T) {

// Rename second event stream
es2.Name = ptrTo("stream2a")
created, err = mgr.UpsertStream(ctx, es2)
created, err = mgr.UpsertStream(ctx, "" /* ID is in es2 object */, es2)
assert.NoError(t, err)
assert.False(t, created)

Expand Down
Loading