Skip to content

Commit

Permalink
Merge pull request #120 from hyperledger/es-flexibility
Browse files Browse the repository at this point in the history
Provide more flexibility on how EventStream specs are stored
  • Loading branch information
peterbroadhurst authored Jan 22, 2024
2 parents ba21054 + 00303f7 commit a4a1c3c
Show file tree
Hide file tree
Showing 17 changed files with 637 additions and 515 deletions.
37 changes: 11 additions & 26 deletions examples/ffpubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"bufio"
"context"
"database/sql/driver"
"fmt"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -47,9 +46,7 @@ func main() {

// This demo: Just create a single event stream
// Real world: You would create ffapi.Route objects for the CRUD actions on mgr
_, err := mgr.UpsertStream(ctx, &eventstreams.EventStreamSpec[pubSubConfig]{
Name: ptrTo("demo"),
})
_, err := mgr.UpsertStream(ctx, "demo", &eventstreams.GenericEventStream{})
assertNoError(err)

// Read lines from standard in, and pass them to all active routines
Expand All @@ -65,19 +62,7 @@ func main() {
}
}

type pubSubESManager = eventstreams.Manager[pubSubConfig]

type pubSubConfig struct {
// no extra config
}

func (psc *pubSubConfig) Scan(src interface{}) error {
return fftypes.JSONScan(src, psc)
}

func (psc *pubSubConfig) Value() (driver.Value, error) {
return fftypes.JSONValue(psc)
}
type pubSubESManager = eventstreams.Manager[*eventstreams.GenericEventStream]

