Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide more flexibility on how EventStream specs are stored #120

Merged
merged 13 commits into from
Jan 22, 2024
37 changes: 11 additions & 26 deletions examples/ffpubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"bufio"
"context"
"database/sql/driver"
"fmt"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -47,9 +46,7 @@ func main() {

// This demo: Just create a single event stream
// Real world: You would create ffapi.Route objects for the CRUD actions on mgr
_, err := mgr.UpsertStream(ctx, &eventstreams.EventStreamSpec[pubSubConfig]{
Name: ptrTo("demo"),
})
_, err := mgr.UpsertStream(ctx, "demo", &eventstreams.GenericEventStream{})
assertNoError(err)

// Read lines from standard in, and pass them to all active routines
Expand All @@ -65,19 +62,7 @@ func main() {
}
}

type pubSubESManager = eventstreams.Manager[pubSubConfig]

type pubSubConfig struct {
// no extra config
}

func (psc *pubSubConfig) Scan(src interface{}) error {
return fftypes.JSONScan(src, psc)
}

func (psc *pubSubConfig) Value() (driver.Value, error) {
return fftypes.JSONValue(psc)
}
type pubSubESManager = eventstreams.Manager[*eventstreams.GenericEventStream]

type pubSubMessage struct {
Message string `json:"message"`
Expand All @@ -93,11 +78,15 @@ func (ims *inMemoryStream) NewID() string {
return fftypes.NewUUID().String()
}

func (ims *inMemoryStream) Validate(_ context.Context, _ *pubSubConfig) error {
func (ims *inMemoryStream) Validate(_ context.Context, _ *eventstreams.GenericEventStream) error {
return nil // no config defined in pubSubConfig to validate
}

func (ims *inMemoryStream) Run(_ context.Context, _ *eventstreams.EventStreamSpec[pubSubConfig], checkpointSequenceID string, deliver eventstreams.Deliver[pubSubMessage]) (err error) {
func (ims *inMemoryStream) WithRuntimeStatus(spec *eventstreams.GenericEventStream, status eventstreams.EventStreamStatus, stats *eventstreams.EventStreamStatistics) *eventstreams.GenericEventStream {
return spec.WithRuntimeStatus(status, stats)
}

func (ims *inMemoryStream) Run(_ context.Context, _ *eventstreams.GenericEventStream, checkpointSequenceID string, deliver eventstreams.Deliver[pubSubMessage]) (err error) {
var index int
if checkpointSequenceID != "" {
index, err = strconv.Atoi(checkpointSequenceID)
Expand Down Expand Up @@ -135,10 +124,6 @@ func (ims *inMemoryStream) Run(_ context.Context, _ *eventstreams.EventStreamSpe
}
}

func ptrTo[T any](v T) *T {
return &v
}

func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) {
// Use SQLite in-memory DB
conf := config.RootSection("ffpubsub")
Expand All @@ -161,13 +146,13 @@ func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) {
u.Scheme = "ws"
log.L(ctx).Infof("Running on: %s", u)

p := eventstreams.NewEventStreamPersistence[pubSubConfig](sql, dbsql.UUIDValidator)
c := eventstreams.GenerateConfig[pubSubConfig, pubSubMessage](ctx)
p := eventstreams.NewGenericEventStreamPersistence(sql, dbsql.UUIDValidator)
c := eventstreams.GenerateConfig[*eventstreams.GenericEventStream, pubSubMessage](ctx)
ims := &inMemoryStream{
messages: []string{},
newMessages: *sync.NewCond(new(sync.Mutex)),
}
mgr, err := eventstreams.NewEventStreamManager[pubSubConfig, pubSubMessage](ctx, c, p, wsServer, ims)
mgr, err := eventstreams.NewEventStreamManager[*eventstreams.GenericEventStream, pubSubMessage](ctx, c, p, wsServer, ims)
assertNoError(err)
return mgr, ims, func() {
log.L(ctx).Infof("Shutting down")
Expand Down
34 changes: 20 additions & 14 deletions pkg/eventstreams/activestream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -33,7 +33,7 @@ type eventStreamBatch[DataType any] struct {
batchTimer *time.Timer
}

type activeStream[CT any, DT any] struct {
type activeStream[CT EventStreamSpec, DT any] struct {
*eventStream[CT, DT]
ctx context.Context
cancelCtx func()
Expand All @@ -60,7 +60,7 @@ func (es *eventStream[CT, DT]) newActiveStream() *activeStream[CT, DT] {
},
eventLoopDone: make(chan struct{}),
batchLoopDone: make(chan struct{}),
events: make(chan *Event[DT], *es.spec.BatchSize),
events: make(chan *Event[DT], *es.spec.ESFields().BatchSize),
}
go as.runEventLoop()
go as.runBatchLoop()
Expand Down Expand Up @@ -98,17 +98,18 @@ func (as *activeStream[CT, DT]) loadCheckpoint() (sequencedID string, err error)
}
if cp != nil && cp.SequenceID != nil {
sequencedID = *cp.SequenceID
} else if as.spec.InitialSequenceID != nil {
sequencedID = *as.spec.InitialSequenceID
} else if as.spec.ESFields().InitialSequenceID != nil {
sequencedID = *as.spec.ESFields().InitialSequenceID
}
return true, err
})
return sequencedID, err
}

func (as *activeStream[CT, DT]) checkFilter(event *Event[DT]) bool {
if as.spec.topicFilterRegexp != nil {
return as.spec.topicFilterRegexp.Match([]byte(event.Topic))
esSpec := as.spec.ESFields()
if esSpec.topicFilterRegexp != nil {
return esSpec.topicFilterRegexp.Match([]byte(event.Topic))
}
return true
}
Expand Down Expand Up @@ -149,8 +150,9 @@ func (as *activeStream[CT, DT]) runSourceLoop(initialCheckpointSequenceID string
func (as *activeStream[CT, DT]) runBatchLoop() {
defer close(as.batchLoopDone)

esSpec := as.spec.ESFields()
var batch *eventStreamBatch[DT]
batchTimeout := time.Duration(*as.spec.BatchTimeout)
batchTimeout := time.Duration(*esSpec.BatchTimeout)
var noBatchActive <-chan time.Time = make(chan time.Time) // never pops
batchTimedOut := noBatchActive
for {
Expand All @@ -173,15 +175,15 @@ func (as *activeStream[CT, DT]) runBatchLoop() {
batch = &eventStreamBatch[DT]{
number: as.batchNumber,
batchTimer: time.NewTimer(batchTimeout),
events: make([]*Event[DT], 0, *as.spec.BatchSize),
events: make([]*Event[DT], 0, *esSpec.BatchSize),
}
batchTimedOut = batch.batchTimer.C
}
batch.events = append(batch.events, event)
}
}
batchDispatched := false
if batch != nil && (len(batch.events) >= *as.spec.BatchSize || timedOut) {
if batch != nil && (len(batch.events) >= *esSpec.BatchSize || timedOut) {
// attempt dispatch (only returns err on exit)
if err := as.dispatchBatch(batch); err != nil {
log.L(as.ctx).Debugf("batch loop done: %s", err)
Expand Down Expand Up @@ -241,6 +243,7 @@ func (as *activeStream[CT, DT]) checkpointRoutine() {
return // We're done
}
err := as.retry.Do(as.ctx, "checkpoint", func(attempt int) (retry bool, err error) {
log.L(as.bgCtx).Debugf("Writing checkpoint id=%s sequenceID=%s", as.spec.GetID(), checkpointSequenceID)
_, err = as.esm.persistence.Checkpoints().Upsert(as.ctx, &EventStreamCheckpoint{
ID: ptrTo(as.spec.GetID()), // the ID of the stream is the ID of the checkpoint
SequenceID: &checkpointSequenceID,
Expand All @@ -266,9 +269,12 @@ func (as *activeStream[CT, DT]) dispatchBatch(batch *eventStreamBatch[DT]) (err
as.LastDispatchAttempts = 0
as.LastDispatchStatus = DispatchStatusDispatching
as.HighestDispatched = batch.events[len(batch.events)-1].SequenceID
esSpec := as.spec.ESFields()
for {
// Short exponential back-off retry
err := as.retry.Do(as.ctx, "action", func(_ int) (retry bool, err error) {
log.L(as.ctx).Debugf("Batch %d attempt %d dispatching. Len=%d",
batch.number, as.LastDispatchAttempts, len(batch.events))
err = as.action.AttemptDispatch(as.ctx, as.LastDispatchAttempts, &EventBatch[DT]{
Type: MessageTypeEventBatch,
StreamID: as.spec.GetID(),
Expand All @@ -281,7 +287,7 @@ func (as *activeStream[CT, DT]) dispatchBatch(batch *eventStreamBatch[DT]) (err
as.LastDispatchAttempts++
as.LastDispatchFailure = err.Error()
as.LastDispatchStatus = DispatchStatusRetrying
return time.Since(*as.LastDispatchTime.Time()) < time.Duration(*as.spec.RetryTimeout), err
return time.Since(*as.LastDispatchTime.Time()) < time.Duration(*esSpec.RetryTimeout), err
}
as.LastDispatchStatus = DispatchStatusComplete
return false, nil
Expand All @@ -292,14 +298,14 @@ func (as *activeStream[CT, DT]) dispatchBatch(batch *eventStreamBatch[DT]) (err
// We're in blocked retry delay
as.LastDispatchStatus = DispatchStatusBlocked
log.L(as.ctx).Errorf("Batch failed short retry after %.2fs secs. ErrorHandling=%s BlockedRetryDelay=%.2fs ",
time.Since(*as.LastDispatchTime.Time()).Seconds(), *as.spec.ErrorHandling, time.Duration(*as.spec.BlockedRetryDelay).Seconds())
if *as.spec.ErrorHandling == ErrorHandlingTypeSkip {
time.Since(*as.LastDispatchTime.Time()).Seconds(), *esSpec.ErrorHandling, time.Duration(*esSpec.BlockedRetryDelay).Seconds())
if *esSpec.ErrorHandling == ErrorHandlingTypeSkip {
// Swallow the error now we have logged it
as.LastDispatchStatus = DispatchStatusSkipped
return nil
}
select {
case <-time.After(time.Duration(*as.spec.BlockedRetryDelay)):
case <-time.After(time.Duration(*esSpec.BlockedRetryDelay)):
case <-as.ctx.Done():
// Only way we exit with error, is if the context is cancelled
return i18n.NewError(as.ctx, i18n.MsgContextCanceled)
Expand Down
43 changes: 26 additions & 17 deletions pkg/eventstreams/activestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestCheckpointContextClose(t *testing.T) {
})
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)
Expand All @@ -47,12 +47,12 @@ func TestRunSourceLoopDone(t *testing.T) {
ctx, es, mes, done := newTestEventStream(t)
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)

mes.run = func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error {
mes.run = func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error {
deliver(nil)
return nil
}
Expand All @@ -66,13 +66,13 @@ func TestRunSourceEventsBlockedExit(t *testing.T) {
ctx, es, mes, done := newTestEventStream(t)
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)

as.events = make(chan *Event[testData])
mes.run = func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error {
mes.run = func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error {
deliver([]*Event[testData]{{ /* will block */ }})
return nil
}
Expand All @@ -92,7 +92,7 @@ func TestBatchTimeout(t *testing.T) {
es.spec.BatchTimeout = ptrTo(fftypes.FFDuration(1 * time.Millisecond))

delivered := false
mes.run = func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error {
mes.run = func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error {
if delivered {
<-ctx.Done()
} else {
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestQueuedCheckpointAsync(t *testing.T) {
})
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)
Expand All @@ -156,7 +156,7 @@ func TestQueuedCheckpointCancel(t *testing.T) {
})
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)
Expand All @@ -172,7 +172,7 @@ func TestDispatchSkipError(t *testing.T) {
ctx, es, _, done := newTestEventStream(t)
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)
Expand All @@ -196,31 +196,40 @@ func TestDispatchBlockError(t *testing.T) {
ctx, es, _, done := newTestEventStream(t)
defer done()

as := &activeStream[testESConfig, testData]{
eventStream: es,
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
events: make(chan *Event[testData]),
batchLoopDone: make(chan struct{}),
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)

as.spec.BatchSize = ptrTo(1)
as.spec.RetryTimeout = ptrTo(fftypes.FFDuration(1 * time.Microsecond))
as.spec.BlockedRetryDelay = ptrTo(fftypes.FFDuration(1 * time.Microsecond))
as.spec.BlockedRetryDelay = ptrTo(fftypes.FFDuration(10 * time.Millisecond))
as.spec.ErrorHandling = ptrTo(ErrorHandlingTypeBlock)
callCount := 0
calls := make(chan bool)
as.action = &mockAction{
attemptDispatch: func(ctx context.Context, attempt int, events *EventBatch[testData]) error {
calls <- true
callCount++
if callCount > 1 {
as.cancelCtx()
}
return fmt.Errorf("pop")
},
}

dispatchDone := make(chan struct{})
go func() {
err := as.dispatchBatch(&eventStreamBatch[testData]{
events: []*Event[testData]{{}},
})
assert.Error(t, err)
defer close(dispatchDone)
as.runBatchLoop()
}()

as.events <- &Event[testData]{}

<-calls
<-calls
as.cancelCtx()
<-dispatchDone

}
10 changes: 5 additions & 5 deletions pkg/eventstreams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (
// Generics:
// - CT is the Configuration Type - the custom extensions to the configuration schema
// - DT is the Data Type - the payload type that will be delivered to the application
type DispatcherFactory[CT any, DT any] interface {
Validate(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT], tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error
NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT]
type DispatcherFactory[CT EventStreamSpec, DT any] interface {
Validate(ctx context.Context, conf *Config[CT, DT], spec CT, tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error
NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec CT) Dispatcher[DT]
}

type Config[CT any, DT any] struct {
type Config[CT EventStreamSpec, DT any] struct {
TLSConfigs map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"`
Retry *retry.Retry `ffstruct:"EventStreamConfig" json:"retry,omitempty"`
DisablePrivateIPs bool `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"`
Expand Down Expand Up @@ -135,7 +135,7 @@ func InitConfig(conf config.Section) {

// Optional function to generate config directly from YAML configuration using the config package.
// You can also generate the configuration programmatically
func GenerateConfig[CT any, DT any](ctx context.Context) *Config[CT, DT] {
func GenerateConfig[CT EventStreamSpec, DT any](ctx context.Context) *Config[CT, DT] {
httpDefaults, _ := ffresty.GenerateConfig(ctx, WebhookDefaultsConfig)
tlsConfigs := map[string]*fftls.Config{}
for i := 0; i < TLSConfigs.ArraySize(); i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventstreams/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestGenerateConfigTLS(t *testing.T) {
tls0.Set(ConfigTLSConfigName, "tls0")
tls0.SubSection("tls").Set(fftls.HTTPConfTLSCAFile, t.TempDir())

c := GenerateConfig[testESConfig, testData](context.Background())
c := GenerateConfig[*GenericEventStream, testData](context.Background())
assert.True(t, c.TLSConfigs["tls0"].Enabled)

}
Loading