diff --git a/ingest-delete/task.js b/ingest-delete/task.js index 006fc1a..ed5a85a 100644 --- a/ingest-delete/task.js +++ b/ingest-delete/task.js @@ -3,53 +3,59 @@ import SSM from '@aws-sdk/client-ssm'; import CW from '@aws-sdk/client-cloudwatch'; import moment from 'moment'; -const APIKEY = process.env.APIKEY; +const API_KEY = process.env.API_KEY; export async function handler(event) { - if (!event) throw new Error('Event not populated'); - - console.log(`ok - event: ${JSON.stringify(event)}`); - - const STAGE = process.env.STAGE || 'dev'; - const params = new Map(); - - let StackName = null; - let batch = null; - if (event.Records && event.Records[0] && event.Records[0].Sns) { - const alarm = JSON.parse(event.Records[0].Sns.Message).AlarmName; - StackName = alarm.replace('-sqs-empty', ''); - batch = `batch-${StackName.replace(/^.*-batch-/, '')}`; - } else if (event.batch) { - StackName = `animl-ingest-${STAGE}-${event.batch}`; - batch = event.batch; - } else if (event.source) { - return await scheduledDelete(STAGE); - } else { - throw new Error('Unknown Event Type'); + if (!event) throw new Error('Event not populated'); + + console.log(`ok - event: ${JSON.stringify(event)}`); + + const STAGE = process.env.STAGE || 'dev'; + const params = new Map(); + + let StackName = null; + let batch = null; + if (event.Records && event.Records[0] && event.Records[0].Sns) { + const alarm = JSON.parse(event.Records[0].Sns.Message).AlarmName; + StackName = alarm.replace('-sqs-empty', ''); + batch = `batch-${StackName.replace(/^.*-batch-/, '')}`; + } else if (event.batch) { + StackName = `animl-ingest-${STAGE}-${event.batch}`; + batch = event.batch; + } else if (event.source) { + return await scheduledDelete(STAGE); + } else { + throw new Error('Unknown Event Type'); + } + + if (!StackName) throw new Error('StackName could not be determined'); + + try { + const cf = new CloudFormation.CloudFormationClient({ + region: process.env.AWS_DEFAULT_REGION || 'us-west-2', + }); + const ssm = new SSM.SSMClient({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); + + for (const param of ( + await ssm.send( + new SSM.GetParametersCommand({ + Names: [`/api/url-${STAGE}`], + WithDecryption: true, + }), + ) + ).Parameters) { + console.log(`ok - setting ${param.Name}`); + params.set(param.Name, param.Value); } - if (!StackName) throw new Error('StackName could not be determined'); - - try { - const cf = new CloudFormation.CloudFormationClient({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); - const ssm = new SSM.SSMClient({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); - - for (const param of (await ssm.send(new SSM.GetParametersCommand({ - Names: [`/api/url-${STAGE}`], - WithDecryption: true - }))).Parameters) { - console.log(`ok - setting ${param.Name}`); - params.set(param.Name, param.Value); - } - - console.log(`ok - deleting: ${StackName}`); + console.log(`ok - deleting: ${StackName}`); - await cf.send(new CloudFormation.DeleteStackCommand({ StackName })); + await cf.send(new CloudFormation.DeleteStackCommand({ StackName })); - console.log(`ok - batch: ${batch}`); + console.log(`ok - batch: ${batch}`); - await fetcher(params.get(`/api/url-${STAGE}`), { - query: ` + await fetcher(params.get(`/api/url-${STAGE}`), { + query: ` mutation UpdateBatch($input: UpdateBatchInput!){ updateBatch(input: $input) { batch { @@ -61,21 +67,21 @@ export async function handler(event) { } } `, - variables: { - input: { - _id: batch, - processingEnd: new Date() - } - } - }); + variables: { + input: { + _id: batch, + processingEnd: new Date(), + }, + }, + }); - console.log('ok - stack deletion complete'); - } catch (err) { - console.error(err); + console.log('ok - stack deletion complete'); + } catch (err) { + console.error(err); - if (params.has(`/api/url-${STAGE}`)) { - await fetcher(params.get(`/api/url-${STAGE}`), { - query: ` + if (params.has(`/api/url-${STAGE}`)) { + await fetcher(params.get(`/api/url-${STAGE}`), { + query: ` mutation CreateBatchError($input: CreateBatchErrorInput!) { createBatchError(input: $input) { _id @@ -85,92 +91,100 @@ export async function handler(event) { } } `, - variables: { - input: { - error: err.message, - batch: batch - } - } - }); - } else { - console.error('not ok - Failed to post to CreateBatchError'); - } - - throw err; + variables: { + input: { + error: err.message, + batch: batch, + }, + }, + }); + } else { + console.error('not ok - Failed to post to CreateBatchError'); } + + throw err; + } } async function scheduledDelete(stage) { - const cf = new CloudFormation.CloudFormationClient({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); - - let stacks = []; - let nextToken = undefined; - do { - const res = await cf.send(new CloudFormation.ListStacksCommand({ - NextToken: nextToken - })); - - stacks.push(...res.StackSummaries); - nextToken = res.NextToken; - } while (nextToken); - - stacks = stacks.filter((stack) => { - return stack.StackName.startsWith(`animl-ingest-${stage}-batch-`); - }).filter((stack) => { - return stack.StackStatus !== 'DELETE_COMPLETE'; - }).filter((stack) => { - return moment(stack.CreationTime).isSameOrBefore(moment().subtract(24, 'hours')); + const cf = new CloudFormation.CloudFormationClient({ + region: process.env.AWS_DEFAULT_REGION || 'us-west-2', + }); + + let stacks = []; + let nextToken = undefined; + do { + const res = await cf.send( + new CloudFormation.ListStacksCommand({ + NextToken: nextToken, + }), + ); + + stacks.push(...res.StackSummaries); + nextToken = res.NextToken; + } while (nextToken); + + stacks = stacks + .filter((stack) => { + return stack.StackName.startsWith(`animl-ingest-${stage}-batch-`); + }) + .filter((stack) => { + return stack.StackStatus !== 'DELETE_COMPLETE'; + }) + .filter((stack) => { + return moment(stack.CreationTime).isSameOrBefore(moment().subtract(24, 'hours')); }); - const cw = new CW.CloudWatchClient({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); - console.log(`ok - ${stacks.length} candidate stacks for deletion`); - for (const stack of stacks) { - console.log(`ok - checking alarm state ${stack.StackName}`); - const alarm = await cw.send(new CW.DescribeAlarmsCommand({ - AlarmNames: [`${stack.StackName}-sqs-empty`] - })); - - if (alarm.MetricAlarms[0].StateValue === 'INSUFFICIENT_DATA') { - console.log(`ok - deleting ${stack.StackName}`); - await cf.send(new CloudFormation.DeleteStackCommand({ StackName: stack.StackName })); - } + const cw = new CW.CloudWatchClient({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); + console.log(`ok - ${stacks.length} candidate stacks for deletion`); + for (const stack of stacks) { + console.log(`ok - checking alarm state ${stack.StackName}`); + const alarm = await cw.send( + new CW.DescribeAlarmsCommand({ + AlarmNames: [`${stack.StackName}-sqs-empty`], + }), + ); + + if (alarm.MetricAlarms[0].StateValue === 'INSUFFICIENT_DATA') { + console.log(`ok - deleting ${stack.StackName}`); + await cf.send(new CloudFormation.DeleteStackCommand({ StackName: stack.StackName })); } + } } async function fetcher(url, body) { - console.log('Posting metadata to API'); - const res = await fetch(url, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': APIKEY - }, - body: JSON.stringify(body) - }); - - if (!res.ok) { - const texterr = await res.text(); - let jsonerr; - try { - jsonerr = JSON.parse(texterr); - } catch (err) { - throw new Error(texterr); - } - - if (jsonerr.message) throw new Error(jsonerr.message); - throw new Error(texterr); + console.log('Posting metadata to API'); + const res = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': API_KEY, + }, + body: JSON.stringify(body), + }); + + if (!res.ok) { + const texterr = await res.text(); + let jsonerr; + try { + jsonerr = JSON.parse(texterr); + } catch (err) { + throw new Error(texterr); } - const json = await res.json(); + if (jsonerr.message) throw new Error(jsonerr.message); + throw new Error(texterr); + } - if (json && Array.isArray(json.errors) && json.errors.length) { - throw new Error(json.errors[0].message); - } + const json = await res.json(); - return json; -} + if (json && Array.isArray(json.errors) && json.errors.length) { + throw new Error(json.errors[0].message); + } + return json; +} if (import.meta.url === `file://${process.argv[1]}`) { - await handler(process.env.EVENT ? JSON.parse(process.env.EVENT) : {}); + await handler(process.env.EVENT ? JSON.parse(process.env.EVENT) : {}); } diff --git a/ingest-image/task.js b/ingest-image/task.js index c3a2be5..86a8fc5 100644 --- a/ingest-image/task.js +++ b/ingest-image/task.js @@ -24,7 +24,7 @@ const region = process.env.AWS_DEFAULT_REGION || 'us-west-2'; const IngestType = new Enum(['NONE', 'IMAGE', 'BATCH'], 'IngestType'); -const APIKEY = process.env.APIKEY; +const API_KEY = process.env.API_KEY; const SHARP_CONFIG = { failOn: 'none' }; @@ -440,7 +440,7 @@ async function fetcher(url, body) { signal: controller.signal, headers: { 'Content-Type': 'application/json', - 'x-api-key': APIKEY, + 'x-api-key': API_KEY, }, body: JSON.stringify(body), }); diff --git a/ingest-zip/index.js b/ingest-zip/index.js index c04d24c..66daaea 100644 --- a/ingest-zip/index.js +++ b/ingest-zip/index.js @@ -11,7 +11,7 @@ import StreamZip from 'node-stream-zip'; import Stack from './lib/stack.js'; import asyncPool from 'tiny-async-pool'; -const APIKEY = process.env.APIKEY; +const API_KEY = process.env.API_KEY; // If this is changed also update ingest-image const SUPPORTED_FILE_TYPES = ['.jpg', '.jpeg', '.png']; @@ -27,151 +27,169 @@ const UPDATE_BATCH_QUERY = ` `; export default async function handler() { - const task = JSON.parse(process.env.TASK); - const batch = task.Key.replace('.zip', ''); - const StackName = `${process.env.StackName}-${batch}`; - const STAGE = process.env.STAGE || 'dev'; + const task = JSON.parse(process.env.TASK); + const batch = task.Key.replace('.zip', ''); + const StackName = `${process.env.StackName}-${batch}`; + const STAGE = process.env.STAGE || 'dev'; - const params = new Map(); + const params = new Map(); - try { - const s3 = new S3.S3Client({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); - const cf = new CloudFormation.CloudFormationClient({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); - const ssm = new SSM.SSMClient({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); - - for (const param of (await ssm.send(new SSM.GetParametersCommand({ - Names: [`/api/url-${STAGE}`], - WithDecryption: true - }))).Parameters) { - console.log(`ok - setting ${param.Name}`); - params.set(param.Name, param.Value); - } - - await pipeline( - (await s3.send(new S3.GetObjectCommand({ - Bucket: task.Bucket, - Key: task.Key - }))).Body, - fs.createWriteStream(path.resolve(os.tmpdir(), 'input.zip')) - ); - - // Preparse Zip to get a general sense of how many items are in the zip - // and if it is empty ignore it - const zip = new StreamZip.async({ - file: path.resolve(os.tmpdir(), 'input.zip'), - skipEntryNameValidation: true - }); - - let total = 0; - const entries = await zip.entries(); - for (const entrykey in entries) { - const entry = entries[entrykey]; - const parsed = path.parse(entry.name); - if (!parsed.ext) continue; - if (parsed.base[0] === '.') continue; - if (!SUPPORTED_FILE_TYPES.includes(parsed.ext.toLowerCase())) continue; - total++; - } - - zip.close(); - - const now = new Date(); - const input = { - _id: batch, - total: total, - uploadComplete: now - }; - - if (total === 0) { - console.log('ok - no image files to process'); - input.processingStart = now, - input.processingEnd = now; - } - - await fetcher(params.get(`/api/url-${STAGE}`), { - query: UPDATE_BATCH_QUERY, - variables: { input } - }); - - await cf.send(new CloudFormation.CreateStackCommand({ - StackName, - TemplateBody: JSON.stringify(Stack.generate(process.env.StackName, task, STAGE)), - Parameters: [{ - ParameterKey: 'BatchID', - ParameterValue: batch - },{ - ParameterKey: 'S3URL', - ParameterValue: `s3://${task.Bucket}/${task.Key}` - }] - })); - - await monitor(StackName); - - await fetcher(params.get(`/api/url-${STAGE}`), { - query: UPDATE_BATCH_QUERY, - variables: { - input: { - _id: batch, - processingStart: new Date() - } - } - }); - - const prezip = new StreamZip.async({ - file: path.resolve(os.tmpdir(), 'input.zip'), - skipEntryNameValidation: true - }); - - const preentries = await prezip.entries(); - const images = []; - for (const entrykey in preentries) { - const entry = preentries[entrykey]; - images.push(entry); - } - - for await (const ms of asyncPool(1000, images, async (entry) => { - const parsed = path.parse(entry.name); - if (!parsed.ext) return; - if (parsed.base[0] === '.') return; - if (!SUPPORTED_FILE_TYPES.includes(parsed.ext.toLowerCase())) return; - - const data = await prezip.entryData(entry); - - await s3.send(new S3.PutObjectCommand({ - Bucket: task.Bucket, - Key: `${batch}/${entry.name}`, - Body: data - })); - - return `ok - written: ${batch}/${entry.name}`; - })) { - console.log(ms); - } - - prezip.close(); - - await s3.send(new S3.DeleteObjectCommand({ + try { + const s3 = new S3.S3Client({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); + const cf = new CloudFormation.CloudFormationClient({ + region: process.env.AWS_DEFAULT_REGION || 'us-west-2', + }); + const ssm = new SSM.SSMClient({ region: process.env.AWS_DEFAULT_REGION || 'us-west-2' }); + + for (const param of ( + await ssm.send( + new SSM.GetParametersCommand({ + Names: [`/api/url-${STAGE}`], + WithDecryption: true, + }), + ) + ).Parameters) { + console.log(`ok - setting ${param.Name}`); + params.set(param.Name, param.Value); + } + + await pipeline( + ( + await s3.send( + new S3.GetObjectCommand({ Bucket: task.Bucket, - Key: task.Key - })); - - console.log('ok - extraction complete'); - - await fetcher(params.get(`/api/url-${STAGE}`), { - query: UPDATE_BATCH_QUERY, - variables: { - input: { - _id: batch, - ingestionComplete: new Date() - } - } - }); - } catch (err) { - console.error(err); + Key: task.Key, + }), + ) + ).Body, + fs.createWriteStream(path.resolve(os.tmpdir(), 'input.zip')), + ); + + // Preparse Zip to get a general sense of how many items are in the zip + // and if it is empty ignore it + const zip = new StreamZip.async({ + file: path.resolve(os.tmpdir(), 'input.zip'), + skipEntryNameValidation: true, + }); + + let total = 0; + const entries = await zip.entries(); + for (const entrykey in entries) { + const entry = entries[entrykey]; + const parsed = path.parse(entry.name); + if (!parsed.ext) continue; + if (parsed.base[0] === '.') continue; + if (!SUPPORTED_FILE_TYPES.includes(parsed.ext.toLowerCase())) continue; + total++; + } + + zip.close(); + + const now = new Date(); + const input = { + _id: batch, + total: total, + uploadComplete: now, + }; + + if (total === 0) { + console.log('ok - no image files to process'); + (input.processingStart = now), (input.processingEnd = now); + } + + await fetcher(params.get(`/api/url-${STAGE}`), { + query: UPDATE_BATCH_QUERY, + variables: { input }, + }); + + await cf.send( + new CloudFormation.CreateStackCommand({ + StackName, + TemplateBody: JSON.stringify(Stack.generate(process.env.StackName, task, STAGE)), + Parameters: [ + { + ParameterKey: 'BatchID', + ParameterValue: batch, + }, + { + ParameterKey: 'S3URL', + ParameterValue: `s3://${task.Bucket}/${task.Key}`, + }, + ], + }), + ); + + await monitor(StackName); + + await fetcher(params.get(`/api/url-${STAGE}`), { + query: UPDATE_BATCH_QUERY, + variables: { + input: { + _id: batch, + processingStart: new Date(), + }, + }, + }); + + const prezip = new StreamZip.async({ + file: path.resolve(os.tmpdir(), 'input.zip'), + skipEntryNameValidation: true, + }); + + const preentries = await prezip.entries(); + const images = []; + for (const entrykey in preentries) { + const entry = preentries[entrykey]; + images.push(entry); + } + + for await (const ms of asyncPool(1000, images, async (entry) => { + const parsed = path.parse(entry.name); + if (!parsed.ext) return; + if (parsed.base[0] === '.') return; + if (!SUPPORTED_FILE_TYPES.includes(parsed.ext.toLowerCase())) return; + + const data = await prezip.entryData(entry); + + await s3.send( + new S3.PutObjectCommand({ + Bucket: task.Bucket, + Key: `${batch}/${entry.name}`, + Body: data, + }), + ); + + return `ok - written: ${batch}/${entry.name}`; + })) { + console.log(ms); + } + + prezip.close(); + + await s3.send( + new S3.DeleteObjectCommand({ + Bucket: task.Bucket, + Key: task.Key, + }), + ); - if (params.has(`/api/url-${STAGE}`)) { - await fetcher(params.get(`/api/url-${STAGE}`), { - query: ` + console.log('ok - extraction complete'); + + await fetcher(params.get(`/api/url-${STAGE}`), { + query: UPDATE_BATCH_QUERY, + variables: { + input: { + _id: batch, + ingestionComplete: new Date(), + }, + }, + }); + } catch (err) { + console.error(err); + + if (params.has(`/api/url-${STAGE}`)) { + await fetcher(params.get(`/api/url-${STAGE}`), { + query: ` mutation CreateBatchError($input: CreateBatchErrorInput!) { createBatchError(input: $input) { _id @@ -181,85 +199,83 @@ export default async function handler() { } } `, - variables: { - input: { - error: err.message, - batch: batch - } - } - }); - } else { - console.error('not ok - Failed to post to CreateBatchError'); - } - - throw err; + variables: { + input: { + error: err.message, + batch: batch, + }, + }, + }); + } else { + console.error('not ok - Failed to post to CreateBatchError'); } -} + throw err; + } +} function monitor(StackName) { - const region = process.env.AWS_DEFAULT_REGION || 'us-west-2'; - - return new Promise((resolve, reject) => { - const events = EventStream(StackName, { region }) - .on('error', (err) => { - return reject(err); - }); - - const stringify = new stream.Transform({ objectMode: true }); - stringify._transform = (event, enc, cb) => { - let msg = event.ResourceStatus + ' ' + event.LogicalResourceId; - if (event.ResourceStatusReason) msg += ': ' + event.ResourceStatusReason; - cb(null, currentTime() + ' ' + region + ': ' + msg + '\n'); - }; - - events.pipe(stringify).pipe(process.stdout); - stringify.on('end', resolve); + const region = process.env.AWS_DEFAULT_REGION || 'us-west-2'; + + return new Promise((resolve, reject) => { + const events = EventStream(StackName, { region }).on('error', (err) => { + return reject(err); }); + + const stringify = new stream.Transform({ objectMode: true }); + stringify._transform = (event, enc, cb) => { + let msg = event.ResourceStatus + ' ' + event.LogicalResourceId; + if (event.ResourceStatusReason) msg += ': ' + event.ResourceStatusReason; + cb(null, currentTime() + ' ' + region + ': ' + msg + '\n'); + }; + + events.pipe(stringify).pipe(process.stdout); + stringify.on('end', resolve); + }); } function currentTime() { - const now = new Date(); - const hour = ('00' + now.getUTCHours()).slice(-2); - const min = ('00' + now.getUTCMinutes()).slice(-2); - const sec = ('00' + now.getUTCSeconds()).slice(-2); - return [hour, min, sec].join(':') + 'Z'; + const now = new Date(); + const hour = ('00' + now.getUTCHours()).slice(-2); + const min = ('00' + now.getUTCMinutes()).slice(-2); + const sec = ('00' + now.getUTCSeconds()).slice(-2); + return [hour, min, sec].join(':') + 'Z'; } async function fetcher(url, body) { - console.log('Posting metadata to API', JSON.stringify(body)); - const res = await fetch(url, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': APIKEY - }, - body: JSON.stringify(body) - }); - - if (!res.ok) { - console.error('ERROR: Headers:', JSON.stringify(Object.fromEntries(res.headers))); - - const texterr = await res.text(); - let jsonerr; - try { - jsonerr = JSON.parse(texterr); - } catch (err) { - throw new Error(texterr); - } - - console.error('ERROR: Body:', JSON.stringify(jsonerr)); - if (jsonerr.message) throw new Error(jsonerr.message); - throw new Error(texterr); + console.log('Posting metadata to API', JSON.stringify(body)); + const res = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': API_KEY, + }, + body: JSON.stringify(body), + }); + + if (!res.ok) { + console.error('ERROR: Headers:', JSON.stringify(Object.fromEntries(res.headers))); + + const texterr = await res.text(); + let jsonerr; + try { + jsonerr = JSON.parse(texterr); + } catch (err) { + throw new Error(texterr); } - const json = await res.json(); + console.error('ERROR: Body:', JSON.stringify(jsonerr)); + if (jsonerr.message) throw new Error(jsonerr.message); + throw new Error(texterr); + } + + const json = await res.json(); - if (json && Array.isArray(json.errors) && json.errors.length) { - throw new Error(json.errors[0].message); - } + if (json && Array.isArray(json.errors) && json.errors.length) { + throw new Error(json.errors[0].message); + } - return json; + return json; } if (import.meta.url === `file://${process.argv[1]}`) handler(); diff --git a/serverless.yml b/serverless.yml index aa1d9a3..90abf8d 100644 --- a/serverless.yml +++ b/serverless.yml @@ -94,7 +94,7 @@ provider: environment: STAGE: ${opt:stage, self:provider.stage, 'dev'} - APIKEY: '{{resolve:secretsmanager:api-key-${self:provider.stage}:SecretString:apikey}}' + API_KEY: '{{resolve:secretsmanager:api-key-${self:provider.stage}:SecretString:apikey}}' functions: IngestZip: @@ -291,7 +291,7 @@ resources: Value: !Ref 'AWS::StackName' - Name: 'STAGE' Value: ${opt:stage, self:provider.stage, 'dev'} - - Name: 'APIKEY' + - Name: 'API_KEY' Value: '{{resolve:secretsmanager:api-key-${self:provider.stage}:SecretString:apikey}}' ResourceRequirements: - Type: 'VCPU'