diff --git a/examples/ffpubsub.go b/examples/ffpubsub.go index 31a6be2..d5d2514 100644 --- a/examples/ffpubsub.go +++ b/examples/ffpubsub.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -162,7 +162,7 @@ func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) { log.L(ctx).Infof("Running on: %s", u) p := eventstreams.NewEventStreamPersistence[pubSubConfig](sql, dbsql.UUIDValidator) - c := eventstreams.GenerateConfig(ctx) + c := eventstreams.GenerateConfig[pubSubConfig, pubSubMessage](ctx) ims := &inMemoryStream{ messages: []string{}, newMessages: *sync.NewCond(new(sync.Mutex)), diff --git a/pkg/dbsql/crud_test.go b/pkg/dbsql/crud_test.go index 3e83be7..8ef8662 100644 --- a/pkg/dbsql/crud_test.go +++ b/pkg/dbsql/crud_test.go @@ -36,13 +36,14 @@ import ( type TestCRUDable struct { ResourceBase - NS *string `json:"namespace"` - Name *string `json:"name"` - Field1 *string `json:"f1"` - Field2 *fftypes.FFBigInt `json:"f2"` - Field3 *fftypes.JSONAny `json:"f3"` - Field4 *int64 `json:"f4"` - Field5 *bool `json:"f5"` + NS *string `json:"namespace"` + Name *string `json:"name"` + Field1 *string `json:"f1"` + Field2 *fftypes.FFBigInt `json:"f2"` + Field3 *fftypes.JSONAny `json:"f3"` + Field4 *int64 `json:"f4"` + Field5 *bool `json:"f5"` + Field6 *fftypes.FFDuration `json:"f6"` } var CRUDableQueryFactory = &ffapi.QueryFields{ @@ -55,8 +56,8 @@ var CRUDableQueryFactory = &ffapi.QueryFields{ "f2": &ffapi.BigIntField{}, "f3": &ffapi.JSONField{}, "f4": &ffapi.Int64Field{}, - "f5": &ffapi.JSONField{}, - "f6": &ffapi.BoolField{}, + "f5": &ffapi.BoolField{}, + "f6": &ffapi.Int64Field{}, } // TestHistory shows a simple object: @@ -177,6 +178,7 @@ func newCRUDCollection(db *Database, ns string) *TestCRUD { "field3", "field4", "field5", + "field6", }, FilterFieldMap: map[string]string{ "f1": "field1", @@ -212,6 +214,8 @@ func newCRUDCollection(db *Database, ns string) *TestCRUD { return &inst.Field4 case "field5": return &inst.Field5 + case "field6": + return &inst.Field6 } return nil }, diff --git a/pkg/eventstreams/config.go b/pkg/eventstreams/config.go index 0acc30c..cdc4b85 100644 --- a/pkg/eventstreams/config.go +++ b/pkg/eventstreams/config.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,6 +18,7 @@ package eventstreams import ( "context" + "crypto/tls" "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/ffresty" @@ -26,12 +27,25 @@ import ( "github.com/hyperledger/firefly-common/pkg/retry" ) -type Config struct { +// DispatcherFactory is the interface to plug in a custom dispatcher, for example to provide +// local in-process processing of events (in addition to remote WebSocket/Webhook consumption). +// Generics: +// - CT is the Configuration Type - the custom extensions to the configuration schema +// - DT is the Data Type - the payload type that will be delivered to the application +type DispatcherFactory[CT any, DT any] interface { + Validate(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT], tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error + NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT] +} + +type Config[CT any, DT any] struct { TLSConfigs map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"` Retry *retry.Retry `ffstruct:"EventStreamConfig" json:"retry,omitempty"` DisablePrivateIPs bool `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"` Checkpoints CheckpointsTuningConfig `ffstruct:"EventStreamConfig" json:"checkpoints"` Defaults EventStreamDefaults `ffstruct:"EventStreamConfig" json:"defaults,omitempty"` + + // Allow plugging in additional types (important that the embedding code adds the FFEnum doc entry for the EventStreamType) + AdditionalDispatchers map[EventStreamType]DispatcherFactory[CT, DT] `json:"-"` } type CheckpointsTuningConfig struct { @@ -121,7 +135,7 @@ func InitConfig(conf config.Section) { // Optional function to generate config directly from YAML configuration using the config package. // You can also generate the configuration programmatically -func GenerateConfig(ctx context.Context) *Config { +func GenerateConfig[CT any, DT any](ctx context.Context) *Config[CT, DT] { httpDefaults, _ := ffresty.GenerateConfig(ctx, WebhookDefaultsConfig) tlsConfigs := map[string]*fftls.Config{} for i := 0; i < TLSConfigs.ArraySize(); i++ { @@ -129,7 +143,7 @@ func GenerateConfig(ctx context.Context) *Config { name := tlsConf.GetString(ConfigTLSConfigName) tlsConfigs[name] = fftls.GenerateConfig(tlsConf.SubSection("tls")) } - return &Config{ + return &Config[CT, DT]{ TLSConfigs: tlsConfigs, DisablePrivateIPs: RootConfig.GetBool(ConfigDisablePrivateIPs), Checkpoints: CheckpointsTuningConfig{ diff --git a/pkg/eventstreams/config_test.go b/pkg/eventstreams/config_test.go index b14247f..1f44c88 100644 --- a/pkg/eventstreams/config_test.go +++ b/pkg/eventstreams/config_test.go @@ -34,7 +34,7 @@ func TestGenerateConfigTLS(t *testing.T) { tls0.Set(ConfigTLSConfigName, "tls0") tls0.SubSection("tls").Set(fftls.HTTPConfTLSCAFile, t.TempDir()) - c := GenerateConfig(context.Background()) + c := GenerateConfig[testESConfig, testData](context.Background()) assert.True(t, c.TLSConfigs["tls0"].Enabled) } diff --git a/pkg/eventstreams/e2e_test.go b/pkg/eventstreams/e2e_test.go index 56805d1..31aa145 100644 --- a/pkg/eventstreams/e2e_test.go +++ b/pkg/eventstreams/e2e_test.go @@ -113,18 +113,17 @@ func TestE2E_DeliveryWebSockets(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts) + mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) assert.NoError(t, err) // Create a stream to sub-select one topic es1 := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), TopicFilter: ptrTo("topic_1"), // only one of the topics Type: &EventStreamTypeWebSocket, BatchSize: ptrTo(10), Config: &testESConfig{Config1: "1111"}, } - created, err := mgr.UpsertStream(ctx, es1) + created, err := mgr.UpsertStream(ctx, "stream1", es1) assert.NoError(t, err) assert.True(t, created) @@ -162,18 +161,17 @@ func TestE2E_DeliveryWebSocketsNack(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts) + mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) assert.NoError(t, err) // Create a stream to sub-select one topic es1 := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), TopicFilter: ptrTo("topic_1"), // only one of the topics Type: &EventStreamTypeWebSocket, BatchSize: ptrTo(10), Config: &testESConfig{Config1: "1111"}, } - created, err := mgr.UpsertStream(ctx, es1) + created, err := mgr.UpsertStream(ctx, "stream1", es1) assert.NoError(t, err) assert.True(t, created) @@ -208,18 +206,17 @@ func TestE2E_WebsocketDeliveryRestartReset(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts) + mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) assert.NoError(t, err) // Create a stream to sub-select one topic es1 := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), TopicFilter: ptrTo("topic_1"), // only one of the topics Type: &EventStreamTypeWebSocket, BatchSize: ptrTo(10), Config: &testESConfig{Config1: "1111"}, } - created, err := mgr.UpsertStream(ctx, es1) + created, err := mgr.UpsertStream(ctx, "stream1", es1) assert.NoError(t, err) assert.True(t, created) @@ -269,7 +266,7 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts) + mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) assert.NoError(t, err) got100 := make(chan struct{}) @@ -302,7 +299,6 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) { // Create a stream to sub-select one topic es1 := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), TopicFilter: ptrTo("topic_1"), // only one of the topics Type: &EventStreamTypeWebhook, BatchSize: ptrTo(10), @@ -315,7 +311,7 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) { }, }, } - created, err := mgr.UpsertStream(ctx, es1) + created, err := mgr.UpsertStream(ctx, "stream1", es1) assert.NoError(t, err) assert.True(t, created) @@ -340,7 +336,7 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts) + mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) assert.NoError(t, err) gotFiveTimes := make(chan struct{}) @@ -372,7 +368,6 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) { // Create a stream to sub-select one topic es1 := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), TopicFilter: ptrTo("topic_1"), // only one of the topics Type: &EventStreamTypeWebhook, BatchSize: ptrTo(10), @@ -385,7 +380,7 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) { }, }, } - created, err := mgr.UpsertStream(ctx, es1) + created, err := mgr.UpsertStream(ctx, "stream1", es1) assert.NoError(t, err) assert.True(t, created) @@ -410,25 +405,23 @@ func TestE2E_CRUDLifecycle(t *testing.T) { started: make(chan struct{}), // we never start it } - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), p, wss, ts) + mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) assert.NoError(t, err) // Create first event stream started es1 := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), TopicFilter: ptrTo("topic1"), // only one of the topics Type: &EventStreamTypeWebSocket, Config: &testESConfig{ Config1: "confValue1", }, } - created, err := mgr.UpsertStream(ctx, es1) + created, err := mgr.UpsertStream(ctx, "stream1", es1) assert.NoError(t, err) assert.True(t, created) // Create second event stream stopped es2 := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream2"), TopicFilter: ptrTo("topic2"), // only one of the topics Type: &EventStreamTypeWebSocket, Status: &EventStreamStatusStopped, @@ -436,7 +429,7 @@ func TestE2E_CRUDLifecycle(t *testing.T) { Config1: "confValue2", }, } - created, err = mgr.UpsertStream(ctx, es2) + created, err = mgr.UpsertStream(ctx, "stream2", es2) assert.NoError(t, err) assert.True(t, created) @@ -458,7 +451,7 @@ func TestE2E_CRUDLifecycle(t *testing.T) { // Rename second event stream es2.Name = ptrTo("stream2a") - created, err = mgr.UpsertStream(ctx, es2) + created, err = mgr.UpsertStream(ctx, "" /* ID is in es2 object */, es2) assert.NoError(t, err) assert.False(t, created) diff --git a/pkg/eventstreams/eventstreams.go b/pkg/eventstreams/eventstreams.go index b5a820b..9278063 100644 --- a/pkg/eventstreams/eventstreams.go +++ b/pkg/eventstreams/eventstreams.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,7 +18,6 @@ package eventstreams import ( "context" - "crypto/tls" "database/sql" "database/sql/driver" "regexp" @@ -65,6 +64,13 @@ var ( EventStreamStatusUnknown = fftypes.FFEnumValue("esstatus", "unknown") // not persisted ) +type LifecyclePhase int + +const ( + LifecyclePhasePreInsertValidation LifecyclePhase = iota // on user-supplied context, prior to inserting to DB + LifecyclePhaseStarting // while initializing for startup (so all defaults should be resolved) +) + // Let's us check that the config serializes type DBSerializable interface { sql.Scanner @@ -78,8 +84,9 @@ type EventStreamSpec[CT any] struct { Name *string `ffstruct:"eventstream" json:"name,omitempty"` Status *EventStreamStatus `ffstruct:"eventstream" json:"status,omitempty"` Type *EventStreamType `ffstruct:"eventstream" json:"type,omitempty" ffenum:"estype"` - InitialSequenceID *string `ffstruct:"eventstream" json:"initialSequenceID,omitempty" ffenum:"estype"` - TopicFilter *string `ffstruct:"eventstream" json:"topicFilter,omitempty" ffenum:"estype"` + InitialSequenceID *string `ffstruct:"eventstream" json:"initialSequenceID,omitempty"` + TopicFilter *string `ffstruct:"eventstream" json:"topicFilter,omitempty"` + Identity *string `ffstruct:"eventstream" json:"identity,omitempty"` Config *CT `ffstruct:"eventstream" json:"config,omitempty"` ErrorHandling *ErrorHandlingType `ffstruct:"eventstream" json:"errorHandling"` @@ -149,7 +156,7 @@ func (esc *EventStreamCheckpoint) SetUpdated(t *fftypes.FFTime) { esc.Updated = t } -type EventBatchDispatcher[DT any] interface { +type Dispatcher[DT any] interface { AttemptDispatch(ctx context.Context, attempt int, events *EventBatch[DT]) error } @@ -174,20 +181,22 @@ func checkSet[T any](ctx context.Context, storeDefaults bool, fieldName string, // Optionally it stores the defaults back on the structure, to ensure no nil fields. // - When using at runtime: true, so later code doesn't need to worry about nil checks / defaults // - When storing to the DB: false, so defaults can be applied dynamically -func (esc *EventStreamSpec[CT]) validate(ctx context.Context, tlsConfigs map[string]*tls.Config, defaults *EventStreamDefaults, validateConf func(context.Context, *CT) error, setDefaults bool) (err error) { +func (esm *esManager[CT, DT]) validateStream(ctx context.Context, esc *EventStreamSpec[CT], phase LifecyclePhase) (factory DispatcherFactory[CT, DT], err error) { if esc.Name == nil { - return i18n.NewError(ctx, i18n.MsgMissingRequiredField, "name") + return nil, i18n.NewError(ctx, i18n.MsgMissingRequiredField, "name") } if esc.TopicFilter != nil { fullMatchFilter := `^` + *esc.TopicFilter + `$` if esc.topicFilterRegexp, err = regexp.Compile(fullMatchFilter); err != nil { - return i18n.NewError(ctx, i18n.MsgESInvalidTopicFilterRegexp, fullMatchFilter, err) + return nil, i18n.NewError(ctx, i18n.MsgESInvalidTopicFilterRegexp, fullMatchFilter, err) } } - if err := validateConf(ctx, esc.Config); err != nil { - return err + defaults := esm.config.Defaults + if err := esm.runtime.Validate(ctx, esc.Config); err != nil { + return nil, err } err = fftypes.ValidateFFNameField(ctx, *esc.Name, "name") + setDefaults := phase == LifecyclePhaseStarting if err == nil { err = checkSet(ctx, setDefaults, "status", &esc.Status, EventStreamStatusStarted, func(v fftypes.FFEnum) bool { return fftypes.FFEnumValid(ctx, "esstatus", v) }) } @@ -210,25 +219,17 @@ func (esc *EventStreamSpec[CT]) validate(ctx context.Context, tlsConfigs map[str err = checkSet(ctx, true /* type always applied */, "type", &esc.Type, EventStreamTypeWebSocket, func(v fftypes.FFEnum) bool { return fftypes.FFEnumValid(ctx, "estype", v) }) } if err != nil { - return err + return nil, err } - switch *esc.Type { - case EventStreamTypeWebSocket: - if esc.WebSocket == nil { - esc.WebSocket = &WebSocketConfig{} - } - if err := esc.WebSocket.validate(ctx, &defaults.WebSocketDefaults, setDefaults); err != nil { - return err - } - case EventStreamTypeWebhook: - if esc.Webhook == nil { - esc.Webhook = &WebhookConfig{} - } - if err := esc.Webhook.validate(ctx, tlsConfigs); err != nil { - return err - } + factory = esm.dispatchers[*esc.Type] + if factory == nil { + return nil, i18n.NewError(ctx, i18n.MsgESInvalidType, *esc.Type) } - return nil + err = factory.Validate(ctx, &esm.config, esc, esm.tlsConfigs, phase) + if err != nil { + return nil, err + } + return factory, nil } type eventStream[CT any, DT any] struct { @@ -236,7 +237,7 @@ type eventStream[CT any, DT any] struct { esm *esManager[CT, DT] spec *EventStreamSpec[CT] mux sync.Mutex - action EventBatchDispatcher[DT] + action Dispatcher[DT] activeState *activeStream[CT, DT] retry *retry.Retry persistence Persistence[CT] @@ -254,7 +255,8 @@ func (esm *esManager[CT, DT]) initEventStream( spec *EventStreamSpec[CT], ) (es *eventStream[CT, DT], err error) { // Validate - if err := esm.validateStream(bgCtx, spec, true); err != nil { + factory, err := esm.validateStream(bgCtx, spec, LifecyclePhaseStarting) + if err != nil { return nil, err } @@ -266,12 +268,7 @@ func (esm *esManager[CT, DT]) initEventStream( retry: esm.config.Retry, } - switch *es.spec.Type { - case EventStreamTypeWebhook: - es.action = esm.newWebhookAction(es.bgCtx, spec.Webhook) - case EventStreamTypeWebSocket: - es.action = newWebSocketAction[DT](esm.wsChannels, spec.WebSocket, *spec.Name) - } + es.action = factory.NewDispatcher(es.bgCtx, &esm.config, spec) log.L(es.bgCtx).Infof("Initialized Event Stream") if *spec.Status == EventStreamStatusStarted { @@ -281,10 +278,6 @@ func (esm *esManager[CT, DT]) initEventStream( return es, nil } -func (esm *esManager[CT, DT]) validateStream(ctx context.Context, esSpec *EventStreamSpec[CT], setDefaults bool) error { - return esSpec.validate(ctx, esm.tlsConfigs, &esm.config.Defaults, esm.runtime.Validate, setDefaults) -} - func (es *eventStream[CT, DT]) requestStop(ctx context.Context) chan struct{} { es.mux.Lock() defer es.mux.Unlock() diff --git a/pkg/eventstreams/eventstreams_test.go b/pkg/eventstreams/eventstreams_test.go index bdefa6a..8e284a5 100644 --- a/pkg/eventstreams/eventstreams_test.go +++ b/pkg/eventstreams/eventstreams_test.go @@ -124,42 +124,51 @@ func TestValidate(t *testing.T) { done() es.spec = &EventStreamSpec[testESConfig]{} - err := es.esm.validateStream(ctx, es.spec, false) + _, err := es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00112", err) es.spec.Name = ptrTo("name1") - err = es.esm.validateStream(ctx, es.spec, false) + _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.NoError(t, err) es.esm.runtime.(*mockEventSource).validate = func(ctx context.Context, conf *testESConfig) error { return fmt.Errorf("pop") } - err = es.esm.validateStream(ctx, es.spec, false) + _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "pop", err) es.esm.runtime.(*mockEventSource).validate = func(ctx context.Context, conf *testESConfig) error { return nil } es.spec.TopicFilter = ptrTo("((((!Bad Regexp[") - err = es.esm.validateStream(ctx, es.spec, false) + _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00235", err) es.spec.TopicFilter = nil es.spec.Type = ptrTo(fftypes.FFEnum("wrong")) - err = es.esm.validateStream(ctx, es.spec, false) + _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00234", err) es.spec.Type = ptrTo(EventStreamTypeWebSocket) es.spec.WebSocket = &WebSocketConfig{ DistributionMode: ptrTo(fftypes.FFEnum("wrong")), } - err = es.esm.validateStream(ctx, es.spec, false) + _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00234", err) es.spec.Type = ptrTo(EventStreamTypeWebhook) - err = es.esm.validateStream(ctx, es.spec, false) + _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00216", err) _, err = es.esm.initEventStream(ctx, es.spec) assert.Regexp(t, "FF00216", err) + + customType := fftypes.FFEnumValue("estype", "custom1") + es.spec.Type = &customType + _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) + assert.Regexp(t, "FF00217", err) + + _, err = es.esm.initEventStream(ctx, es.spec) + assert.Regexp(t, "FF00217", err) + } func TestRequestStopAlreadyStopping(t *testing.T) { @@ -293,3 +302,7 @@ func TestGetIDNil(t *testing.T) { assert.Empty(t, (&EventStreamSpec[testESConfig]{}).GetID()) assert.Empty(t, (&EventStreamCheckpoint{}).GetID()) } + +func TestCheckDocs(t *testing.T) { + ffapi.CheckObjectDocumented(&EventStreamWithStatus[struct{}]{}) +} diff --git a/pkg/eventstreams/manager.go b/pkg/eventstreams/manager.go index df58905..bf114d4 100644 --- a/pkg/eventstreams/manager.go +++ b/pkg/eventstreams/manager.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -31,13 +31,14 @@ import ( ) type Manager[CT any] interface { - UpsertStream(ctx context.Context, esSpec *EventStreamSpec[CT]) (bool, error) + UpsertStream(ctx context.Context, nameOrID string, esSpec *EventStreamSpec[CT]) (bool, error) GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (*EventStreamWithStatus[CT], error) + GetStreamByNameOrID(ctx context.Context, nameOrID string, opts ...dbsql.GetOption) (*EventStreamWithStatus[CT], error) ListStreams(ctx context.Context, filter ffapi.Filter) ([]*EventStreamWithStatus[CT], *ffapi.FilterResult, error) - StopStream(ctx context.Context, id string) error - StartStream(ctx context.Context, id string) error - ResetStream(ctx context.Context, id string, sequenceID string) error - DeleteStream(ctx context.Context, id string) error + StopStream(ctx context.Context, nameOrID string) error + StartStream(ctx context.Context, nameOrID string) error + ResetStream(ctx context.Context, nameOrID string, sequenceID string) error + DeleteStream(ctx context.Context, nameOrID string) error Close(ctx context.Context) } @@ -51,6 +52,9 @@ const ( type Deliver[DT any] func(events []*Event[DT]) SourceInstruction // Runtime is the required implementation extension for the EventStream common utility +// Generics: +// - ConfigType is the Configuration Type - the custom extensions to the configuration schema +// - DataType is the Data Type - the payload type that will be delivered to the application type Runtime[ConfigType any, DataType any] interface { // Generate a new unique resource ID (such as a UUID) NewID() string @@ -70,16 +74,17 @@ type Runtime[ConfigType any, DataType any] interface { } type esManager[CT any, DT any] struct { - config Config + config Config[CT, DT] mux sync.Mutex streams map[string]*eventStream[CT, DT] tlsConfigs map[string]*tls.Config wsChannels wsserver.WebSocketChannels persistence Persistence[CT] runtime Runtime[CT, DT] + dispatchers map[EventStreamType]DispatcherFactory[CT, DT] } -func NewEventStreamManager[CT any, DT any](ctx context.Context, config *Config, p Persistence[CT], wsChannels wsserver.WebSocketChannels, source Runtime[CT, DT]) (es Manager[CT], err error) { +func NewEventStreamManager[CT any, DT any](ctx context.Context, config *Config[CT, DT], p Persistence[CT], wsChannels wsserver.WebSocketChannels, source Runtime[CT, DT]) (es Manager[CT], err error) { var confExample interface{} = new(CT) if _, isDBSerializable := (confExample).(DBSerializable); !isDBSerializable { @@ -104,7 +109,13 @@ func NewEventStreamManager[CT any, DT any](ctx context.Context, config *Config, persistence: p, wsChannels: wsChannels, streams: map[string]*eventStream[CT, DT]{}, + dispatchers: config.AdditionalDispatchers, } + if esm.dispatchers == nil { + esm.dispatchers = make(map[EventStreamType]DispatcherFactory[CT, DT]) + } + esm.dispatchers[EventStreamTypeWebSocket] = &webSocketDispatcherFactory[CT, DT]{esm: esm} + esm.dispatchers[EventStreamTypeWebhook] = &webhookDispatcherFactory[CT, DT]{} if err = esm.initialize(ctx); err != nil { return nil, err } @@ -124,6 +135,18 @@ func (esm *esManager[CT, DT]) getStream(id string) *eventStream[CT, DT] { return esm.streams[id] } +func (esm *esManager[CT, DT]) getStreamByNameOrID(ctx context.Context, nameOrID string) (*eventStream[CT, DT], error) { + stream, err := esm.GetStreamByNameOrID(ctx, nameOrID) + if err != nil { + return nil, err + } + es := esm.getStream(*stream.ID) + if es == nil { + return nil, i18n.NewError(ctx, i18n.Msg404NoResult) + } + return es, nil +} + func (esm *esManager[CT, DT]) removeStream(id string) { esm.mux.Lock() defer esm.mux.Unlock() @@ -160,8 +183,28 @@ func (esm *esManager[CT, DT]) initialize(ctx context.Context) error { return nil } -func (esm *esManager[CT, DT]) UpsertStream(ctx context.Context, esSpec *EventStreamSpec[CT]) (bool, error) { +func (esm *esManager[CT, DT]) UpsertStream(ctx context.Context, nameOrID string, esSpec *EventStreamSpec[CT]) (bool, error) { + + validID := nameOrID != "" && esm.persistence.IDValidator()(ctx, nameOrID) == nil var existing *eventStream[CT, DT] + if validID { + // Updating by ID + esSpec.ID = &nameOrID + } else if nameOrID != "" { + // Upserting by name + existingNamed, err := esm.persistence.EventStreams().GetByName(ctx, nameOrID) + if err != nil { + return false, err + } + // If it exists, then we're updating by ID now + if existingNamed != nil { + esSpec.ID = existingNamed.ID + } + } + if (esSpec.Name == nil || len(*esSpec.Name) == 0) && len(nameOrID) > 0 { + esSpec.Name = &nameOrID + } + if esSpec.ID == nil || len(*esSpec.ID) == 0 { esSpec.ID = ptrTo(esm.runtime.NewID()) } else { @@ -179,7 +222,7 @@ func (esm *esManager[CT, DT]) UpsertStream(ctx context.Context, esSpec *EventStr // Do a validation that does NOT update the defaults into the structure, so that // the defaults are not persisted into the DB. This means that if the defaults are // changed then any streams with nil fields will pick up the new defaults. - if err := esm.validateStream(ctx, esSpec, false); err != nil { + if _, err := esm.validateStream(ctx, esSpec, LifecyclePhasePreInsertValidation); err != nil { return false, err } @@ -208,47 +251,47 @@ func (esm *esManager[CT, DT]) reInit(ctx context.Context, esSpec *EventStreamSpe return nil } -func (esm *esManager[CT, DT]) DeleteStream(ctx context.Context, id string) error { - es := esm.getStream(id) - if es == nil { - return i18n.NewError(ctx, i18n.Msg404NoResult) +func (esm *esManager[CT, DT]) DeleteStream(ctx context.Context, nameOrID string) error { + es, err := esm.getStreamByNameOrID(ctx, nameOrID) + if err != nil { + return err } if err := es.delete(ctx); err != nil { return err } // Now we can delete it fully from the DB - if err := esm.persistence.EventStreams().Delete(ctx, id); err != nil { + if err := esm.persistence.EventStreams().Delete(ctx, nameOrID); err != nil { return err } - esm.removeStream(id) + esm.removeStream(*es.spec.ID) return nil } -func (esm *esManager[CT, DT]) StopStream(ctx context.Context, id string) error { - es := esm.getStream(id) - if es == nil { - return i18n.NewError(ctx, i18n.Msg404NoResult) +func (esm *esManager[CT, DT]) StopStream(ctx context.Context, nameOrID string) error { + es, err := esm.getStreamByNameOrID(ctx, nameOrID) + if err != nil { + return err } return es.stop(ctx) } -func (esm *esManager[CT, DT]) ResetStream(ctx context.Context, id string, sequenceID string) error { - es := esm.getStream(id) - if es == nil { - return i18n.NewError(ctx, i18n.Msg404NoResult) +func (esm *esManager[CT, DT]) ResetStream(ctx context.Context, nameOrID string, sequenceID string) error { + es, err := esm.getStreamByNameOrID(ctx, nameOrID) + if err != nil { + return err } // suspend any active stream if err := es.suspend(ctx); err != nil { return err } // delete any existing checkpoint - if err := esm.persistence.Checkpoints().DeleteMany(ctx, CheckpointFilters.NewFilter(ctx).Eq("id", id)); err != nil { + if err := esm.persistence.Checkpoints().DeleteMany(ctx, CheckpointFilters.NewFilter(ctx).Eq("id", *es.spec.ID)); err != nil { return err } // store the initial_sequence_id back to the object, and update our in-memory record es.spec.InitialSequenceID = &sequenceID if err := esm.persistence.EventStreams().UpdateSparse(ctx, &EventStreamSpec[CT]{ - ID: &id, + ID: es.spec.ID, InitialSequenceID: &sequenceID, }); err != nil { return err @@ -260,10 +303,10 @@ func (esm *esManager[CT, DT]) ResetStream(ctx context.Context, id string, sequen return nil } -func (esm *esManager[CT, DT]) StartStream(ctx context.Context, id string) error { - es := esm.getStream(id) - if es == nil { - return i18n.NewError(ctx, i18n.Msg404NoResult) +func (esm *esManager[CT, DT]) StartStream(ctx context.Context, nameOrID string) error { + es, err := esm.getStreamByNameOrID(ctx, nameOrID) + if err != nil { + return err } return es.start(ctx) } @@ -294,12 +337,20 @@ func (esm *esManager[CT, DT]) ListStreams(ctx context.Context, filter ffapi.Filt return enriched, fr, err } -func (esm *esManager[CT, DT]) GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (*EventStreamWithStatus[CT], error) { +func (esm *esManager[CT, DT]) GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (es *EventStreamWithStatus[CT], err error) { esSpec, err := esm.persistence.EventStreams().GetByID(ctx, id, opts...) - if err != nil { - return nil, err + if err == nil { + es = esm.enrichGetStream(ctx, esSpec) + } + return +} + +func (esm *esManager[CT, DT]) GetStreamByNameOrID(ctx context.Context, nameOrID string, opts ...dbsql.GetOption) (es *EventStreamWithStatus[CT], err error) { + esSpec, err := esm.persistence.EventStreams().GetByUUIDOrName(ctx, nameOrID, opts...) + if err == nil { + es = esm.enrichGetStream(ctx, esSpec) } - return esm.enrichGetStream(ctx, esSpec), nil + return } func (esm *esManager[CT, DT]) Close(ctx context.Context) { diff --git a/pkg/eventstreams/manager_test.go b/pkg/eventstreams/manager_test.go index 7be41bc..162e167 100644 --- a/pkg/eventstreams/manager_test.go +++ b/pkg/eventstreams/manager_test.go @@ -69,6 +69,9 @@ func (mp *mockPersistence) EventStreams() dbsql.CRUD[*EventStreamSpec[testESConf func (mp *mockPersistence) Checkpoints() dbsql.CRUD[*EventStreamCheckpoint] { return mp.checkpoints } +func (mp *mockPersistence) IDValidator() IDValidator { + return dbsql.UUIDValidator +} func (mp *mockPersistence) Close() {} func newMockESManager(t *testing.T, extraSetup ...func(mp *mockPersistence)) (context.Context, *esManager[testESConfig, testData], *mockEventSource, func()) { @@ -99,7 +102,7 @@ func newMockESManager(t *testing.T, extraSetup ...func(mp *mockPersistence)) (co return nil }, } - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), mp, nil, mes) + mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), mp, nil, mes) assert.NoError(t, err) return ctx, mgr.(*esManager[testESConfig, testData]), mes, func() { @@ -109,7 +112,7 @@ func newMockESManager(t *testing.T, extraSetup ...func(mp *mockPersistence)) (co } func TestNewManagerFailBadTLS(t *testing.T) { - _, err := NewEventStreamManager[testESConfig, testData](context.Background(), &Config{ + _, err := NewEventStreamManager[testESConfig, testData](context.Background(), &Config[testESConfig, testData]{ Retry: &retry.Retry{}, TLSConfigs: map[string]*fftls.Config{ "tls0": { @@ -124,7 +127,7 @@ func TestNewManagerFailBadTLS(t *testing.T) { func TestNewManagerBadConfStruct(t *testing.T) { assert.Panics(t, func() { - _, _ = NewEventStreamManager[string /* must be DBSerializable */, testData](context.Background(), &Config{ + _, _ = NewEventStreamManager[string /* must be DBSerializable */, testData](context.Background(), &Config[string, testData]{ Retry: &retry.Retry{}, TLSConfigs: map[string]*fftls.Config{ "tls0": { @@ -137,7 +140,7 @@ func TestNewManagerBadConfStruct(t *testing.T) { } func TestNewManagerBadConfState(t *testing.T) { - _, err := NewEventStreamManager[testESConfig, testData](context.Background(), &Config{}, nil, nil, &mockEventSource{}) + _, err := NewEventStreamManager[testESConfig, testData](context.Background(), &Config[testESConfig, testData]{}, nil, nil, &mockEventSource{}) assert.Regexp(t, "FF00237", err) } @@ -149,7 +152,7 @@ func TestInitFail(t *testing.T) { mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, fmt.Errorf("pop")) ctx := context.Background() InitConfig(config.RootSection("ut")) - _, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), mp, nil, nil) + _, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), mp, nil, nil) assert.Regexp(t, "pop", err) } @@ -182,7 +185,7 @@ func TestInitWithStreamsCleanupFail(t *testing.T) { } mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("Delete", mock.Anything, es.GetID()).Return(fmt.Errorf("pop")) - _, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), mp, nil, nil) + _, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), mp, nil, nil) assert.Regexp(t, "pop", err) } @@ -199,7 +202,7 @@ func TestInitWithStreamsInitFail(t *testing.T) { Status: ptrTo(EventStreamStatusStarted), } mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - _, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig(ctx), mp, nil, &mockEventSource{ + _, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), mp, nil, &mockEventSource{ validate: func(ctx context.Context, conf *testESConfig) error { return fmt.Errorf("pop") }, @@ -207,28 +210,66 @@ func TestInitWithStreamsInitFail(t *testing.T) { assert.Regexp(t, "pop", err) } -func TestUpsertStreamDeleted(t *testing.T) { +func TestUpsertStreamByNameDeleted(t *testing.T) { es := &EventStreamSpec[testESConfig]{ + Name: ptrTo("stream1"), ID: ptrTo(fftypes.NewUUID().String()), + Status: ptrTo(EventStreamStatusStopped), + } + ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetByName", mock.Anything, "stream1").Return(es, nil) + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + }) + defer done() + + esm.getStream(es.GetID()).spec.Status = ptrTo(EventStreamStatusDeleted) + _, err := esm.UpsertStream(ctx, "stream1", es) + assert.Regexp(t, "FF00236", err) + +} + +func TestUpsertStreamByNameFailLookup(t *testing.T) { + es := &EventStreamSpec[testESConfig]{ Name: ptrTo("stream1"), + ID: ptrTo(fftypes.NewUUID().String()), Status: ptrTo(EventStreamStatusStopped), } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetByName", mock.Anything, "stream1").Return((*EventStreamSpec[testESConfig])(nil), fmt.Errorf("pop")) mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() }) defer done() esm.getStream(es.GetID()).spec.Status = ptrTo(EventStreamStatusDeleted) - _, err := esm.UpsertStream(ctx, es) + _, err := esm.UpsertStream(ctx, "stream1", es) + assert.Regexp(t, "pop", err) + +} + +func TestUpsertStreamByIDDeleted(t *testing.T) { + es := &EventStreamSpec[testESConfig]{ + Name: ptrTo("stream1"), + ID: ptrTo(fftypes.NewUUID().String()), + Status: ptrTo(EventStreamStatusStopped), + } + ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + }) + defer done() + + esm.getStream(es.GetID()).spec.Status = ptrTo(EventStreamStatusDeleted) + _, err := esm.UpsertStream(ctx, *es.ID, es) assert.Regexp(t, "FF00236", err) } func TestUpsertStreamBadUpdate(t *testing.T) { es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), Name: ptrTo("stream1"), + ID: ptrTo(fftypes.NewUUID().String()), Status: ptrTo(EventStreamStatusStopped), } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { @@ -239,15 +280,15 @@ func TestUpsertStreamBadUpdate(t *testing.T) { newES := *es newES.Name = nil - _, err := esm.UpsertStream(ctx, &newES) + _, err := esm.UpsertStream(ctx, "", &newES) assert.Regexp(t, "FF00112", err) } func TestUpsertStreamUpsertFail(t *testing.T) { es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), Name: ptrTo("stream1"), + ID: ptrTo(fftypes.NewUUID().String()), Status: ptrTo(EventStreamStatusStopped), } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { @@ -257,7 +298,7 @@ func TestUpsertStreamUpsertFail(t *testing.T) { }) defer done() - _, err := esm.UpsertStream(ctx, es) + _, err := esm.UpsertStream(ctx, "", es) assert.Regexp(t, "pop", err) } @@ -301,6 +342,9 @@ func TestUpsertReInitExistingFailInit(t *testing.T) { func TestDeleteStreamNotKnown(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{ + ID: ptrTo("does not exist"), + }, nil).Once() }) defer done() @@ -309,9 +353,24 @@ func TestDeleteStreamNotKnown(t *testing.T) { } +func TestDeleteStreamNotFound(t *testing.T) { + ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return((*EventStreamSpec[testESConfig])(nil), fmt.Errorf("not found")).Once() + }) + defer done() + + err := esm.DeleteStream(ctx, fftypes.NewUUID().String()) + assert.Regexp(t, "not found", err) + +} + func TestResetStreamNotKnown(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{ + ID: ptrTo("does not exist"), + }, nil).Once() }) defer done() @@ -323,6 +382,9 @@ func TestResetStreamNotKnown(t *testing.T) { func TestStopStreamNotKnown(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{ + ID: ptrTo("does not exist"), + }, nil).Once() }) defer done() @@ -334,6 +396,9 @@ func TestStopStreamNotKnown(t *testing.T) { func TestStartStreamNotKnown(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{ + ID: ptrTo("does not exist"), + }, nil).Once() }) defer done() @@ -363,6 +428,7 @@ func TestDeleteStreamFail(t *testing.T) { Status: ptrTo(EventStreamStatusStopped), } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(es, nil).Once() mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() @@ -381,6 +447,7 @@ func TestDeleteStreamFailDelete(t *testing.T) { Status: ptrTo(EventStreamStatusStopped), } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(es, nil).Once() mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() @@ -394,11 +461,6 @@ func TestDeleteStreamFailDelete(t *testing.T) { } func TestResetStreamStopFailTimeout(t *testing.T) { - ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - }) - done() - existing := &eventStream[testESConfig, testData]{ activeState: &activeStream[testESConfig, testData]{}, stopping: make(chan struct{}), @@ -408,6 +470,13 @@ func TestResetStreamStopFailTimeout(t *testing.T) { Status: ptrTo(EventStreamStatusStopped), }, } + + ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(existing.spec, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + }) + done() + esm.addStream(ctx, existing) err := esm.ResetStream(ctx, existing.spec.GetID(), "") assert.Regexp(t, "FF00229", err) @@ -415,12 +484,6 @@ func TestResetStreamStopFailTimeout(t *testing.T) { } func TestResetStreamStopFailDeleteCheckpoint(t *testing.T) { - ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() - }) - done() - existing := &eventStream[testESConfig, testData]{ spec: &EventStreamSpec[testESConfig]{ ID: ptrTo(fftypes.NewUUID().String()), @@ -428,6 +491,13 @@ func TestResetStreamStopFailDeleteCheckpoint(t *testing.T) { Status: ptrTo(EventStreamStatusStopped), }, } + ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(existing.spec, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() + }) + done() + esm.addStream(ctx, existing) err := esm.ResetStream(ctx, existing.spec.GetID(), "") assert.Regexp(t, "pop", err) @@ -435,13 +505,6 @@ func TestResetStreamStopFailDeleteCheckpoint(t *testing.T) { } func TestResetStreamStopFailUpdateSequence(t *testing.T) { - ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(nil).Once() - mp.eventStreams.On("UpdateSparse", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() - }) - done() - existing := &eventStream[testESConfig, testData]{ spec: &EventStreamSpec[testESConfig]{ ID: ptrTo(fftypes.NewUUID().String()), @@ -449,6 +512,14 @@ func TestResetStreamStopFailUpdateSequence(t *testing.T) { Status: ptrTo(EventStreamStatusStopped), }, } + ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(existing.spec, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(nil).Once() + mp.eventStreams.On("UpdateSparse", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() + }) + done() + esm.addStream(ctx, existing) err := esm.ResetStream(ctx, existing.spec.GetID(), "12345") assert.Regexp(t, "pop", err) @@ -456,13 +527,6 @@ func TestResetStreamStopFailUpdateSequence(t *testing.T) { } func TestResetStreamNoOp(t *testing.T) { - ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(nil).Once() - mp.eventStreams.On("UpdateSparse", mock.Anything, mock.Anything).Return(nil).Once() - }) - done() - existing := &eventStream[testESConfig, testData]{ spec: &EventStreamSpec[testESConfig]{ ID: ptrTo(fftypes.NewUUID().String()), @@ -470,6 +534,14 @@ func TestResetStreamNoOp(t *testing.T) { Status: ptrTo(EventStreamStatusStopped), }, } + ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(existing.spec, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(nil).Once() + mp.eventStreams.On("UpdateSparse", mock.Anything, mock.Anything).Return(nil).Once() + }) + done() + esm.addStream(ctx, existing) err := esm.ResetStream(ctx, existing.spec.GetID(), "12345") assert.NoError(t, err) @@ -500,6 +572,18 @@ func TestGetStreamByIDFail(t *testing.T) { } +func TestGetStreamByNameOrID(t *testing.T) { + ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{}, nil).Once() + }) + defer done() + + _, err := esm.GetStreamByNameOrID(ctx, "stream1") + assert.NoError(t, err) + +} + func TestCloseSuspendFail(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() diff --git a/pkg/eventstreams/persistence.go b/pkg/eventstreams/persistence.go index 66f8639..49a1e4e 100644 --- a/pkg/eventstreams/persistence.go +++ b/pkg/eventstreams/persistence.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -27,6 +27,7 @@ import ( type Persistence[CT any] interface { EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] Checkpoints() dbsql.CRUD[*EventStreamCheckpoint] + IDValidator() IDValidator Close() } @@ -38,6 +39,8 @@ var EventStreamFilters = &ffapi.QueryFields{ "status": &ffapi.StringField{}, "type": &ffapi.StringField{}, "topicfilter": &ffapi.StringField{}, + "identity": &ffapi.StringField{}, + "config": &ffapi.JSONField{}, } var CheckpointFilters = &ffapi.QueryFields{ @@ -61,6 +64,10 @@ type esPersistence[CT any] struct { idValidator IDValidator } +func (p *esPersistence[CT]) IDValidator() IDValidator { + return p.idValidator +} + func (p *esPersistence[CT]) EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] { return &dbsql.CrudBase[*EventStreamSpec[CT]]{ DB: p.db, @@ -74,6 +81,7 @@ func (p *esPersistence[CT]) EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] { "type", "initial_sequence_id", "topic_filter", + "identity", "config", "error_handling", "batch_size", @@ -111,6 +119,8 @@ func (p *esPersistence[CT]) EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] { return &inst.InitialSequenceID case "topic_filter": return &inst.TopicFilter + case "identity": + return &inst.Identity case "config": return &inst.Config case "error_handling": diff --git a/pkg/eventstreams/webhooks.go b/pkg/eventstreams/webhooks.go index 3a57329..7323e6a 100644 --- a/pkg/eventstreams/webhooks.go +++ b/pkg/eventstreams/webhooks.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -51,43 +51,47 @@ func (wc *WebhookConfig) Value() (driver.Value, error) { return fftypes.JSONValue(wc) } +type webhookDispatcherFactory[CT any, DT any] struct{} + // validate initializes the config ready for use -func (wc *WebhookConfig) validate(ctx context.Context, tlsConfigs map[string]*tls.Config) error { - if wc.URL == nil || *wc.URL == "" { +func (wdf *webhookDispatcherFactory[CT, DT]) Validate(ctx context.Context, _ *Config[CT, DT], spec *EventStreamSpec[CT], tlsConfigs map[string]*tls.Config, _ LifecyclePhase) error { + if spec.Webhook == nil { + spec.Webhook = &WebhookConfig{} + } + whc := spec.Webhook + if whc.URL == nil || *whc.URL == "" { return i18n.NewError(ctx, i18n.MsgMissingWebhookURL) } - if wc.TLSConfigName != nil && *wc.TLSConfigName != "" { - tlsConfig, ok := tlsConfigs[*wc.TLSConfigName] + if whc.TLSConfigName != nil && *whc.TLSConfigName != "" { + tlsConfig, ok := tlsConfigs[*whc.TLSConfigName] if !ok { - return i18n.NewError(ctx, i18n.MsgUnknownTLSConfiguration, *wc.TLSConfigName) + return i18n.NewError(ctx, i18n.MsgUnknownTLSConfiguration, *whc.TLSConfigName) } - wc.tlsConfig = tlsConfig + whc.tlsConfig = tlsConfig } - wc.validated = true + whc.validated = true return nil } type webhookAction[CT any, DT any] struct { - esm *esManager[CT, DT] disablePrivateIPs bool spec *WebhookConfig client *resty.Client } -func (esm *esManager[CT, DT]) newWebhookAction(ctx context.Context, spec *WebhookConfig) *webhookAction[CT, DT] { - conf := spec.HTTP - if conf == nil { - conf = &esm.config.Defaults.WebhookDefaults.HTTPConfig +func (wdf *webhookDispatcherFactory[CT, DT]) NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT] { + httpConf := spec.Webhook.HTTP + if httpConf == nil { + httpConf = &conf.Defaults.WebhookDefaults.HTTPConfig } - conf.TLSClientConfig = spec.tlsConfig + httpConf.TLSClientConfig = spec.Webhook.tlsConfig client := ffresty.NewWithConfig(ctx, ffresty.Config{ - URL: *spec.URL, - HTTPConfig: *conf, + URL: *spec.Webhook.URL, + HTTPConfig: *httpConf, }) return &webhookAction[CT, DT]{ - esm: esm, - spec: spec, - disablePrivateIPs: esm.config.DisablePrivateIPs, + spec: spec.Webhook, + disablePrivateIPs: conf.DisablePrivateIPs, client: client, } } diff --git a/pkg/eventstreams/webhooks_test.go b/pkg/eventstreams/webhooks_test.go index 10270f3..8dcbb18 100644 --- a/pkg/eventstreams/webhooks_test.go +++ b/pkg/eventstreams/webhooks_test.go @@ -18,6 +18,7 @@ package eventstreams import ( "context" + "crypto/tls" "encoding/json" "fmt" "net/http" @@ -44,26 +45,32 @@ func newTestWebhooks(t *testing.T, whc *WebhookConfig, tweaks ...func()) *webhoo }) done() - assert.NoError(t, whc.validate(ctx, mgr.tlsConfigs)) + whf := &webhookDispatcherFactory[testESConfig, testData]{} + spec := &EventStreamSpec[testESConfig]{ + Name: ptrTo("stream1"), + Webhook: whc, + } + assert.NoError(t, whf.Validate(ctx, &mgr.config, spec, mgr.tlsConfigs, LifecyclePhaseStarting)) - return mgr.newWebhookAction(context.Background(), whc) + return whf.NewDispatcher(context.Background(), &mgr.config, spec).(*webhookAction[testESConfig, testData]) } func TestWebhooksConfigValidate(t *testing.T) { - wc := &WebhookConfig{} - assert.Regexp(t, "FF00216", wc.validate(context.Background(), nil)) + whc := &WebhookConfig{} + whf := &webhookDispatcherFactory[testESConfig, testData]{} + spec := &EventStreamSpec[testESConfig]{ + Webhook: whc, + } + assert.Regexp(t, "FF00216", whf.Validate(context.Background(), nil, spec, nil, LifecyclePhaseStarting)) u := "http://test.example" - wc.URL = &u - assert.NoError(t, wc.validate(context.Background(), nil)) + whc.URL = &u + assert.NoError(t, whf.Validate(context.Background(), nil, spec, nil, LifecyclePhaseStarting)) tlsConfName := "wrong" - wc = &WebhookConfig{ - URL: &u, - TLSConfigName: &tlsConfName, - } - assert.Regexp(t, "FF00223", wc.validate(context.Background(), nil)) + whc.TLSConfigName = &tlsConfName + assert.Regexp(t, "FF00223", whf.Validate(context.Background(), nil, spec, make(map[string]*tls.Config), LifecyclePhaseStarting)) } diff --git a/pkg/eventstreams/websockets.go b/pkg/eventstreams/websockets.go index 96fe295..95751f2 100644 --- a/pkg/eventstreams/websockets.go +++ b/pkg/eventstreams/websockets.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,6 +18,7 @@ package eventstreams import ( "context" + "crypto/tls" "database/sql/driver" "github.com/hyperledger/firefly-common/pkg/fftypes" @@ -37,6 +38,10 @@ type WebSocketConfig struct { DistributionMode *DistributionMode `ffstruct:"wsconfig" json:"distributionMode,omitempty"` } +type webSocketDispatcherFactory[CT any, DT any] struct { + esm *esManager[CT, DT] +} + // Store in DB as JSON func (wc *WebSocketConfig) Scan(src interface{}) error { return fftypes.JSONScan(src, wc) @@ -50,8 +55,12 @@ func (wc *WebSocketConfig) Value() (driver.Value, error) { return fftypes.JSONValue(wc) } -func (wc *WebSocketConfig) validate(ctx context.Context, defaults *ConfigWebsocketDefaults, setDefaults bool) error { - return checkSet(ctx, setDefaults, "distributionMode", &wc.DistributionMode, defaults.DefaultDistributionMode, func(v fftypes.FFEnum) bool { return fftypes.FFEnumValid(ctx, "distmode", v) }) +func (wsf *webSocketDispatcherFactory[CT, DT]) Validate(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT], _ map[string]*tls.Config, phase LifecyclePhase) error { + if spec.WebSocket == nil { + spec.WebSocket = &WebSocketConfig{} + } + setDefaults := phase == LifecyclePhaseStarting + return checkSet(ctx, setDefaults, "distributionMode", &spec.WebSocket.DistributionMode, conf.Defaults.WebSocketDefaults.DefaultDistributionMode, func(v fftypes.FFEnum) bool { return fftypes.FFEnumValid(ctx, "distmode", v) }) } type webSocketAction[DT any] struct { @@ -60,11 +69,11 @@ type webSocketAction[DT any] struct { wsChannels wsserver.WebSocketChannels } -func newWebSocketAction[DT any](wsChannels wsserver.WebSocketChannels, spec *WebSocketConfig, topic string) *webSocketAction[DT] { +func (wsf *webSocketDispatcherFactory[CT, DT]) NewDispatcher(_ context.Context, _ *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT] { return &webSocketAction[DT]{ - spec: spec, - wsChannels: wsChannels, - topic: topic, + spec: spec.WebSocket, + wsChannels: wsf.esm.wsChannels, + topic: *spec.Name, } } diff --git a/pkg/eventstreams/websockets_test.go b/pkg/eventstreams/websockets_test.go index deb291e..d4610c4 100644 --- a/pkg/eventstreams/websockets_test.go +++ b/pkg/eventstreams/websockets_test.go @@ -22,22 +22,36 @@ import ( "testing" "github.com/hyperledger/firefly-common/mocks/wsservermocks" + "github.com/hyperledger/firefly-common/pkg/ffapi" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/wsserver" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func mockWSChannels(wsc *wsservermocks.WebSocketChannels) (chan interface{}, chan interface{}, chan *wsserver.WebSocketCommandMessageOrError) { senderChannel := make(chan interface{}, 1) broadcastChannel := make(chan interface{}, 1) receiverChannel := make(chan *wsserver.WebSocketCommandMessageOrError, 1) - wsc.On("GetChannels", "ut_stream").Return((chan<- interface{})(senderChannel), (chan<- interface{})(broadcastChannel), (<-chan *wsserver.WebSocketCommandMessageOrError)(receiverChannel)) + wsc.On("GetChannels", "ut_stream").Return((chan<- interface{})(senderChannel), (chan<- interface{})(broadcastChannel), (<-chan *wsserver.WebSocketCommandMessageOrError)(receiverChannel)).Maybe() return senderChannel, broadcastChannel, receiverChannel } +func newTestWebSocketsFactory(t *testing.T) (context.Context, *esManager[testESConfig, testData], *wsservermocks.WebSocketChannels, *webSocketDispatcherFactory[testESConfig, testData]) { + ctx, mgr, _, done := newMockESManager(t, func(mdb *mockPersistence) { + mdb.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil) + }) + done() + + mws := wsservermocks.NewWebSocketChannels(t) + mgr.wsChannels = mws + + return ctx, mgr, mws, &webSocketDispatcherFactory[testESConfig, testData]{esm: mgr} +} + func TestWSAttemptIgnoreWrongAcks(t *testing.T) { - mws := &wsservermocks.WebSocketChannels{} + ctx, mgr, mws, whf := newTestWebSocketsFactory(t) _, _, rc := mockWSChannels(mws) go func() { @@ -50,9 +64,13 @@ func TestWSAttemptIgnoreWrongAcks(t *testing.T) { }() dmw := DistributionModeBroadcast - wsa := newWebSocketAction[testData](mws, &WebSocketConfig{ - DistributionMode: &dmw, - }, "ut_stream") + spec := &EventStreamSpec[testESConfig]{ + Name: ptrTo("ut_stream"), + WebSocket: &WebSocketConfig{ + DistributionMode: &dmw, + }, + } + wsa := whf.NewDispatcher(ctx, &mgr.config, spec).(*webSocketAction[testData]) err := wsa.AttemptDispatch(context.Background(), 0, &EventBatch[testData]{ StreamID: fftypes.NewUUID().String(), @@ -69,14 +87,18 @@ func TestWSAttemptIgnoreWrongAcks(t *testing.T) { func TestWSattemptDispatchExitPushingEvent(t *testing.T) { - mws := &wsservermocks.WebSocketChannels{} + ctx, mgr, mws, whf := newTestWebSocketsFactory(t) _, bc, _ := mockWSChannels(mws) bc <- []*fftypes.JSONAny{} // block the broadcast channel dmw := DistributionModeBroadcast - wsa := newWebSocketAction[testData](mws, &WebSocketConfig{ - DistributionMode: &dmw, - }, "ut_stream") + spec := &EventStreamSpec[testESConfig]{ + Name: ptrTo("ut_stream"), + WebSocket: &WebSocketConfig{ + DistributionMode: &dmw, + }, + } + wsa := whf.NewDispatcher(ctx, &mgr.config, spec).(*webSocketAction[testData]) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -93,13 +115,17 @@ func TestWSattemptDispatchExitPushingEvent(t *testing.T) { func TestWSattemptDispatchExitReceivingReply(t *testing.T) { - mws := &wsservermocks.WebSocketChannels{} + ctx, mgr, mws, whf := newTestWebSocketsFactory(t) _, _, rc := mockWSChannels(mws) dmw := DistributionModeBroadcast - wsa := newWebSocketAction[testData](mws, &WebSocketConfig{ - DistributionMode: &dmw, - }, "ut_stream") + spec := &EventStreamSpec[testESConfig]{ + Name: ptrTo("ut_stream"), + WebSocket: &WebSocketConfig{ + DistributionMode: &dmw, + }, + } + wsa := whf.NewDispatcher(ctx, &mgr.config, spec).(*webSocketAction[testData]) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -110,16 +136,20 @@ func TestWSattemptDispatchExitReceivingReply(t *testing.T) { func TestWSattemptDispatchNackFromClient(t *testing.T) { - mws := &wsservermocks.WebSocketChannels{} + ctx, mgr, mws, whf := newTestWebSocketsFactory(t) _, _, rc := mockWSChannels(mws) rc <- &wsserver.WebSocketCommandMessageOrError{ Err: fmt.Errorf("pop"), } dmw := DistributionModeBroadcast - wsa := newWebSocketAction[testData](mws, &WebSocketConfig{ - DistributionMode: &dmw, - }, "ut_stream") + spec := &EventStreamSpec[testESConfig]{ + Name: ptrTo("ut_stream"), + WebSocket: &WebSocketConfig{ + DistributionMode: &dmw, + }, + } + wsa := whf.NewDispatcher(ctx, &mgr.config, spec).(*webSocketAction[testData]) err := wsa.waitForAck(context.Background(), rc, -1) assert.Regexp(t, "pop", err) diff --git a/pkg/ffapi/apirequest.go b/pkg/ffapi/apirequest.go index 132e6d4..ea3c060 100644 --- a/pkg/ffapi/apirequest.go +++ b/pkg/ffapi/apirequest.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -35,6 +35,7 @@ type APIRequest struct { AlwaysPaginate bool } +// Encourage consideration for new APIs of using ItemsResult() - generic and documented, and always returns the same output for all queries // FilterResult is a helper to transform a filter result into a REST API standard payload func (r *APIRequest) FilterResult(items interface{}, res *FilterResult, err error) (interface{}, error) { itemsVal := reflect.ValueOf(items) @@ -53,3 +54,15 @@ func (r *APIRequest) FilterResult(items interface{}, res *FilterResult, err erro return items, err } + +// ItemsResult is a helper to transform a filter result into a REST API standard payload +func ItemsResult[T any](items []T, res *FilterResult, err error) (*ItemsResultTyped[T], error) { + response := &ItemsResultTyped[T]{ + Items: items, + } + if res != nil { + response.Total = res.TotalCount + } + response.Count = len(items) + return response, err +} diff --git a/pkg/ffapi/apirequest_test.go b/pkg/ffapi/apirequest_test.go index d86f1c7..fd7ea0c 100644 --- a/pkg/ffapi/apirequest_test.go +++ b/pkg/ffapi/apirequest_test.go @@ -75,3 +75,24 @@ func TestFilterResultAlwaysPaginateNoFilterResult(t *testing.T) { Items: []string{"test"}, }, f) } + +func TestItemsResult(t *testing.T) { + f, err := ItemsResult([]string{"item1"}, &FilterResult{}, nil) + assert.NoError(t, err) + assert.Equal(t, []string{"item1"}, f.Items) + assert.Equal(t, 1, f.Count) + assert.Nil(t, f.Total) + + ten := int64(10) + f, err = ItemsResult([]string{"item1"}, &FilterResult{ + TotalCount: &ten, + }, nil) + assert.NoError(t, err) + assert.Equal(t, []string{"item1"}, f.Items) + assert.Equal(t, 1, f.Count) + assert.Equal(t, int64(10), *f.Total) +} + +func TestCheckItemsResultDocs(t *testing.T) { + CheckObjectDocumented(&ItemsResultTyped[string]{}) +} diff --git a/pkg/ffapi/openapi3.go b/pkg/ffapi/openapi3.go index 112c1ca..c67868b 100644 --- a/pkg/ffapi/openapi3.go +++ b/pkg/ffapi/openapi3.go @@ -116,7 +116,7 @@ func (sg *SwaggerGen) getPathItem(doc *openapi3.T, path string) *openapi3.PathIt if doc.Paths == nil { doc.Paths = &openapi3.Paths{} } - pi := doc.Paths.Value(path) + pi := doc.Paths.Find(path) if pi != nil { return pi } @@ -302,7 +302,7 @@ func (sg *SwaggerGen) addOutput(ctx context.Context, doc *openapi3.T, route *Rou } } for _, code := range route.JSONOutputCodes { - op.Responses.Map()[strconv.FormatInt(int64(code), 10)] = &openapi3.ResponseRef{ + op.Responses.Set(strconv.FormatInt(int64(code), 10), &openapi3.ResponseRef{ Value: &openapi3.Response{ Description: &s, Content: openapi3.Content{ @@ -311,7 +311,7 @@ func (sg *SwaggerGen) addOutput(ctx context.Context, doc *openapi3.T, route *Rou }, }, }, - } + }) } } diff --git a/pkg/ffapi/openapi3_test.go b/pkg/ffapi/openapi3_test.go index 1514d99..ee1eb98 100644 --- a/pkg/ffapi/openapi3_test.go +++ b/pkg/ffapi/openapi3_test.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -263,6 +263,7 @@ func TestWildcards(t *testing.T) { Version: "1.0", BaseURL: "http://localhost:12345/api/v1", }).Generate(context.Background(), routes) + assert.NotNil(t, swagger.Paths.Find("/namespaces/{ns}/example1/{id}")) assert.NotNil(t, swagger.Paths.Value("/namespaces/{ns}/example1/{id}")) } @@ -282,19 +283,19 @@ func TestFFExcludeTag(t *testing.T) { Version: "1.0", BaseURL: "http://localhost:12345/api/v1", }).Generate(context.Background(), routes) - assert.NotNil(t, swagger.Paths.Value("/namespaces/{ns}/example1/test").Post.RequestBody.Value) - length, err := swagger.Paths.Value("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("length") + assert.NotNil(t, swagger.Paths.Find("/namespaces/{ns}/example1/test").Post.RequestBody.Value) + length, err := swagger.Paths.Find("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("length") assert.NoError(t, err) assert.NotNil(t, length) - width, err := swagger.Paths.Value("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("width") + width, err := swagger.Paths.Find("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("width") assert.NoError(t, err) assert.NotNil(t, width) - _, err = swagger.Paths.Value("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("secret") - assert.Regexp(t, `no schema "secret"`, err) - _, err = swagger.Paths.Value("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("conditional") - assert.Regexp(t, `no schema "conditional"`, err) - _, err = swagger.Paths.Value("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("conditionalInput") - assert.Regexp(t, `no schema "conditionalInput"`, err) + _, err = swagger.Paths.Find("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("secret") + assert.Regexp(t, "no schema", err) + _, err = swagger.Paths.Find("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("conditional") + assert.Regexp(t, "no schema", err) + _, err = swagger.Paths.Find("/namespaces/{ns}/example1/test").Post.RequestBody.Value.Content.Get("application/json").Schema.Value.Properties.JSONLookup("conditionalInput") + assert.Regexp(t, "no schema", err) } func TestPanicOnMissingDescription(t *testing.T) { @@ -377,8 +378,10 @@ func TestPreTranslatedRouteDescription(t *testing.T) { Version: "1.0", BaseURL: "http://localhost:12345/api/v1", }).Generate(context.Background(), routes) + assert.NotNil(t, swagger.Paths.Find("/namespaces/{ns}/example1/test").Post.RequestBody.Value) + description := swagger.Paths.Find("/namespaces/{ns}/example1/test").Post.Description assert.NotNil(t, swagger.Paths.Value("/namespaces/{ns}/example1/test").Post.RequestBody.Value) - description := swagger.Paths.Value("/namespaces/{ns}/example1/test").Post.Description + description = swagger.Paths.Value("/namespaces/{ns}/example1/test").Post.Description assert.Equal(t, "this is a description", description) } diff --git a/pkg/ffapi/openapihandler.go b/pkg/ffapi/openapihandler.go index b0dd445..921d8b4 100644 --- a/pkg/ffapi/openapihandler.go +++ b/pkg/ffapi/openapihandler.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/pkg/ffapi/query_fields.go b/pkg/ffapi/query_fields.go index d7122c8..b379bb4 100644 --- a/pkg/ffapi/query_fields.go +++ b/pkg/ffapi/query_fields.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/pkg/ffapi/restfilter_json.go b/pkg/ffapi/restfilter_json.go index 02b0fff..38dd31b 100644 --- a/pkg/ffapi/restfilter_json.go +++ b/pkg/ffapi/restfilter_json.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -30,12 +30,19 @@ import ( var allMods = []string{"not", "caseInsensitive"} var justCaseInsensitive = []string{"caseInsensitive"} +// Note if ItemsResultTyped below might be preferred for new APIs (if you are able to adopt always-return {items:[]} style) type FilterResultsWithCount struct { Count int64 `json:"count"` Total *int64 `json:"total,omitempty"` // omitted if a count was not calculated (AlwaysPaginate enabled, and count not specified) Items interface{} `json:"items"` } +type ItemsResultTyped[T any] struct { + Count int `ffstruct:"CollectionResults" json:"count"` + Total *int64 `ffstruct:"CollectionResults" json:"total,omitempty"` // omitted if a count was not calculated (AlwaysPaginate enabled, and count not specified) + Items []T `ffstruct:"CollectionResults" json:"items"` +} + type filterModifiers struct { negate bool caseInsensitive bool diff --git a/pkg/ffresty/ffresty.go b/pkg/ffresty/ffresty.go index cf83ef0..656f889 100644 --- a/pkg/ffresty/ffresty.go +++ b/pkg/ffresty/ffresty.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -71,7 +71,7 @@ type HTTPConfig struct { HTTPPassthroughHeadersEnabled bool `ffstruct:"RESTConfig" json:"httpPassthroughHeadersEnabled,omitempty"` HTTPHeaders fftypes.JSONObject `ffstruct:"RESTConfig" json:"headers,omitempty"` HTTPTLSHandshakeTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"tlsHandshakeTimeout,omitempty"` - HTTPCustomClient interface{} `ffstruct:"RESTConfig" json:"httpCustomClient,omitempty"` + HTTPCustomClient interface{} `json:"-"` TLSClientConfig *tls.Config `json:"-"` // should be built from separate TLSConfig using fftls utils OnCheckRetry func(res *resty.Response, err error) bool `json:"-"` // response could be nil on err OnBeforeRequest func(req *resty.Request) error `json:"-"` // called before each request, even retry diff --git a/pkg/fftypes/timeutils.go b/pkg/fftypes/timeutils.go index 1317eb4..de8db53 100644 --- a/pkg/fftypes/timeutils.go +++ b/pkg/fftypes/timeutils.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -134,8 +134,11 @@ func (ft *FFTime) Scan(src interface{}) error { } // Value implements sql.Valuer -func (ft FFTime) Value() (driver.Value, error) { - if time.Time(ft).IsZero() { +func (ft *FFTime) Value() (driver.Value, error) { + if ft == nil { + return nil, nil + } + if time.Time(*ft).IsZero() { return int64(0), nil } return ft.UnixNano(), nil @@ -242,6 +245,9 @@ func (fd *FFDuration) Scan(src interface{}) error { // Value implements sql.Valuer func (fd *FFDuration) Value() (driver.Value, error) { + if fd == nil { + return nil, nil + } return fd.String(), nil } diff --git a/pkg/fftypes/timeutils_test.go b/pkg/fftypes/timeutils_test.go index e229fcd..5bfc62b 100644 --- a/pkg/fftypes/timeutils_test.go +++ b/pkg/fftypes/timeutils_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -86,9 +86,14 @@ func TestFFTimeDatabaseSerialization(t *testing.T) { now := Now() zero := ZeroTime() - var ft = &zero + var ft *FFTime v, err := ft.Value() assert.NoError(t, err) + assert.Nil(t, v) + + ft = &zero + v, err = ft.Value() + assert.NoError(t, err) assert.Equal(t, int64(0), v) ft = now @@ -262,7 +267,8 @@ func TestFFDurationParseValue(t *testing.T) { var pfd *FFDuration v, err := pfd.Value() assert.NoError(t, err) - assert.Equal(t, "", v) + assert.Equal(t, "", pfd.String()) + assert.Nil(t, v) pfd = &fd *pfd = FFDuration(12345) * FFDuration(time.Millisecond) diff --git a/pkg/i18n/en_base_field_descriptions.go b/pkg/i18n/en_base_field_descriptions.go index 862be88..bb2c2a5 100644 --- a/pkg/i18n/en_base_field_descriptions.go +++ b/pkg/i18n/en_base_field_descriptions.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -42,4 +42,65 @@ var ( FilterJSONSort = ffm("FilterJSON.sort", "Array of fields to sort by. A '-' prefix on a field requests that field is sorted in descending order") FilterJSONCount = ffm("FilterJSON.count", "If true, the total number of entries that could be returned from the database will be calculated and returned as a 'total' (has a performance cost)") FilterJSONOr = ffm("FilterJSON.or", "Array of sub-queries where any sub-query can match to return results (OR combined). Note that within each sub-query all filters must match (AND combined)") + + EventStreamBatchSize = ffm("eventstream.batchSize", "Maximum number of events to deliver in each batch") + EventStreamBatchTimeout = ffm("eventstream.batchTimeout", "Amount of time to wait for events to arrive before delivering an incomplete batch") + EventStreamBlockedRetryDelay = ffm("eventstream.blockedRetryDelay", "Amount of time to wait between retries, when a stream is blocked") + EventStreamConfig = ffm("eventstream.config", "Additional configuration for the event stream") + EventStreamCreated = ffm("eventstream.created", "Time the event stream was created") + EventStreamUpdated = ffm("eventstream.updated", "Time the event stream was last updated") + EventStreamErrorHandling = ffm("eventstream.errorHandling", "When an error is encountered, and short retries are exhausted, whether to skip the event or block the stream (default=block)") + EventStreamID = ffm("eventstream.id", "ID of the event stream") + EventStreamIdentity = ffm("eventstream.identity", "Identity context for the event stream") + EventStreamInitialSequenceID = ffm("eventstream.initialSequenceID", "Initial sequence ID to begin event delivery from") + EventStreamName = ffm("eventstream.name", "Unique name for the event stream") + EventStreamRetryTimeout = ffm("eventstream.retryTimeout", "Short retry timeout before error handling, in case of webhook based delivery") + EventStreamStatus = ffm("eventstream.status", "Status information for the event stream") + EventStreamTopicFilter = ffm("eventstream.topicFilter", "Regular expression to apply to the topic of each event for server-side filtering") + EventStreamType = ffm("eventstream.type", "Delivery type for the event stream") + EventStreamWebHook = ffm("eventstream.webhook", "Webhook configuration options") + EventStreamWebSocket = ffm("eventstream.websocket", "WebSocket configuration options") + EventStreamStatistics = ffm("EventStream.statistics", "Information about the status and operation of the event stream") + EventStreamStatusExt = ffm("EventStream.status", "Status of the event stream") + + EventStreamWHConfigHeaders = ffm("whconfig.headers", "Headers to add to the HTTP call") + EventStreamWHConfigHTTP = ffm("whconfig.http", "Base client config for the Webhook HTTP requests") + EventStreamWHConfigMethod = ffm("whconfig.method", "HTTP method to use for the HTTP requests") + EventStreamWHTLSConfigName = ffm("whconfig.tlsConfigName", "Name of a TLS configuration to use for this Webhook") + EventStreamWHURL = ffm("whconfig.url", "URL to invoke") + + EventStreamWSDistributionMode = ffm("wsconfig.distributionMode", "Whether to 'broadcast' messages (at most once), or 'load_balance' requests between connections with acknowledgement (at least once)") + + EventStreamStatisticsStartTime = ffm("EventStreamStatistics.startTime", "Time the stream started") + EventStreamStatisticsLastDispatchTime = ffm("EventStreamStatistics.lastDispatchTime", "Time the stream last dispatched a message") + EventStreamStatisticsLastDispatchBatch = ffm("EventStreamStatistics.lastDispatchBatch", "Batch number of the last dispatched batch") + EventStreamStatisticsLastDispatchAttempts = ffm("EventStreamStatistics.lastDispatchAttempts", "Number of attempts to dispatch the current batch") + EventStreamStatisticsLastDispatchFailure = ffm("EventStreamStatistics.lastDispatchFailure", "Error message for the last failure that occurred") + EventStreamStatisticsLastDispatchComplete = ffm("EventStreamStatistics.lastDispatchComplete", "Completion status of the last batch") + EventStreamStatisticsHighestDetected = ffm("EventStreamStatistics.highestDetected", "Highest sequence ID detected") + EventStreamStatisticsHighestDispatched = ffm("EventStreamStatistics.highestDispatched", "Highest sequence ID dispatched") + EventStreamStatisticsCheckpoint = ffm("EventStreamStatistics.checkpoint", "Current checkpoint sequence ID") + + RESTConfigAuthPassword = ffm("RESTConfig.authPassword", "Password for the HTTP/HTTPS Basic Auth header") + RESTConfigAuthUsername = ffm("RESTConfig.authUsername", "Username for the HTTP/HTTPS Basic Auth header") + RESTConfigConnectionTimeout = ffm("RESTConfig.connectionTimeout", "HTTP connection timeout") + RESTConfigExpectContinueTimeout = ffm("RESTConfig.expectContinueTimeout", "Time to wait for the first response from the server after connecting") + RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call") + RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection") + RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool") + RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host") + RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool") + RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool") + RESTConfigProxyURL = ffm("RESTConfig.proxyURL", "URL of a proxy server to use for connections") + RESTConfigRequestTimeout = ffm("RESTConfig.requestTimeout", "Maximum time any individual request can take") + RESTConfigRetry = ffm("RESTConfig.retry", "Whether to automatically retry failed HTTP requests") + RESTConfigRetryCount = ffm("RESTConfig.retryCount", "Maximum number of times to retry") + RESTConfigRetryErrorStatusCodeRegex = ffm("RESTConfig.retryErrorStatusCodeRegex", "Regular expression to apply to the status codes of failed request to determine whether to retry") + RESTConfigRetryInitialDelay = ffm("RESTConfig.retryInitialDelay", "Time to wait before the first retry") + RESTConfigRetryMaximumDelay = ffm("RESTConfig.retryMaximumDelay", "Maximum time to wait between retries") + RESTConfigTLSHandshakeTimeout = ffm("RESTConfig.tlsHandshakeTimeout", "Maximum time to wait for the TLS handshake to complete") + + CollectionResultsCount = ffm("CollectionResults.count", "Number of items returned from this call") + CollectionResultsTotal = ffm("CollectionResults.total", "Number of items total that could be returned, if a count was requested") + CollectionResultsItems = ffm("CollectionResults.items", "The array of items") ) diff --git a/pkg/i18n/en_base_messages.go b/pkg/i18n/en_base_messages.go index edc3168..f1c7ac5 100644 --- a/pkg/i18n/en_base_messages.go +++ b/pkg/i18n/en_base_messages.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -36,7 +36,7 @@ var ( APIFilterCountDesc = ffm("api.filterCount", "Return a total count as well as items (adds extra database processing)") APIFilterFieldsDesc = ffm("api.filterFields", "Comma separated list of fields to return") - ResourceBaseID = ffm("ResourceBase.id", "The UUID of the service") + ResourceBaseID = ffm("ResourceBase.id", "The UUID of the resource") ResourceBaseCreated = ffm("ResourceBase.created", "The time the resource was created") ResourceBaseUpdated = ffm("ResourceBase.updated", "The time the resource was last updated") ) diff --git a/test/dbmigrations/000001_create_crudables_table.up.sql b/test/dbmigrations/000001_create_crudables_table.up.sql index 738dba2..39770da 100644 --- a/test/dbmigrations/000001_create_crudables_table.up.sql +++ b/test/dbmigrations/000001_create_crudables_table.up.sql @@ -9,6 +9,7 @@ CREATE TABLE crudables ( field2 VARCHAR(65), field3 TEXT, field4 BIGINT, - field5 BOOLEAN + field5 BOOLEAN, + field6 BIGINT ); CREATE UNIQUE INDEX crudables_id ON crudables(ns, id); diff --git a/test/es_demo_migrations/000001_create_eventstreams_table.up.sql b/test/es_demo_migrations/000001_create_eventstreams_table.up.sql index 3a861b7..8b3be23 100644 --- a/test/es_demo_migrations/000001_create_eventstreams_table.up.sql +++ b/test/es_demo_migrations/000001_create_eventstreams_table.up.sql @@ -8,6 +8,7 @@ CREATE TABLE eventstreams ( type VARCHAR(64), initial_sequence_id TEXT, topic_filter TEXT, + identity TEXT, config TEXT, error_handling TEXT, batch_size INT,