From 9e8d018191bff88cafd10434eea586b2636be874 Mon Sep 17 00:00:00 2001 From: Nathaniel Rindlaub Date: Wed, 7 Aug 2024 12:37:02 -0700 Subject: [PATCH 1/3] Reduce concurrency when copying image files to S3, add 1 GB memory --- ingest-delete/task.js | 6 +++--- ingest-zip/index.js | 32 ++++++++++++++++++++++---------- serverless.yml | 2 +- 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/ingest-delete/task.js b/ingest-delete/task.js index ed5a85a..f6898e3 100644 --- a/ingest-delete/task.js +++ b/ingest-delete/task.js @@ -41,7 +41,7 @@ export async function handler(event) { new SSM.GetParametersCommand({ Names: [`/api/url-${STAGE}`], WithDecryption: true, - }), + }) ) ).Parameters) { console.log(`ok - setting ${param.Name}`); @@ -117,7 +117,7 @@ async function scheduledDelete(stage) { const res = await cf.send( new CloudFormation.ListStacksCommand({ NextToken: nextToken, - }), + }) ); stacks.push(...res.StackSummaries); @@ -142,7 +142,7 @@ async function scheduledDelete(stage) { const alarm = await cw.send( new CW.DescribeAlarmsCommand({ AlarmNames: [`${stack.StackName}-sqs-empty`], - }), + }) ); if (alarm.MetricAlarms[0].StateValue === 'INSUFFICIENT_DATA') { diff --git a/ingest-zip/index.js b/ingest-zip/index.js index 66daaea..7f76c66 100644 --- a/ingest-zip/index.js +++ b/ingest-zip/index.js @@ -46,7 +46,7 @@ export default async function handler() { new SSM.GetParametersCommand({ Names: [`/api/url-${STAGE}`], WithDecryption: true, - }), + }) ) ).Parameters) { console.log(`ok - setting ${param.Name}`); @@ -59,10 +59,10 @@ export default async function handler() { new S3.GetObjectCommand({ Bucket: task.Bucket, Key: task.Key, - }), + }) ) ).Body, - fs.createWriteStream(path.resolve(os.tmpdir(), 'input.zip')), + fs.createWriteStream(path.resolve(os.tmpdir(), 'input.zip')) ); // Preparse Zip to get a general sense of how many items are in the zip @@ -116,7 +116,7 @@ export default async function handler() { ParameterValue: `s3://${task.Bucket}/${task.Key}`, }, ], - }), + }) ); await monitor(StackName); @@ -143,20 +143,29 @@ export default async function handler() { images.push(entry); } - for await (const ms of asyncPool(1000, images, async (entry) => { + console.time('copying images to S3 @ 100 concurrency'); + + let maxMemoryUsed = 0; + for await (const ms of asyncPool(500, images, async (entry) => { const parsed = path.parse(entry.name); - if (!parsed.ext) return; - if (parsed.base[0] === '.') return; - if (!SUPPORTED_FILE_TYPES.includes(parsed.ext.toLowerCase())) return; + if (!parsed.ext) return `not ok - no extension: ${entry.name}`; + if (parsed.base[0] === '.') return `not ok - hidden file: ${entry.name}`; + if (!SUPPORTED_FILE_TYPES.includes(parsed.ext.toLowerCase())) + return `not ok - unsupported file type: ${entry.name}`; const data = await prezip.entryData(entry); + // NOTE: just for logging memory usage + if (process.memoryUsage().rss > maxMemoryUsed) { + maxMemoryUsed = process.memoryUsage().rss; + } + await s3.send( new S3.PutObjectCommand({ Bucket: task.Bucket, Key: `${batch}/${entry.name}`, Body: data, - }), + }) ); return `ok - written: ${batch}/${entry.name}`; @@ -164,13 +173,16 @@ export default async function handler() { console.log(ms); } + console.timeEnd('copying images to S3 @ 100 concurrency'); + console.log(`max memory used: ${maxMemoryUsed / 1024 / 1024} MB`); + prezip.close(); await s3.send( new S3.DeleteObjectCommand({ Bucket: task.Bucket, Key: task.Key, - }), + }) ); console.log('ok - extraction complete'); diff --git a/serverless.yml b/serverless.yml index 90abf8d..aaa617f 100644 --- a/serverless.yml +++ b/serverless.yml @@ -297,7 +297,7 @@ resources: - Type: 'VCPU' Value: 1 - Type: 'MEMORY' - Value: 4096 + Value: 5120 ExecutionRoleArn: Fn::GetAtt: - BatchExecRole From d8d3d4bdef7f6740ebb2f7acae332aea65b6bb40 Mon Sep 17 00:00:00 2001 From: Nathaniel Rindlaub Date: Wed, 7 Aug 2024 13:36:45 -0700 Subject: [PATCH 2/3] Reduce concurrency to 100 --- ingest-zip/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest-zip/index.js b/ingest-zip/index.js index 7f76c66..b0cc0a0 100644 --- a/ingest-zip/index.js +++ b/ingest-zip/index.js @@ -146,7 +146,7 @@ export default async function handler() { console.time('copying images to S3 @ 100 concurrency'); let maxMemoryUsed = 0; - for await (const ms of asyncPool(500, images, async (entry) => { + for await (const ms of asyncPool(100, images, async (entry) => { const parsed = path.parse(entry.name); if (!parsed.ext) return `not ok - no extension: ${entry.name}`; if (parsed.base[0] === '.') return `not ok - hidden file: ${entry.name}`; From 94f0e7c7f8466e5441075cf47da009f169806e93 Mon Sep 17 00:00:00 2001 From: Nathaniel Rindlaub Date: Thu, 8 Aug 2024 09:51:34 -0700 Subject: [PATCH 3/3] Fix lint errors --- ingest-delete/test/basic.test.js | 228 ++++++++++---------- ingest-zip/lib/cfstream.js | 213 +++++++++---------- ingest-zip/lib/stack.js | 343 +++++++++++++++++-------------- ingest-zip/test/basic.test.js | 254 ++++++++++++----------- 4 files changed, 542 insertions(+), 496 deletions(-) diff --git a/ingest-delete/test/basic.test.js b/ingest-delete/test/basic.test.js index 7795915..b9a3cc3 100644 --- a/ingest-delete/test/basic.test.js +++ b/ingest-delete/test/basic.test.js @@ -6,131 +6,135 @@ import { MockAgent, setGlobalDispatcher } from 'undici'; import SSM from '@aws-sdk/client-ssm'; test('Basic - CloudWatch Alarm', async (t) => { - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - - const mockPool = mockAgent.get('http://example.com'); - - mockPool.intercept({ - path: '/', - method: 'POST' - }).reply(200, { message: 'posted' }); - - const order = []; - Sinon.stub(SSM.SSMClient.prototype, 'send').callsFake((command) => { - if (command instanceof SSM.GetParametersCommand) { - order.push('SSM:GetParametersCommand'); - - t.deepEquals(command.input, { - Names: ['/api/url-test'], - WithDecryption: true - }); - - return Promise.resolve({ - Parameters: [{ - Name: '/api/url-test', - Value: 'http://example.com' - }] - }); - } else { - t.fail('Unexpected Command'); - } - }); - - Sinon.stub(CloudFormation.CloudFormationClient.prototype, 'send').callsFake((command) => { - if (command instanceof CloudFormation.DeleteStackCommand) { - order.push('CloudFormation:DeleteStack'); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + + const mockPool = mockAgent.get('http://example.com'); + + mockPool + .intercept({ + path: '/', + method: 'POST', + }) + .reply(200, { message: 'posted' }); + + const order = []; + Sinon.stub(SSM.SSMClient.prototype, 'send').callsFake((command) => { + if (command instanceof SSM.GetParametersCommand) { + order.push('SSM:GetParametersCommand'); + + t.deepEquals(command.input, { + Names: ['/api/url-test'], + WithDecryption: true, + }); + + return Promise.resolve({ + Parameters: [ + { + Name: '/api/url-test', + Value: 'http://example.com', + }, + ], + }); + } else { + t.fail('Unexpected Command'); + } + }); - t.ok(command.input.StackName.startsWith('test-stack-batch')); + Sinon.stub(CloudFormation.CloudFormationClient.prototype, 'send').callsFake((command) => { + if (command instanceof CloudFormation.DeleteStackCommand) { + order.push('CloudFormation:DeleteStack'); - return Promise.resolve({}); - } else { - t.fail('Unexpected Command'); - } - }); + t.ok(command.input.StackName.startsWith('test-stack-batch')); - try { - process.env.STAGE = 'test'; - await IngestDelete({ - Records: [{ - Sns: { - Message: JSON.stringify({ - AlarmName: 'test-stack-batch-847400c8-8e54-4a40-ab5c-0d593939c883-sqs-empty' - }) - } - }] - }); - } catch (err) { - t.error(err); + return Promise.resolve({}); + } else { + t.fail('Unexpected Command'); } + }); + + try { + process.env.STAGE = 'test'; + await IngestDelete({ + Records: [ + { + Sns: { + Message: JSON.stringify({ + AlarmName: 'test-stack-batch-847400c8-8e54-4a40-ab5c-0d593939c883-sqs-empty', + }), + }, + }, + ], + }); + } catch (err) { + t.error(err); + } - t.deepEquals(order, [ - 'SSM:GetParametersCommand', - 'CloudFormation:DeleteStack' - ]); + t.deepEquals(order, ['SSM:GetParametersCommand', 'CloudFormation:DeleteStack']); - Sinon.restore(); - t.end(); + Sinon.restore(); + t.end(); }); test('Basic - StopBatch API', async (t) => { - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - - const mockPool = mockAgent.get('http://example.com'); - - mockPool.intercept({ - path: '/', - method: 'POST' - }).reply(200, { message: 'posted' }); - - const order = []; - Sinon.stub(SSM.SSMClient.prototype, 'send').callsFake((command) => { - if (command instanceof SSM.GetParametersCommand) { - order.push('SSM:GetParametersCommand'); - - t.deepEquals(command.input, { - Names: ['/api/url-test'], - WithDecryption: true - }); - - return Promise.resolve({ - Parameters: [{ - Name: '/api/url-test', - Value: 'http://example.com' - }] - }); - } else { - t.fail('Unexpected Command'); - } - }); - - Sinon.stub(CloudFormation.CloudFormationClient.prototype, 'send').callsFake((command) => { - if (command instanceof CloudFormation.DeleteStackCommand) { - order.push('CloudFormation:DeleteStack'); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + + const mockPool = mockAgent.get('http://example.com'); + + mockPool + .intercept({ + path: '/', + method: 'POST', + }) + .reply(200, { message: 'posted' }); + + const order = []; + Sinon.stub(SSM.SSMClient.prototype, 'send').callsFake((command) => { + if (command instanceof SSM.GetParametersCommand) { + order.push('SSM:GetParametersCommand'); + + t.deepEquals(command.input, { + Names: ['/api/url-test'], + WithDecryption: true, + }); + + return Promise.resolve({ + Parameters: [ + { + Name: '/api/url-test', + Value: 'http://example.com', + }, + ], + }); + } else { + t.fail('Unexpected Command'); + } + }); - t.ok(command.input.StackName.startsWith('animl-ingest-test-batch-')); + Sinon.stub(CloudFormation.CloudFormationClient.prototype, 'send').callsFake((command) => { + if (command instanceof CloudFormation.DeleteStackCommand) { + order.push('CloudFormation:DeleteStack'); - return Promise.resolve({}); - } else { - t.fail('Unexpected Command'); - } - }); + t.ok(command.input.StackName.startsWith('animl-ingest-test-batch-')); - try { - process.env.STAGE = 'test'; - await IngestDelete({ - batch: 'batch-847400c8-8e54-4a40-ab5c-0d593939c883' - }); - } catch (err) { - t.error(err); + return Promise.resolve({}); + } else { + t.fail('Unexpected Command'); } + }); + + try { + process.env.STAGE = 'test'; + await IngestDelete({ + batch: 'batch-847400c8-8e54-4a40-ab5c-0d593939c883', + }); + } catch (err) { + t.error(err); + } - t.deepEquals(order, [ - 'SSM:GetParametersCommand', - 'CloudFormation:DeleteStack' - ]); + t.deepEquals(order, ['SSM:GetParametersCommand', 'CloudFormation:DeleteStack']); - Sinon.restore(); - t.end(); + Sinon.restore(); + t.end(); }); diff --git a/ingest-zip/lib/cfstream.js b/ingest-zip/lib/cfstream.js index bd128c7..37accc4 100644 --- a/ingest-zip/lib/cfstream.js +++ b/ingest-zip/lib/cfstream.js @@ -1,112 +1,117 @@ import { Readable } from 'stream'; import CloudFormation from '@aws-sdk/client-cloudformation'; -export default function(stackName, options) { - if (!options) options = {}; - const cfn = new CloudFormation.CloudFormationClient(options); - - const stream = new Readable({ objectMode: true }), - pollInterval = options.pollInterval || 10000, - seen = new Map(), - push = stream.push.bind(stream); - - let describing = false, - complete = false, - stackId = stackName, - events = []; - - - if (options.lastEventId) { - seen.set(options.lastEventId, true); - } - - stream._read = function() { - if (describing || complete) return; - describeStack(); - }; - - async function describeEvents(nextToken) { - if (describing) return; - describing = true; - // Describe stacks using stackId (ARN) as CF stacks are actually - // not unique by name. - try { - const data = await cfn.send(new CloudFormation.DescribeStackEventsCommand({ - StackName: stackId, - NextToken: nextToken - })); - describing = false; - - let i; - for (i = 0; i < (data.StackEvents || []).length; i++) { - const event = data.StackEvents[i]; - - // Assuming StackEvents are in strictly reverse chronological order, - // stop reading events once we reach one we've seen already. - if (seen.has(event.EventId)) - break; - - // Collect new events in an array and mark them as "seen". - events.push(event); - seen.set(event.EventId, true); - - // If we reach a user initiated event assume this event is the - // initiating event the caller intends to monitor. - if (event.LogicalResourceId === stackName && - event.ResourceType === 'AWS::CloudFormation::Stack' && - event.ResourceStatusReason === 'User Initiated') { - break; - } - } - - // If we did not find an event on this page we had already seen, paginate. - if (i === (data.StackEvents || []).length && data.NextToken) { - describeEvents(data.NextToken); - } - - // We know that the update is complete, whatever we have in the events - // array represents the last few events to stream. - else if (complete) { - events.reverse().forEach(push); - push(null); - } - - // The update is not complete, and there aren't any new events or more - // pages to scan. DescribeStack in order to check again to see if the - // update has completed. - else { - setTimeout(describeStack, pollInterval); - events.reverse().forEach(push); - events = []; - } - } catch (err) { - return stream.emit('error', err); +export default function (stackName, options) { + if (!options) options = {}; + const cfn = new CloudFormation.CloudFormationClient(options); + + const stream = new Readable({ objectMode: true }), + pollInterval = options.pollInterval || 10000, + seen = new Map(), + push = stream.push.bind(stream); + + let describing = false, + complete = false, + stackId = stackName, + events = []; + + if (options.lastEventId) { + seen.set(options.lastEventId, true); + } + + stream._read = function () { + if (describing || complete) return; + describeStack(); + }; + + async function describeEvents(nextToken) { + if (describing) return; + describing = true; + // Describe stacks using stackId (ARN) as CF stacks are actually + // not unique by name. + try { + const data = await cfn.send( + new CloudFormation.DescribeStackEventsCommand({ + StackName: stackId, + NextToken: nextToken, + }) + ); + describing = false; + + let i; + for (i = 0; i < (data.StackEvents || []).length; i++) { + const event = data.StackEvents[i]; + + // Assuming StackEvents are in strictly reverse chronological order, + // stop reading events once we reach one we've seen already. + if (seen.has(event.EventId)) break; + + // Collect new events in an array and mark them as "seen". + events.push(event); + seen.set(event.EventId, true); + + // If we reach a user initiated event assume this event is the + // initiating event the caller intends to monitor. + if ( + event.LogicalResourceId === stackName && + event.ResourceType === 'AWS::CloudFormation::Stack' && + event.ResourceStatusReason === 'User Initiated' + ) { + break; } + } + + // If we did not find an event on this page we had already seen, paginate. + if (i === (data.StackEvents || []).length && data.NextToken) { + describeEvents(data.NextToken); + } + + // We know that the update is complete, whatever we have in the events + // array represents the last few events to stream. + else if (complete) { + events.reverse().forEach(push); + push(null); + } + + // The update is not complete, and there aren't any new events or more + // pages to scan. DescribeStack in order to check again to see if the + // update has completed. + else { + setTimeout(describeStack, pollInterval); + events.reverse().forEach(push); + events = []; + } + } catch (err) { + return stream.emit('error', err); } - - async function describeStack() { - if (describing) return; - describing = true; - try { - const data = await cfn.send(new CloudFormation.DescribeStacksCommand({ - StackName: stackId - })); - describing = false; - - if (!data.Stacks.length) return stream.emit('error', new Error('Could not describe stack: ' + stackName)); - - stackId = data.Stacks[0].StackId; - - if (/COMPLETE$/.test(data.Stacks[0].StackStatus)) { - complete = true; - describeEvents(); - } else { - setTimeout(describeEvents, pollInterval); - } - } catch (err) { - return stream.emit('error', err); - } + } + + async function describeStack() { + if (describing) return; + describing = true; + try { + const data = await cfn.send( + new CloudFormation.DescribeStacksCommand({ + StackName: stackId, + }) + ); + describing = false; + + if (!data.Stacks.length) + return stream.emit('error', new Error('Could not describe stack: ' + stackName)); + + stackId = data.Stacks[0].StackId; + + if (/COMPLETE$/.test(data.Stacks[0].StackStatus)) { + complete = true; + describeEvents(); + } else { + setTimeout(describeEvents, pollInterval); + } + } catch (err) { + return stream.emit('error', err); } + } - return stream; + return stream; } diff --git a/ingest-zip/lib/stack.js b/ingest-zip/lib/stack.js index c781668..c04ac33 100644 --- a/ingest-zip/lib/stack.js +++ b/ingest-zip/lib/stack.js @@ -1,168 +1,193 @@ import cf from '@openaddresses/cloudfriend'; export default class Stack { - static generate(parent, task, stage) { - return { - AWSTemplateFormatVersion : '2010-09-09', - Parameters: { - BatchID: { - Type: 'String', - Description: 'The unique ID of the Batch Task' - }, - S3URL: { - Type: 'String', - Description: 'The S3 URL of the object that triggered the Batch Task', - Default: `s3://${task.bucket}/${task.key}` - } + static generate(parent, task, stage) { + return { + AWSTemplateFormatVersion: '2010-09-09', + Parameters: { + BatchID: { + Type: 'String', + Description: 'The unique ID of the Batch Task', + }, + S3URL: { + Type: 'String', + Description: 'The S3 URL of the object that triggered the Batch Task', + Default: `s3://${task.bucket}/${task.key}`, + }, + }, + Resources: { + PredQueue: { + Type: 'AWS::SQS::Queue', + Properties: { + QueueName: cf.stackName, + VisibilityTimeout: 720, + RedrivePolicy: { + deadLetterTargetArn: cf.getAtt('PredDLQ', 'Arn'), + maxReceiveCount: 5, }, - Resources: { - PredQueue: { - Type: 'AWS::SQS::Queue', - Properties: { - QueueName: cf.stackName, - VisibilityTimeout: 720, - RedrivePolicy: { - deadLetterTargetArn: cf.getAtt('PredDLQ', 'Arn'), - maxReceiveCount: 5 - } - } + }, + }, + PredDLQ: { + Type: 'AWS::SQS::Queue', + Properties: { + QueueName: cf.join([cf.stackName, '-dlq']), + }, + }, + PredInference: { + Type: 'AWS::Lambda::EventSourceMapping', + Properties: { + BatchSize: 10, + Enabled: true, + EventSourceArn: cf.getAtt('PredQueue', 'Arn'), + FunctionName: `animl-api-${stage}-batchinference`, + FunctionResponseTypes: ['ReportBatchItemFailures'], + }, + }, + PredSQSAlarm: { + Type: 'AWS::CloudWatch::Alarm', + DependsOn: ['PredQueue', 'PredDLQ'], + Properties: { + AlarmName: cf.join([cf.stackName, '-sqs-empty']), + AlarmDescription: 'Set an alarm to breach when SQS list is at 0', + ActionsEnabled: true, + OKActions: [], + AlarmActions: [ + cf.join([ + 'arn:', + cf.partition, + ':sns:', + cf.region, + ':', + cf.accountId, + `:${parent}-delete`, + ]), + ], + InsufficientDataActions: [], + Dimensions: [], + EvaluationPeriods: 10, + DatapointsToAlarm: 10, + Threshold: 0, + ComparisonOperator: 'LessThanOrEqualToThreshold', + TreatMissingData: 'missing', + Metrics: [ + { + Id: 'total', + Label: 'TotalSQS', + ReturnData: true, + Expression: 'SUM(METRICS())', + }, + { + Id: 'm1', + ReturnData: false, + MetricStat: { + Metric: { + Namespace: 'AWS/SQS', + MetricName: 'ApproximateNumberOfMessagesNotVisible', + Dimensions: [ + { + Name: 'QueueName', + Value: cf.getAtt('PredQueue', 'QueueName'), + }, + ], + }, + Period: 60, + Stat: 'Maximum', }, - PredDLQ: { - Type: 'AWS::SQS::Queue', - Properties: { - QueueName: cf.join([cf.stackName, '-dlq']) - } + }, + { + Id: 'm2', + ReturnData: false, + MetricStat: { + Metric: { + Namespace: 'AWS/SQS', + MetricName: 'ApproximateNumberOfMessagesVisible', + Dimensions: [ + { + Name: 'QueueName', + Value: cf.getAtt('PredQueue', 'QueueName'), + }, + ], + }, + Period: 60, + Stat: 'Maximum', }, - PredInference: { - Type: 'AWS::Lambda::EventSourceMapping', - Properties: { - BatchSize: 10, - Enabled: true, - EventSourceArn: cf.getAtt('PredQueue', 'Arn'), - FunctionName: `animl-api-${stage}-batchinference`, - FunctionResponseTypes: ['ReportBatchItemFailures'] - } + }, + { + Id: 'm3', + ReturnData: false, + MetricStat: { + Metric: { + Namespace: 'AWS/SQS', + MetricName: 'ApproximateNumberOfMessagesDelayed', + Dimensions: [ + { + Name: 'QueueName', + Value: cf.getAtt('PredQueue', 'QueueName'), + }, + ], + }, + Period: 60, + Stat: 'Maximum', }, - PredSQSAlarm: { - Type: 'AWS::CloudWatch::Alarm', - DependsOn: [ - 'PredQueue', - 'PredDLQ' + }, + { + Id: 'm4', + ReturnData: false, + MetricStat: { + Metric: { + Namespace: 'AWS/SQS', + MetricName: 'ApproximateNumberOfMessagesNotVisible', + Dimensions: [ + { + Name: 'QueueName', + Value: cf.join([cf.stackName, '-dlq']), + }, ], - Properties: { - AlarmName: cf.join([cf.stackName, '-sqs-empty']), - AlarmDescription: 'Set an alarm to breach when SQS list is at 0', - ActionsEnabled: true, - OKActions: [], - AlarmActions: [ - cf.join(['arn:', cf.partition, ':sns:', cf.region, ':', cf.accountId, `:${parent}-delete`]) - ], - InsufficientDataActions: [], - Dimensions: [], - EvaluationPeriods: 10, - DatapointsToAlarm: 10, - Threshold: 0, - ComparisonOperator: 'LessThanOrEqualToThreshold', - TreatMissingData: 'missing', - Metrics: [{ - Id: 'total', - Label: 'TotalSQS', - ReturnData: true, - Expression: 'SUM(METRICS())' - },{ - Id: 'm1', - ReturnData: false, - MetricStat: { - Metric: { - Namespace: 'AWS/SQS', - MetricName: 'ApproximateNumberOfMessagesNotVisible', - Dimensions: [{ - Name: 'QueueName', - Value: cf.getAtt('PredQueue', 'QueueName') - }] - }, - Period: 60, - Stat: 'Maximum' - } - },{ - Id: 'm2', - ReturnData: false, - MetricStat: { - Metric: { - Namespace: 'AWS/SQS', - MetricName: 'ApproximateNumberOfMessagesVisible', - Dimensions: [{ - Name: 'QueueName', - Value: cf.getAtt('PredQueue', 'QueueName') - }] - }, - Period: 60, - Stat: 'Maximum' - } - },{ - Id: 'm3', - ReturnData: false, - MetricStat: { - Metric: { - Namespace: 'AWS/SQS', - MetricName: 'ApproximateNumberOfMessagesDelayed', - Dimensions: [{ - Name: 'QueueName', - Value: cf.getAtt('PredQueue', 'QueueName') - }] - }, - Period: 60, - Stat: 'Maximum' - } - },{ - Id: 'm4', - ReturnData: false, - MetricStat: { - Metric: { - Namespace: 'AWS/SQS', - MetricName: 'ApproximateNumberOfMessagesNotVisible', - Dimensions: [{ - Name: 'QueueName', - Value: cf.join([cf.stackName, '-dlq']) - }] - }, - Period: 60, - Stat: 'Maximum' - } - },{ - Id: 'm5', - ReturnData: false, - MetricStat: { - Metric: { - Namespace: 'AWS/SQS', - MetricName: 'ApproximateNumberOfMessagesVisible', - Dimensions: [{ - Name: 'QueueName', - Value: cf.getAtt('PredDLQ', 'QueueName') - }] - }, - Period: 60, - Stat: 'Maximum' - } - },{ - Id: 'm6', - ReturnData: false, - MetricStat: { - Metric: { - Namespace: 'AWS/SQS', - MetricName: 'ApproximateNumberOfMessagesDelayed', - Dimensions: [{ - Name: 'QueueName', - Value: cf.getAtt('PredDLQ', 'QueueName') - }] - }, - Period: 60, - Stat: 'Maximum' - } - }] - } - } - } - }; - } + }, + Period: 60, + Stat: 'Maximum', + }, + }, + { + Id: 'm5', + ReturnData: false, + MetricStat: { + Metric: { + Namespace: 'AWS/SQS', + MetricName: 'ApproximateNumberOfMessagesVisible', + Dimensions: [ + { + Name: 'QueueName', + Value: cf.getAtt('PredDLQ', 'QueueName'), + }, + ], + }, + Period: 60, + Stat: 'Maximum', + }, + }, + { + Id: 'm6', + ReturnData: false, + MetricStat: { + Metric: { + Namespace: 'AWS/SQS', + MetricName: 'ApproximateNumberOfMessagesDelayed', + Dimensions: [ + { + Name: 'QueueName', + Value: cf.getAtt('PredDLQ', 'QueueName'), + }, + ], + }, + Period: 60, + Stat: 'Maximum', + }, + }, + ], + }, + }, + }, + }; + } } diff --git a/ingest-zip/test/basic.test.js b/ingest-zip/test/basic.test.js index 27d3e80..bc44110 100644 --- a/ingest-zip/test/basic.test.js +++ b/ingest-zip/test/basic.test.js @@ -8,127 +8,139 @@ import SSM from '@aws-sdk/client-ssm'; import fs from 'node:fs'; test('Basic', async (t) => { - const mockAgent = new MockAgent(); - setGlobalDispatcher(mockAgent); - - const mockPool = mockAgent.get('http://example.com'); - - mockPool.intercept({ - path: '/', - method: 'POST' - }).reply(200, { message: 'posted' }); - mockPool.intercept({ - path: '/', - method: 'POST' - }).reply(200, { message: 'posted' }); - mockPool.intercept({ - path: '/', - method: 'POST' - }).reply(200, { message: 'posted' }); - - const order = []; - Sinon.stub(SSM.SSMClient.prototype, 'send').callsFake((command) => { - if (command instanceof SSM.GetParametersCommand) { - order.push('SSM:GetParametersCommand'); - - t.deepEquals(command.input, { - Names: ['/api/url-test'], - WithDecryption: true - }); - - return Promise.resolve({ - Parameters: [{ - Name: '/api/url-test', - Value: 'http://example.com' - }] - }); - } else { - t.fail('Unexpected Command'); - } - }); - - Sinon.stub(S3.S3Client.prototype, 'send').callsFake((command) => { - if (command instanceof S3.GetObjectCommand) { - order.push(`S3:GetObjectCommand:${command.input.Key}`); - - t.deepEquals(command.input, { - Bucket: 'example-bucket', - Key: 'batch-example-key' - }); - - return Promise.resolve({ - Body: fs.createReadStream(new URL('./fixtures/test.zip', import.meta.url)) - }); - } else if (command instanceof S3.PutObjectCommand) { - order.push('S3:PutObjectCommand'); - - t.equals(command.input.Bucket, 'example-bucket'); - - return Promise.resolve({}); - } else if (command instanceof S3.DeleteObjectCommand) { - order.push(`S3:DeleteObjectCommand:${command.input.Key}`); - - t.equals(command.input.Bucket, 'example-bucket'); - - return Promise.resolve({}); - } else { - t.fail('Unexpected Command'); - } - }); - - Sinon.stub(CloudFormation.CloudFormationClient.prototype, 'send').callsFake((command) => { - if (command instanceof CloudFormation.CreateStackCommand) { - order.push('CloudFormation:CreateStack'); - t.ok(command.input.StackName.startsWith('test-stack-batch')); - return Promise.resolve({}); - } else if (command instanceof CloudFormation.DescribeStacksCommand) { - order.push('CloudFormation:DescribeStacksCommand'); - t.ok(command.input.StackName.startsWith('test-stack-batch')); - return Promise.resolve({ - Stacks: [{ - StackId: 'test-stack-batch-fd75f86c-0991-45ae-9efd-9635a51e5af2', - StackStatus: 'UPDATE_COMPLETE' - }] - }); - } else if (command instanceof CloudFormation.DescribeStackEventsCommand) { - order.push('CloudFormation:DescribeStackEvents'); - t.ok(command.input.StackName.startsWith('test-stack-batch')); - return Promise.resolve({ - StackEvents: [{ - EventId: 1 - }] - }); - } else { - t.fail('Unexpected Command'); - } - }); - - try { - process.env.TASK = JSON.stringify({ Bucket: 'example-bucket', Key: 'batch-example-key' }); - process.env.STAGE = 'test'; - process.env.StackName = 'test-stack'; - await IngestZip(); - } catch (err) { - t.error(err); + const mockAgent = new MockAgent(); + setGlobalDispatcher(mockAgent); + + const mockPool = mockAgent.get('http://example.com'); + + mockPool + .intercept({ + path: '/', + method: 'POST', + }) + .reply(200, { message: 'posted' }); + mockPool + .intercept({ + path: '/', + method: 'POST', + }) + .reply(200, { message: 'posted' }); + mockPool + .intercept({ + path: '/', + method: 'POST', + }) + .reply(200, { message: 'posted' }); + + const order = []; + Sinon.stub(SSM.SSMClient.prototype, 'send').callsFake((command) => { + if (command instanceof SSM.GetParametersCommand) { + order.push('SSM:GetParametersCommand'); + + t.deepEquals(command.input, { + Names: ['/api/url-test'], + WithDecryption: true, + }); + + return Promise.resolve({ + Parameters: [ + { + Name: '/api/url-test', + Value: 'http://example.com', + }, + ], + }); + } else { + t.fail('Unexpected Command'); } + }); - t.deepEquals(order, [ - 'SSM:GetParametersCommand', - 'S3:GetObjectCommand:batch-example-key', - 'CloudFormation:CreateStack', - 'CloudFormation:DescribeStacksCommand', - 'CloudFormation:DescribeStackEvents', - 'S3:PutObjectCommand', - 'S3:PutObjectCommand', - 'S3:PutObjectCommand', - 'S3:PutObjectCommand', - 'S3:PutObjectCommand', - 'S3:PutObjectCommand', - 'S3:PutObjectCommand', - 'S3:PutObjectCommand', - 'S3:DeleteObjectCommand:batch-example-key' - ]); - - Sinon.restore(); - t.end(); + Sinon.stub(S3.S3Client.prototype, 'send').callsFake((command) => { + if (command instanceof S3.GetObjectCommand) { + order.push(`S3:GetObjectCommand:${command.input.Key}`); + + t.deepEquals(command.input, { + Bucket: 'example-bucket', + Key: 'batch-example-key', + }); + + return Promise.resolve({ + Body: fs.createReadStream(new URL('./fixtures/test.zip', import.meta.url)), + }); + } else if (command instanceof S3.PutObjectCommand) { + order.push('S3:PutObjectCommand'); + + t.equals(command.input.Bucket, 'example-bucket'); + + return Promise.resolve({}); + } else if (command instanceof S3.DeleteObjectCommand) { + order.push(`S3:DeleteObjectCommand:${command.input.Key}`); + + t.equals(command.input.Bucket, 'example-bucket'); + + return Promise.resolve({}); + } else { + t.fail('Unexpected Command'); + } + }); + + Sinon.stub(CloudFormation.CloudFormationClient.prototype, 'send').callsFake((command) => { + if (command instanceof CloudFormation.CreateStackCommand) { + order.push('CloudFormation:CreateStack'); + t.ok(command.input.StackName.startsWith('test-stack-batch')); + return Promise.resolve({}); + } else if (command instanceof CloudFormation.DescribeStacksCommand) { + order.push('CloudFormation:DescribeStacksCommand'); + t.ok(command.input.StackName.startsWith('test-stack-batch')); + return Promise.resolve({ + Stacks: [ + { + StackId: 'test-stack-batch-fd75f86c-0991-45ae-9efd-9635a51e5af2', + StackStatus: 'UPDATE_COMPLETE', + }, + ], + }); + } else if (command instanceof CloudFormation.DescribeStackEventsCommand) { + order.push('CloudFormation:DescribeStackEvents'); + t.ok(command.input.StackName.startsWith('test-stack-batch')); + return Promise.resolve({ + StackEvents: [ + { + EventId: 1, + }, + ], + }); + } else { + t.fail('Unexpected Command'); + } + }); + + try { + process.env.TASK = JSON.stringify({ Bucket: 'example-bucket', Key: 'batch-example-key' }); + process.env.STAGE = 'test'; + process.env.StackName = 'test-stack'; + await IngestZip(); + } catch (err) { + t.error(err); + } + + t.deepEquals(order, [ + 'SSM:GetParametersCommand', + 'S3:GetObjectCommand:batch-example-key', + 'CloudFormation:CreateStack', + 'CloudFormation:DescribeStacksCommand', + 'CloudFormation:DescribeStackEvents', + 'S3:PutObjectCommand', + 'S3:PutObjectCommand', + 'S3:PutObjectCommand', + 'S3:PutObjectCommand', + 'S3:PutObjectCommand', + 'S3:PutObjectCommand', + 'S3:PutObjectCommand', + 'S3:PutObjectCommand', + 'S3:DeleteObjectCommand:batch-example-key', + ]); + + Sinon.restore(); + t.end(); });