Skip to content

Commit

Permalink
Fix incorrect use of HTTP context on background stream
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Jan 22, 2024
1 parent 172ff62 commit b66b288
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
8 changes: 4 additions & 4 deletions pkg/eventstreams/eventstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,24 +236,24 @@ type EventStreamActions[CT any] interface {
}

func (esm *esManager[CT, DT]) initEventStream(
bgCtx context.Context,
spec CT,
) (es *eventStream[CT, DT], err error) {
// Validate
factory, err := esm.validateStream(bgCtx, spec, LifecyclePhaseStarting)
factory, err := esm.validateStream(esm.bgCtx, spec, LifecyclePhaseStarting)
if err != nil {
return nil, err
}

streamCtx := log.WithLogField(esm.bgCtx, "eventstream", *spec.ESFields().Name)
es = &eventStream[CT, DT]{
bgCtx: log.WithLogField(bgCtx, "eventstream", *spec.ESFields().Name),
bgCtx: streamCtx,
esm: esm,
spec: spec,
persistence: esm.persistence,
retry: esm.config.Retry,
}

es.action = factory.NewDispatcher(es.bgCtx, &esm.config, spec)
es.action = factory.NewDispatcher(streamCtx, &esm.config, spec)

log.L(es.bgCtx).Infof("Initialized Event Stream")
if *spec.ESFields().Status == EventStreamStatusStarted {
Expand Down
6 changes: 3 additions & 3 deletions pkg/eventstreams/eventstreams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newTestEventStream(t *testing.T, extraSetup ...func(mdb *mockPersistence))
mdb.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil)
})
ctx, mgr, mes, done := newMockESManager(t, extraSetup...)
es, err := mgr.initEventStream(ctx, &GenericEventStream{
es, err := mgr.initEventStream(&GenericEventStream{
ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()},
EventStreamSpecFields: EventStreamSpecFields{
Name: ptrTo(t.Name()),
Expand Down Expand Up @@ -161,15 +161,15 @@ func TestValidate(t *testing.T) {
_, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation)
assert.Regexp(t, "FF00216", err)

_, err = es.esm.initEventStream(ctx, es.spec)
_, err = es.esm.initEventStream(es.spec)
assert.Regexp(t, "FF00216", err)

customType := fftypes.FFEnumValue("estype", "custom1")
es.spec.Type = &customType
_, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation)
assert.Regexp(t, "FF00217", err)

_, err = es.esm.initEventStream(ctx, es.spec)
_, err = es.esm.initEventStream(es.spec)
assert.Regexp(t, "FF00217", err)

}
Expand Down
7 changes: 5 additions & 2 deletions pkg/eventstreams/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Runtime[ConfigType EventStreamSpec, DataType any] interface {
}

type esManager[CT EventStreamSpec, DT any] struct {
bgCtx context.Context
config Config[CT, DT]
mux sync.Mutex
streams map[string]*eventStream[CT, DT]
Expand All @@ -100,6 +101,7 @@ func NewEventStreamManager[CT EventStreamSpec, DT any](ctx context.Context, conf
}
}
esm := &esManager[CT, DT]{
bgCtx: ctx,
config: *config,
tlsConfigs: tlsConfigs,
runtime: source,
Expand Down Expand Up @@ -168,7 +170,7 @@ func (esm *esManager[CT, DT]) initialize(ctx context.Context) error {
return err
}
} else {
es, err := esm.initEventStream(ctx, esSpec)
es, err := esm.initEventStream(esSpec)
if err != nil {
return err
}
Expand Down Expand Up @@ -238,7 +240,8 @@ func (esm *esManager[CT, DT]) reInit(ctx context.Context, spec CT, existing *eve
return err
}
}
es, err := esm.initEventStream(ctx, spec)
// initializing the event stream happens outside of this context (must be long lived)
es, err := esm.initEventStream(spec)
if err != nil {
return err
}
Expand Down

0 comments on commit b66b288

Please sign in to comment.