diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index 2c1b39c247..ba3c10bc9b 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -196,6 +196,8 @@ func TestBSBProducerCallerCancelsCtx(t *testing.T) { BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), } + pubConfig.BufferedStorageConfig.NumWorkers = 1 + // the buffering runs async, test needs to stub datastore methods for potential invocation, // but is race, since test also cancels the backend context which started the buffer, // so, not deterministic, no assert on these. @@ -205,15 +207,20 @@ func TestBSBProducerCallerCancelsCtx(t *testing.T) { FilesPerPartition: 1, }) - mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(makeSingleLCMBatch(2), nil) + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd"). + Run(func(args mock.Arguments) { + cancel() + }). + Return(makeSingleLCMBatch(2), nil) + // this second attempt needs to be mocked, ledger buffer queues this 'next' sequence task automatically + // in getFromLedgerQueue after it receives "FFFFFFFD--2.xdr.zstd", the ctx is not checked then or in + // the async worker routine that receives the task. mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil) appCallback := func(lcm xdr.LedgerCloseMeta) error { return nil } - cancel() - datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { return mockDataStore, nil }