diff --git a/examples/ffpubsub.go b/examples/ffpubsub.go index d5d2514..398617a 100644 --- a/examples/ffpubsub.go +++ b/examples/ffpubsub.go @@ -19,7 +19,6 @@ package main import ( "bufio" "context" - "database/sql/driver" "fmt" "net/http" "net/http/httptest" @@ -47,9 +46,7 @@ func main() { // This demo: Just create a single event stream // Real world: You would create ffapi.Route objects for the CRUD actions on mgr - _, err := mgr.UpsertStream(ctx, &eventstreams.EventStreamSpec[pubSubConfig]{ - Name: ptrTo("demo"), - }) + _, err := mgr.UpsertStream(ctx, "demo", &eventstreams.GenericEventStream{}) assertNoError(err) // Read lines from standard in, and pass them to all active routines @@ -65,19 +62,7 @@ func main() { } } -type pubSubESManager = eventstreams.Manager[pubSubConfig] - -type pubSubConfig struct { - // no extra config -} - -func (psc *pubSubConfig) Scan(src interface{}) error { - return fftypes.JSONScan(src, psc) -} - -func (psc *pubSubConfig) Value() (driver.Value, error) { - return fftypes.JSONValue(psc) -} +type pubSubESManager = eventstreams.Manager[*eventstreams.GenericEventStream] type pubSubMessage struct { Message string `json:"message"` @@ -93,11 +78,15 @@ func (ims *inMemoryStream) NewID() string { return fftypes.NewUUID().String() } -func (ims *inMemoryStream) Validate(_ context.Context, _ *pubSubConfig) error { +func (ims *inMemoryStream) Validate(_ context.Context, _ *eventstreams.GenericEventStream) error { return nil // no config defined in pubSubConfig to validate } -func (ims *inMemoryStream) Run(_ context.Context, _ *eventstreams.EventStreamSpec[pubSubConfig], checkpointSequenceID string, deliver eventstreams.Deliver[pubSubMessage]) (err error) { +func (ims *inMemoryStream) WithRuntimeStatus(spec *eventstreams.GenericEventStream, status eventstreams.EventStreamStatus, stats *eventstreams.EventStreamStatistics) *eventstreams.GenericEventStream { + return spec.WithRuntimeStatus(status, stats) +} + +func (ims *inMemoryStream) Run(_ context.Context, _ *eventstreams.GenericEventStream, checkpointSequenceID string, deliver eventstreams.Deliver[pubSubMessage]) (err error) { var index int if checkpointSequenceID != "" { index, err = strconv.Atoi(checkpointSequenceID) @@ -135,10 +124,6 @@ func (ims *inMemoryStream) Run(_ context.Context, _ *eventstreams.EventStreamSpe } } -func ptrTo[T any](v T) *T { - return &v -} - func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) { // Use SQLite in-memory DB conf := config.RootSection("ffpubsub") @@ -161,13 +146,13 @@ func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) { u.Scheme = "ws" log.L(ctx).Infof("Running on: %s", u) - p := eventstreams.NewEventStreamPersistence[pubSubConfig](sql, dbsql.UUIDValidator) - c := eventstreams.GenerateConfig[pubSubConfig, pubSubMessage](ctx) + p := eventstreams.NewGenericEventStreamPersistence(sql, dbsql.UUIDValidator) + c := eventstreams.GenerateConfig[*eventstreams.GenericEventStream, pubSubMessage](ctx) ims := &inMemoryStream{ messages: []string{}, newMessages: *sync.NewCond(new(sync.Mutex)), } - mgr, err := eventstreams.NewEventStreamManager[pubSubConfig, pubSubMessage](ctx, c, p, wsServer, ims) + mgr, err := eventstreams.NewEventStreamManager[*eventstreams.GenericEventStream, pubSubMessage](ctx, c, p, wsServer, ims) assertNoError(err) return mgr, ims, func() { log.L(ctx).Infof("Shutting down") diff --git a/pkg/eventstreams/activestream.go b/pkg/eventstreams/activestream.go index d6dcd3e..dabfc7c 100644 --- a/pkg/eventstreams/activestream.go +++ b/pkg/eventstreams/activestream.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -33,7 +33,7 @@ type eventStreamBatch[DataType any] struct { batchTimer *time.Timer } -type activeStream[CT any, DT any] struct { +type activeStream[CT EventStreamSpec, DT any] struct { *eventStream[CT, DT] ctx context.Context cancelCtx func() @@ -60,7 +60,7 @@ func (es *eventStream[CT, DT]) newActiveStream() *activeStream[CT, DT] { }, eventLoopDone: make(chan struct{}), batchLoopDone: make(chan struct{}), - events: make(chan *Event[DT], *es.spec.BatchSize), + events: make(chan *Event[DT], *es.spec.ESFields().BatchSize), } go as.runEventLoop() go as.runBatchLoop() @@ -98,8 +98,8 @@ func (as *activeStream[CT, DT]) loadCheckpoint() (sequencedID string, err error) } if cp != nil && cp.SequenceID != nil { sequencedID = *cp.SequenceID - } else if as.spec.InitialSequenceID != nil { - sequencedID = *as.spec.InitialSequenceID + } else if as.spec.ESFields().InitialSequenceID != nil { + sequencedID = *as.spec.ESFields().InitialSequenceID } return true, err }) @@ -107,8 +107,9 @@ func (as *activeStream[CT, DT]) loadCheckpoint() (sequencedID string, err error) } func (as *activeStream[CT, DT]) checkFilter(event *Event[DT]) bool { - if as.spec.topicFilterRegexp != nil { - return as.spec.topicFilterRegexp.Match([]byte(event.Topic)) + esSpec := as.spec.ESFields() + if esSpec.topicFilterRegexp != nil { + return esSpec.topicFilterRegexp.Match([]byte(event.Topic)) } return true } @@ -149,8 +150,9 @@ func (as *activeStream[CT, DT]) runSourceLoop(initialCheckpointSequenceID string func (as *activeStream[CT, DT]) runBatchLoop() { defer close(as.batchLoopDone) + esSpec := as.spec.ESFields() var batch *eventStreamBatch[DT] - batchTimeout := time.Duration(*as.spec.BatchTimeout) + batchTimeout := time.Duration(*esSpec.BatchTimeout) var noBatchActive <-chan time.Time = make(chan time.Time) // never pops batchTimedOut := noBatchActive for { @@ -173,7 +175,7 @@ func (as *activeStream[CT, DT]) runBatchLoop() { batch = &eventStreamBatch[DT]{ number: as.batchNumber, batchTimer: time.NewTimer(batchTimeout), - events: make([]*Event[DT], 0, *as.spec.BatchSize), + events: make([]*Event[DT], 0, *esSpec.BatchSize), } batchTimedOut = batch.batchTimer.C } @@ -181,7 +183,7 @@ func (as *activeStream[CT, DT]) runBatchLoop() { } } batchDispatched := false - if batch != nil && (len(batch.events) >= *as.spec.BatchSize || timedOut) { + if batch != nil && (len(batch.events) >= *esSpec.BatchSize || timedOut) { // attempt dispatch (only returns err on exit) if err := as.dispatchBatch(batch); err != nil { log.L(as.ctx).Debugf("batch loop done: %s", err) @@ -241,6 +243,7 @@ func (as *activeStream[CT, DT]) checkpointRoutine() { return // We're done } err := as.retry.Do(as.ctx, "checkpoint", func(attempt int) (retry bool, err error) { + log.L(as.bgCtx).Debugf("Writing checkpoint id=%s sequenceID=%s", as.spec.GetID(), checkpointSequenceID) _, err = as.esm.persistence.Checkpoints().Upsert(as.ctx, &EventStreamCheckpoint{ ID: ptrTo(as.spec.GetID()), // the ID of the stream is the ID of the checkpoint SequenceID: &checkpointSequenceID, @@ -266,9 +269,12 @@ func (as *activeStream[CT, DT]) dispatchBatch(batch *eventStreamBatch[DT]) (err as.LastDispatchAttempts = 0 as.LastDispatchStatus = DispatchStatusDispatching as.HighestDispatched = batch.events[len(batch.events)-1].SequenceID + esSpec := as.spec.ESFields() for { // Short exponential back-off retry err := as.retry.Do(as.ctx, "action", func(_ int) (retry bool, err error) { + log.L(as.ctx).Debugf("Batch %d attempt %d dispatching. Len=%d", + batch.number, as.LastDispatchAttempts, len(batch.events)) err = as.action.AttemptDispatch(as.ctx, as.LastDispatchAttempts, &EventBatch[DT]{ Type: MessageTypeEventBatch, StreamID: as.spec.GetID(), @@ -281,7 +287,7 @@ func (as *activeStream[CT, DT]) dispatchBatch(batch *eventStreamBatch[DT]) (err as.LastDispatchAttempts++ as.LastDispatchFailure = err.Error() as.LastDispatchStatus = DispatchStatusRetrying - return time.Since(*as.LastDispatchTime.Time()) < time.Duration(*as.spec.RetryTimeout), err + return time.Since(*as.LastDispatchTime.Time()) < time.Duration(*esSpec.RetryTimeout), err } as.LastDispatchStatus = DispatchStatusComplete return false, nil @@ -292,14 +298,14 @@ func (as *activeStream[CT, DT]) dispatchBatch(batch *eventStreamBatch[DT]) (err // We're in blocked retry delay as.LastDispatchStatus = DispatchStatusBlocked log.L(as.ctx).Errorf("Batch failed short retry after %.2fs secs. ErrorHandling=%s BlockedRetryDelay=%.2fs ", - time.Since(*as.LastDispatchTime.Time()).Seconds(), *as.spec.ErrorHandling, time.Duration(*as.spec.BlockedRetryDelay).Seconds()) - if *as.spec.ErrorHandling == ErrorHandlingTypeSkip { + time.Since(*as.LastDispatchTime.Time()).Seconds(), *esSpec.ErrorHandling, time.Duration(*esSpec.BlockedRetryDelay).Seconds()) + if *esSpec.ErrorHandling == ErrorHandlingTypeSkip { // Swallow the error now we have logged it as.LastDispatchStatus = DispatchStatusSkipped return nil } select { - case <-time.After(time.Duration(*as.spec.BlockedRetryDelay)): + case <-time.After(time.Duration(*esSpec.BlockedRetryDelay)): case <-as.ctx.Done(): // Only way we exit with error, is if the context is cancelled return i18n.NewError(as.ctx, i18n.MsgContextCanceled) diff --git a/pkg/eventstreams/activestream_test.go b/pkg/eventstreams/activestream_test.go index 13fcda9..5b3ee88 100644 --- a/pkg/eventstreams/activestream_test.go +++ b/pkg/eventstreams/activestream_test.go @@ -33,7 +33,7 @@ func TestCheckpointContextClose(t *testing.T) { }) defer done() - as := &activeStream[testESConfig, testData]{ + as := &activeStream[*GenericEventStream, testData]{ eventStream: es, } as.ctx, as.cancelCtx = context.WithCancel(ctx) @@ -47,12 +47,12 @@ func TestRunSourceLoopDone(t *testing.T) { ctx, es, mes, done := newTestEventStream(t) defer done() - as := &activeStream[testESConfig, testData]{ + as := &activeStream[*GenericEventStream, testData]{ eventStream: es, } as.ctx, as.cancelCtx = context.WithCancel(ctx) - mes.run = func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error { + mes.run = func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error { deliver(nil) return nil } @@ -66,13 +66,13 @@ func TestRunSourceEventsBlockedExit(t *testing.T) { ctx, es, mes, done := newTestEventStream(t) defer done() - as := &activeStream[testESConfig, testData]{ + as := &activeStream[*GenericEventStream, testData]{ eventStream: es, } as.ctx, as.cancelCtx = context.WithCancel(ctx) as.events = make(chan *Event[testData]) - mes.run = func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error { + mes.run = func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error { deliver([]*Event[testData]{{ /* will block */ }}) return nil } @@ -92,7 +92,7 @@ func TestBatchTimeout(t *testing.T) { es.spec.BatchTimeout = ptrTo(fftypes.FFDuration(1 * time.Millisecond)) delivered := false - mes.run = func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error { + mes.run = func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error { if delivered { <-ctx.Done() } else { @@ -135,7 +135,7 @@ func TestQueuedCheckpointAsync(t *testing.T) { }) defer done() - as := &activeStream[testESConfig, testData]{ + as := &activeStream[*GenericEventStream, testData]{ eventStream: es, } as.ctx, as.cancelCtx = context.WithCancel(ctx) @@ -156,7 +156,7 @@ func TestQueuedCheckpointCancel(t *testing.T) { }) defer done() - as := &activeStream[testESConfig, testData]{ + as := &activeStream[*GenericEventStream, testData]{ eventStream: es, } as.ctx, as.cancelCtx = context.WithCancel(ctx) @@ -172,7 +172,7 @@ func TestDispatchSkipError(t *testing.T) { ctx, es, _, done := newTestEventStream(t) defer done() - as := &activeStream[testESConfig, testData]{ + as := &activeStream[*GenericEventStream, testData]{ eventStream: es, } as.ctx, as.cancelCtx = context.WithCancel(ctx) @@ -196,31 +196,40 @@ func TestDispatchBlockError(t *testing.T) { ctx, es, _, done := newTestEventStream(t) defer done() - as := &activeStream[testESConfig, testData]{ - eventStream: es, + as := &activeStream[*GenericEventStream, testData]{ + eventStream: es, + events: make(chan *Event[testData]), + batchLoopDone: make(chan struct{}), } as.ctx, as.cancelCtx = context.WithCancel(ctx) + as.spec.BatchSize = ptrTo(1) as.spec.RetryTimeout = ptrTo(fftypes.FFDuration(1 * time.Microsecond)) - as.spec.BlockedRetryDelay = ptrTo(fftypes.FFDuration(1 * time.Microsecond)) + as.spec.BlockedRetryDelay = ptrTo(fftypes.FFDuration(10 * time.Millisecond)) as.spec.ErrorHandling = ptrTo(ErrorHandlingTypeBlock) + callCount := 0 calls := make(chan bool) as.action = &mockAction{ attemptDispatch: func(ctx context.Context, attempt int, events *EventBatch[testData]) error { calls <- true + callCount++ + if callCount > 1 { + as.cancelCtx() + } return fmt.Errorf("pop") }, } + dispatchDone := make(chan struct{}) go func() { - err := as.dispatchBatch(&eventStreamBatch[testData]{ - events: []*Event[testData]{{}}, - }) - assert.Error(t, err) + defer close(dispatchDone) + as.runBatchLoop() }() + as.events <- &Event[testData]{} + <-calls <-calls - as.cancelCtx() + <-dispatchDone } diff --git a/pkg/eventstreams/config.go b/pkg/eventstreams/config.go index cdc4b85..caa8dee 100644 --- a/pkg/eventstreams/config.go +++ b/pkg/eventstreams/config.go @@ -32,12 +32,12 @@ import ( // Generics: // - CT is the Configuration Type - the custom extensions to the configuration schema // - DT is the Data Type - the payload type that will be delivered to the application -type DispatcherFactory[CT any, DT any] interface { - Validate(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT], tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error - NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT] +type DispatcherFactory[CT EventStreamSpec, DT any] interface { + Validate(ctx context.Context, conf *Config[CT, DT], spec CT, tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error + NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec CT) Dispatcher[DT] } -type Config[CT any, DT any] struct { +type Config[CT EventStreamSpec, DT any] struct { TLSConfigs map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"` Retry *retry.Retry `ffstruct:"EventStreamConfig" json:"retry,omitempty"` DisablePrivateIPs bool `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"` @@ -135,7 +135,7 @@ func InitConfig(conf config.Section) { // Optional function to generate config directly from YAML configuration using the config package. // You can also generate the configuration programmatically -func GenerateConfig[CT any, DT any](ctx context.Context) *Config[CT, DT] { +func GenerateConfig[CT EventStreamSpec, DT any](ctx context.Context) *Config[CT, DT] { httpDefaults, _ := ffresty.GenerateConfig(ctx, WebhookDefaultsConfig) tlsConfigs := map[string]*fftls.Config{} for i := 0; i < TLSConfigs.ArraySize(); i++ { diff --git a/pkg/eventstreams/config_test.go b/pkg/eventstreams/config_test.go index 1f44c88..2e05099 100644 --- a/pkg/eventstreams/config_test.go +++ b/pkg/eventstreams/config_test.go @@ -34,7 +34,7 @@ func TestGenerateConfigTLS(t *testing.T) { tls0.Set(ConfigTLSConfigName, "tls0") tls0.SubSection("tls").Set(fftls.HTTPConfTLSCAFile, t.TempDir()) - c := GenerateConfig[testESConfig, testData](context.Background()) + c := GenerateConfig[*GenericEventStream, testData](context.Background()) assert.True(t, c.TLSConfigs["tls0"].Enabled) } diff --git a/pkg/eventstreams/e2e_test.go b/pkg/eventstreams/e2e_test.go index 31aa145..b47826f 100644 --- a/pkg/eventstreams/e2e_test.go +++ b/pkg/eventstreams/e2e_test.go @@ -18,7 +18,6 @@ package eventstreams import ( "context" - "database/sql/driver" "encoding/json" "fmt" "net/http" @@ -37,18 +36,6 @@ import ( "github.com/stretchr/testify/assert" ) -type testESConfig struct { - Config1 string `json:"config1"` -} - -func (tc *testESConfig) Scan(src interface{}) error { - return fftypes.JSONScan(src, tc) -} - -func (tc *testESConfig) Value() (driver.Value, error) { - return fftypes.JSONValue(tc) -} - type testData struct { Field1 int `json:"field1"` } @@ -59,14 +46,18 @@ type testSource struct { sequenceStartedWith string } -func (ts *testSource) Validate(ctx context.Context, config *testESConfig) error { - if config.Config1 == "" { - return fmt.Errorf("config1 missing") +func (ts *testSource) Validate(ctx context.Context, conf *GenericEventStream) error { + if conf.Name != nil && *conf.Name == "validator_fail_instruction" { + return fmt.Errorf("validator_failed") } return nil } -func (ts *testSource) Run(ctx context.Context, spec *EventStreamSpec[testESConfig], checkpointSequenceID string, deliver Deliver[testData]) error { +func (ts *testSource) WithRuntimeStatus(spec *GenericEventStream, status EventStreamStatus, stats *EventStreamStatistics) *GenericEventStream { + return spec.WithRuntimeStatus(status, stats) +} + +func (ts *testSource) Run(ctx context.Context, spec *GenericEventStream, checkpointSequenceID string, deliver Deliver[testData]) error { msgNumber := 0 ts.startCount++ ts.sequenceStartedWith = checkpointSequenceID @@ -113,15 +104,16 @@ func TestE2E_DeliveryWebSockets(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) + mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) assert.NoError(t, err) // Create a stream to sub-select one topic - es1 := &EventStreamSpec[testESConfig]{ - TopicFilter: ptrTo("topic_1"), // only one of the topics - Type: &EventStreamTypeWebSocket, - BatchSize: ptrTo(10), - Config: &testESConfig{Config1: "1111"}, + es1 := &GenericEventStream{ + Type: &EventStreamTypeWebSocket, + EventStreamSpecFields: EventStreamSpecFields{ + TopicFilter: ptrTo("topic_1"), // only one of the topics + BatchSize: ptrTo(10), + }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) assert.NoError(t, err) @@ -161,15 +153,16 @@ func TestE2E_DeliveryWebSocketsNack(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) + mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) assert.NoError(t, err) // Create a stream to sub-select one topic - es1 := &EventStreamSpec[testESConfig]{ - TopicFilter: ptrTo("topic_1"), // only one of the topics - Type: &EventStreamTypeWebSocket, - BatchSize: ptrTo(10), - Config: &testESConfig{Config1: "1111"}, + es1 := &GenericEventStream{ + Type: &EventStreamTypeWebSocket, + EventStreamSpecFields: EventStreamSpecFields{ + TopicFilter: ptrTo("topic_1"), // only one of the topics + BatchSize: ptrTo(10), + }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) assert.NoError(t, err) @@ -206,15 +199,16 @@ func TestE2E_WebsocketDeliveryRestartReset(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) + mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) assert.NoError(t, err) // Create a stream to sub-select one topic - es1 := &EventStreamSpec[testESConfig]{ - TopicFilter: ptrTo("topic_1"), // only one of the topics - Type: &EventStreamTypeWebSocket, - BatchSize: ptrTo(10), - Config: &testESConfig{Config1: "1111"}, + es1 := &GenericEventStream{ + Type: &EventStreamTypeWebSocket, + EventStreamSpecFields: EventStreamSpecFields{ + TopicFilter: ptrTo("topic_1"), // only one of the topics + BatchSize: ptrTo(10), + }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) assert.NoError(t, err) @@ -232,7 +226,7 @@ func TestE2E_WebsocketDeliveryRestartReset(t *testing.T) { assert.Equal(t, 1, ts.startCount) // Wait for the checkpoint to be reflected in the status - var ess *EventStreamWithStatus[testESConfig] + var ess *GenericEventStream for ess == nil || ess.Statistics == nil || ess.Statistics.Checkpoint == "" { time.Sleep(1 * time.Millisecond) ess, err = mgr.GetStreamByID(ctx, es1.GetID()) @@ -251,7 +245,7 @@ func TestE2E_WebsocketDeliveryRestartReset(t *testing.T) { assert.Equal(t, 2, ts.startCount) // Reset it and check we get the reset - err = mgr.ResetStream(ctx, es1.GetID(), "first") + err = mgr.ResetStream(ctx, es1.GetID(), ptrTo("first")) assert.NoError(t, err) wsReceiveAck(ctx, t, wsc, func(batch *EventBatch[testData]) {}) assert.Equal(t, "first", ts.sequenceStartedWith) @@ -266,7 +260,7 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) + mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) assert.NoError(t, err) got100 := make(chan struct{}) @@ -298,11 +292,12 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) { defer whServer.Close() // Create a stream to sub-select one topic - es1 := &EventStreamSpec[testESConfig]{ - TopicFilter: ptrTo("topic_1"), // only one of the topics - Type: &EventStreamTypeWebhook, - BatchSize: ptrTo(10), - Config: &testESConfig{Config1: "1111"}, + es1 := &GenericEventStream{ + Type: &EventStreamTypeWebhook, + EventStreamSpecFields: EventStreamSpecFields{ + TopicFilter: ptrTo("topic_1"), // only one of the topics + BatchSize: ptrTo(10), + }, Webhook: &WebhookConfig{ URL: ptrTo(whServer.URL + "/some/path"), Method: ptrTo("PUT"), @@ -336,7 +331,7 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) { ts := &testSource{started: make(chan struct{})} close(ts.started) // start delivery immediately - will block as no WS connected - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) + mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) assert.NoError(t, err) gotFiveTimes := make(chan struct{}) @@ -367,11 +362,12 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) { defer whServer.Close() // Create a stream to sub-select one topic - es1 := &EventStreamSpec[testESConfig]{ - TopicFilter: ptrTo("topic_1"), // only one of the topics - Type: &EventStreamTypeWebhook, - BatchSize: ptrTo(10), - Config: &testESConfig{Config1: "1111"}, + es1 := &GenericEventStream{ + Type: &EventStreamTypeWebhook, + EventStreamSpecFields: EventStreamSpecFields{ + TopicFilter: ptrTo("topic_1"), // only one of the topics + BatchSize: ptrTo(10), + }, Webhook: &WebhookConfig{ URL: ptrTo(whServer.URL + "/some/path"), Method: ptrTo("PUT"), @@ -405,15 +401,14 @@ func TestE2E_CRUDLifecycle(t *testing.T) { started: make(chan struct{}), // we never start it } - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), p, wss, ts) + mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) assert.NoError(t, err) // Create first event stream started - es1 := &EventStreamSpec[testESConfig]{ - TopicFilter: ptrTo("topic1"), // only one of the topics - Type: &EventStreamTypeWebSocket, - Config: &testESConfig{ - Config1: "confValue1", + es1 := &GenericEventStream{ + Type: &EventStreamTypeWebSocket, + EventStreamSpecFields: EventStreamSpecFields{ + TopicFilter: ptrTo("topic1"), // only one of the topics }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) @@ -421,12 +416,11 @@ func TestE2E_CRUDLifecycle(t *testing.T) { assert.True(t, created) // Create second event stream stopped - es2 := &EventStreamSpec[testESConfig]{ - TopicFilter: ptrTo("topic2"), // only one of the topics - Type: &EventStreamTypeWebSocket, - Status: &EventStreamStatusStopped, - Config: &testESConfig{ - Config1: "confValue2", + es2 := &GenericEventStream{ + Type: &EventStreamTypeWebSocket, + EventStreamSpecFields: EventStreamSpecFields{ + TopicFilter: ptrTo("topic2"), // only one of the topics + Status: &EventStreamStatusStopped, }, } created, err = mgr.UpsertStream(ctx, "stream2", es2) @@ -434,20 +428,19 @@ func TestE2E_CRUDLifecycle(t *testing.T) { assert.True(t, created) // Find the second one by topic filter - esList, _, err := mgr.ListStreams(ctx, EventStreamFilters.NewFilter(ctx).Eq("topicfilter", "topic2")) + esList, _, err := mgr.ListStreams(ctx, GenericEventStreamFilters.NewFilter(ctx).Eq("topicfilter", "topic2")) assert.NoError(t, err) assert.Len(t, esList, 1) assert.Equal(t, "stream2", *esList[0].Name) assert.Equal(t, "topic2", *esList[0].TopicFilter) assert.Equal(t, 50, *esList[0].BatchSize) // picked up default when it was loaded - assert.Equal(t, "confValue2", esList[0].Config.Config1) - assert.Equal(t, EventStreamStatusStopped, esList[0].Status) + assert.Equal(t, EventStreamStatusStopped, *esList[0].Status) // Get the first by ID es1c, err := mgr.GetStreamByID(ctx, es1.GetID(), dbsql.FailIfNotFound) assert.NoError(t, err) assert.Equal(t, "stream1", *es1c.Name) - assert.Equal(t, EventStreamStatusStarted, es1c.Status) + assert.Equal(t, EventStreamStatusStarted, *es1c.Status) // Rename second event stream es2.Name = ptrTo("stream2a") @@ -460,13 +453,13 @@ func TestE2E_CRUDLifecycle(t *testing.T) { assert.NoError(t, err) es2c, err := mgr.GetStreamByID(ctx, es2.GetID(), dbsql.FailIfNotFound) assert.NoError(t, err) - assert.Equal(t, EventStreamStatusStarted, es2c.Status) + assert.Equal(t, EventStreamStatusStarted, *es2c.Status) assert.Equal(t, "stream2a", *es2c.Name) err = mgr.StopStream(ctx, es2.GetID()) assert.NoError(t, err) es2c, err = mgr.GetStreamByID(ctx, es2.GetID(), dbsql.FailIfNotFound) assert.NoError(t, err) - assert.Equal(t, EventStreamStatusStopped, es2c.Status) + assert.Equal(t, EventStreamStatusStopped, *es2c.Status) err = mgr.DeleteStream(ctx, es2.GetID()) assert.NoError(t, err) @@ -475,7 +468,7 @@ func TestE2E_CRUDLifecycle(t *testing.T) { assert.NoError(t, err) // Check no streams left - esList, _, err = mgr.ListStreams(ctx, EventStreamFilters.NewFilter(ctx).And()) + esList, _, err = mgr.ListStreams(ctx, GenericEventStreamFilters.NewFilter(ctx).And()) assert.NoError(t, err) assert.Empty(t, esList) @@ -501,7 +494,7 @@ func wsReceiveNack(ctx context.Context, t *testing.T, wsc wsclient.WSClient, cb assert.NoError(t, err) } -func setupE2ETest(t *testing.T, extraSetup ...func()) (context.Context, Persistence[testESConfig], wsserver.WebSocketChannels, wsclient.WSClient, func()) { +func setupE2ETest(t *testing.T, extraSetup ...func()) (context.Context, Persistence[*GenericEventStream], wsserver.WebSocketChannels, wsclient.WSClient, func()) { logrus.SetLevel(logrus.TraceLevel) ctx := context.Background() @@ -527,7 +520,7 @@ func setupE2ETest(t *testing.T, extraSetup ...func()) (context.Context, Persiste db, err := dbsql.NewSQLiteProvider(ctx, dbConf) assert.NoError(t, err) - p := NewEventStreamPersistence[testESConfig](db, dbsql.UUIDValidator) + p := NewGenericEventStreamPersistence(db, dbsql.UUIDValidator) p.EventStreams().Validate() p.Checkpoints().Validate() diff --git a/pkg/eventstreams/eventstreams.go b/pkg/eventstreams/eventstreams.go index 9278063..105731d 100644 --- a/pkg/eventstreams/eventstreams.go +++ b/pkg/eventstreams/eventstreams.go @@ -23,6 +23,7 @@ import ( "regexp" "sync" + "github.com/hyperledger/firefly-common/pkg/dbsql" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" @@ -77,17 +78,21 @@ type DBSerializable interface { driver.Valuer } -type EventStreamSpec[CT any] struct { - ID *string `ffstruct:"eventstream" json:"id"` - Created *fftypes.FFTime `ffstruct:"eventstream" json:"created"` - Updated *fftypes.FFTime `ffstruct:"eventstream" json:"updated"` +type EventStreamSpec interface { + dbsql.Resource + SetID(s string) + ESFields() *EventStreamSpecFields + ESType() EventStreamType // separated from fields to allow choice on restrictions + WebhookConf() *WebhookConfig // can return nil if Webhooks not supported + WebSocketConf() *WebSocketConfig // can return nil if WebSockets not supported + IsNil() bool // needed as quirk of using interfaces with generics +} + +type EventStreamSpecFields struct { Name *string `ffstruct:"eventstream" json:"name,omitempty"` Status *EventStreamStatus `ffstruct:"eventstream" json:"status,omitempty"` - Type *EventStreamType `ffstruct:"eventstream" json:"type,omitempty" ffenum:"estype"` - InitialSequenceID *string `ffstruct:"eventstream" json:"initialSequenceID,omitempty"` + InitialSequenceID *string `ffstruct:"eventstream" json:"initialSequenceId,omitempty"` TopicFilter *string `ffstruct:"eventstream" json:"topicFilter,omitempty"` - Identity *string `ffstruct:"eventstream" json:"identity,omitempty"` - Config *CT `ffstruct:"eventstream" json:"config,omitempty"` ErrorHandling *ErrorHandlingType `ffstruct:"eventstream" json:"errorHandling"` BatchSize *int `ffstruct:"eventstream" json:"batchSize"` @@ -95,27 +100,9 @@ type EventStreamSpec[CT any] struct { RetryTimeout *fftypes.FFDuration `ffstruct:"eventstream" json:"retryTimeout"` BlockedRetryDelay *fftypes.FFDuration `ffstruct:"eventstream" json:"blockedRetryDelay"` - Webhook *WebhookConfig `ffstruct:"eventstream" json:"webhook,omitempty"` - WebSocket *WebSocketConfig `ffstruct:"eventstream" json:"websocket,omitempty"` - topicFilterRegexp *regexp.Regexp } -func (esc *EventStreamSpec[CT]) GetID() string { - if esc.ID == nil { - return "" - } - return *esc.ID -} - -func (esc *EventStreamSpec[CT]) SetCreated(t *fftypes.FFTime) { - esc.Created = t -} - -func (esc *EventStreamSpec[CT]) SetUpdated(t *fftypes.FFTime) { - esc.Updated = t -} - type EventStreamStatistics struct { StartTime *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"startTime"` LastDispatchTime *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"lastDispatchTime"` @@ -128,12 +115,6 @@ type EventStreamStatistics struct { Checkpoint string `ffstruct:"EventStreamStatistics" json:"checkpoint"` } -type EventStreamWithStatus[CT any] struct { - *EventStreamSpec[CT] - Status EventStreamStatus `ffstruct:"EventStream" json:"status"` - Statistics *EventStreamStatistics `ffstruct:"EventStream" json:"statistics,omitempty"` -} - type EventStreamCheckpoint struct { ID *string `ffstruct:"EventStreamCheckpoint" json:"id"` Created *fftypes.FFTime `ffstruct:"EventStreamCheckpoint" json:"created"` @@ -181,7 +162,8 @@ func checkSet[T any](ctx context.Context, storeDefaults bool, fieldName string, // Optionally it stores the defaults back on the structure, to ensure no nil fields. // - When using at runtime: true, so later code doesn't need to worry about nil checks / defaults // - When storing to the DB: false, so defaults can be applied dynamically -func (esm *esManager[CT, DT]) validateStream(ctx context.Context, esc *EventStreamSpec[CT], phase LifecyclePhase) (factory DispatcherFactory[CT, DT], err error) { +func (esm *esManager[CT, DT]) validateStream(ctx context.Context, spec CT, phase LifecyclePhase) (factory DispatcherFactory[CT, DT], err error) { + esc := spec.ESFields() if esc.Name == nil { return nil, i18n.NewError(ctx, i18n.MsgMissingRequiredField, "name") } @@ -192,7 +174,7 @@ func (esm *esManager[CT, DT]) validateStream(ctx context.Context, esc *EventStre } } defaults := esm.config.Defaults - if err := esm.runtime.Validate(ctx, esc.Config); err != nil { + if err := esm.runtime.Validate(ctx, spec); err != nil { return nil, err } err = fftypes.ValidateFFNameField(ctx, *esc.Name, "name") @@ -215,27 +197,30 @@ func (esm *esManager[CT, DT]) validateStream(ctx context.Context, esc *EventStre if err == nil { err = checkSet(ctx, setDefaults, "errorHandling", &esc.ErrorHandling, defaults.ErrorHandling, func(v fftypes.FFEnum) bool { return fftypes.FFEnumValid(ctx, "ehtype", v) }) } + esType := spec.ESType() if err == nil { - err = checkSet(ctx, true /* type always applied */, "type", &esc.Type, EventStreamTypeWebSocket, func(v fftypes.FFEnum) bool { return fftypes.FFEnumValid(ctx, "estype", v) }) + if !fftypes.FFEnumValid(ctx, "estype", esType) { + err = i18n.NewError(ctx, i18n.MsgInvalidValue, esType, "type") + } } if err != nil { return nil, err } - factory = esm.dispatchers[*esc.Type] + factory = esm.dispatchers[esType] if factory == nil { - return nil, i18n.NewError(ctx, i18n.MsgESInvalidType, *esc.Type) + return nil, i18n.NewError(ctx, i18n.MsgESInvalidType, esType) } - err = factory.Validate(ctx, &esm.config, esc, esm.tlsConfigs, phase) + err = factory.Validate(ctx, &esm.config, spec, esm.tlsConfigs, phase) if err != nil { return nil, err } return factory, nil } -type eventStream[CT any, DT any] struct { +type eventStream[CT EventStreamSpec, DT any] struct { bgCtx context.Context esm *esManager[CT, DT] - spec *EventStreamSpec[CT] + spec CT mux sync.Mutex action Dispatcher[DT] activeState *activeStream[CT, DT] @@ -247,31 +232,31 @@ type eventStream[CT any, DT any] struct { type EventStreamActions[CT any] interface { Start(ctx context.Context) error Stop(ctx context.Context) error - Status(ctx context.Context) *EventStreamWithStatus[CT] + Status(ctx context.Context) CT } func (esm *esManager[CT, DT]) initEventStream( - bgCtx context.Context, - spec *EventStreamSpec[CT], + spec CT, ) (es *eventStream[CT, DT], err error) { // Validate - factory, err := esm.validateStream(bgCtx, spec, LifecyclePhaseStarting) + factory, err := esm.validateStream(esm.bgCtx, spec, LifecyclePhaseStarting) if err != nil { return nil, err } + streamCtx := log.WithLogField(esm.bgCtx, "eventstream", *spec.ESFields().Name) es = &eventStream[CT, DT]{ - bgCtx: log.WithLogField(bgCtx, "eventstream", *spec.Name), + bgCtx: streamCtx, esm: esm, spec: spec, persistence: esm.persistence, retry: esm.config.Retry, } - es.action = factory.NewDispatcher(es.bgCtx, &esm.config, spec) + es.action = factory.NewDispatcher(streamCtx, &esm.config, spec) log.L(es.bgCtx).Infof("Initialized Event Stream") - if *spec.Status == EventStreamStatusStarted { + if *spec.ESFields().Status == EventStreamStatusStarted { // Start up the stream es.ensureActive() } @@ -290,7 +275,7 @@ func (es *eventStream[CT, DT]) requestStop(ctx context.Context) chan struct{} { if es.stopping != nil { return es.stopping } - persistedStatus := *es.spec.Status + persistedStatus := *es.spec.ESFields().Status // Cancel the active context, and create the stopping task activeState.cancelCtx() closedWhenStopped := make(chan struct{}) @@ -321,7 +306,7 @@ func (es *eventStream[CT, DT]) checkSetStatus(ctx context.Context, targetStatus transition := func(runtime, persited EventStreamStatus) { newRuntimeStatus = runtime - es.spec.Status = &persited + es.spec.ESFields().Status = &persited changeToPersist = &persited } @@ -331,7 +316,7 @@ func (es *eventStream[CT, DT]) checkSetStatus(ctx context.Context, targetStatus } // Check valid state transitions based on the persisted status, and whether we are stopping - switch *es.spec.Status { + switch *es.spec.ESFields().Status { case EventStreamStatusDeleted: newRuntimeStatus = EventStreamStatusDeleted if es.stopping != nil { @@ -380,7 +365,7 @@ func (es *eventStream[CT, DT]) checkSetStatus(ctx context.Context, targetStatus } func (es *eventStream[CT, DT]) persistStatus(ctx context.Context, targetStatus EventStreamStatus) error { - fb := EventStreamFilters.NewUpdate(ctx) + fb := GenericEventStreamFilters.NewUpdate(ctx) return es.esm.persistence.EventStreams().Update(ctx, es.spec.GetID(), fb.Set("status", targetStatus)) } @@ -443,11 +428,7 @@ func (es *eventStream[CT, DT]) start(ctx context.Context) error { return nil } -func (es *eventStream[CT, DT]) Status(ctx context.Context) *EventStreamWithStatus[CT] { +func (es *eventStream[CT, DT]) WithStatus(ctx context.Context) CT { status, _, statistics, _ := es.checkSetStatus(ctx, nil) - return &EventStreamWithStatus[CT]{ - EventStreamSpec: es.spec, - Status: status, - Statistics: statistics, - } + return es.esm.runtime.WithRuntimeStatus(es.spec, status, statistics) } diff --git a/pkg/eventstreams/eventstreams_test.go b/pkg/eventstreams/eventstreams_test.go index 8e284a5..a4f4da9 100644 --- a/pkg/eventstreams/eventstreams_test.go +++ b/pkg/eventstreams/eventstreams_test.go @@ -21,21 +21,24 @@ import ( "fmt" "testing" + "github.com/hyperledger/firefly-common/pkg/dbsql" "github.com/hyperledger/firefly-common/pkg/ffapi" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) -func newTestEventStream(t *testing.T, extraSetup ...func(mdb *mockPersistence)) (context.Context, *eventStream[testESConfig, testData], *mockEventSource, func()) { +func newTestEventStream(t *testing.T, extraSetup ...func(mdb *mockPersistence)) (context.Context, *eventStream[*GenericEventStream, testData], *mockEventSource, func()) { extraSetup = append(extraSetup, func(mdb *mockPersistence) { - mdb.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil) + mdb.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil) }) ctx, mgr, mes, done := newMockESManager(t, extraSetup...) - es, err := mgr.initEventStream(ctx, &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo(t.Name()), - Status: ptrTo(EventStreamStatusStopped), + es, err := mgr.initEventStream(&GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo(t.Name()), + Status: ptrTo(EventStreamStatusStopped), + }, }) assert.NoError(t, err) @@ -44,8 +47,8 @@ func newTestEventStream(t *testing.T, extraSetup ...func(mdb *mockPersistence)) func TestEventStreamFields(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, } assert.Equal(t, es.GetID(), es.GetID()) t1 := fftypes.Now() @@ -123,7 +126,7 @@ func TestValidate(t *testing.T) { ctx, es, _, done := newTestEventStream(t) done() - es.spec = &EventStreamSpec[testESConfig]{} + es.spec = &GenericEventStream{} _, err := es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00112", err) @@ -131,12 +134,12 @@ func TestValidate(t *testing.T) { _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.NoError(t, err) - es.esm.runtime.(*mockEventSource).validate = func(ctx context.Context, conf *testESConfig) error { + es.esm.runtime.(*mockEventSource).validate = func(ctx context.Context, conf *GenericEventStream) error { return fmt.Errorf("pop") } _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "pop", err) - es.esm.runtime.(*mockEventSource).validate = func(ctx context.Context, conf *testESConfig) error { return nil } + es.esm.runtime.(*mockEventSource).validate = func(ctx context.Context, conf *GenericEventStream) error { return nil } es.spec.TopicFilter = ptrTo("((((!Bad Regexp[") _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) @@ -158,7 +161,7 @@ func TestValidate(t *testing.T) { _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00216", err) - _, err = es.esm.initEventStream(ctx, es.spec) + _, err = es.esm.initEventStream(es.spec) assert.Regexp(t, "FF00216", err) customType := fftypes.FFEnumValue("estype", "custom1") @@ -166,7 +169,7 @@ func TestValidate(t *testing.T) { _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00217", err) - _, err = es.esm.initEventStream(ctx, es.spec) + _, err = es.esm.initEventStream(es.spec) assert.Regexp(t, "FF00217", err) } @@ -175,7 +178,7 @@ func TestRequestStopAlreadyStopping(t *testing.T) { ctx, es, _, done := newTestEventStream(t) defer done() - es.activeState = &activeStream[testESConfig, testData]{} + es.activeState = &activeStream[*GenericEventStream, testData]{} es.stopping = make(chan struct{}) s := es.requestStop(ctx) close(s) @@ -190,7 +193,7 @@ func TestRequestStopPersistFail(t *testing.T) { defer done() es.spec.Status = ptrTo(EventStreamStatusDeleted) - as := &activeStream[testESConfig, testData]{ + as := &activeStream[*GenericEventStream, testData]{ eventLoopDone: make(chan struct{}), batchLoopDone: make(chan struct{}), } @@ -291,7 +294,7 @@ func TestSuspendTimeout(t *testing.T) { ctx, es, _, done := newTestEventStream(t) done() - es.activeState = &activeStream[testESConfig, testData]{} + es.activeState = &activeStream[*GenericEventStream, testData]{} es.stopping = make(chan struct{}) err := es.suspend(ctx) assert.Regexp(t, "FF00229", err) @@ -299,10 +302,10 @@ func TestSuspendTimeout(t *testing.T) { } func TestGetIDNil(t *testing.T) { - assert.Empty(t, (&EventStreamSpec[testESConfig]{}).GetID()) + assert.Empty(t, (&GenericEventStream{}).GetID()) assert.Empty(t, (&EventStreamCheckpoint{}).GetID()) } func TestCheckDocs(t *testing.T) { - ffapi.CheckObjectDocumented(&EventStreamWithStatus[struct{}]{}) + ffapi.CheckObjectDocumented(&GenericEventStream{}) } diff --git a/pkg/eventstreams/manager.go b/pkg/eventstreams/manager.go index bf114d4..71dc19f 100644 --- a/pkg/eventstreams/manager.go +++ b/pkg/eventstreams/manager.go @@ -19,7 +19,6 @@ package eventstreams import ( "context" "crypto/tls" - "fmt" "sync" "github.com/hyperledger/firefly-common/pkg/dbsql" @@ -30,14 +29,14 @@ import ( "github.com/hyperledger/firefly-common/pkg/wsserver" ) -type Manager[CT any] interface { - UpsertStream(ctx context.Context, nameOrID string, esSpec *EventStreamSpec[CT]) (bool, error) - GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (*EventStreamWithStatus[CT], error) - GetStreamByNameOrID(ctx context.Context, nameOrID string, opts ...dbsql.GetOption) (*EventStreamWithStatus[CT], error) - ListStreams(ctx context.Context, filter ffapi.Filter) ([]*EventStreamWithStatus[CT], *ffapi.FilterResult, error) +type Manager[CT EventStreamSpec] interface { + UpsertStream(ctx context.Context, nameOrID string, esSpec CT) (bool, error) + GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (CT, error) + GetStreamByNameOrID(ctx context.Context, nameOrID string, opts ...dbsql.GetOption) (CT, error) + ListStreams(ctx context.Context, filter ffapi.Filter) ([]CT, *ffapi.FilterResult, error) StopStream(ctx context.Context, nameOrID string) error StartStream(ctx context.Context, nameOrID string) error - ResetStream(ctx context.Context, nameOrID string, sequenceID string) error + ResetStream(ctx context.Context, nameOrID string, sequenceID *string, preStartCallbacks ...func(ctx context.Context, spec CT) error) error DeleteStream(ctx context.Context, nameOrID string) error Close(ctx context.Context) } @@ -55,11 +54,13 @@ type Deliver[DT any] func(events []*Event[DT]) SourceInstruction // Generics: // - ConfigType is the Configuration Type - the custom extensions to the configuration schema // - DataType is the Data Type - the payload type that will be delivered to the application -type Runtime[ConfigType any, DataType any] interface { +type Runtime[ConfigType EventStreamSpec, DataType any] interface { // Generate a new unique resource ID (such as a UUID) NewID() string + // Return a COPY of the config object with runtime status and statistics (runtime enrichment) + WithRuntimeStatus(spec ConfigType, status EventStreamStatus, stats *EventStreamStatistics) ConfigType // Type specific config validation goes here - Validate(ctx context.Context, config *ConfigType) error + Validate(ctx context.Context, config ConfigType) error // The run function should execute in a loop detecting events until instructed to stop: // - The Run function should block when no events are available // - Must detect if the context is closed (see below) @@ -70,10 +71,11 @@ type Runtime[ConfigType any, DataType any] interface { // 1. In any blocking i/o functions // 2. To wake any sleeps early, such as batch polling scenarios // - If the function returns without an Exit instruction, it will be restarted from the last checkpoint - Run(ctx context.Context, spec *EventStreamSpec[ConfigType], checkpointSequenceID string, deliver Deliver[DataType]) error + Run(ctx context.Context, spec ConfigType, checkpointSequenceID string, deliver Deliver[DataType]) error } -type esManager[CT any, DT any] struct { +type esManager[CT EventStreamSpec, DT any] struct { + bgCtx context.Context config Config[CT, DT] mux sync.Mutex streams map[string]*eventStream[CT, DT] @@ -84,12 +86,8 @@ type esManager[CT any, DT any] struct { dispatchers map[EventStreamType]DispatcherFactory[CT, DT] } -func NewEventStreamManager[CT any, DT any](ctx context.Context, config *Config[CT, DT], p Persistence[CT], wsChannels wsserver.WebSocketChannels, source Runtime[CT, DT]) (es Manager[CT], err error) { +func NewEventStreamManager[CT EventStreamSpec, DT any](ctx context.Context, config *Config[CT, DT], p Persistence[CT], wsChannels wsserver.WebSocketChannels, source Runtime[CT, DT]) (es Manager[CT], err error) { - var confExample interface{} = new(CT) - if _, isDBSerializable := (confExample).(DBSerializable); !isDBSerializable { - panic(fmt.Sprintf("Config type must be DB serializable: %T", confExample)) - } if config.Retry == nil { return nil, i18n.NewError(ctx, i18n.MsgESConfigNotInitialized) } @@ -103,6 +101,7 @@ func NewEventStreamManager[CT any, DT any](ctx context.Context, config *Config[C } } esm := &esManager[CT, DT]{ + bgCtx: ctx, config: *config, tlsConfigs: tlsConfigs, runtime: source, @@ -123,7 +122,7 @@ func NewEventStreamManager[CT any, DT any](ctx context.Context, config *Config[C } func (esm *esManager[CT, DT]) addStream(ctx context.Context, es *eventStream[CT, DT]) { - log.L(ctx).Infof("Adding stream '%s' [%s] (%s)", *es.spec.Name, es.spec.GetID(), es.Status(ctx).Status) + log.L(ctx).Infof("Adding stream '%s' [%s] (%s)", *es.spec.ESFields().Name, es.spec.GetID(), es.WithStatus(ctx).ESFields().Status) esm.mux.Lock() defer esm.mux.Unlock() esm.streams[es.spec.GetID()] = es @@ -136,11 +135,11 @@ func (esm *esManager[CT, DT]) getStream(id string) *eventStream[CT, DT] { } func (esm *esManager[CT, DT]) getStreamByNameOrID(ctx context.Context, nameOrID string) (*eventStream[CT, DT], error) { - stream, err := esm.GetStreamByNameOrID(ctx, nameOrID) + stream, err := esm.GetStreamByNameOrID(ctx, nameOrID, dbsql.FailIfNotFound) if err != nil { return nil, err } - es := esm.getStream(*stream.ID) + es := esm.getStream(stream.GetID()) if es == nil { return nil, i18n.NewError(ctx, i18n.Msg404NoResult) } @@ -157,7 +156,7 @@ func (esm *esManager[CT, DT]) initialize(ctx context.Context) error { const pageSize = 25 var skip uint64 for { - fb := EventStreamFilters.NewFilter(ctx) + fb := GenericEventStreamFilters.NewFilter(ctx) streams, _, err := esm.persistence.EventStreams().GetMany(ctx, fb.And().Skip(skip).Limit(pageSize)) if err != nil { return err @@ -166,12 +165,12 @@ func (esm *esManager[CT, DT]) initialize(ctx context.Context) error { break } for _, esSpec := range streams { - if *esSpec.Status == EventStreamStatusDeleted { + if *esSpec.ESFields().Status == EventStreamStatusDeleted { if err := esm.persistence.EventStreams().Delete(ctx, esSpec.GetID()); err != nil { return err } } else { - es, err := esm.initEventStream(ctx, esSpec) + es, err := esm.initEventStream(esSpec) if err != nil { return err } @@ -183,13 +182,14 @@ func (esm *esManager[CT, DT]) initialize(ctx context.Context) error { return nil } -func (esm *esManager[CT, DT]) UpsertStream(ctx context.Context, nameOrID string, esSpec *EventStreamSpec[CT]) (bool, error) { +func (esm *esManager[CT, DT]) UpsertStream(ctx context.Context, nameOrID string, spec CT) (bool, error) { + esSpec := spec.ESFields() validID := nameOrID != "" && esm.persistence.IDValidator()(ctx, nameOrID) == nil var existing *eventStream[CT, DT] if validID { // Updating by ID - esSpec.ID = &nameOrID + spec.SetID(nameOrID) } else if nameOrID != "" { // Upserting by name existingNamed, err := esm.persistence.EventStreams().GetByName(ctx, nameOrID) @@ -197,18 +197,18 @@ func (esm *esManager[CT, DT]) UpsertStream(ctx context.Context, nameOrID string, return false, err } // If it exists, then we're updating by ID now - if existingNamed != nil { - esSpec.ID = existingNamed.ID + if !existingNamed.IsNil() { + spec.SetID(existingNamed.GetID()) } } if (esSpec.Name == nil || len(*esSpec.Name) == 0) && len(nameOrID) > 0 { esSpec.Name = &nameOrID } - if esSpec.ID == nil || len(*esSpec.ID) == 0 { - esSpec.ID = ptrTo(esm.runtime.NewID()) + if spec.GetID() == "" { + spec.SetID(esm.runtime.NewID()) } else { - existing = esm.getStream(esSpec.GetID()) + existing = esm.getStream(spec.GetID()) } // Only statuses that can be asserted externally are started/stopped @@ -222,30 +222,31 @@ func (esm *esManager[CT, DT]) UpsertStream(ctx context.Context, nameOrID string, // Do a validation that does NOT update the defaults into the structure, so that // the defaults are not persisted into the DB. This means that if the defaults are // changed then any streams with nil fields will pick up the new defaults. - if _, err := esm.validateStream(ctx, esSpec, LifecyclePhasePreInsertValidation); err != nil { + if _, err := esm.validateStream(ctx, spec, LifecyclePhasePreInsertValidation); err != nil { return false, err } - isNew, err := esm.persistence.EventStreams().Upsert(ctx, esSpec, dbsql.UpsertOptimizationExisting) + isNew, err := esm.persistence.EventStreams().Upsert(ctx, spec, dbsql.UpsertOptimizationExisting) if err != nil { return false, err } - return isNew, esm.reInit(ctx, esSpec, existing) + return isNew, esm.reInit(ctx, spec, existing) } -func (esm *esManager[CT, DT]) reInit(ctx context.Context, esSpec *EventStreamSpec[CT], existing *eventStream[CT, DT]) error { +func (esm *esManager[CT, DT]) reInit(ctx context.Context, spec CT, existing *eventStream[CT, DT]) error { // Runtime handling now the DB is updated if existing != nil { if err := existing.suspend(ctx); err != nil { return err } } - es, err := esm.initEventStream(ctx, esSpec) + // initializing the event stream happens outside of this context (must be long lived) + es, err := esm.initEventStream(spec) if err != nil { return err } esm.addStream(ctx, es) - if *es.spec.Status == EventStreamStatusStarted { + if *es.spec.ESFields().Status == EventStreamStatusStarted { es.ensureActive() } return nil @@ -263,7 +264,7 @@ func (esm *esManager[CT, DT]) DeleteStream(ctx context.Context, nameOrID string) if err := esm.persistence.EventStreams().Delete(ctx, nameOrID); err != nil { return err } - esm.removeStream(*es.spec.ID) + esm.removeStream(es.spec.GetID()) return nil } @@ -275,7 +276,7 @@ func (esm *esManager[CT, DT]) StopStream(ctx context.Context, nameOrID string) e return es.stop(ctx) } -func (esm *esManager[CT, DT]) ResetStream(ctx context.Context, nameOrID string, sequenceID string) error { +func (esm *esManager[CT, DT]) ResetStream(ctx context.Context, nameOrID string, sequenceID *string, preStartCallbacks ...func(ctx context.Context, spec CT) error) error { es, err := esm.getStreamByNameOrID(ctx, nameOrID) if err != nil { return err @@ -284,20 +285,30 @@ func (esm *esManager[CT, DT]) ResetStream(ctx context.Context, nameOrID string, if err := es.suspend(ctx); err != nil { return err } + esSpec := es.spec.ESFields() // delete any existing checkpoint - if err := esm.persistence.Checkpoints().DeleteMany(ctx, CheckpointFilters.NewFilter(ctx).Eq("id", *es.spec.ID)); err != nil { + if err := esm.persistence.Checkpoints().DeleteMany(ctx, CheckpointFilters.NewFilter(ctx).Eq("id", es.spec.GetID())); err != nil { return err } // store the initial_sequence_id back to the object, and update our in-memory record - es.spec.InitialSequenceID = &sequenceID - if err := esm.persistence.EventStreams().UpdateSparse(ctx, &EventStreamSpec[CT]{ - ID: es.spec.ID, - InitialSequenceID: &sequenceID, - }); err != nil { - return err + if sequenceID != nil { + esSpec.InitialSequenceID = sequenceID + + if err := esm.persistence.EventStreams().Update(ctx, es.spec.GetID(), + esm.persistence.EventStreams().GetQueryFactory().NewUpdate(ctx).Set("initialsequenceid", *sequenceID), + ); err != nil { + return err + } + } + + // Plug point for logic that might need to do other custom reset logic before restart + for _, cb := range preStartCallbacks { + if err := cb(ctx, es.spec); err != nil { + return err + } } // if the spec status is running, restart it - if *es.spec.Status == EventStreamStatusStarted { + if *esSpec.Status == EventStreamStatusStarted { return es.start(ctx) } return nil @@ -311,33 +322,30 @@ func (esm *esManager[CT, DT]) StartStream(ctx context.Context, nameOrID string) return es.start(ctx) } -func (esm *esManager[CT, DT]) enrichGetStream(ctx context.Context, esSpec *EventStreamSpec[CT]) *EventStreamWithStatus[CT] { +func (esm *esManager[CT, DT]) enrichGetStream(ctx context.Context, esSpec CT) CT { // Grab the live status if es := esm.getStream(esSpec.GetID()); es != nil { - return es.Status(ctx) + return es.WithStatus(ctx) } // Fallback to unknown status rather than failing log.L(ctx).Errorf("No in-memory state for stream '%s'", esSpec.GetID()) - return &EventStreamWithStatus[CT]{ - EventStreamSpec: esSpec, - Status: EventStreamStatusUnknown, - } + return esm.runtime.WithRuntimeStatus(esSpec, EventStreamStatusUnknown, nil) } -func (esm *esManager[CT, DT]) ListStreams(ctx context.Context, filter ffapi.Filter) ([]*EventStreamWithStatus[CT], *ffapi.FilterResult, error) { +func (esm *esManager[CT, DT]) ListStreams(ctx context.Context, filter ffapi.Filter) ([]CT, *ffapi.FilterResult, error) { results, fr, err := esm.persistence.EventStreams().GetMany(ctx, filter) if err != nil { return nil, nil, err } // Fill in live status and defaults - enriched := make([]*EventStreamWithStatus[CT], len(results)) + enriched := make([]CT, len(results)) for i, esSpec := range results { enriched[i] = esm.enrichGetStream(ctx, esSpec) } return enriched, fr, err } -func (esm *esManager[CT, DT]) GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (es *EventStreamWithStatus[CT], err error) { +func (esm *esManager[CT, DT]) GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (es CT, err error) { esSpec, err := esm.persistence.EventStreams().GetByID(ctx, id, opts...) if err == nil { es = esm.enrichGetStream(ctx, esSpec) @@ -345,9 +353,9 @@ func (esm *esManager[CT, DT]) GetStreamByID(ctx context.Context, id string, opts return } -func (esm *esManager[CT, DT]) GetStreamByNameOrID(ctx context.Context, nameOrID string, opts ...dbsql.GetOption) (es *EventStreamWithStatus[CT], err error) { +func (esm *esManager[CT, DT]) GetStreamByNameOrID(ctx context.Context, nameOrID string, opts ...dbsql.GetOption) (es CT, err error) { esSpec, err := esm.persistence.EventStreams().GetByUUIDOrName(ctx, nameOrID, opts...) - if err == nil { + if err == nil && !esSpec.IsNil() { es = esm.enrichGetStream(ctx, esSpec) } return diff --git a/pkg/eventstreams/manager_test.go b/pkg/eventstreams/manager_test.go index 162e167..b0cfc47 100644 --- a/pkg/eventstreams/manager_test.go +++ b/pkg/eventstreams/manager_test.go @@ -34,22 +34,26 @@ import ( ) type mockEventSource struct { - validate func(ctx context.Context, conf *testESConfig) error - run func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error + validate func(ctx context.Context, conf *GenericEventStream) error + run func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error } func (mes *mockEventSource) NewID() string { return fftypes.NewUUID().String() } -func (mes *mockEventSource) Run(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error { +func (mes *mockEventSource) Run(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error { return mes.run(ctx, es, checkpointSequenceId, deliver) } -func (mes *mockEventSource) Validate(ctx context.Context, conf *testESConfig) error { +func (mes *mockEventSource) Validate(ctx context.Context, conf *GenericEventStream) error { return mes.validate(ctx, conf) } +func (mes *mockEventSource) WithRuntimeStatus(spec *GenericEventStream, status EventStreamStatus, stats *EventStreamStatistics) *GenericEventStream { + return spec.WithRuntimeStatus(status, stats) +} + type mockAction struct { attemptDispatch func(ctx context.Context, attempt int, events *EventBatch[testData]) error } @@ -59,11 +63,11 @@ func (ma *mockAction) AttemptDispatch(ctx context.Context, attempt int, events * } type mockPersistence struct { - eventStreams *crudmocks.CRUD[*EventStreamSpec[testESConfig]] + eventStreams *crudmocks.CRUD[*GenericEventStream] checkpoints *crudmocks.CRUD[*EventStreamCheckpoint] } -func (mp *mockPersistence) EventStreams() dbsql.CRUD[*EventStreamSpec[testESConfig]] { +func (mp *mockPersistence) EventStreams() dbsql.CRUD[*GenericEventStream] { return mp.eventStreams } func (mp *mockPersistence) Checkpoints() dbsql.CRUD[*EventStreamCheckpoint] { @@ -74,13 +78,14 @@ func (mp *mockPersistence) IDValidator() IDValidator { } func (mp *mockPersistence) Close() {} -func newMockESManager(t *testing.T, extraSetup ...func(mp *mockPersistence)) (context.Context, *esManager[testESConfig, testData], *mockEventSource, func()) { +func newMockESManager(t *testing.T, extraSetup ...func(mp *mockPersistence)) (context.Context, *esManager[*GenericEventStream, testData], *mockEventSource, func()) { logrus.SetLevel(logrus.DebugLevel) mp := &mockPersistence{ - eventStreams: crudmocks.NewCRUD[*EventStreamSpec[testESConfig]](t), + eventStreams: crudmocks.NewCRUD[*GenericEventStream](t), checkpoints: crudmocks.NewCRUD[*EventStreamCheckpoint](t), } + mp.eventStreams.On("GetQueryFactory").Return(GenericEventStreamFilters).Maybe() ctx, cancelCtx := context.WithCancel(context.Background()) config.RootConfigReset() @@ -96,23 +101,23 @@ func newMockESManager(t *testing.T, extraSetup ...func(mp *mockPersistence)) (co } mes := &mockEventSource{ - validate: func(ctx context.Context, conf *testESConfig) error { return nil }, - run: func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error { + validate: func(ctx context.Context, conf *GenericEventStream) error { return nil }, + run: func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error { <-ctx.Done() return nil }, } - mgr, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), mp, nil, mes) + mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), mp, nil, mes) assert.NoError(t, err) - return ctx, mgr.(*esManager[testESConfig, testData]), mes, func() { + return ctx, mgr.(*esManager[*GenericEventStream, testData]), mes, func() { mgr.Close(ctx) cancelCtx() } } func TestNewManagerFailBadTLS(t *testing.T) { - _, err := NewEventStreamManager[testESConfig, testData](context.Background(), &Config[testESConfig, testData]{ + _, err := NewEventStreamManager[*GenericEventStream, testData](context.Background(), &Config[*GenericEventStream, testData]{ Retry: &retry.Retry{}, TLSConfigs: map[string]*fftls.Config{ "tls0": { @@ -125,46 +130,34 @@ func TestNewManagerFailBadTLS(t *testing.T) { } -func TestNewManagerBadConfStruct(t *testing.T) { - assert.Panics(t, func() { - _, _ = NewEventStreamManager[string /* must be DBSerializable */, testData](context.Background(), &Config[string, testData]{ - Retry: &retry.Retry{}, - TLSConfigs: map[string]*fftls.Config{ - "tls0": { - Enabled: true, - CAFile: t.TempDir(), - }, - }, - }, nil, nil, nil) - }) -} - func TestNewManagerBadConfState(t *testing.T) { - _, err := NewEventStreamManager[testESConfig, testData](context.Background(), &Config[testESConfig, testData]{}, nil, nil, &mockEventSource{}) + _, err := NewEventStreamManager[*GenericEventStream, testData](context.Background(), &Config[*GenericEventStream, testData]{}, nil, nil, &mockEventSource{}) assert.Regexp(t, "FF00237", err) } func TestInitFail(t *testing.T) { mp := &mockPersistence{ - eventStreams: crudmocks.NewCRUD[*EventStreamSpec[testESConfig]](t), + eventStreams: crudmocks.NewCRUD[*GenericEventStream](t), checkpoints: crudmocks.NewCRUD[*EventStreamCheckpoint](t), } - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, fmt.Errorf("pop")) + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, fmt.Errorf("pop")) ctx := context.Background() InitConfig(config.RootSection("ut")) - _, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), mp, nil, nil) + _, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), mp, nil, nil) assert.Regexp(t, "pop", err) } func TestInitWithStreams(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStarted), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStarted), + }, } _, _, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() mp.checkpoints.On("GetByID", mock.Anything, es.GetID()).Return((*EventStreamCheckpoint)(nil), nil) }) defer done() @@ -173,37 +166,41 @@ func TestInitWithStreams(t *testing.T) { func TestInitWithStreamsCleanupFail(t *testing.T) { mp := &mockPersistence{ - eventStreams: crudmocks.NewCRUD[*EventStreamSpec[testESConfig]](t), + eventStreams: crudmocks.NewCRUD[*GenericEventStream](t), checkpoints: crudmocks.NewCRUD[*EventStreamCheckpoint](t), } ctx := context.Background() InitConfig(config.RootSection("ut")) - es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusDeleted), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusDeleted), + }, } - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("Delete", mock.Anything, es.GetID()).Return(fmt.Errorf("pop")) - _, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), mp, nil, nil) + _, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), mp, nil, nil) assert.Regexp(t, "pop", err) } func TestInitWithStreamsInitFail(t *testing.T) { mp := &mockPersistence{ - eventStreams: crudmocks.NewCRUD[*EventStreamSpec[testESConfig]](t), + eventStreams: crudmocks.NewCRUD[*GenericEventStream](t), checkpoints: crudmocks.NewCRUD[*EventStreamCheckpoint](t), } ctx := context.Background() InitConfig(config.RootSection("ut")) - es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStarted), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStarted), + }, } - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - _, err := NewEventStreamManager[testESConfig, testData](ctx, GenerateConfig[testESConfig, testData](ctx), mp, nil, &mockEventSource{ - validate: func(ctx context.Context, conf *testESConfig) error { + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() + _, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), mp, nil, &mockEventSource{ + validate: func(ctx context.Context, conf *GenericEventStream) error { return fmt.Errorf("pop") }, }) @@ -211,15 +208,17 @@ func TestInitWithStreamsInitFail(t *testing.T) { } func TestUpsertStreamByNameDeleted(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), - ID: ptrTo(fftypes.NewUUID().String()), - Status: ptrTo(EventStreamStatusStopped), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { mp.eventStreams.On("GetByName", mock.Anything, "stream1").Return(es, nil) - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() }) defer done() @@ -230,15 +229,17 @@ func TestUpsertStreamByNameDeleted(t *testing.T) { } func TestUpsertStreamByNameFailLookup(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), - ID: ptrTo(fftypes.NewUUID().String()), - Status: ptrTo(EventStreamStatusStopped), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetByName", mock.Anything, "stream1").Return((*EventStreamSpec[testESConfig])(nil), fmt.Errorf("pop")) - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByName", mock.Anything, "stream1").Return((*GenericEventStream)(nil), fmt.Errorf("pop")) + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() }) defer done() @@ -249,32 +250,36 @@ func TestUpsertStreamByNameFailLookup(t *testing.T) { } func TestUpsertStreamByIDDeleted(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), - ID: ptrTo(fftypes.NewUUID().String()), - Status: ptrTo(EventStreamStatusStopped), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() }) defer done() esm.getStream(es.GetID()).spec.Status = ptrTo(EventStreamStatusDeleted) - _, err := esm.UpsertStream(ctx, *es.ID, es) + _, err := esm.UpsertStream(ctx, es.GetID(), es) assert.Regexp(t, "FF00236", err) } func TestUpsertStreamBadUpdate(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), - ID: ptrTo(fftypes.NewUUID().String()), - Status: ptrTo(EventStreamStatusStopped), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() }) defer done() @@ -286,14 +291,16 @@ func TestUpsertStreamBadUpdate(t *testing.T) { } func TestUpsertStreamUpsertFail(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), - ID: ptrTo(fftypes.NewUUID().String()), - Status: ptrTo(EventStreamStatusStopped), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("Upsert", mock.Anything, mock.Anything, dbsql.UpsertOptimizationExisting).Return(false, fmt.Errorf("pop")).Once() }) defer done() @@ -304,18 +311,20 @@ func TestUpsertStreamUpsertFail(t *testing.T) { } func TestUpsertReInitExistingFailTimeout(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStopped), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() }) done() - existing := &eventStream[testESConfig, testData]{ - activeState: &activeStream[testESConfig, testData]{}, + existing := &eventStream[*GenericEventStream, testData]{ + activeState: &activeStream[*GenericEventStream, testData]{}, stopping: make(chan struct{}), } err := esm.reInit(ctx, es, existing) @@ -324,13 +333,15 @@ func TestUpsertReInitExistingFailTimeout(t *testing.T) { } func TestUpsertReInitExistingFailInit(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(fftypes.FFEnum("wrong")), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(fftypes.FFEnum("wrong")), + }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() }) done() @@ -341,9 +352,9 @@ func TestUpsertReInitExistingFailInit(t *testing.T) { func TestDeleteStreamNotKnown(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{ - ID: ptrTo("does not exist"), + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(&GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, // exists in DB, but does not exist in runtime state }, nil).Once() }) defer done() @@ -355,8 +366,8 @@ func TestDeleteStreamNotKnown(t *testing.T) { func TestDeleteStreamNotFound(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return((*EventStreamSpec[testESConfig])(nil), fmt.Errorf("not found")).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return((*GenericEventStream)(nil), fmt.Errorf("not found")).Once() }) defer done() @@ -367,23 +378,23 @@ func TestDeleteStreamNotFound(t *testing.T) { func TestResetStreamNotKnown(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{ - ID: ptrTo("does not exist"), + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(&GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, // exists in DB, but does not exist in runtime state }, nil).Once() }) defer done() - err := esm.ResetStream(ctx, fftypes.NewUUID().String(), "") + err := esm.ResetStream(ctx, fftypes.NewUUID().String(), nil) assert.Regexp(t, "FF00164", err) } func TestStopStreamNotKnown(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{ - ID: ptrTo("does not exist"), + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(&GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, // exists in DB, but does not exist in runtime state }, nil).Once() }) defer done() @@ -395,9 +406,9 @@ func TestStopStreamNotKnown(t *testing.T) { func TestStartStreamNotKnown(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{ - ID: ptrTo("does not exist"), + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(&GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, // exists in DB, but does not exist in runtime state }, nil).Once() }) defer done() @@ -409,28 +420,30 @@ func TestStartStreamNotKnown(t *testing.T) { func TestEnrichStreamNotKnown(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() }) defer done() - es := esm.enrichGetStream(ctx, &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), + es := esm.enrichGetStream(ctx, &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, }) assert.NotNil(t, es) - assert.Equal(t, EventStreamStatusUnknown, es.Status) + assert.Equal(t, EventStreamStatusUnknown, *es.Status) } func TestDeleteStreamFail(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStopped), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(es, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(es, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() }) defer done() @@ -441,15 +454,17 @@ func TestDeleteStreamFail(t *testing.T) { } func TestDeleteStreamFailDelete(t *testing.T) { - es := &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStopped), + es := &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(es, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{es}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(es, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{es}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() mp.eventStreams.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() mp.eventStreams.On("Delete", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() }) @@ -461,109 +476,154 @@ func TestDeleteStreamFailDelete(t *testing.T) { } func TestResetStreamStopFailTimeout(t *testing.T) { - existing := &eventStream[testESConfig, testData]{ - activeState: &activeStream[testESConfig, testData]{}, + existing := &eventStream[*GenericEventStream, testData]{ + activeState: &activeStream[*GenericEventStream, testData]{}, stopping: make(chan struct{}), - spec: &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStopped), + spec: &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(existing.spec, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(existing.spec, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() }) done() + existing.esm = esm esm.addStream(ctx, existing) - err := esm.ResetStream(ctx, existing.spec.GetID(), "") + err := esm.ResetStream(ctx, existing.spec.GetID(), nil) assert.Regexp(t, "FF00229", err) } func TestResetStreamStopFailDeleteCheckpoint(t *testing.T) { - existing := &eventStream[testESConfig, testData]{ - spec: &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStopped), + existing := &eventStream[*GenericEventStream, testData]{ + spec: &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(existing.spec, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(existing.spec, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() }) done() + existing.esm = esm esm.addStream(ctx, existing) - err := esm.ResetStream(ctx, existing.spec.GetID(), "") + err := esm.ResetStream(ctx, existing.spec.GetID(), nil) assert.Regexp(t, "pop", err) } func TestResetStreamStopFailUpdateSequence(t *testing.T) { - existing := &eventStream[testESConfig, testData]{ - spec: &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStopped), + existing := &eventStream[*GenericEventStream, testData]{ + spec: &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(existing.spec, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(existing.spec, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(nil).Once() - mp.eventStreams.On("UpdateSparse", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() + mp.eventStreams.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() }) done() + existing.esm = esm esm.addStream(ctx, existing) - err := esm.ResetStream(ctx, existing.spec.GetID(), "12345") + err := esm.ResetStream(ctx, existing.spec.GetID(), ptrTo("12345")) assert.Regexp(t, "pop", err) } func TestResetStreamNoOp(t *testing.T) { - existing := &eventStream[testESConfig, testData]{ - spec: &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStopped), + existing := &eventStream[*GenericEventStream, testData]{ + spec: &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, }, } ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(existing.spec, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(existing.spec, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(nil).Once() - mp.eventStreams.On("UpdateSparse", mock.Anything, mock.Anything).Return(nil).Once() + mp.eventStreams.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() }) done() + existing.esm = esm esm.addStream(ctx, existing) - err := esm.ResetStream(ctx, existing.spec.GetID(), "12345") + called := false + err := esm.ResetStream(ctx, existing.spec.GetID(), ptrTo("12345"), func(_ context.Context, spec *GenericEventStream) error { + called = true + assert.Equal(t, existing.spec, spec) + return nil + }) assert.NoError(t, err) + assert.True(t, called) + +} + +func TestResetStreamCallbackErr(t *testing.T) { + existing := &eventStream[*GenericEventStream, testData]{ + spec: &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, + }, + } + ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(existing.spec, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() + mp.checkpoints.On("DeleteMany", mock.Anything, mock.Anything).Return(nil).Once() + mp.eventStreams.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + }) + done() + existing.esm = esm + + esm.addStream(ctx, existing) + err := esm.ResetStream(ctx, existing.spec.GetID(), ptrTo("12345"), func(_ context.Context, spec *GenericEventStream) error { + return fmt.Errorf("pop") + }) + assert.Regexp(t, "pop", err) } func TestListStreamsFail(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, fmt.Errorf("pop")).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, fmt.Errorf("pop")).Once() }) defer done() - _, _, err := esm.ListStreams(ctx, EventStreamFilters.NewFilter(ctx).And()) + _, _, err := esm.ListStreams(ctx, GenericEventStreamFilters.NewFilter(ctx).And()) assert.Regexp(t, "pop", err) } func TestGetStreamByIDFail(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetByID", mock.Anything, mock.Anything).Return((*EventStreamSpec[testESConfig])(nil), fmt.Errorf("pop")).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByID", mock.Anything, mock.Anything).Return((*GenericEventStream)(nil), fmt.Errorf("pop")).Once() }) defer done() @@ -574,29 +634,32 @@ func TestGetStreamByIDFail(t *testing.T) { func TestGetStreamByNameOrID(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() - mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything).Return(&EventStreamSpec[testESConfig]{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetByUUIDOrName", mock.Anything, mock.Anything, dbsql.FailIfNotFound).Return(&GenericEventStream{}, nil).Once() }) defer done() - _, err := esm.GetStreamByNameOrID(ctx, "stream1") + _, err := esm.GetStreamByNameOrID(ctx, "stream1", dbsql.FailIfNotFound) assert.NoError(t, err) } func TestCloseSuspendFail(t *testing.T) { ctx, esm, _, done := newMockESManager(t, func(mp *mockPersistence) { - mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil).Once() + mp.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil).Once() }) done() - existing := &eventStream[testESConfig, testData]{ - spec: &EventStreamSpec[testESConfig]{ - ID: ptrTo(fftypes.NewUUID().String()), - Name: ptrTo("stream1"), - Status: ptrTo(EventStreamStatusStopped), + existing := &eventStream[*GenericEventStream, testData]{ + spec: &GenericEventStream{ + ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + Status: ptrTo(EventStreamStatusStopped), + }, }, - activeState: &activeStream[testESConfig, testData]{}, + esm: esm, + activeState: &activeStream[*GenericEventStream, testData]{}, stopping: make(chan struct{}), } esm.addStream(ctx, existing) diff --git a/pkg/eventstreams/persistence.go b/pkg/eventstreams/persistence.go index 49a1e4e..3de985b 100644 --- a/pkg/eventstreams/persistence.go +++ b/pkg/eventstreams/persistence.go @@ -22,27 +22,16 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/hyperledger/firefly-common/pkg/dbsql" "github.com/hyperledger/firefly-common/pkg/ffapi" + "github.com/hyperledger/firefly-common/pkg/fftypes" ) -type Persistence[CT any] interface { - EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] +type Persistence[CT EventStreamSpec] interface { + EventStreams() dbsql.CRUD[CT] Checkpoints() dbsql.CRUD[*EventStreamCheckpoint] IDValidator() IDValidator Close() } -var EventStreamFilters = &ffapi.QueryFields{ - "id": &ffapi.StringField{}, - "created": &ffapi.TimeField{}, - "updated": &ffapi.TimeField{}, - "name": &ffapi.StringField{}, - "status": &ffapi.StringField{}, - "type": &ffapi.StringField{}, - "topicfilter": &ffapi.StringField{}, - "identity": &ffapi.StringField{}, - "config": &ffapi.JSONField{}, -} - var CheckpointFilters = &ffapi.QueryFields{ "id": &ffapi.StringField{}, "created": &ffapi.TimeField{}, @@ -52,8 +41,93 @@ var CheckpointFilters = &ffapi.QueryFields{ type IDValidator func(ctx context.Context, idStr string) error -func NewEventStreamPersistence[CT any](db *dbsql.Database, idValidator IDValidator) Persistence[CT] { - return &esPersistence[CT]{ +// This is a base object, and set of filters, that you can use if: +// - You are happy exposing all the built-in types of consumer (webhooks/websockets) +// - You do not need to extend the configuration in any way +// - You are happy using UUIDs for your IDs per dbsql.ResourceBase semantics +// +// A pre-built persistence library is provided, and sample migrations, that work +// with this structure. +// +// The design of the generic is such that you can start with the generic structure, +// and then move to your own structure later if you want to add more fields. +// +// When you are ready to extend, you need to: +// 1. Copy the GenericEventStream source into your own repo, and rename it appropriately. +// Then you can add your extra configuration fields to it. +// 2. Copy the EventStreams() and Checkpoints() CRUD factories into your own repo, +// and extend them with additional columns etc. as you see fit. +type GenericEventStream struct { + dbsql.ResourceBase + Type *EventStreamType `ffstruct:"eventstream" json:"type,omitempty" ffenum:"estype"` + EventStreamSpecFields + Webhook *WebhookConfig `ffstruct:"eventstream" json:"webhook,omitempty"` + WebSocket *WebSocketConfig `ffstruct:"eventstream" json:"websocket,omitempty"` + Statistics *EventStreamStatistics `ffstruct:"EventStream" json:"statistics,omitempty"` +} + +var GenericEventStreamFilters = &ffapi.QueryFields{ + "id": &ffapi.StringField{}, + "created": &ffapi.TimeField{}, + "updated": &ffapi.TimeField{}, + "name": &ffapi.StringField{}, + "status": &ffapi.StringField{}, + "type": &ffapi.StringField{}, + "topicfilter": &ffapi.StringField{}, + "initialsequenceid": &ffapi.StringField{}, +} + +func (ges *GenericEventStream) SetID(s string) { + ges.ID = fftypes.MustParseUUID(s) +} + +func (ges *GenericEventStream) IsNil() bool { + return ges == nil +} + +func (ges *GenericEventStream) ESFields() *EventStreamSpecFields { + return &ges.EventStreamSpecFields +} + +func (ges *GenericEventStream) ESType() EventStreamType { + if ges.Type == nil { + ges.Type = &EventStreamTypeWebSocket + } + return *ges.Type +} + +func (ges *GenericEventStream) WebhookConf() *WebhookConfig { + if ges.Webhook == nil { + ges.Webhook = &WebhookConfig{} + } + return ges.Webhook +} + +func (ges *GenericEventStream) WebSocketConf() *WebSocketConfig { + if ges.WebSocket == nil { + ges.WebSocket = &WebSocketConfig{} + } + return ges.WebSocket +} + +func (ges *GenericEventStream) WithRuntimeStatus(status EventStreamStatus, stats *EventStreamStatistics) *GenericEventStream { + newGES := &GenericEventStream{ + ResourceBase: ges.ResourceBase, + Type: ges.Type, + EventStreamSpecFields: ges.EventStreamSpecFields, + Webhook: ges.Webhook, + WebSocket: ges.WebSocket, + Statistics: stats, + } + newGES.Status = &status + return newGES +} + +// NewGenericEventStreamPersistence is a helper that builds persistence with no extra config +// Users of this package can use this in cases where they do not have any additional configuration +// that needs to be persisted, and are happy using dbsql.ResourceBase for IDs. +func NewGenericEventStreamPersistence(db *dbsql.Database, idValidator IDValidator) Persistence[*GenericEventStream] { + return &esPersistence[*GenericEventStream]{ db: db, idValidator: idValidator, } @@ -68,8 +142,8 @@ func (p *esPersistence[CT]) IDValidator() IDValidator { return p.idValidator } -func (p *esPersistence[CT]) EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] { - return &dbsql.CrudBase[*EventStreamSpec[CT]]{ +func (p *esPersistence[CT]) EventStreams() dbsql.CRUD[*GenericEventStream] { + return &dbsql.CrudBase[*GenericEventStream]{ DB: p.db, Table: "eventstreams", Columns: []string{ @@ -81,8 +155,6 @@ func (p *esPersistence[CT]) EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] { "type", "initial_sequence_id", "topic_filter", - "identity", - "config", "error_handling", "batch_size", "batch_timeout", @@ -92,16 +164,17 @@ func (p *esPersistence[CT]) EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] { "websocket_config", }, FilterFieldMap: map[string]string{ - "topicfilter": "topic_filter", + "initialsequenceid": "initial_sequence_id", + "topicfilter": "topic_filter", }, - NilValue: func() *EventStreamSpec[CT] { return nil }, - NewInstance: func() *EventStreamSpec[CT] { return &EventStreamSpec[CT]{} }, + NilValue: func() *GenericEventStream { return nil }, + NewInstance: func() *GenericEventStream { return &GenericEventStream{} }, ScopedFilter: func() sq.Eq { return sq.Eq{} }, EventHandler: nil, // set below NameField: "name", - QueryFactory: EventStreamFilters, + QueryFactory: GenericEventStreamFilters, IDValidator: p.idValidator, - GetFieldPtr: func(inst *EventStreamSpec[CT], col string) interface{} { + GetFieldPtr: func(inst *GenericEventStream, col string) interface{} { switch col { case dbsql.ColumnID: return &inst.ID @@ -119,10 +192,6 @@ func (p *esPersistence[CT]) EventStreams() dbsql.CRUD[*EventStreamSpec[CT]] { return &inst.InitialSequenceID case "topic_filter": return &inst.TopicFilter - case "identity": - return &inst.Identity - case "config": - return &inst.Config case "error_handling": return &inst.ErrorHandling case "batch_size": diff --git a/pkg/eventstreams/webhooks.go b/pkg/eventstreams/webhooks.go index 7323e6a..d845283 100644 --- a/pkg/eventstreams/webhooks.go +++ b/pkg/eventstreams/webhooks.go @@ -51,14 +51,11 @@ func (wc *WebhookConfig) Value() (driver.Value, error) { return fftypes.JSONValue(wc) } -type webhookDispatcherFactory[CT any, DT any] struct{} +type webhookDispatcherFactory[CT EventStreamSpec, DT any] struct{} // validate initializes the config ready for use -func (wdf *webhookDispatcherFactory[CT, DT]) Validate(ctx context.Context, _ *Config[CT, DT], spec *EventStreamSpec[CT], tlsConfigs map[string]*tls.Config, _ LifecyclePhase) error { - if spec.Webhook == nil { - spec.Webhook = &WebhookConfig{} - } - whc := spec.Webhook +func (wdf *webhookDispatcherFactory[CT, DT]) Validate(ctx context.Context, _ *Config[CT, DT], spec CT, tlsConfigs map[string]*tls.Config, _ LifecyclePhase) error { + whc := spec.WebhookConf() if whc.URL == nil || *whc.URL == "" { return i18n.NewError(ctx, i18n.MsgMissingWebhookURL) } @@ -79,18 +76,19 @@ type webhookAction[CT any, DT any] struct { client *resty.Client } -func (wdf *webhookDispatcherFactory[CT, DT]) NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT] { - httpConf := spec.Webhook.HTTP +func (wdf *webhookDispatcherFactory[CT, DT]) NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec CT) Dispatcher[DT] { + whc := spec.WebhookConf() + httpConf := whc.HTTP if httpConf == nil { httpConf = &conf.Defaults.WebhookDefaults.HTTPConfig } - httpConf.TLSClientConfig = spec.Webhook.tlsConfig + httpConf.TLSClientConfig = whc.tlsConfig client := ffresty.NewWithConfig(ctx, ffresty.Config{ - URL: *spec.Webhook.URL, + URL: *whc.URL, HTTPConfig: *httpConf, }) return &webhookAction[CT, DT]{ - spec: spec.Webhook, + spec: whc, disablePrivateIPs: conf.DisablePrivateIPs, client: client, } diff --git a/pkg/eventstreams/webhooks_test.go b/pkg/eventstreams/webhooks_test.go index 8dcbb18..e55bae1 100644 --- a/pkg/eventstreams/webhooks_test.go +++ b/pkg/eventstreams/webhooks_test.go @@ -33,10 +33,10 @@ import ( "github.com/stretchr/testify/mock" ) -func newTestWebhooks(t *testing.T, whc *WebhookConfig, tweaks ...func()) *webhookAction[testESConfig, testData] { +func newTestWebhooks(t *testing.T, whc *WebhookConfig, tweaks ...func()) *webhookAction[*GenericEventStream, testData] { ctx, mgr, _, done := newMockESManager(t, func(mdb *mockPersistence) { - mdb.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil) + mdb.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil) WebhookDefaultsConfig.Set(ffresty.HTTPConfigRequestTimeout, "1s") WebhookDefaultsConfig.SubSection("tls").Set(fftls.HTTPConfTLSInsecureSkipHostVerify, true) for _, tweak := range tweaks { @@ -45,22 +45,25 @@ func newTestWebhooks(t *testing.T, whc *WebhookConfig, tweaks ...func()) *webhoo }) done() - whf := &webhookDispatcherFactory[testESConfig, testData]{} - spec := &EventStreamSpec[testESConfig]{ - Name: ptrTo("stream1"), + whf := &webhookDispatcherFactory[*GenericEventStream, testData]{} + spec := &GenericEventStream{ + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("stream1"), + }, Webhook: whc, } assert.NoError(t, whf.Validate(ctx, &mgr.config, spec, mgr.tlsConfigs, LifecyclePhaseStarting)) - return whf.NewDispatcher(context.Background(), &mgr.config, spec).(*webhookAction[testESConfig, testData]) + return whf.NewDispatcher(context.Background(), &mgr.config, spec).(*webhookAction[*GenericEventStream, testData]) } func TestWebhooksConfigValidate(t *testing.T) { whc := &WebhookConfig{} - whf := &webhookDispatcherFactory[testESConfig, testData]{} - spec := &EventStreamSpec[testESConfig]{ - Webhook: whc, + whf := &webhookDispatcherFactory[*GenericEventStream, testData]{} + spec := &GenericEventStream{ + EventStreamSpecFields: EventStreamSpecFields{}, + Webhook: whc, } assert.Regexp(t, "FF00216", whf.Validate(context.Background(), nil, spec, nil, LifecyclePhaseStarting)) diff --git a/pkg/eventstreams/websockets.go b/pkg/eventstreams/websockets.go index 95751f2..8b00cc5 100644 --- a/pkg/eventstreams/websockets.go +++ b/pkg/eventstreams/websockets.go @@ -38,7 +38,7 @@ type WebSocketConfig struct { DistributionMode *DistributionMode `ffstruct:"wsconfig" json:"distributionMode,omitempty"` } -type webSocketDispatcherFactory[CT any, DT any] struct { +type webSocketDispatcherFactory[CT EventStreamSpec, DT any] struct { esm *esManager[CT, DT] } @@ -55,12 +55,10 @@ func (wc *WebSocketConfig) Value() (driver.Value, error) { return fftypes.JSONValue(wc) } -func (wsf *webSocketDispatcherFactory[CT, DT]) Validate(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT], _ map[string]*tls.Config, phase LifecyclePhase) error { - if spec.WebSocket == nil { - spec.WebSocket = &WebSocketConfig{} - } +func (wsf *webSocketDispatcherFactory[CT, DT]) Validate(ctx context.Context, conf *Config[CT, DT], spec CT, _ map[string]*tls.Config, phase LifecyclePhase) error { + wsc := spec.WebSocketConf() setDefaults := phase == LifecyclePhaseStarting - return checkSet(ctx, setDefaults, "distributionMode", &spec.WebSocket.DistributionMode, conf.Defaults.WebSocketDefaults.DefaultDistributionMode, func(v fftypes.FFEnum) bool { return fftypes.FFEnumValid(ctx, "distmode", v) }) + return checkSet(ctx, setDefaults, "distributionMode", &wsc.DistributionMode, conf.Defaults.WebSocketDefaults.DefaultDistributionMode, func(v fftypes.FFEnum) bool { return fftypes.FFEnumValid(ctx, "distmode", v) }) } type webSocketAction[DT any] struct { @@ -69,11 +67,11 @@ type webSocketAction[DT any] struct { wsChannels wsserver.WebSocketChannels } -func (wsf *webSocketDispatcherFactory[CT, DT]) NewDispatcher(_ context.Context, _ *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT] { +func (wsf *webSocketDispatcherFactory[CT, DT]) NewDispatcher(_ context.Context, _ *Config[CT, DT], spec CT) Dispatcher[DT] { return &webSocketAction[DT]{ - spec: spec.WebSocket, + spec: spec.WebSocketConf(), wsChannels: wsf.esm.wsChannels, - topic: *spec.Name, + topic: *spec.ESFields().Name, } } diff --git a/pkg/eventstreams/websockets_test.go b/pkg/eventstreams/websockets_test.go index d4610c4..f03cd02 100644 --- a/pkg/eventstreams/websockets_test.go +++ b/pkg/eventstreams/websockets_test.go @@ -37,16 +37,16 @@ func mockWSChannels(wsc *wsservermocks.WebSocketChannels) (chan interface{}, cha return senderChannel, broadcastChannel, receiverChannel } -func newTestWebSocketsFactory(t *testing.T) (context.Context, *esManager[testESConfig, testData], *wsservermocks.WebSocketChannels, *webSocketDispatcherFactory[testESConfig, testData]) { +func newTestWebSocketsFactory(t *testing.T) (context.Context, *esManager[*GenericEventStream, testData], *wsservermocks.WebSocketChannels, *webSocketDispatcherFactory[*GenericEventStream, testData]) { ctx, mgr, _, done := newMockESManager(t, func(mdb *mockPersistence) { - mdb.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*EventStreamSpec[testESConfig]{}, &ffapi.FilterResult{}, nil) + mdb.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil) }) done() mws := wsservermocks.NewWebSocketChannels(t) mgr.wsChannels = mws - return ctx, mgr, mws, &webSocketDispatcherFactory[testESConfig, testData]{esm: mgr} + return ctx, mgr, mws, &webSocketDispatcherFactory[*GenericEventStream, testData]{esm: mgr} } func TestWSAttemptIgnoreWrongAcks(t *testing.T) { @@ -64,8 +64,10 @@ func TestWSAttemptIgnoreWrongAcks(t *testing.T) { }() dmw := DistributionModeBroadcast - spec := &EventStreamSpec[testESConfig]{ - Name: ptrTo("ut_stream"), + spec := &GenericEventStream{ + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("ut_stream"), + }, WebSocket: &WebSocketConfig{ DistributionMode: &dmw, }, @@ -92,8 +94,10 @@ func TestWSattemptDispatchExitPushingEvent(t *testing.T) { bc <- []*fftypes.JSONAny{} // block the broadcast channel dmw := DistributionModeBroadcast - spec := &EventStreamSpec[testESConfig]{ - Name: ptrTo("ut_stream"), + spec := &GenericEventStream{ + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("ut_stream"), + }, WebSocket: &WebSocketConfig{ DistributionMode: &dmw, }, @@ -119,8 +123,10 @@ func TestWSattemptDispatchExitReceivingReply(t *testing.T) { _, _, rc := mockWSChannels(mws) dmw := DistributionModeBroadcast - spec := &EventStreamSpec[testESConfig]{ - Name: ptrTo("ut_stream"), + spec := &GenericEventStream{ + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("ut_stream"), + }, WebSocket: &WebSocketConfig{ DistributionMode: &dmw, }, @@ -143,8 +149,10 @@ func TestWSattemptDispatchNackFromClient(t *testing.T) { } dmw := DistributionModeBroadcast - spec := &EventStreamSpec[testESConfig]{ - Name: ptrTo("ut_stream"), + spec := &GenericEventStream{ + EventStreamSpecFields: EventStreamSpecFields{ + Name: ptrTo("ut_stream"), + }, WebSocket: &WebSocketConfig{ DistributionMode: &dmw, }, diff --git a/pkg/i18n/en_base_field_descriptions.go b/pkg/i18n/en_base_field_descriptions.go index bb2c2a5..19f1455 100644 --- a/pkg/i18n/en_base_field_descriptions.go +++ b/pkg/i18n/en_base_field_descriptions.go @@ -51,8 +51,7 @@ var ( EventStreamUpdated = ffm("eventstream.updated", "Time the event stream was last updated") EventStreamErrorHandling = ffm("eventstream.errorHandling", "When an error is encountered, and short retries are exhausted, whether to skip the event or block the stream (default=block)") EventStreamID = ffm("eventstream.id", "ID of the event stream") - EventStreamIdentity = ffm("eventstream.identity", "Identity context for the event stream") - EventStreamInitialSequenceID = ffm("eventstream.initialSequenceID", "Initial sequence ID to begin event delivery from") + EventStreamInitialSequenceID = ffm("eventstream.initialSequenceId", "Initial sequence ID to begin event delivery from") EventStreamName = ffm("eventstream.name", "Unique name for the event stream") EventStreamRetryTimeout = ffm("eventstream.retryTimeout", "Short retry timeout before error handling, in case of webhook based delivery") EventStreamStatus = ffm("eventstream.status", "Status information for the event stream") diff --git a/test/es_demo_migrations/000001_create_eventstreams_table.up.sql b/test/es_demo_migrations/000001_create_eventstreams_table.up.sql index 8b3be23..93ed7e6 100644 --- a/test/es_demo_migrations/000001_create_eventstreams_table.up.sql +++ b/test/es_demo_migrations/000001_create_eventstreams_table.up.sql @@ -9,7 +9,6 @@ CREATE TABLE eventstreams ( initial_sequence_id TEXT, topic_filter TEXT, identity TEXT, - config TEXT, error_handling TEXT, batch_size INT, batch_timeout BIGINT,