Skip to content

Commit

Permalink
Merge branch 'main' into 380268299-implementing-the-alternative
Browse files Browse the repository at this point in the history
  • Loading branch information
danieljbruce authored Nov 27, 2024
2 parents 96a9a36 + b022a71 commit 2a553dd
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions test/readrows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,64 @@ describe('Bigtable/ReadRows', () => {
}
})();
});
it('should return row data in the right order with a predictable sleep function', function (done) {
this.timeout(600000);
const keyFrom = undefined;
const keyTo = undefined;
// the server will error after sending this chunk (not row)
const errorAfterChunkNo = 100;
const dataResults = [];

// keyTo and keyFrom are not provided so they will be determined from
// the request that is passed in.
service.setService({
ReadRows: ReadRowsImpl.createService({
errorAfterChunkNo: 100, // the server will error after sending this chunk (not row)
valueSize: 1,
chunkSize: 1,
chunksPerResponse: 1,
debugLog,
}) as ServerImplementationInterface,
});
const sleep = (ticks: number) => {
// Adds an event to the end of the event loop `ticks` times
// This creates a predictable delay using the event loop and
// allows the streams to create a predictable amount of back pressure.
return new Promise(resolve => {
const nextEventLoop = () => {
if (ticks > 0) {
ticks = ticks - 1;
setImmediate(nextEventLoop);
} else {
resolve(ticks);
}
};
nextEventLoop();
});
};
(async () => {
try {
// 150 rows must be enough to reproduce issues with losing the data and to create backpressure
const stream = table.createReadStream({
start: '00000000',
end: '00000150',
});

for await (const row of stream) {
dataResults.push(row.id);
// sleep parameter needs to be high enough to produce backpressure.
await sleep(4000);
}
const expectedResults = Array.from(Array(150).keys())
.map(i => '00000000' + i.toString())
.map(i => i.slice(-8));
assert.deepStrictEqual(dataResults, expectedResults);
done();
} catch (error) {
done(error);
}
})();
});

after(async () => {
server.shutdown(() => {});
Expand Down

0 comments on commit 2a553dd

Please sign in to comment.