Skip to content

Commit

Permalink
Merge pull request #1172 from salesforcecli/cd/fix-bulk-upsert
Browse files Browse the repository at this point in the history
fix(bulk-v1): properly handle +1 job batches
  • Loading branch information
soridalac authored Jan 23, 2025
2 parents 6f1445b + a14468f commit 89e1e5e
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/onRelease.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
uses: salesforcecli/github-workflows/.github/workflows/npmPublish.yml@main
needs: [getDistTag]
with:
ctc: true
# ctc: true
sign: true
tag: ${{ needs.getDistTag.outputs.tag || 'latest' }}
githubTag: ${{ github.event.release.tag_name || inputs.tag }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
- 'yarn test:nuts:data:record'
- 'yarn test:nuts:data:search'
- 'yarn test:nuts:data:tree'
- 'yarn test:nuts:force:data:bulk-upsert-delete-status'
fail-fast: false
with:
os: ${{ matrix.os }}
Expand Down
2 changes: 1 addition & 1 deletion messages/batcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Batch Status

Will poll the batch statuses every %s seconds.
To fetch the status on your own, press CTRL+C and use the command:
sf force data bulk status -i %s -b [<batchId>]
sf force data bulk status -i %s -b %s

# ExternalIdRequired

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@
"test:nuts:data:search": "nyc mocha \"./test/commands/data/search.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20",
"test:nuts:data:create": "nyc mocha \"./test/commands/data/create/*.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20",
"test:nuts:data:bulk-upsert-delete": "nyc mocha \"./test/commands/data/dataBulk.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20",
"test:nuts:force:data:bulk-upsert-delete-status": "nyc mocha \"./test/commands/force/data/bulk/dataBulk.nut.ts\" --slow 4500 --timeout 600000 --parallel --jobs 20",
"test:only": "wireit",
"version": "oclif readme"
},
"dependencies": {
"@jsforce/jsforce-node": "^3.6.3",
"@jsforce/jsforce-node": "^3.6.4",
"@oclif/multi-stage-output": "^0.8.5",
"@salesforce/core": "^8.6.1",
"@salesforce/kit": "^3.2.2",
Expand Down
39 changes: 16 additions & 23 deletions src/batcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { ReadStream } from 'node:fs';
import { EOL } from 'node:os';
import { Connection, Messages, SfError } from '@salesforce/core';
import { Ux } from '@salesforce/sf-plugins-core';
import type { Schema } from '@jsforce/jsforce-node';
Expand All @@ -31,7 +32,7 @@ const messages = Messages.loadMessages('@salesforce/plugin-data', 'batcher');
type BatchEntry = Record<string, string>;
type Batches = BatchEntry[][];

type BulkResult = {
export type BulkResult = {
$: {
xmlns: string;
};
Expand All @@ -58,19 +59,12 @@ export class Batcher {
* @param jobId {string}
* @param doneCallback
*/
public async fetchAndDisplayJobStatus(
jobId: string,
doneCallback?: (...args: [{ job: JobInfo }]) => void
): Promise<JobInfo> {
public async fetchAndDisplayJobStatus(jobId: string): Promise<JobInfo> {
const job = this.conn.bulk.job(jobId);
const jobInfo = await job.check();

this.bulkStatus(jobInfo, undefined, undefined, true);

if (doneCallback) {
doneCallback({ job: jobInfo });
}

return jobInfo;
}

Expand Down Expand Up @@ -120,8 +114,7 @@ export class Batcher {
records: ReadStream,
sobjectType: string,
wait?: number
): Promise<BulkResult[] | JobInfo[]> {
const batchesCompleted = 0;
): Promise<BulkResult[] | JobInfo[] | undefined> {
let batchesQueued = 0;
const overallInfo = false;

Expand All @@ -130,7 +123,10 @@ export class Batcher {
// The error handling for this gets quite tricky when there are multiple batches
// Currently, we bail out early by calling an Error.exit
// But, we might want to actually continue to the next batch.
return (await Promise.all(
//
// async: batches are created and return array of batch info.
// sync: batches are created, it waits until they all finish and Promise.all resolves `undefined`.
const batchInfos = (await Promise.all(
batches.map(
async (batch: Array<Record<string, string>>, i: number): Promise<BulkResult | BatchInfo | void | JobInfo> => {
const newBatch = job.createBatch();
Expand Down Expand Up @@ -186,14 +182,16 @@ export class Batcher {
}
);
} else {
resolve(this.waitForCompletion(newBatch, batchesCompleted, overallInfo, i + 1, batches.length, wait));
resolve(this.waitForCompletion(newBatch, overallInfo, i + 1, wait));
}

void newBatch.execute(batch);
});
}
)
)) as BulkResult[];
)) as BulkResult[] | undefined;

return wait ? [await this.fetchAndDisplayJobStatus(job.id!)] : batchInfos;
}

/**
Expand Down Expand Up @@ -229,12 +227,10 @@ export class Batcher {
*/
private async waitForCompletion<J extends Schema, T extends BulkOperation>(
newBatch: Batch<J, T>,
batchesCompleted: number,
overallInfo: boolean,
batchNum: number,
totalNumBatches: number,
waitMins: number
): Promise<JobInfo> {
): Promise<void> {
return new Promise((resolve, reject) => {
void newBatch.on(
'queue',
Expand All @@ -245,10 +241,10 @@ export class Batcher {
if (result.state === 'Failed') {
reject(result.stateMessage);
} else if (!overallInfo) {
this.ux.log(messages.getMessage('PollingInfo', [POLL_FREQUENCY_MS / 1000, batchInfo.jobId]));
this.ux.log(messages.getMessage('PollingInfo', [POLL_FREQUENCY_MS / 1000, batchInfo.jobId, batchInfo.id]));
overallInfo = true;
}
this.ux.log(messages.getMessage('BatchQueued', [batchNum, batchInfo.id]));
this.ux.log(messages.getMessage('BatchQueued', [batchNum, batchInfo.id]), EOL);
newBatch.poll(POLL_FREQUENCY_MS, waitMins * 60_000);
}
);
Expand All @@ -257,10 +253,7 @@ export class Batcher {
void newBatch.on('response', async (results: BulkIngestBatchResult): Promise<void> => {
const summary: BatchInfo = await newBatch.check();
this.bulkStatus(summary, results, batchNum);
batchesCompleted++;
if (batchesCompleted === totalNumBatches) {
resolve(await this.fetchAndDisplayJobStatus(summary.jobId));
}
resolve();
});
});
}
Expand Down
1 change: 1 addition & 0 deletions test/commands/batcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ let ux: Ux;
describe('batcher', () => {
const $$ = sinon.createSandbox();
describe('bulkBatchStatus', () => {
// @ts-ignore this test doesn't nedd all JobInfo props.
const summary: JobInfo = { id: '123', operation: 'upsert', state: 'Closed', object: 'Account' };
beforeEach(() => {
const conn = $$.stub(Connection.prototype);
Expand Down
4 changes: 2 additions & 2 deletions test/commands/data/tree/dataTreeMoreThan200.nut.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe('data:tree commands with more than 200 records are batches in safe grou
ensureExitCode: 0,
}
);
expect(importResult.jsonOutput?.result.length).to.equal(10000, 'Expected 10000 records to be imported');
expect(importResult.jsonOutput?.result.length).to.equal(10_000, 'Expected 10000 records to be imported');

execCmd(
`data:export:tree --query "${query}" --prefix ${prefix} --output-dir ${path.join(
Expand Down Expand Up @@ -75,7 +75,7 @@ describe('data:tree commands with more than 200 records are batches in safe grou
}).jsonOutput;

expect(queryResults?.result.totalSize).to.equal(
10000,
10_000,
`Expected 10000 Account objects returned by the query to org: ${importAlias}`
);
});
Expand Down
57 changes: 56 additions & 1 deletion test/commands/force/data/bulk/dataBulk.nut.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import fs from 'node:fs';
import { expect } from 'chai';
import { execCmd, TestSession } from '@salesforce/cli-plugins-testkit';
import { sleep } from '@salesforce/kit';
import { BatcherReturnType } from '../../../../../src/batcher.js';
import { JobInfo } from '@jsforce/jsforce-node/lib/api/bulk.js';
import { BatcherReturnType, BulkResult } from '../../../../../src/batcher.js';
import { StatusResult } from '../../../../../src/types.js';
import { QueryResult } from '../../../data/query/query.nut.js';
import { DataExportBulkResult } from '../../../../../src/commands/data/export/bulk.js';

let testSession: TestSession;

Expand Down Expand Up @@ -121,6 +123,55 @@ describe('force:data:bulk commands', () => {
checkBulkStatusJsonResponse(bulkDeleteResult.jobId, bulkDeleteResult.id);
});

// Bulk v1 batch limit is 10K records so this NUT ensures we handle multiple batches correctly.
it('should upsert, query and delete 60K accounts (sync)', async () => {
// bulk v1 upsert
const cmdRes = execCmd<BatcherReturnType>(
`force:data:bulk:upsert --sobject Account --file ${path.join(
'.',
'data',
'bulkUpsertLarge.csv'
)} --externalid Id --wait 10 --json`,
{ ensureExitCode: 0 }
).jsonOutput?.result;
assert.equal(cmdRes?.length, 1);
// guaranteed by the assertion, done for ts
const upsertJobResult = cmdRes[0];

if (isBulkJob(upsertJobResult)) {
assert.equal(upsertJobResult.numberBatchesCompleted, '8');
assert.equal(upsertJobResult.numberBatchesFailed, '0');
assert.equal(upsertJobResult.numberRecordsProcessed, '76380');
} else {
assert.fail('upsertJobResult does not contain bulk job info.');
}

// bulk v2 export to get IDs of accounts to delete
const outputFile = 'export-accounts.csv';
const result = execCmd<DataExportBulkResult>(
`data export bulk -q "select id from account where phone = '415-555-0000'" --output-file ${outputFile} --wait 10 --json`,
{ ensureExitCode: 0 }
).jsonOutput?.result;
expect(result?.totalSize).to.equal(76_380);
expect(result?.filePath).to.equal(outputFile);

// bulk v1 delete
const cmdDeleteRes = execCmd<BatcherReturnType>(
`force:data:bulk:delete --sobject Account --file ${outputFile} --wait 10 --json`,
{ ensureExitCode: 0 }
).jsonOutput?.result;
assert.equal(cmdDeleteRes?.length, 1);
// guaranteed by the assertion, done for ts
const deleteJobResult = cmdDeleteRes[0];
if (isBulkJob(deleteJobResult)) {
assert.equal(deleteJobResult.numberBatchesCompleted, '8');
assert.equal(deleteJobResult.numberBatchesFailed, '0');
assert.equal(deleteJobResult.numberRecordsProcessed, '76380');
} else {
assert.fail('deleteJobResult does not contain bulk job info.');
}
});

it('should upsert, query, and delete 10 accounts all serially', async () => {
const bulkUpsertResult = bulkInsertAccounts();
await isCompleted(
Expand Down Expand Up @@ -195,3 +246,7 @@ const bulkInsertAccounts = () => {
assert('jobId' in bulkUpsertResult);
return bulkUpsertResult;
};

function isBulkJob(info: JobInfo | BulkResult): info is JobInfo {
return (info as JobInfo).numberBatchesCompleted !== undefined;
}
32 changes: 6 additions & 26 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -939,22 +939,14 @@
"@smithy/types" "^3.7.2"
tslib "^2.6.2"

"@aws-sdk/[email protected]":
"@aws-sdk/[email protected]", "@aws-sdk/types@^3.222.0":
version "3.723.0"
resolved "https://registry.yarnpkg.com/@aws-sdk/types/-/types-3.723.0.tgz#f0c5a6024a73470421c469b6c1dd5bc4b8fb851b"
integrity sha512-LmK3kwiMZG1y5g3LGihT9mNkeNOmwEyPk6HGcJqh0wOSV4QpWoKu2epyKE4MLQNUUlz2kOVbVbOrwmI6ZcteuA==
dependencies:
"@smithy/types" "^4.0.0"
tslib "^2.6.2"

"@aws-sdk/types@^3.222.0":
version "3.692.0"
resolved "https://registry.yarnpkg.com/@aws-sdk/types/-/types-3.692.0.tgz#c8f6c75b6ad659865b72759796d4d92c1b72069b"
integrity sha512-RpNvzD7zMEhiKgmlxGzyXaEcg2khvM7wd5sSHVapOcrde1awQSOMGI4zKBQ+wy5TnDfrm170ROz/ERLYtrjPZA==
dependencies:
"@smithy/types" "^3.7.0"
tslib "^2.6.2"

"@aws-sdk/[email protected]":
version "3.723.0"
resolved "https://registry.yarnpkg.com/@aws-sdk/util-arn-parser/-/util-arn-parser-3.723.0.tgz#e9bff2b13918a92d60e0012101dad60ed7db292c"
Expand Down Expand Up @@ -1503,12 +1495,7 @@
"@inquirer/type" "^3.0.2"
yoctocolors-cjs "^2.1.2"

"@inquirer/figures@^1.0.5", "@inquirer/figures@^1.0.6":
version "1.0.8"
resolved "https://registry.yarnpkg.com/@inquirer/figures/-/figures-1.0.8.tgz#d9e414a1376a331a0e71b151fea27c48845788b0"
integrity sha512-tKd+jsmhq21AP1LhexC0pPwsCxEhGgAkg28byjJAd+xhmIs8LUX8JbUc3vBf3PhLxWiB5EvyBE5X7JSPAqMAqg==

"@inquirer/figures@^1.0.9":
"@inquirer/figures@^1.0.5", "@inquirer/figures@^1.0.6", "@inquirer/figures@^1.0.9":
version "1.0.9"
resolved "https://registry.yarnpkg.com/@inquirer/figures/-/figures-1.0.9.tgz#9d8128f8274cde4ca009ca8547337cab3f37a4a3"
integrity sha512-BXvGj0ehzrngHTPTDqUoDT3NXL8U0RxUk2zJm2A66RhCEIWdtU1v6GuUqNAgArW4PQ9CinqIWyHdQgdwOj06zQ==
Expand Down Expand Up @@ -1699,10 +1686,10 @@
"@jridgewell/resolve-uri" "^3.1.0"
"@jridgewell/sourcemap-codec" "^1.4.14"

"@jsforce/jsforce-node@^3.6.1", "@jsforce/jsforce-node@^3.6.3":
version "3.6.3"
resolved "https://registry.yarnpkg.com/@jsforce/jsforce-node/-/jsforce-node-3.6.3.tgz#a5c984b6deffac01ddabc3f4b48374408c5cd194"
integrity sha512-sNUeBzfUv57uH0AiYuAOO8yjBP7lNY33mWybrjvBud8gMFVWozY6UAWU1DUk/dpqZ0+FK3iqB++nOQRczj1nSg==
"@jsforce/jsforce-node@^3.6.1", "@jsforce/jsforce-node@^3.6.4":
version "3.6.4"
resolved "https://registry.yarnpkg.com/@jsforce/jsforce-node/-/jsforce-node-3.6.4.tgz#c1055e2064a501633e9d86f6f2fe1b287c6ce9ce"
integrity sha512-9IZL5lFDE1nUnPYnzOleH0xaEE3Sc9sQcLKwx1LQeSyAI/KvnxySadlIpZAdTrJap4hDRFQkXiEFiJ3oFwj/zg==
dependencies:
"@sindresorhus/is" "^4"
base64url "^3.0.1"
Expand Down Expand Up @@ -2654,13 +2641,6 @@
"@smithy/util-stream" "^4.0.1"
tslib "^2.6.2"

"@smithy/types@^3.7.0":
version "3.7.1"
resolved "https://registry.yarnpkg.com/@smithy/types/-/types-3.7.1.tgz#4af54c4e28351e9101996785a33f2fdbf93debe7"
integrity sha512-XKLcLXZY7sUQgvvWyeaL/qwNPp6V3dWcUjqrQKjSb+tzYiCy340R/c64LV5j+Tnb2GhmunEX0eou+L+m2hJNYA==
dependencies:
tslib "^2.6.2"

"@smithy/types@^3.7.2":
version "3.7.2"
resolved "https://registry.yarnpkg.com/@smithy/types/-/types-3.7.2.tgz#05cb14840ada6f966de1bf9a9c7dd86027343e10"
Expand Down

0 comments on commit 89e1e5e

Please sign in to comment.