From b66b288c58c4ec612fa1b204f4d1d7f595622330 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Sun, 21 Jan 2024 22:14:19 -0500 Subject: [PATCH] Fix incorrect use of HTTP context on background stream Signed-off-by: Peter Broadhurst --- pkg/eventstreams/eventstreams.go | 8 ++++---- pkg/eventstreams/eventstreams_test.go | 6 +++--- pkg/eventstreams/manager.go | 7 +++++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/eventstreams/eventstreams.go b/pkg/eventstreams/eventstreams.go index de68016..b768c5a 100644 --- a/pkg/eventstreams/eventstreams.go +++ b/pkg/eventstreams/eventstreams.go @@ -236,24 +236,24 @@ type EventStreamActions[CT any] interface { } func (esm *esManager[CT, DT]) initEventStream( - bgCtx context.Context, spec CT, ) (es *eventStream[CT, DT], err error) { // Validate - factory, err := esm.validateStream(bgCtx, spec, LifecyclePhaseStarting) + factory, err := esm.validateStream(esm.bgCtx, spec, LifecyclePhaseStarting) if err != nil { return nil, err } + streamCtx := log.WithLogField(esm.bgCtx, "eventstream", *spec.ESFields().Name) es = &eventStream[CT, DT]{ - bgCtx: log.WithLogField(bgCtx, "eventstream", *spec.ESFields().Name), + bgCtx: streamCtx, esm: esm, spec: spec, persistence: esm.persistence, retry: esm.config.Retry, } - es.action = factory.NewDispatcher(es.bgCtx, &esm.config, spec) + es.action = factory.NewDispatcher(streamCtx, &esm.config, spec) log.L(es.bgCtx).Infof("Initialized Event Stream") if *spec.ESFields().Status == EventStreamStatusStarted { diff --git a/pkg/eventstreams/eventstreams_test.go b/pkg/eventstreams/eventstreams_test.go index 1c05b65..a4f4da9 100644 --- a/pkg/eventstreams/eventstreams_test.go +++ b/pkg/eventstreams/eventstreams_test.go @@ -33,7 +33,7 @@ func newTestEventStream(t *testing.T, extraSetup ...func(mdb *mockPersistence)) mdb.eventStreams.On("GetMany", mock.Anything, mock.Anything).Return([]*GenericEventStream{}, &ffapi.FilterResult{}, nil) }) ctx, mgr, mes, done := newMockESManager(t, extraSetup...) - es, err := mgr.initEventStream(ctx, &GenericEventStream{ + es, err := mgr.initEventStream(&GenericEventStream{ ResourceBase: dbsql.ResourceBase{ID: fftypes.NewUUID()}, EventStreamSpecFields: EventStreamSpecFields{ Name: ptrTo(t.Name()), @@ -161,7 +161,7 @@ func TestValidate(t *testing.T) { _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00216", err) - _, err = es.esm.initEventStream(ctx, es.spec) + _, err = es.esm.initEventStream(es.spec) assert.Regexp(t, "FF00216", err) customType := fftypes.FFEnumValue("estype", "custom1") @@ -169,7 +169,7 @@ func TestValidate(t *testing.T) { _, err = es.esm.validateStream(ctx, es.spec, LifecyclePhasePreInsertValidation) assert.Regexp(t, "FF00217", err) - _, err = es.esm.initEventStream(ctx, es.spec) + _, err = es.esm.initEventStream(es.spec) assert.Regexp(t, "FF00217", err) } diff --git a/pkg/eventstreams/manager.go b/pkg/eventstreams/manager.go index d914c17..fcc1f6b 100644 --- a/pkg/eventstreams/manager.go +++ b/pkg/eventstreams/manager.go @@ -75,6 +75,7 @@ type Runtime[ConfigType EventStreamSpec, DataType any] interface { } type esManager[CT EventStreamSpec, DT any] struct { + bgCtx context.Context config Config[CT, DT] mux sync.Mutex streams map[string]*eventStream[CT, DT] @@ -100,6 +101,7 @@ func NewEventStreamManager[CT EventStreamSpec, DT any](ctx context.Context, conf } } esm := &esManager[CT, DT]{ + bgCtx: ctx, config: *config, tlsConfigs: tlsConfigs, runtime: source, @@ -168,7 +170,7 @@ func (esm *esManager[CT, DT]) initialize(ctx context.Context) error { return err } } else { - es, err := esm.initEventStream(ctx, esSpec) + es, err := esm.initEventStream(esSpec) if err != nil { return err } @@ -238,7 +240,8 @@ func (esm *esManager[CT, DT]) reInit(ctx context.Context, spec CT, existing *eve return err } } - es, err := esm.initEventStream(ctx, spec) + // initializing the event stream happens outside of this context (must be long lived) + es, err := esm.initEventStream(spec) if err != nil { return err }