type pubSubMessage struct {
Message string `json:"message"`
Expand All @@ -93,11 +78,15 @@ func (ims *inMemoryStream) NewID() string {
return fftypes.NewUUID().String()
}

func (ims *inMemoryStream) Validate(_ context.Context, _ *pubSubConfig) error {
func (ims *inMemoryStream) Validate(_ context.Context, _ *eventstreams.GenericEventStream) error {
return nil // no config defined in pubSubConfig to validate
}

func (ims *inMemoryStream) Run(_ context.Context, _ *eventstreams.EventStreamSpec[pubSubConfig], checkpointSequenceID string, deliver eventstreams.Deliver[pubSubMessage]) (err error) {
func (ims *inMemoryStream) WithRuntimeStatus(spec *eventstreams.GenericEventStream, status eventstreams.EventStreamStatus, stats *eventstreams.EventStreamStatistics) *eventstreams.GenericEventStream {
return spec.WithRuntimeStatus(status, stats)
}

func (ims *inMemoryStream) Run(_ context.Context, _ *eventstreams.GenericEventStream, checkpointSequenceID string, deliver eventstreams.Deliver[pubSubMessage]) (err error) {
var index int
if checkpointSequenceID != "" {
index, err = strconv.Atoi(checkpointSequenceID)
Expand Down Expand Up @@ -135,10 +124,6 @@ func (ims *inMemoryStream) Run(_ context.Context, _ *eventstreams.EventStreamSpe
}
}

func ptrTo[T any](v T) *T {
return &v
}

func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) {
// Use SQLite in-memory DB
conf := config.RootSection("ffpubsub")
Expand All @@ -161,13 +146,13 @@ func setup(ctx context.Context) (pubSubESManager, *inMemoryStream, func()) {
u.Scheme = "ws"
log.L(ctx).Infof("Running on: %s", u)

p := eventstreams.NewEventStreamPersistence[pubSubConfig](sql, dbsql.UUIDValidator)
c := eventstreams.GenerateConfig[pubSubConfig, pubSubMessage](ctx)
p := eventstreams.NewGenericEventStreamPersistence(sql, dbsql.UUIDValidator)
c := eventstreams.GenerateConfig[*eventstreams.GenericEventStream, pubSubMessage](ctx)
ims := &inMemoryStream{
messages: []string{},
newMessages: *sync.NewCond(new(sync.Mutex)),
}
mgr, err := eventstreams.NewEventStreamManager[pubSubConfig, pubSubMessage](ctx, c, p, wsServer, ims)
mgr, err := eventstreams.NewEventStreamManager[*eventstreams.GenericEventStream, pubSubMessage](ctx, c, p, wsServer, ims)
assertNoError(err)
return mgr, ims, func() {
log.L(ctx).Infof("Shutting down")
Expand Down
34 changes: 20 additions & 14 deletions pkg/eventstreams/activestream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -33,7 +33,7 @@ type eventStreamBatch[DataType any] struct {
batchTimer *time.Timer
}

type activeStream[CT any, DT any] struct {
type activeStream[CT EventStreamSpec, DT any] struct {
*eventStream[CT, DT]
ctx context.Context
cancelCtx func()
Expand All @@ -60,7 +60,7 @@ func (es *eventStream[CT, DT]) newActiveStream() *activeStream[CT, DT] {
},
eventLoopDone: make(chan struct{}),
batchLoopDone: make(chan struct{}),
events: make(chan *Event[DT], *es.spec.BatchSize),
events: make(chan *Event[DT], *es.spec.ESFields().BatchSize),
}
go as.runEventLoop()
go as.runBatchLoop()
Expand Down Expand Up @@ -98,17 +98,18 @@ func (as *activeStream[CT, DT]) loadCheckpoint() (sequencedID string, err error)
}
if cp != nil && cp.SequenceID != nil {
sequencedID = *cp.SequenceID
} else if as.spec.InitialSequenceID != nil {
sequencedID = *as.spec.InitialSequenceID
} else if as.spec.ESFields().InitialSequenceID != nil {
sequencedID = *as.spec.ESFields().InitialSequenceID
}
return true, err
})
return sequencedID, err
}

func (as *activeStream[CT, DT]) checkFilter(event *Event[DT]) bool {
if as.spec.topicFilterRegexp != nil {
return as.spec.topicFilterRegexp.Match([]byte(event.Topic))
esSpec := as.spec.ESFields()
if esSpec.topicFilterRegexp != nil {
return esSpec.topicFilterRegexp.Match([]byte(event.Topic))
}
return true
}
Expand Down Expand Up @@ -149,8 +150,9 @@ func (as *activeStream[CT, DT]) runSourceLoop(initialCheckpointSequenceID string
func (as *activeStream[CT, DT]) runBatchLoop() {
defer close(as.batchLoopDone)

esSpec := as.spec.ESFields()
var batch *eventStreamBatch[DT]
batchTimeout := time.Duration(*as.spec.BatchTimeout)
batchTimeout := time.Duration(*esSpec.BatchTimeout)
var noBatchActive <-chan time.Time = make(chan time.Time) // never pops
batchTimedOut := noBatchActive
for {
Expand All @@ -173,15 +175,15 @@ func (as *activeStream[CT, DT]) runBatchLoop() {
batch = &eventStreamBatch[DT]{
number: as.batchNumber,
batchTimer: time.NewTimer(batchTimeout),
events: make([]*Event[DT], 0, *as.spec.BatchSize),
events: make([]*Event[DT], 0, *esSpec.BatchSize),
}
batchTimedOut = batch.batchTimer.C
}
batch.events = append(batch.events, event)
}
}
batchDispatched := false
if batch != nil && (len(batch.events) >= *as.spec.BatchSize || timedOut) {
if batch != nil && (len(batch.events) >= *esSpec.BatchSize || timedOut) {
// attempt dispatch (only returns err on exit)
if err := as.dispatchBatch(batch); err != nil {
log.L(as.ctx).Debugf("batch loop done: %s", err)
Expand Down Expand Up @@ -241,6 +243,7 @@ func (as *activeStream[CT, DT]) checkpointRoutine() {
return // We're done
}
err := as.retry.Do(as.ctx, "checkpoint", func(attempt int) (retry bool, err error) {
log.L(as.bgCtx).Debugf("Writing checkpoint id=%s sequenceID=%s", as.spec.GetID(), checkpointSequenceID)
_, err = as.esm.persistence.Checkpoints().Upsert(as.ctx, &EventStreamCheckpoint{
ID: ptrTo(as.spec.GetID()), // the ID of the stream is the ID of the checkpoint
SequenceID: &checkpointSequenceID,
Expand All @@ -266,9 +269,12 @@ func (as *activeStream[CT, DT]) dispatchBatch(batch *eventStreamBatch[DT]) (err
as.LastDispatchAttempts = 0
as.LastDispatchStatus = DispatchStatusDispatching
as.HighestDispatched = batch.events[len(batch.events)-1].SequenceID
esSpec := as.spec.ESFields()
for {
// Short exponential back-off retry
err := as.retry.Do(as.ctx, "action", func(_ int) (retry bool, err error) {
log.L(as.ctx).Debugf("Batch %d attempt %d dispatching. Len=%d",
batch.number, as.LastDispatchAttempts, len(batch.events))
err = as.action.AttemptDispatch(as.ctx, as.LastDispatchAttempts, &EventBatch[DT]{
Type: MessageTypeEventBatch,
StreamID: as.spec.GetID(),
Expand All @@ -281,7 +287,7 @@ func (as *activeStream[CT, DT]) dispatchBatch(batch *eventStreamBatch[DT]) (err
as.LastDispatchAttempts++
as.LastDispatchFailure = err.Error()
as.LastDispatchStatus = DispatchStatusRetrying
return time.Since(*as.LastDispatchTime.Time()) < time.Duration(*as.spec.RetryTimeout), err
return time.Since(*as.LastDispatchTime.Time()) < time.Duration(*esSpec.RetryTimeout), err
}
as.LastDispatchStatus = DispatchStatusComplete
return false, nil
Expand All @@ -292,14 +298,14 @@ func (as *activeStream[CT, DT]) dispatchBatch(batch *eventStreamBatch[DT]) (err
// We're in blocked retry delay
as.LastDispatchStatus = DispatchStatusBlocked
log.L(as.ctx).Errorf("Batch failed short retry after %.2fs secs. ErrorHandling=%s BlockedRetryDelay=%.2fs ",
time.Since(*as.LastDispatchTime.Time()).Seconds(), *as.spec.ErrorHandling, time.Duration(*as.spec.BlockedRetryDelay).Seconds())
if *as.spec.ErrorHandling == ErrorHandlingTypeSkip {
time.Since(*as.LastDispatchTime.Time()).Seconds(), *esSpec.ErrorHandling, time.Duration(*esSpec.BlockedRetryDelay).Seconds())
if *esSpec.ErrorHandling == ErrorHandlingTypeSkip {
// Swallow the error now we have logged it
as.LastDispatchStatus = DispatchStatusSkipped
return nil
}
select {
case <-time.After(time.Duration(*as.spec.BlockedRetryDelay)):
case <-time.After(time.Duration(*esSpec.BlockedRetryDelay)):
case <-as.ctx.Done():
// Only way we exit with error, is if the context is cancelled
return i18n.NewError(as.ctx, i18n.MsgContextCanceled)
Expand Down
43 changes: 26 additions & 17 deletions pkg/eventstreams/activestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestCheckpointContextClose(t *testing.T) {
})
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)
Expand All @@ -47,12 +47,12 @@ func TestRunSourceLoopDone(t *testing.T) {
ctx, es, mes, done := newTestEventStream(t)
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)

mes.run = func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error {
mes.run = func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error {
deliver(nil)
return nil
}
Expand All @@ -66,13 +66,13 @@ func TestRunSourceEventsBlockedExit(t *testing.T) {
ctx, es, mes, done := newTestEventStream(t)
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)

as.events = make(chan *Event[testData])
mes.run = func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error {
mes.run = func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error {
deliver([]*Event[testData]{{ /* will block */ }})
return nil
}
Expand All @@ -92,7 +92,7 @@ func TestBatchTimeout(t *testing.T) {
es.spec.BatchTimeout = ptrTo(fftypes.FFDuration(1 * time.Millisecond))

delivered := false
mes.run = func(ctx context.Context, es *EventStreamSpec[testESConfig], checkpointSequenceId string, deliver Deliver[testData]) error {
mes.run = func(ctx context.Context, es *GenericEventStream, checkpointSequenceId string, deliver Deliver[testData]) error {
if delivered {
<-ctx.Done()
} else {
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestQueuedCheckpointAsync(t *testing.T) {
})
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)
Expand All @@ -156,7 +156,7 @@ func TestQueuedCheckpointCancel(t *testing.T) {
})
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)
Expand All @@ -172,7 +172,7 @@ func TestDispatchSkipError(t *testing.T) {
ctx, es, _, done := newTestEventStream(t)
defer done()

as := &activeStream[testESConfig, testData]{
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)
Expand All @@ -196,31 +196,40 @@ func TestDispatchBlockError(t *testing.T) {
ctx, es, _, done := newTestEventStream(t)
defer done()

as := &activeStream[testESConfig, testData]{
eventStream: es,
as := &activeStream[*GenericEventStream, testData]{
eventStream: es,
events: make(chan *Event[testData]),
batchLoopDone: make(chan struct{}),
}
as.ctx, as.cancelCtx = context.WithCancel(ctx)

as.spec.BatchSize = ptrTo(1)
as.spec.RetryTimeout = ptrTo(fftypes.FFDuration(1 * time.Microsecond))
as.spec.BlockedRetryDelay = ptrTo(fftypes.FFDuration(1 * time.Microsecond))
as.spec.BlockedRetryDelay = ptrTo(fftypes.FFDuration(10 * time.Millisecond))
as.spec.ErrorHandling = ptrTo(ErrorHandlingTypeBlock)
callCount := 0
calls := make(chan bool)
as.action = &mockAction{
attemptDispatch: func(ctx context.Context, attempt int, events *EventBatch[testData]) error {
calls <- true
callCount++
if callCount > 1 {
as.cancelCtx()
}
return fmt.Errorf("pop")
},
}

dispatchDone := make(chan struct{})
go func() {
err := as.dispatchBatch(&eventStreamBatch[testData]{
events: []*Event[testData]{{}},
})
assert.Error(t, err)
defer close(dispatchDone)
as.runBatchLoop()
}()

as.events <- &Event[testData]{}

<-calls
<-calls
as.cancelCtx()
<-dispatchDone

}
10 changes: 5 additions & 5 deletions pkg/eventstreams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (
// Generics:
// - CT is the Configuration Type - the custom extensions to the configuration schema
// - DT is the Data Type - the payload type that will be delivered to the application
type DispatcherFactory[CT any, DT any] interface {
Validate(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT], tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error
NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec *EventStreamSpec[CT]) Dispatcher[DT]
type DispatcherFactory[CT EventStreamSpec, DT any] interface {
Validate(ctx context.Context, conf *Config[CT, DT], spec CT, tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error
NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec CT) Dispatcher[DT]
}

type Config[CT any, DT any] struct {
type Config[CT EventStreamSpec, DT any] struct {
TLSConfigs map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"`
Retry *retry.Retry `ffstruct:"EventStreamConfig" json:"retry,omitempty"`
DisablePrivateIPs bool `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"`
Expand Down Expand Up @@ -135,7 +135,7 @@ func InitConfig(conf config.Section) {

// Optional function to generate config directly from YAML configuration using the config package.
// You can also generate the configuration programmatically
func GenerateConfig[CT any, DT any](ctx context.Context) *Config[CT, DT] {
func GenerateConfig[CT EventStreamSpec, DT any](ctx context.Context) *Config[CT, DT] {
httpDefaults, _ := ffresty.GenerateConfig(ctx, WebhookDefaultsConfig)
tlsConfigs := map[string]*fftls.Config{}
for i := 0; i < TLSConfigs.ArraySize(); i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventstreams/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestGenerateConfigTLS(t *testing.T) {
tls0.Set(ConfigTLSConfigName, "tls0")
tls0.SubSection("tls").Set(fftls.HTTPConfTLSCAFile, t.TempDir())

c := GenerateConfig[testESConfig, testData](context.Background())
c := GenerateConfig[*GenericEventStream, testData](context.Background())
assert.True(t, c.TLSConfigs["tls0"].Enabled)

}
Loading

0 comments on commit a4a1c3c

Please sign in to comment.