Skip to content

Commit

Permalink
Merge pull request #652 from terascope/more-write-performance
Browse files Browse the repository at this point in the history
v2.1.6 - More write performance
  • Loading branch information
peterdemartini authored Aug 17, 2021
2 parents 2ba9094 + 3c197da commit 93f76d8
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 99 deletions.
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "file",
"version": "2.1.5",
"version": "2.1.6",
"description": "A set of processors for working with files"
}
4 changes: 2 additions & 2 deletions asset/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "file",
"version": "2.1.5",
"version": "2.1.6",
"description": "A set of processors for working with files",
"private": true,
"workspaces": {
Expand All @@ -14,7 +14,7 @@
"build:watch": "yarn build --watch"
},
"dependencies": {
"@terascope/file-asset-apis": "^0.4.10",
"@terascope/file-asset-apis": "^0.4.11",
"@terascope/job-components": "^0.52.7",
"csvtojson": "^2.0.10",
"fs-extra": "^10.0.0",
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "file-assets-bundle",
"version": "2.1.5",
"version": "2.1.6",
"description": "A set of processors for working with files",
"repository": "https://github.com/terascope/file-assets.git",
"author": "Terascope, LLC <[email protected]>",
Expand All @@ -27,7 +27,7 @@
"dependencies": {},
"devDependencies": {
"@terascope/eslint-config": "^0.6.0",
"@terascope/file-asset-apis": "^0.4.10",
"@terascope/file-asset-apis": "^0.4.11",
"@terascope/job-components": "^0.52.7",
"@types/fs-extra": "^9.0.12",
"@types/jest": "^27.0.1",
Expand Down
3 changes: 1 addition & 2 deletions packages/file-asset-apis/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/file-asset-apis",
"version": "0.4.10",
"version": "0.4.11",
"description": "file reader and sender apis",
"publishConfig": {
"access": "public"
Expand All @@ -24,7 +24,6 @@
"license": "MIT",
"dependencies": {
"@terascope/utils": "^0.40.5",
"async-mutex": "^0.3.1",
"csvtojson": "^2.0.10",
"fs-extra": "^10.0.0",
"json2csv": "5.0.6",
Expand Down
74 changes: 34 additions & 40 deletions packages/file-asset-apis/src/base/ChunkGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,31 @@ export interface Chunk {
readonly has_more: boolean;
}

const MiB = 1024 * 1024;

/**
* Efficiently breaks up a slice into multiple chunks.
* The behavior will change depending if the whole slice has be
* serialized at once or not.
*
* The chunk size is not a guaranteed since we check the length
*/
export class ChunkGenerator {
/**
* Used for determine how big each chunk of a single file should be
*/
static MAX_CHUNK_SIZE_BYTES = (isTest ? 5 : 100) * 1024 * 1024;
static MAX_CHUNK_SIZE_BYTES = (isTest ? 5 : 100) * MiB;

/**
* Used for determine how much the chunk must overflow byte to
* defer the data to the next chunk
*/
static MAX_CHUNK_OVERFLOW_BYTES = (isTest ? 0 : 1) * MiB;

/*
* 5MiB - Minimum part size for multipart uploads with Minio
*/
static MIN_CHUNK_SIZE_BYTES = 1024 * 1024 * 5;
static MIN_CHUNK_SIZE_BYTES = 5 * MiB;

constructor(
readonly formatter: Formatter,
Expand Down Expand Up @@ -67,58 +77,46 @@ export class ChunkGenerator {

private async* _chunkByRow(): AsyncIterableIterator<Chunk> {
const chunkSize = getBytes(ChunkGenerator.MAX_CHUNK_SIZE_BYTES);
const maxOverflowBytes = ChunkGenerator.MAX_CHUNK_OVERFLOW_BYTES;
let index = 0;

const buffers: Buffer[] = [];
let buffersLength = 0;
/**
* This will use the length of the string
* to determine the rough number of bytes
*/
let chunkStr = '';
/**
* this is used to count the number of
* small data pieces so we can break out the event loop
*/
let tooSmallOfDataCount = 0;

let chunk: Chunk|undefined;
for (const [formatted, has_more] of this.formatter.formatIterator(this.slice)) {
for (const [str, has_more] of this.formatter.formatIterator(this.slice)) {
chunk = undefined;

const buf = Buffer.from(formatted, 'utf8');
buffers.push(buf);
buffersLength += buf.length;
chunkStr += str;

/**
* Since a row may push the chunk size over the limit,
* the overflow from the current row buffer needs to
* be deferred until the next iteration
*/
const overflowBytes = buffersLength - chunkSize;
if (overflowBytes > 0) {
const combinedBuffer = Buffer.concat(
buffers,
buffersLength
);
buffersLength = 0;
buffers.length = 0;

const overflowBuf = Buffer.alloc(overflowBytes);
combinedBuffer.copy(overflowBuf, 0, chunkSize);

const uploadBuf = Buffer.alloc(chunkSize, combinedBuffer);
const estimatedOverflowBytes = chunkStr.length - chunkSize;
if (estimatedOverflowBytes > maxOverflowBytes) {
const combinedBuffer = Buffer.from(chunkStr);
chunkStr = combinedBuffer.toString('utf-8', chunkSize);

chunk = {
index,
has_more,
data: uploadBuf,
data: combinedBuffer.slice(0, chunkSize),
};

index++;
buffersLength += overflowBuf.length;
buffers.push(overflowBuf);
} else if (overflowBytes === 0) {
const combinedBuffer = Buffer.concat(
buffers,
buffersLength
);
buffersLength = 0;
buffers.length = 0;
} else if (estimatedOverflowBytes >= 0) {
const combinedBuffer = Buffer.from(chunkStr);
chunkStr = '';

chunk = {
index,
Expand All @@ -138,23 +136,19 @@ export class ChunkGenerator {

// this will ensure we don't block the event loop
// for too long blocking requests from going out
if (tooSmallOfDataCount % 100 === 99) {
if (tooSmallOfDataCount % 1000 === 999) {
await EventLoop.wait();
}
}

if (buffers.length) {
const combinedBuffer = Buffer.concat(
buffers,
buffersLength
);
buffersLength = 0;
buffers.length = 0;
if (chunkStr.length) {
const uploadBuffer = Buffer.from(chunkStr);
chunkStr = '';

chunk = {
index,
has_more: false,
data: combinedBuffer,
data: uploadBuffer,
};

yield chunk;
Expand Down
45 changes: 16 additions & 29 deletions packages/file-asset-apis/src/s3/MultiPartUploader.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { EventEmitter, once } from 'events';
import type S3 from 'aws-sdk/clients/s3';
import { E_CANCELED, Semaphore } from 'async-mutex';
import {
EventLoop,
Logger, pDelay, sortBy, toHumanTime
} from '@terascope/utils';
import {
Expand Down Expand Up @@ -58,21 +56,14 @@ export class MultiPartUploader {
*/
private finishing = false;

/**
* this is used to control the concurrency of the
* part upload requests
*/
private readonly readSemaphore: Semaphore;
private readonly events: EventEmitter;

constructor(
readonly client: S3,
readonly bucket: string,
readonly key: string,
readonly concurrency: number,
readonly logger: Logger
) {
this.readSemaphore = new Semaphore(this.concurrency);
this.events = new EventEmitter();
// just so we don't get warnings set this to a higher number
this.events.setMaxListeners(1000);
Expand Down Expand Up @@ -101,21 +92,21 @@ export class MultiPartUploader {
// adding this here will ensure that
// we give the event loop some time to
// to start the upload
await pDelay(10);
await pDelay(0);
}

/**
* Used wait until the background request for start is finished.
* If that failed, this should throw
*/
private async _waitForStart(ctx: string): Promise<void> {
private _waitForStart(ctx: string): Promise<void>|void {
if (!this.started) {
throw Error('Expected MultiPartUploader->start to have been finished');
}

if (this.uploadId == null) {
this.logger.debug(`${ctx} waiting for upload to start`);
await once(this.events, Events.StartDone);
return once(this.events, Events.StartDone).then(() => undefined);
}

if (this.startError != null) {
Expand All @@ -138,27 +129,23 @@ export class MultiPartUploader {
}

this.pendingParts++;
this.readSemaphore.runExclusive(async () => {
await this._waitForStart(`part #${partNumber}`);
await this._uploadPart(body, partNumber);
}).catch((err) => {
if (err === E_CANCELED) {
this.logger.debug(`upload part #${partNumber} canceled`);
return;
}

this.partUploadErrors.set(String(err), err);
this.readSemaphore.cancel();
}).finally(() => {
this.pendingParts--;
this.events.emit(Events.PartDone);
});
// run this in the background
Promise.resolve()
.then(() => this._waitForStart(`part #${partNumber}`))
.then(() => this._uploadPart(body, partNumber))
.catch((err) => {
this.partUploadErrors.set(String(err), err);
})
.finally(() => {
this.pendingParts--;
this.events.emit(Events.PartDone);
});

if (!this.uploadId) {
if (this.pendingParts > 0 || !this.uploadId) {
// adding this here will ensure that
// we give the event loop some time to
// to start the upload
await EventLoop.wait();
await pDelay(this.pendingParts);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/file-asset-apis/src/s3/s3-sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export class S3Sender extends ChunkedFileSender implements RouteSenderAPI {

if (chunk.has_more) {
uploader = new MultiPartUploader(
this.client, Bucket, Key, this.concurrency, this.logger
this.client, Bucket, Key, this.logger
);
await uploader.start();
} else {
Expand Down
14 changes: 4 additions & 10 deletions packages/file-asset-apis/test/base/ChunkGenerator-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,23 +289,17 @@ describe('ChunkGenerator', () => {
});

it('should return a more than one chunk', async () => {
const wholeBuffer = await gen.compressor.compress(
gen.formatter.format(input)
);
const str = gen.formatter.format(input);
// we just need to ensure that our test will work
expect(wholeBuffer.length).toBeGreaterThan(CHUNK_SIZE);
expect(str.length).toBeGreaterThan(CHUNK_SIZE);

const expected: TestChunk[] = [{
index: 0,
data: wholeBuffer
.subarray(0, CHUNK_SIZE)
.toString(),
data: str.slice(0, CHUNK_SIZE),
has_more: true,
}, {
index: 1,
data: wholeBuffer
.subarray(CHUNK_SIZE, CHUNK_SIZE * 2)
.toString(),
data: str.slice(CHUNK_SIZE, CHUNK_SIZE * 2),
has_more: false,
}];
await expect(toArray(gen)).resolves.toEqual(expected);
Expand Down
12 changes: 0 additions & 12 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1377,13 +1377,6 @@ astral-regex@^2.0.0:
resolved "https://registry.yarnpkg.com/astral-regex/-/astral-regex-2.0.0.tgz#483143c567aeed4785759c0865786dc77d7d2e31"
integrity sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ==

async-mutex@^0.3.1:
version "0.3.1"
resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.3.1.tgz#7033af665f1c7cebed8b878267a43ba9e77c5f67"
integrity sha512-vRfQwcqBnJTLzVQo72Sf7KIUbcSUP5hNchx6udI1U6LuPQpfePgdjJzlCe76yFZ8pxlLjn9lwcl/Ya0TSOv0Tw==
dependencies:
tslib "^2.1.0"

asynckit@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"
Expand Down Expand Up @@ -5845,11 +5838,6 @@ tslib@^1.8.1:
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==

tslib@^2.1.0:
version "2.3.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.3.0.tgz#803b8cdab3e12ba581a4ca41c8839bbb0dacb09e"
integrity sha512-N82ooyxVNm6h1riLCoyS9e3fuJ3AMG2zIZs2Gd1ATcSFjSA23Q0fzjjZeh0jbJvWVDZ0cJT8yaNNaaXHzueNjg==

tsutils@^3.21.0:
version "3.21.0"
resolved "https://registry.yarnpkg.com/tsutils/-/tsutils-3.21.0.tgz#b48717d394cea6c1e096983eed58e9d61715b623"
Expand Down

0 comments on commit 93f76d8

Please sign in to comment.