diff --git a/dev/test.ts b/dev/test.ts index d7f9494..bf19db6 100644 --- a/dev/test.ts +++ b/dev/test.ts @@ -10,6 +10,7 @@ import { TEST_BUCKET_OBJECT_PREFIX, TEST_BUCKET_WORKING_PREFIX, } from "./constants"; +import { makeObjectDictionaryCsv, makeTestObject } from "./tests-util"; const discoveryClient = new ServiceDiscoveryClient({}); const s3Client = new S3Client({}); @@ -18,31 +19,6 @@ const sfnClient = new SFNClient({}); // generate a unique run folder for this execution of the entire test suite const uniqueFolder = randomBytes(8).toString("hex"); -/** - * Put a list of objects as a CSV into an object. - * - * @param absoluteCsvKey the key of the CSV in the working folder - * @param keysBucket the source bucket of the objects - * @param keys the keys of the objects - */ -async function makeObjectListCsv( - absoluteCsvKey: string, - keysBucket: string, - keys: string[], -) { - let content = ""; - for (const k of keys) { - content += `${keysBucket},"${k}"\n`; - } - const response = await s3Client.send( - new PutObjectCommand({ - Bucket: TEST_BUCKET, - Key: absoluteCsvKey, - Body: content, - }), - ); -} - function getPaths(testNumber: number) { const tsvName = `${testNumber}-objects-to-copy.tsv`; @@ -53,7 +29,7 @@ function getPaths(testNumber: number) { testFolderObjectsTsvRelative: `${uniqueFolder}/${tsvName}`, testFolderObjectsTsvAbsolute: `${TEST_BUCKET_WORKING_PREFIX}${uniqueFolder}/${tsvName}`, - testFolderSrc: `${TEST_BUCKET_OBJECT_PREFIX}${uniqueFolder}/${testNumber}-src`, + testFolderSrc: `${TEST_BUCKET_OBJECT_PREFIX}${uniqueFolder}/${testNumber}-src/`, testFolderDest: `${TEST_BUCKET_OBJECT_PREFIX}${uniqueFolder}/${testNumber}-dest/`, }; } @@ -74,14 +50,12 @@ async function doTest1(stateMachineArn: string) { }; for (const [n, stor] of Object.entries(sourceObjects)) { - await makeTestObject(n, 256 * 1024, stor); + await makeTestObject(TEST_BUCKET, n, 256 * 1024, stor); } - await makeObjectListCsv( - testFolderObjectsTsvAbsolute, - TEST_BUCKET, - Object.keys(sourceObjects), - ); + await makeObjectDictionaryCsv(TEST_BUCKET, testFolderObjectsTsvAbsolute, { + TEST_BUCKET: Object.keys(sourceObjects), + }); await sfnClient.send( new StartExecutionCommand({ @@ -98,15 +72,54 @@ async function doTest1(stateMachineArn: string) { async function doTest2(stateMachineArn: string) { const { + testFolderSrc, testFolderDest, testFolderObjectsTsvAbsolute, testFolderObjectsTsvRelative, } = getPaths(1); - await makeObjectListCsv(testFolderObjectsTsvAbsolute, "umccr-10g-data-dev", [ - "HG00096/HG00096.hard-filtered.vcf.gz", - "HG00097/HG00097.hard-filtered.vcf.gz", - ]); + // we are going to make objects that are in both the src *and* destination + // this will let us test our "checksum skipping" + + // same name and same content + await makeTestObject( + TEST_BUCKET, + `${testFolderSrc}existing-a.bin`, + 256 * 1024, + ); + await makeTestObject( + TEST_BUCKET, + `${testFolderDest}existing-a.bin`, + 256 * 1024, + ); + + // same name and different content - the result should be that rclone *does* copy this + await makeTestObject( + TEST_BUCKET, + `${testFolderSrc}existing-b.bin`, + 64 * 1024, + ); + await makeTestObject( + TEST_BUCKET, + `${testFolderDest}existing-b.bin`, + 64 * 1024, + "STANDARD", + 1, + ); + + await makeObjectDictionaryCsv(TEST_BUCKET, testFolderObjectsTsvAbsolute, { + "umccr-10g-data-dev": [ + "HG00096/HG00096.hard-filtered.vcf.gz", + "HG00097/HG00097.hard-filtered.vcf.gz", + // this does not exist + "HG000XX/HG000XX.hard-filtered.vcf.gz", + ], + "not-a-bucket-that-exists": ["a-file-that-also-does-not-exist.bam"], + [TEST_BUCKET]: [ + `${testFolderSrc}existing-a.bin`, + `${testFolderSrc}existing-b.bin`, + ], + }); await sfnClient.send( new StartExecutionCommand({ @@ -115,7 +128,7 @@ async function doTest2(stateMachineArn: string) { sourceFilesCsvKey: testFolderObjectsTsvRelative, destinationBucket: TEST_BUCKET, destinationPrefixKey: testFolderDest, - maxItemsPerBatch: 1, + maxItemsPerBatch: 2, }), }), ); @@ -133,14 +146,12 @@ async function doTest3(stateMachineArn: string) { }; for (const [n, stor] of Object.entries(sourceObjects)) { - await makeTestObject(n, 1000, stor); + await makeTestObject(TEST_BUCKET, n, 1000, stor); } - await makeObjectListCsv( - `${testFolderSrc}/objects-to-copy.tsv`, - TEST_BUCKET, - Object.keys(sourceObjects), - ); + //await makeObjectDictionaryCsv(TEST_BUCKET, testFolderObjectsTsvAbsolute, { + // TEST_BUCKET: Object.keys(sourceObjects), + //}); await sfnClient.send( new StartExecutionCommand({ @@ -155,23 +166,6 @@ async function doTest3(stateMachineArn: string) { ); } -async function makeTestObject( - key: string, - sizeInBytes: number, - storageClass: StorageClass = "STANDARD", -) { - const response = await s3Client.send( - new PutObjectCommand({ - Bucket: TEST_BUCKET, - Key: key, - Body: Buffer.alloc(sizeInBytes, 13), - StorageClass: storageClass, - }), - ); -} - -async function createTestData() {} - (async () => { console.log(`Working folder = ${TEST_BUCKET}:${uniqueFolder}`); diff --git a/dev/tests-util.ts b/dev/tests-util.ts new file mode 100644 index 0000000..1940305 --- /dev/null +++ b/dev/tests-util.ts @@ -0,0 +1,68 @@ +import { PutObjectCommand, S3Client, StorageClass } from "@aws-sdk/client-s3"; + +const s3Client = new S3Client({}); + +/** + * Put a dictionary of objects as a two column CSV into an S3 object. + * + * @param csvBucket + * @param csvAbsoluteKey the key of the CSV in the working folder + * @param objects a dictionary of buckets->key[] + */ +export async function makeObjectDictionaryCsv( + csvBucket: string, + csvAbsoluteKey: string, + objects: Record, +) { + let content = ""; + + // for each bucket + for (const b of Object.keys(objects)) { + // for each key + for (const k of objects[b]) content += `${b},"${k}"\n`; + } + + // now save the CSV to S3 + const response = await s3Client.send( + new PutObjectCommand({ + Bucket: csvBucket, + Key: csvAbsoluteKey, + Body: content, + }), + ); +} + +/** + * Makes an S3 object of a certain size and storage class - and + * filled with basically blank data + * + * @param bucket the bucket of the object + * @param key the key of the object + * @param sizeInBytes the size in bytes of the object to make + * @param storageClass the storage class for the object, defaults to STANDARD + * @param forceContentByte force a content byte if the default needs to be overridden + * @returns the byte value that is the content of the created file + */ +export async function makeTestObject( + bucket: string, + key: string, + sizeInBytes: number, + storageClass: StorageClass = "STANDARD", + forceContentByte: number | undefined = undefined, +) { + const contentByte = + forceContentByte === undefined ? sizeInBytes % 256 : forceContentByte; + const response = await s3Client.send( + new PutObjectCommand({ + Bucket: bucket, + Key: key, + // so rather than make every file filled with 0s - we fill + // with a value that depends on the size... no particular + // point other than we can I guess assert content has been + // successfully copied by looking at the destination content after copy + Body: Buffer.alloc(sizeInBytes, contentByte), + StorageClass: storageClass, + }), + ); + return contentByte; +} diff --git a/packages/aws-copy-out-sharer/docker/rclone-batch-docker-image/rclone-batch.go b/packages/aws-copy-out-sharer/docker/rclone-batch-docker-image/rclone-batch.go index 198b03e..481c92b 100644 --- a/packages/aws-copy-out-sharer/docker/rclone-batch-docker-image/rclone-batch.go +++ b/packages/aws-copy-out-sharer/docker/rclone-batch-docker-image/rclone-batch.go @@ -122,6 +122,9 @@ func main() { "--stats", "10000h", // normally no bandwidth limiting ("0") - but can institute bandwidth limit if asked "--bwlimit", If(debugBandwidthOk, debugBandwidth, "0"), + // because we are transferring between S3 - which has a consistent idea of checksums + // at src and destination we enable this options + "--checksum", "copy", source, destination) // we are only interested in stderr @@ -225,13 +228,13 @@ func main() { case 143: results[which] = map[string]any{ "errors": 1, - "lastError": "Interrupted by SIGTERM", + "lastError": "interrupted by SIGTERM", "source": source} resultErrorCount++ default: results[which] = map[string]any{ "errors": 1, - "lastError": fmt.Sprintf("Exit of rclone with code %v but no JSON statistics block generated", runExitErr.ExitCode()), + "lastError": fmt.Sprintf("exit of rclone with code %v but no JSON statistics block generated", runExitErr.ExitCode()), "systemError": fmt.Sprintf("%#v", runExitErr), "source": source} resultErrorCount++ @@ -245,7 +248,7 @@ func main() { // create a fake "compatible" stats block results[which] = map[string]any{ "errors": 1, - "lastError": "Skipped due to previous SIGTERM received", + "lastError": "skipped due to previous SIGTERM received", "source": source} resultErrorCount++ } diff --git a/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/package-lock.json b/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/package-lock.json index e84c5e8..dbd2b9a 100644 --- a/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/package-lock.json +++ b/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/package-lock.json @@ -9,7 +9,8 @@ "version": "1.0.0", "dependencies": { "@aws-sdk/client-s3": "3.405.0", - "@types/aws-lambda": "8.10.93" + "@types/aws-lambda": "8.10.93", + "csv-stringify": "6.4.4" } }, "node_modules/@aws-crypto/crc32": { @@ -1395,6 +1396,11 @@ "resolved": "https://registry.npmjs.org/bowser/-/bowser-2.11.0.tgz", "integrity": "sha512-AlcaJBi/pqqJBIQ8U9Mcpc9i8Aqxn88Skv5d+xBX006BY5u8N3mGLHa5Lgppa7L/HfwgwLgZ6NYs+Ag6uUmJRA==" }, + "node_modules/csv-stringify": { + "version": "6.4.4", + "resolved": "https://registry.npmjs.org/csv-stringify/-/csv-stringify-6.4.4.tgz", + "integrity": "sha512-NDshLupGa7gp4UG4sSNIqwYJqgSwvds0SvENntxoVoVvTzXcrHvd5gG2MWpbRpSNvk59dlmIe1IwNvSxN4IVmg==" + }, "node_modules/fast-xml-parser": { "version": "4.2.5", "resolved": "https://registry.npmjs.org/fast-xml-parser/-/fast-xml-parser-4.2.5.tgz", diff --git a/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/package.json b/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/package.json index 764d461..5df6d4a 100644 --- a/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/package.json +++ b/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/package.json @@ -4,6 +4,7 @@ "main": "summarise-copy-lambda.ts", "dependencies": { "@aws-sdk/client-s3": "3.405.0", - "@types/aws-lambda": "8.10.93" + "@types/aws-lambda": "8.10.93", + "csv-stringify": "6.4.4" } } diff --git a/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/summarise-copy-lambda.ts b/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/summarise-copy-lambda.ts index cab1e99..a44ba9d 100644 --- a/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/summarise-copy-lambda.ts +++ b/packages/aws-copy-out-sharer/lambda/summarise-copy-lambda/summarise-copy-lambda.ts @@ -1,98 +1,197 @@ -import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3"; +import { + GetObjectCommand, + PutObjectCommand, + S3Client, +} from "@aws-sdk/client-s3"; import { basename } from "path/posix"; +import { stringify } from "csv-stringify/sync"; interface InvokeEvent { - Bucket: string; - Key: string; + rcloneResults: { + manifestAbsoluteKey: string; + }; + destinationBucket: string; + destinationPrefixKey: string; + destinationEndCopyRelativeKey: string; } +/** + * A handler that process the Steps CSV result (which should have a bunch + * of rclone stats) - and converts that into a CSV report for the + * copy out destination. + * + * @param event + */ export async function handler(event: InvokeEvent) { - console.log(JSON.stringify(event, null, 2)); + // debug input event + console.debug(JSON.stringify(event, null, 2)); const client = new S3Client({}); + // the manifest.json is generated by an AWS Steps DISTRIBUTED map and shows the results + // of all the individual map run parts const getManifestCommand = new GetObjectCommand({ - Bucket: event.Bucket, - Key: event.Key, + Bucket: event.destinationBucket, + Key: event.rcloneResults.manifestAbsoluteKey, }); const getManifestResult = await client.send(getManifestCommand); const getManifestContent = await getManifestResult.Body.transformToString(); - - const manifest = JSON.parse(getManifestContent); - + // A sample manifest // {"DestinationBucket":"elsa-data-tmp", - // "MapRunArn":"arn:aws:states:ap-southeast-2:843407916570:mapRun:CopyOutStateMachineF3E7B017-3KDB9FinLIMl/4474d22f-4056-30e3-978c-027016edac90:0c17ffd6-e8ad-44c0-a65b-a8b721007241", + // "MapRunArn":"arn:aws:states:ap-southeast-2:12345678:mapRun:CopyOutStateMachineABCD/4474d22f-4056-30e3-978c-027016edac90:0c17ffd6-e8ad-44c0-a65b-a8b721007241", // "ResultFiles":{ // "FAILED":[], // "PENDING":[], - // "SUCCEEDED":[{"Key":"copy-out-test-working/a6faea86c066cd90/1-objects-to-copy.tsv/0c17ffd6-e8ad-44c0-a65b-a8b721007241/SUCCEEDED_0.json","Size":2887}]}} - if (manifest["ResultFiles"]) { - const rf = manifest["ResultFiles"]; - - if (rf["FAILED"] && rf["FAILED"].length > 0) - throw new Error("Copy is meant to succeed - but it had failed results"); - - if (rf["PENDING"] && rf["PENDING"].length > 0) - throw new Error("Copy is meant to succeed - but it had pending results"); - - if (!rf["SUCCEEDED"]) throw new Error("Copy is meant to succeed"); - - const fileResults = {}; - - for (const s of rf["SUCCEEDED"]) { - const getSuccessCommand = new GetObjectCommand({ - Bucket: event.Bucket, - Key: s["Key"], - }); - - const getSuccessResult = await client.send(getSuccessCommand); - const getSuccessContent = await getSuccessResult.Body.transformToString(); - - for (const row of JSON.parse(getSuccessContent)) { - if (row["Output"]) { - const rowOutput = JSON.parse(row["Output"]); - - // { "bytes": 0, - // "checks": 0, - // "deletedDirs": 0, - // "deletes": 0, - // "elapsedTime": 0.2928195, - // "errors": 0, - // "eta": null, - // "fatalError": false, - // "renames": 0, - // "retryError": false, - // "serverSideCopies": 1, - // "serverSideCopyBytes": 9, - // "serverSideMoveBytes": 0, - // "serverSideMoves": 0, - // "source": "s3:elsa-data-tmp/copy-out-test-objects/d76848c9ae316e13/1-src/1.bin", - // "speed": 0, - // "totalBytes": 0, - // "totalChecks": 0, - // "totalTransfers": 1, - // "transferTime": 0.046778609, - // "transfers": 1 } - for (const rcloneRow of rowOutput["rcloneResult"]) { - const s = rcloneRow["source"]; - const b = basename(s); - - const copiedBytes = rcloneRow["serverSideCopyBytes"]; - const copySeconds = rcloneRow["elapsedTime"]; + // "SUCCEEDED":[{"Key":"copy-out-test-working/a6faea86c066cd90/1-objects-to-copy.tsv/0c17ffd6-e8ad-44c0-a65b-a8b721007241/SUCCEEDED_0.json", + // "Size":2887}]}} + const manifest = JSON.parse(getManifestContent); + + const rf = manifest["ResultFiles"]; + + if (!rf) + throw new Error( + "AWS Steps Distributed map manifest.json is missing ResultFiles", + ); + + const pending = rf["PENDING"]; + const failed = rf["FAILED"]; + const succeeded = rf["SUCCEEDED"]; + + if ( + !Array.isArray(pending) || + !Array.isArray(failed) || + !Array.isArray(succeeded) + ) + throw new Error( + "AWS Steps Distributed map manifest.json is missing an expected array for PENDING, FAILED or SUCCEEDED", + ); + + if (pending.length > 0) + throw new Error( + "AWS Steps Distributed map manifest.json indicates there are PENDING results which is not a state we are expecting", + ); + + if (failed.length > 0) + throw new Error("Copy is meant to succeed - but it had failed results"); + + const fileResults = {}; + + for (const s of succeeded) { + const getSuccessCommand = new GetObjectCommand({ + Bucket: event.destinationBucket, + Key: s["Key"], + }); + + const getSuccessResult = await client.send(getSuccessCommand); + const getSuccessContent = await getSuccessResult.Body.transformToString(); + + for (const row of JSON.parse(getSuccessContent)) { + if (row["Output"]) { + const rowOutput = JSON.parse(row["Output"]); + + // { "bytes": 0, + // "checks": 0, + // "deletedDirs": 0, + // "deletes": 0, + // "elapsedTime": 0.2928195, + // "errors": 0, + // "eta": null, + // "fatalError": false, + // "renames": 0, + // "retryError": false, + // "serverSideCopies": 1, + // "serverSideCopyBytes": 9, + // "serverSideMoveBytes": 0, + // "serverSideMoves": 0, + // "source": "s3:elsa-data-tmp/copy-out-test-objects/d76848c9ae316e13/1-src/1.bin", + // "speed": 0, + // "totalBytes": 0, + // "totalChecks": 0, + // "totalTransfers": 1, + // "transferTime": 0.046778609, + // "transfers": 1 } + for (const rcloneRow of rowOutput["rcloneResult"]) { + console.log(JSON.stringify(rcloneRow, null, 2)); + + const s = rcloneRow["source"]; + const b = basename(s); + + // NOTE/WARNING: this behaviour is very dependent on rclone and our interpretation + // of rclone stats - so if things start breaking this is where I would start + // looking + const errors: number = rcloneRow["errors"]; + const lastError: number = rcloneRow["lastError"]; + const copiedBytes: number = rcloneRow["serverSideCopyBytes"]; + const copySeconds = rcloneRow["elapsedTime"]; + const totalTransfers = rcloneRow["totalTransfers"]; + const retryError = rcloneRow["retryError"]; + + // firstly if we have been signalled an error - we need to report that + if (errors > 0) { fileResults[b] = { name: b, - speedInMebibytesPerSecond: - copiedBytes / copySeconds / 1024 / 1024, + status: "ERROR", + speed: 0, + message: lastError, }; + } else { + // if we didn't end up transferring anything BUT there was no actual error AND + // we did a retry - then that probably means the source file didn't exist + if (totalTransfers < 1 && retryError) { + fileResults[b] = { + name: b, + status: "ERROR", + speed: 0, + message: "source file did not exist so nothing was transferred", + }; + } + // if we didn't end up transferring anything BUT there was no actual error + // AND we didn't do any retries then changes are we skipped due to it already + // being at the destination + else if (totalTransfers < 1 && !retryError) { + fileResults[b] = { + name: b, + status: "ALREADYCOPIED", + speed: 0, + message: + "destination file already exists with same checksum so nothing was transferred", + }; + } else { + // if we did do a copy then copySeconds will normally be a value and we can compute a speed + if (copySeconds) + fileResults[b] = { + name: b, + status: "COPIED", + speed: Math.floor(copiedBytes / copySeconds / 1024 / 1024), + message: "", + }; + } } } } - - console.log(JSON.stringify(fileResults, null, 2)); } - } else { - throw new Error("Missing result manifest"); + + // debug results before we make the CSV + console.debug(JSON.stringify(fileResults, null, 2)); + + const output = stringify(Object.values(fileResults), { + header: true, + columns: { + name: "OBJECTNAME", + status: "TRANSFERSTATUS", + speed: "MBPERSEC", + message: "MESSAGE", + }, + }); + + const putCommand = new PutObjectCommand({ + Bucket: event.destinationBucket, + Key: `${event.destinationPrefixKey}${event.destinationEndCopyRelativeKey}`, + Body: output, + }); + + await client.send(putCommand); } } diff --git a/packages/aws-copy-out-sharer/src/copy-out-state-machine-construct.ts b/packages/aws-copy-out-sharer/src/copy-out-state-machine-construct.ts index d4161e5..64d57bd 100644 --- a/packages/aws-copy-out-sharer/src/copy-out-state-machine-construct.ts +++ b/packages/aws-copy-out-sharer/src/copy-out-state-machine-construct.ts @@ -45,7 +45,7 @@ export class CopyOutStateMachineConstruct extends Construct { // these are the default objects that will be created in the destination prefix area destinationStartCopyRelativeKey: "STARTED_COPY.txt", - destinationEndCopyRelativeKey: "ENDED_COPY.txt", + destinationEndCopyRelativeKey: "ENDED_COPY.csv", }; const canWriteLambdaStep = new CanWriteLambdaStepConstruct( diff --git a/packages/aws-copy-out-sharer/src/copy-out-state-machine-input.ts b/packages/aws-copy-out-sharer/src/copy-out-state-machine-input.ts index c290bbf..81331c3 100644 --- a/packages/aws-copy-out-sharer/src/copy-out-state-machine-input.ts +++ b/packages/aws-copy-out-sharer/src/copy-out-state-machine-input.ts @@ -26,9 +26,12 @@ export type CopyOutStateMachineInputKeys = keyof CopyOutStateMachineInput; export const SOURCE_FILES_CSV_KEY_FIELD_NAME: CopyOutStateMachineInputKeys = "sourceFilesCsvKey"; +export const MAX_ITEMS_PER_BATCH_FIELD_NAME: CopyOutStateMachineInputKeys = + "maxItemsPerBatch"; + export const DESTINATION_BUCKET_FIELD_NAME: CopyOutStateMachineInputKeys = "destinationBucket"; -export const DESTINATION_KEY_FIELD_NAME: CopyOutStateMachineInputKeys = +export const DESTINATION_PREFIX_KEY_FIELD_NAME: CopyOutStateMachineInputKeys = "destinationPrefixKey"; export const DESTINATION_START_COPY_RELATIVE_KEY_FIELD_NAME: CopyOutStateMachineInputKeys = diff --git a/packages/aws-copy-out-sharer/src/rclone-map-construct.ts b/packages/aws-copy-out-sharer/src/rclone-map-construct.ts index a9d9608..8180e13 100644 --- a/packages/aws-copy-out-sharer/src/rclone-map-construct.ts +++ b/packages/aws-copy-out-sharer/src/rclone-map-construct.ts @@ -4,7 +4,12 @@ import { S3CsvDistributedMap } from "./s3-csv-distributed-map"; import { RcloneRunTaskConstruct } from "./rclone-run-task-construct"; import { IVpc, SubnetType } from "aws-cdk-lib/aws-ec2"; import { Cluster } from "aws-cdk-lib/aws-ecs"; -import { SOURCE_FILES_CSV_KEY_FIELD_NAME } from "./copy-out-state-machine-input"; +import { + DESTINATION_BUCKET_FIELD_NAME, + DESTINATION_PREFIX_KEY_FIELD_NAME, + MAX_ITEMS_PER_BATCH_FIELD_NAME, + SOURCE_FILES_CSV_KEY_FIELD_NAME, +} from "./copy-out-state-machine-input"; type Props = { vpc: IVpc; @@ -61,6 +66,7 @@ export class RcloneMapConstruct extends Construct { this.distributedMap = new S3CsvDistributedMap(this, "RcloneMap", { toleratedFailurePercentage: 25, + batchMaxItemsPath: `$.${MAX_ITEMS_PER_BATCH_FIELD_NAME}`, itemReaderCsvHeaders: [bucketColumnName, keyColumnName], itemReader: { Bucket: props.workingBucket, @@ -81,8 +87,8 @@ export class RcloneMapConstruct extends Construct { batchInput: { "rcloneDestination.$": JsonPath.format( "s3:{}/{}", - JsonPath.stringAt(`$.destinationBucket`), - JsonPath.stringAt("$.destinationPrefixKey"), + JsonPath.stringAt(`$.${DESTINATION_BUCKET_FIELD_NAME}`), + JsonPath.stringAt(`$.${DESTINATION_PREFIX_KEY_FIELD_NAME}`), ), }, iterator: rcloneRunTask, @@ -94,6 +100,10 @@ export class RcloneMapConstruct extends Construct { JsonPath.stringAt(`$.${SOURCE_FILES_CSV_KEY_FIELD_NAME}`), ), }, + resultSelector: { + "manifestAbsoluteKey.$": "$.ResultWriterDetails.Key", + }, + resultPath: `$.rcloneResults`, }); } } diff --git a/packages/aws-copy-out-sharer/src/s3-csv-distributed-map.ts b/packages/aws-copy-out-sharer/src/s3-csv-distributed-map.ts index ec3ec2a..e7dc97e 100644 --- a/packages/aws-copy-out-sharer/src/s3-csv-distributed-map.ts +++ b/packages/aws-copy-out-sharer/src/s3-csv-distributed-map.ts @@ -36,6 +36,8 @@ export interface S3CsvDistributedMapProps { readonly maxConcurrencyPath?: JsonPath | string; readonly resultPath?: JsonPath | string; + readonly resultSelector?: Readonly>; + readonly label?: string; } @@ -102,6 +104,13 @@ export class S3CsvDistributedMap } override toStateJson(): object { + // if any of these are specified we want to put in an ItemBatcher + // block which will tell the DISTRIBUTED map to switch on batching + const useBatching = + !!this.props.batchMaxItemsPath || + !!this.props.batchMaxItems || + !!this.props.batchInput; + const stateJson = { Type: "Map", ToleratedFailurePercentage: this.props.toleratedFailurePercentage, @@ -122,16 +131,13 @@ export class S3CsvDistributedMap Parameters: this.props.itemReader, }, ItemSelector: this.props.itemSelector, - ItemBatcher: - this.props.batchMaxItemsPath || - this.props.batchMaxItems || - this.props.batchInput - ? { - MaxItemsPerBatch: this.props.batchMaxItems, - MaxItemsPerBatchPath: this.props.batchMaxItemsPath, - BatchInput: this.props.batchInput, - } - : undefined, + ItemBatcher: useBatching + ? { + MaxItemsPerBatch: this.props.batchMaxItems, + MaxItemsPerBatchPath: this.props.batchMaxItemsPath, + BatchInput: this.props.batchInput, + } + : undefined, MaxConcurrency: this.props.maxConcurrency, MaxConcurrencyPath: this.props.maxConcurrencyPath, Label: this.props.label, @@ -142,6 +148,7 @@ export class S3CsvDistributedMap } : undefined, ResultPath: this.props.resultPath, + ResultSelector: this.props.resultSelector, }; return { diff --git a/packages/aws-copy-out-sharer/src/summarise-copy-lambda-step-construct.ts b/packages/aws-copy-out-sharer/src/summarise-copy-lambda-step-construct.ts index a182f8f..69103ed 100644 --- a/packages/aws-copy-out-sharer/src/summarise-copy-lambda-step-construct.ts +++ b/packages/aws-copy-out-sharer/src/summarise-copy-lambda-step-construct.ts @@ -93,7 +93,6 @@ export class SummariseCopyLambdaStepConstruct extends Construct { ); this.invocableLambda = new LambdaInvoke(this, `Summarise Copy Results`, { - inputPath: "$.ResultWriterDetails", lambdaFunction: summariseCopyLambda, resultPath: JsonPath.DISCARD, });