Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed rate limit error in bulk publish #1746

Draft
wants to merge 3 commits into
base: development
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 106 additions & 79 deletions packages/contentstack-bulk-publish/src/consumer/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ const path = require('path');
const { formatError } = require('../util');
const apiVersionForNRP = '3.2';
const nrpApiVersionWarning = `Provided apiVersion is invalid. ${apiVersionForNRP} is only supported value. Continuing with regular bulk-publish for now.`;

const { handleRateLimit, fetchBulkPublishLimit } = require('../util/common-utility');
const { getLoggerInstance, addLogs, getLogsDirPath } = require('../util/logger');
const { sanitizePath } = require('@contentstack/cli-utilities');
const { delay } = require('bluebird');
const logsDir = getLogsDirPath();

let logger;
Expand Down Expand Up @@ -236,6 +237,8 @@ async function performBulkPublish(data, _config, queue) {
let conf;
const bulkPublishObj = data.obj;
const stack = bulkPublishObj.stack;
let bulkPublishLimit = fetchBulkPublishLimit(stack?.org_uid);
let xRateLimitRemaining = bulkPublishLimit;
let payload = {};
switch (bulkPublishObj.Type) {
case 'entry':
Expand All @@ -261,6 +264,7 @@ async function performBulkPublish(data, _config, queue) {
.publish(payload)
.then((bulkPublishEntriesResponse) => {
if (!bulkPublishEntriesResponse.error_message) {
xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10);
const sanitizedData = removePublishDetails(bulkPublishObj.entries);
console.log(
chalk.green(`Bulk entries sent for publish`),
Expand All @@ -277,10 +281,14 @@ async function performBulkPublish(data, _config, queue) {
throw bulkPublishEntriesResponse;
}
})
.catch((error) => {
.catch(async (error) => {
if (error.errorCode === 429 && data.retry < 2) {
data.retry++;
queue.Enqueue(data);
// Call the handleRateLimit function
const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining);
if (delayApplied) {
queue.Enqueue(data);
}
} else {
delete bulkPublishObj.stack;
console.log(chalk.red(`Bulk entries failed to publish with error ${formatError(error)}`));
Expand Down Expand Up @@ -316,6 +324,7 @@ async function performBulkPublish(data, _config, queue) {
.publish(payload)
.then((bulkPublishAssetsResponse) => {
if (!bulkPublishAssetsResponse.error_message) {
xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10);
console.log(
chalk.green(
`Bulk assets sent for publish`,
Expand All @@ -334,10 +343,14 @@ async function performBulkPublish(data, _config, queue) {
throw bulkPublishAssetsResponse;
}
})
.catch((error) => {
.catch(async (error) => {
if (error.errorCode === 429 && data.retry < 2) {
data.retry++;
queue.Enqueue(data);
// Call the handleRateLimit function
const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining);
if (delayApplied) {
queue.Enqueue(data);
}
} else {
delete bulkPublishObj.stack;
console.log(chalk.red(`Bulk assets failed to publish with error ${formatError(error)}`));
Expand Down Expand Up @@ -380,49 +393,55 @@ async function performBulkUnPublish(data, _config, queue) {
}
}
}
stack
.bulkOperation()
.unpublish(payload)
.then((bulkUnPublishEntriesResponse) => {
if (!bulkUnPublishEntriesResponse.error_message) {
delete bulkUnPublishObj.stack;

console.log(
chalk.green(
`Bulk entries sent for Unpublish`,
bulkUnPublishEntriesResponse.job_id
? chalk.yellow(`job_id: ${bulkUnPublishEntriesResponse.job_id}`)
: '',
),
);
let sanitizedData = removePublishDetails(bulkUnPublishObj.entries);
displayEntriesDetails(sanitizedData);
addLogs(
logger,
{ options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host },
'info',
);
} else {
throw bulkUnPublishEntriesResponse;
}
})
.catch((error) => {
if (error.errorCode === 429 && data.retry < 2) {
data.retry++;
try {
const bulkUnPublishEntriesResponse = await stack.bulkOperation().unpublish(payload);

if (!bulkUnPublishEntriesResponse.error_message) {
xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10);
delete bulkUnPublishObj.stack;
console.log(
chalk.green(
`Bulk entries sent for Unpublish`,
bulkUnPublishEntriesResponse.job_id
? chalk.yellow(`job_id: ${bulkUnPublishEntriesResponse.job_id}`)
: '',
),
);

let sanitizedData = removePublishDetails(bulkUnPublishObj.entries);
displayEntriesDetails(sanitizedData);
addLogs(logger, {
options: bulkUnPublishObj,
api_key: stack.stackHeaders.api_key,
alias: stack.alias,
host: stack.host,
}, 'info');
} else {
throw bulkUnPublishEntriesResponse;
}
} catch (error) {
if (error.errorCode === 429 && data.retry < 2) {
data.retry++;
// Call the handleRateLimit function
const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining);
if (delayApplied) {
queue.Enqueue(data);
} else {
delete bulkUnPublishObj.stack;
console.log(chalk.red(`Bulk entries failed to Unpublish with error ${formatError(error)}`));
let sanitizedData = removePublishDetails(bulkUnPublishObj.entries);
displayEntriesDetails(sanitizedData);
addLogs(
}
} else {
delete bulkUnPublishObj.stack;
console.log(chalk.red(`Bulk entries failed to Unpublish with error ${formatError(error)}`));
let sanitizedData = removePublishDetails(bulkUnPublishObj.entries);
displayEntriesDetails(sanitizedData);
addLogs(
logger,
{ options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host },
'error',
);
}
});
}
}
break;

case 'asset':
conf = {
assets: removePublishDetails(bulkUnPublishObj.assets),
Expand All @@ -433,52 +452,60 @@ async function performBulkUnPublish(data, _config, queue) {
if (bulkUnPublishObj.apiVersion) {
if (!isNaN(bulkUnPublishObj.apiVersion) && bulkUnPublishObj.apiVersion === apiVersionForNRP) {
payload['api_version'] = bulkUnPublishObj.apiVersion;
} else if (bulkUnPublishObj.apiVersion !== '3') {
console.log(chalk.yellow(nrpApiVersionWarning));
}
}

try {
const bulkUnPublishAssetsResponse = await stack.bulkOperation().unpublish(payload);

if (!bulkUnPublishAssetsResponse.error_message) {
xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10);
delete bulkUnPublishObj.stack;
let sanitizedData = removePublishDetails(bulkUnPublishObj.assets);
console.log(
chalk.green(
`Bulk assets sent for Unpublish`,
bulkUnPublishAssetsResponse.job_id ? chalk.yellow(`job_id: ${bulkUnPublishAssetsResponse.job_id}`) : '',
),
);

displayAssetsDetails(sanitizedData);
addLogs(logger, {
options: bulkUnPublishObj,
api_key: stack.stackHeaders.api_key,
alias: stack.alias,
host: stack.host,
}, 'info');
} else {
if (bulkUnPublishObj.apiVersion !== '3') {
// because 3 is the default value for api-version, and it exists for the purpose of display only
console.log(chalk.yellow(nrpApiVersionWarning));
}
throw bulkUnPublishAssetsResponse;
}
}
stack
.bulkOperation()
.unpublish(payload)
.then((bulkUnPublishAssetsResponse) => {
if (!bulkUnPublishAssetsResponse.error_message) {
delete bulkUnPublishObj.stack;
let sanitizedData = removePublishDetails(bulkUnPublishObj.assets);
console.log(
chalk.green(
`Bulk assets sent for Unpublish`,
bulkUnPublishAssetsResponse.job_id ? chalk.yellow(`job_id: ${bulkUnPublishAssetsResponse.job_id}`) : '',
),
);
displayAssetsDetails(sanitizedData);
addLogs(
logger,
{ options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host },
'info',
);
} else {
throw bulkUnPublishAssetsResponse;
}
})
.catch((error) => {
if (error.errorCode === 429 && data.retry < 2) {
data.retry++;
} catch (error) {
if (error.errorCode === 429 && data.retry < 2) {
data.retry++;
// Call the handleRateLimit function
const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining);
if (delayApplied) {
queue.Enqueue(data);
} else {
delete bulkUnPublishObj.stack;
console.log(chalk.red(`Bulk assets failed to Unpublish with error ${formatError(error)}`));
let sanitizedData = removePublishDetails(bulkUnPublishObj.assets);
displayAssetsDetails(sanitizedData);
addLogs(
logger,
{ options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host },
'error',
);
}
});
} else {
delete bulkUnPublishObj.stack;
console.log(chalk.red(`Bulk assets failed to Unpublish with error ${formatError(error)}`));
let sanitizedData = removePublishDetails(bulkUnPublishObj.assets);
displayAssetsDetails(sanitizedData);
addLogs(logger, {
options: bulkUnPublishObj,
api_key: stack.stackHeaders.api_key,
alias: stack.alias,
host: stack.host,
}, 'error');
}
}
break;
default:
console.log('No such type');
Expand Down
1 change: 1 addition & 0 deletions packages/contentstack-bulk-publish/src/util/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ async function getStack(data) {
const options = {
host: data.host,
branchName: data.branch,
headers: {includeResHeaders: true},
};
const stackOptions = {};
if (data.alias) {
Expand Down
25 changes: 24 additions & 1 deletion packages/contentstack-bulk-publish/src/util/common-utility.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,27 @@ function fetchBulkPublishLimit(orgUid) {
return bulkPublishLimit;
}

module.exports = { fetchBulkPublishLimit };
/**
* Handles the rate limit checking and adds delay if necessary.
* @param {Object} error - The error object containing the response headers.
* @param {Object} data - The data being processed, including the batch size.
* @param {Function} delay - The delay function to use for waiting.
* @param {number} xRateLimitRemaining - The xRateLimitRemaining containing the remaining balance.
* @returns {boolean} - Returns `true` if delay was applied, `false` otherwise.
*/
async function handleRateLimit(error, data, delay, xRateLimitRemaining) {
// Check if rate limit is exhausted or batch size exceeds remaining limit
if (xRateLimitRemaining === 0 || data.length > xRateLimitRemaining) {
cliux.print(
'Bulk rate limit reached or batch size exceeds remaining limit. Retrying in 2 seconds...',
{ color: 'yellow' },
);
await delay(2000); // Wait for 2 seconds before retrying
return true;
} else {
return false;
}
}


module.exports = { fetchBulkPublishLimit, handleRateLimit };
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class ManagementSDKInitiator {
if (!option.headers) option.headers = {};
option.headers['X-CS-CLI'] = this.analyticsInfo;
}
if (config.headers?.includeResHeaders) {
if (!option.headers) option.headers = {};
option.headers['includeResHeaders'] = true;
}
if (!config.management_token) {
const authorisationType = configStore.get('authorisationType');
if (authorisationType === 'BASIC') {
Expand Down
Loading