Skip to content

Commit

Permalink
split the batch size and try again if maxpayload error recieved
Browse files Browse the repository at this point in the history
  • Loading branch information
Rakhi Mundhada authored and Rakhi Mundhada committed Dec 28, 2023
1 parent 95467fe commit ebbf16b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@alertlogic/paws-collector",
"version": "2.2.0",
"version": "2.2.1",
"license": "MIT",
"description": "Alert Logic AWS based API Poll Log Collector Library",
"repository": {
Expand Down
47 changes: 28 additions & 19 deletions paws_collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const DDB_DELETE_BATCH_OPTIONS = {
maxBatchSizeBytes: 16 * 1024 * 1024
};
const MAX_ERROR_RETRIES = 5;
const MAX_LOG_BATCH_SIZE = 10000;
let MAX_LOG_BATCH_SIZE = 10000;
const REMAINING_CONTEXT_TIME_IN_MS = 10 * 1000;
function getPawsParamStoreParam(){
return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -573,28 +573,37 @@ class PawsCollector extends AlAwsCollector {
*/
batchLogProcess(logs, privCollectorState, nextInvocationTimeout, callback) {
let collector = this;
let indexArray = [];
const batches = Math.ceil(logs.length / MAX_LOG_BATCH_SIZE);
for (let i = 0; i < batches; i++) {
indexArray.push({ start: MAX_LOG_BATCH_SIZE * i, stop: MAX_LOG_BATCH_SIZE * (i + 1) });
}

let promises = indexArray.map((logpart) => {
return new Promise((resolve, reject) => {
collector.processLog(logs.slice(logpart.start, logpart.stop), collector.pawsFormatLog.bind(collector), null, (err, res) => {
if (err) {
collector.handleDeDupIngestError(err, logs.slice(logpart.start, logpart.stop), (error) => {
reject(error);
});
}
else {
resolve(res);
}
function processBatch(logs) {
let indexArray = [];
const batches = Math.ceil(logs.length / MAX_LOG_BATCH_SIZE);
for (let i = 0; i < batches; i++) {
indexArray.push({ start: MAX_LOG_BATCH_SIZE * i, stop: MAX_LOG_BATCH_SIZE * (i + 1) });
}
let promises = indexArray.map((logpart) => {
return new Promise((resolve, reject) => {
collector.processLog(logs.slice(logpart.start, logpart.stop), collector.pawsFormatLog.bind(collector), null, (err, res) => {
if (err) {
if (typeof err === 'string' && err.includes("Maximum payload size exceeded")) {
let logsSlice = logs.slice(logpart.start, logpart.stop)
MAX_LOG_BATCH_SIZE = Math.ceil(logsSlice.length / 2);
return processBatch(logsSlice).then(resolve(null)).catch(reject(err));
} else {
collector.handleDeDupIngestError(err, logs.slice(logpart.start, logpart.stop), (error) => {
reject(error);
});
}
}
else {
resolve(res);
}
});
});
});
});
return Promise.all(promises);
}

Promise.all(promises).then((res) => {
processBatch(logs).then((res) => {
return callback(null, privCollectorState, nextInvocationTimeout);
}).catch((error) => {
collector.reportErrorToIngestApi(error, () => {
Expand Down

0 comments on commit ebbf16b

Please sign in to comment.