From eeddb8952782c252905980f9b0d7d6cdfed916be Mon Sep 17 00:00:00 2001 From: sunil-lakshman <104969541+sunil-lakshman@users.noreply.github.com> Date: Wed, 5 Feb 2025 23:03:28 +0530 Subject: [PATCH 1/2] Fixed rate limit error in bulk publish --- .../src/consumer/publish.js | 184 ++++++++++-------- .../src/util/client.js | 1 + .../src/util/common-utility.js | 25 ++- .../src/contentstack-management-sdk.ts | 4 + 4 files changed, 134 insertions(+), 80 deletions(-) diff --git a/packages/contentstack-bulk-publish/src/consumer/publish.js b/packages/contentstack-bulk-publish/src/consumer/publish.js index 1c05186ce1..926dc4775d 100644 --- a/packages/contentstack-bulk-publish/src/consumer/publish.js +++ b/packages/contentstack-bulk-publish/src/consumer/publish.js @@ -7,9 +7,10 @@ const path = require('path'); const { formatError } = require('../util'); const apiVersionForNRP = '3.2'; const nrpApiVersionWarning = `Provided apiVersion is invalid. ${apiVersionForNRP} is only supported value. Continuing with regular bulk-publish for now.`; - +const { handleRateLimit } = require('../util/common-utility'); const { getLoggerInstance, addLogs, getLogsDirPath } = require('../util/logger'); const { sanitizePath } = require('@contentstack/cli-utilities'); +const { delay } = require('bluebird'); const logsDir = getLogsDirPath(); let logger; @@ -234,6 +235,7 @@ async function performBulkPublish(data, _config, queue) { // add validation for user uid // if user not logged in, then user uid won't be available and NRP too won't work let conf; + let xRateLimitRemaining; const bulkPublishObj = data.obj; const stack = bulkPublishObj.stack; let payload = {}; @@ -261,6 +263,7 @@ async function performBulkPublish(data, _config, queue) { .publish(payload) .then((bulkPublishEntriesResponse) => { if (!bulkPublishEntriesResponse.error_message) { + xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10); const sanitizedData = removePublishDetails(bulkPublishObj.entries); console.log( chalk.green(`Bulk entries sent for publish`), @@ -277,10 +280,14 @@ async function performBulkPublish(data, _config, queue) { throw bulkPublishEntriesResponse; } }) - .catch((error) => { + .catch(async (error) => { if (error.errorCode === 429 && data.retry < 2) { data.retry++; - queue.Enqueue(data); + // Call the handleRateLimit function + const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining); + if (delayApplied) { + queue.Enqueue(data); + } } else { delete bulkPublishObj.stack; console.log(chalk.red(`Bulk entries failed to publish with error ${formatError(error)}`)); @@ -316,6 +323,7 @@ async function performBulkPublish(data, _config, queue) { .publish(payload) .then((bulkPublishAssetsResponse) => { if (!bulkPublishAssetsResponse.error_message) { + xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10); console.log( chalk.green( `Bulk assets sent for publish`, @@ -334,10 +342,14 @@ async function performBulkPublish(data, _config, queue) { throw bulkPublishAssetsResponse; } }) - .catch((error) => { + .catch(async (error) => { if (error.errorCode === 429 && data.retry < 2) { data.retry++; - queue.Enqueue(data); + // Call the handleRateLimit function + const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining); + if (delayApplied) { + queue.Enqueue(data); + } } else { delete bulkPublishObj.stack; console.log(chalk.red(`Bulk assets failed to publish with error ${formatError(error)}`)); @@ -380,49 +392,55 @@ async function performBulkUnPublish(data, _config, queue) { } } } - stack - .bulkOperation() - .unpublish(payload) - .then((bulkUnPublishEntriesResponse) => { - if (!bulkUnPublishEntriesResponse.error_message) { - delete bulkUnPublishObj.stack; - console.log( - chalk.green( - `Bulk entries sent for Unpublish`, - bulkUnPublishEntriesResponse.job_id - ? chalk.yellow(`job_id: ${bulkUnPublishEntriesResponse.job_id}`) - : '', - ), - ); - let sanitizedData = removePublishDetails(bulkUnPublishObj.entries); - displayEntriesDetails(sanitizedData); - addLogs( - logger, - { options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host }, - 'info', - ); - } else { - throw bulkUnPublishEntriesResponse; - } - }) - .catch((error) => { - if (error.errorCode === 429 && data.retry < 2) { - data.retry++; + try { + const bulkUnPublishEntriesResponse = await stack.bulkOperation().unpublish(payload); + + if (!bulkUnPublishEntriesResponse.error_message) { + xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10); + delete bulkUnPublishObj.stack; + console.log( + chalk.green( + `Bulk entries sent for Unpublish`, + bulkUnPublishEntriesResponse.job_id + ? chalk.yellow(`job_id: ${bulkUnPublishEntriesResponse.job_id}`) + : '', + ), + ); + + let sanitizedData = removePublishDetails(bulkUnPublishObj.entries); + displayEntriesDetails(sanitizedData); + addLogs(logger, { + options: bulkUnPublishObj, + api_key: stack.stackHeaders.api_key, + alias: stack.alias, + host: stack.host, + }, 'info'); + } else { + throw bulkUnPublishEntriesResponse; + } + } catch (error) { + if (error.errorCode === 429 && data.retry < 2) { + data.retry++; + // Call the handleRateLimit function + const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining); + if (delayApplied) { queue.Enqueue(data); - } else { - delete bulkUnPublishObj.stack; - console.log(chalk.red(`Bulk entries failed to Unpublish with error ${formatError(error)}`)); - let sanitizedData = removePublishDetails(bulkUnPublishObj.entries); - displayEntriesDetails(sanitizedData); - addLogs( + } + } else { + delete bulkUnPublishObj.stack; + console.log(chalk.red(`Bulk entries failed to Unpublish with error ${formatError(error)}`)); + let sanitizedData = removePublishDetails(bulkUnPublishObj.entries); + displayEntriesDetails(sanitizedData); + addLogs( logger, { options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host }, 'error', ); - } - }); + } + } break; + case 'asset': conf = { assets: removePublishDetails(bulkUnPublishObj.assets), @@ -433,52 +451,60 @@ async function performBulkUnPublish(data, _config, queue) { if (bulkUnPublishObj.apiVersion) { if (!isNaN(bulkUnPublishObj.apiVersion) && bulkUnPublishObj.apiVersion === apiVersionForNRP) { payload['api_version'] = bulkUnPublishObj.apiVersion; + } else if (bulkUnPublishObj.apiVersion !== '3') { + console.log(chalk.yellow(nrpApiVersionWarning)); + } + } + + try { + const bulkUnPublishAssetsResponse = await stack.bulkOperation().unpublish(payload); + + if (!bulkUnPublishAssetsResponse.error_message) { + xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10); + delete bulkUnPublishObj.stack; + let sanitizedData = removePublishDetails(bulkUnPublishObj.assets); + console.log( + chalk.green( + `Bulk assets sent for Unpublish`, + bulkUnPublishAssetsResponse.job_id ? chalk.yellow(`job_id: ${bulkUnPublishAssetsResponse.job_id}`) : '', + ), + ); + + displayAssetsDetails(sanitizedData); + addLogs(logger, { + options: bulkUnPublishObj, + api_key: stack.stackHeaders.api_key, + alias: stack.alias, + host: stack.host, + }, 'info'); } else { if (bulkUnPublishObj.apiVersion !== '3') { // because 3 is the default value for api-version, and it exists for the purpose of display only console.log(chalk.yellow(nrpApiVersionWarning)); } + throw bulkUnPublishAssetsResponse; } - } - stack - .bulkOperation() - .unpublish(payload) - .then((bulkUnPublishAssetsResponse) => { - if (!bulkUnPublishAssetsResponse.error_message) { - delete bulkUnPublishObj.stack; - let sanitizedData = removePublishDetails(bulkUnPublishObj.assets); - console.log( - chalk.green( - `Bulk assets sent for Unpublish`, - bulkUnPublishAssetsResponse.job_id ? chalk.yellow(`job_id: ${bulkUnPublishAssetsResponse.job_id}`) : '', - ), - ); - displayAssetsDetails(sanitizedData); - addLogs( - logger, - { options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host }, - 'info', - ); - } else { - throw bulkUnPublishAssetsResponse; - } - }) - .catch((error) => { - if (error.errorCode === 429 && data.retry < 2) { - data.retry++; + } catch (error) { + if (error.errorCode === 429 && data.retry < 2) { + data.retry++; + // Call the handleRateLimit function + const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining); + if (delayApplied) { queue.Enqueue(data); - } else { - delete bulkUnPublishObj.stack; - console.log(chalk.red(`Bulk assets failed to Unpublish with error ${formatError(error)}`)); - let sanitizedData = removePublishDetails(bulkUnPublishObj.assets); - displayAssetsDetails(sanitizedData); - addLogs( - logger, - { options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host }, - 'error', - ); } - }); + } else { + delete bulkUnPublishObj.stack; + console.log(chalk.red(`Bulk assets failed to Unpublish with error ${formatError(error)}`)); + let sanitizedData = removePublishDetails(bulkUnPublishObj.assets); + displayAssetsDetails(sanitizedData); + addLogs(logger, { + options: bulkUnPublishObj, + api_key: stack.stackHeaders.api_key, + alias: stack.alias, + host: stack.host, + }, 'error'); + } + } break; default: console.log('No such type'); diff --git a/packages/contentstack-bulk-publish/src/util/client.js b/packages/contentstack-bulk-publish/src/util/client.js index 22a73bcca7..828b63a8ef 100644 --- a/packages/contentstack-bulk-publish/src/util/client.js +++ b/packages/contentstack-bulk-publish/src/util/client.js @@ -6,6 +6,7 @@ async function getStack(data) { const options = { host: data.host, branchName: data.branch, + headers: {includeResHeaders: true}, }; const stackOptions = {}; if (data.alias) { diff --git a/packages/contentstack-bulk-publish/src/util/common-utility.js b/packages/contentstack-bulk-publish/src/util/common-utility.js index c51c0a40de..7a396c7159 100644 --- a/packages/contentstack-bulk-publish/src/util/common-utility.js +++ b/packages/contentstack-bulk-publish/src/util/common-utility.js @@ -27,4 +27,27 @@ function fetchBulkPublishLimit(orgUid) { return bulkPublishLimit; } -module.exports = { fetchBulkPublishLimit }; +/** + * Handles the rate limit checking and adds delay if necessary. + * @param {Object} error - The error object containing the response headers. + * @param {Object} data - The data being processed, including the batch size. + * @param {Function} delay - The delay function to use for waiting. + * @param {number} xRateLimitRemaining - The xRateLimitRemaining containing the remaining balance. + * @returns {boolean} - Returns `true` if delay was applied, `false` otherwise. + */ +async function handleRateLimit(error, data, delay, xRateLimitRemaining) { + // Check if rate limit is exhausted or batch size exceeds remaining limit + if (xRateLimitRemaining === 0 || data.length > xRateLimitRemaining) { + cliux.print( + 'Bulk rate limit reached or batch size exceeds remaining limit. Retrying in 2 seconds...', + { color: 'yellow' }, + ); + await delay(2000); // Wait for 2 seconds before retrying + return true; + } else { + return false; + } +} + + +module.exports = { fetchBulkPublishLimit, handleRateLimit }; diff --git a/packages/contentstack-utilities/src/contentstack-management-sdk.ts b/packages/contentstack-utilities/src/contentstack-management-sdk.ts index 1697f912dc..6ce0817172 100644 --- a/packages/contentstack-utilities/src/contentstack-management-sdk.ts +++ b/packages/contentstack-utilities/src/contentstack-management-sdk.ts @@ -85,6 +85,10 @@ class ManagementSDKInitiator { if (!option.headers) option.headers = {}; option.headers['X-CS-CLI'] = this.analyticsInfo; } + if (config.headers?.includeResHeaders) { + if (!option.headers) option.headers = {}; + option.headers['includeResHeaders'] = true; + } if (!config.management_token) { const authorisationType = configStore.get('authorisationType'); if (authorisationType === 'BASIC') { From df30090e92a754d449b3d2a8267b72bf9374e9c2 Mon Sep 17 00:00:00 2001 From: sunil-lakshman <104969541+sunil-lakshman@users.noreply.github.com> Date: Thu, 13 Feb 2025 10:47:17 +0530 Subject: [PATCH 2/2] Added default value for bulkPublishLimit --- packages/contentstack-bulk-publish/src/consumer/publish.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/contentstack-bulk-publish/src/consumer/publish.js b/packages/contentstack-bulk-publish/src/consumer/publish.js index 926dc4775d..b560c04087 100644 --- a/packages/contentstack-bulk-publish/src/consumer/publish.js +++ b/packages/contentstack-bulk-publish/src/consumer/publish.js @@ -7,7 +7,7 @@ const path = require('path'); const { formatError } = require('../util'); const apiVersionForNRP = '3.2'; const nrpApiVersionWarning = `Provided apiVersion is invalid. ${apiVersionForNRP} is only supported value. Continuing with regular bulk-publish for now.`; -const { handleRateLimit } = require('../util/common-utility'); +const { handleRateLimit, fetchBulkPublishLimit } = require('../util/common-utility'); const { getLoggerInstance, addLogs, getLogsDirPath } = require('../util/logger'); const { sanitizePath } = require('@contentstack/cli-utilities'); const { delay } = require('bluebird'); @@ -235,9 +235,10 @@ async function performBulkPublish(data, _config, queue) { // add validation for user uid // if user not logged in, then user uid won't be available and NRP too won't work let conf; - let xRateLimitRemaining; const bulkPublishObj = data.obj; const stack = bulkPublishObj.stack; + let bulkPublishLimit = fetchBulkPublishLimit(stack?.org_uid); + let xRateLimitRemaining = bulkPublishLimit; let payload = {}; switch (bulkPublishObj.Type) { case 'entry':