From 8a3d9d2914e1dd9f3bbcd22e8edcb9ea5d5daf5d Mon Sep 17 00:00:00 2001 From: andrewpatto Date: Fri, 1 Dec 2023 16:42:20 +1100 Subject: [PATCH] New distributed map construct from internet Working with thawing and copying --- dev/test.ts | 2 +- .../thaw-objects-lambda.ts | 14 +- .../src/can-write-lambda-step-construct.ts | 14 +- .../aws-copy-out-sharer/src/copy-out-stack.ts | 7 - .../src/copy-out-state-machine-construct.ts | 74 +++------ .../src/copy-out-state-machine-input.ts | 11 ++ .../src/rclone-map-construct.ts | 90 +++++++++++ ...struct.ts => rclone-run-task-construct.ts} | 8 +- .../src/s3-csv-distributed-map.ts | 152 ++++++++++++++++++ .../src/thaw-objects-lambda-step-construct.ts | 32 ++-- .../src/thaw-objects-map-construct.ts | 82 ++++++++++ 11 files changed, 377 insertions(+), 109 deletions(-) create mode 100644 packages/aws-copy-out-sharer/src/copy-out-state-machine-input.ts create mode 100644 packages/aws-copy-out-sharer/src/rclone-map-construct.ts rename packages/aws-copy-out-sharer/src/{fargate-run-task-construct.ts => rclone-run-task-construct.ts} (96%) create mode 100644 packages/aws-copy-out-sharer/src/s3-csv-distributed-map.ts create mode 100644 packages/aws-copy-out-sharer/src/thaw-objects-map-construct.ts diff --git a/dev/test.ts b/dev/test.ts index 27b5009..21b6b95 100644 --- a/dev/test.ts +++ b/dev/test.ts @@ -51,7 +51,7 @@ async function makeTestObject( async function createTestData() { const sourceObjects = { - [`${testFolderSrc}/1.bin`]: StorageClass.DEEP_ARCHIVE, + [`${testFolderSrc}/1.bin`]: StorageClass.GLACIER_IR, [`${testFolderSrc}/2.bin`]: StorageClass.STANDARD, [`${testFolderSrc}/3.bin`]: StorageClass.GLACIER, }; diff --git a/packages/aws-copy-out-sharer/lambda/thaw-objects-lambda/thaw-objects-lambda.ts b/packages/aws-copy-out-sharer/lambda/thaw-objects-lambda/thaw-objects-lambda.ts index 3f7be51..a24254b 100644 --- a/packages/aws-copy-out-sharer/lambda/thaw-objects-lambda/thaw-objects-lambda.ts +++ b/packages/aws-copy-out-sharer/lambda/thaw-objects-lambda/thaw-objects-lambda.ts @@ -12,10 +12,6 @@ interface ThawObjectsEvent { }[]; BatchInput: { - // ... - // there are other fields in the batch input that we are not interested in - we pass them on - // ... - glacierFlexibleRetrievalThawDays: number; glacierFlexibleRetrievalThawSpeed: string; @@ -148,13 +144,5 @@ export async function handler(event: ThawObjectsEvent) { `${isThawing}/${event.Items.length} are in the process of thawing`, ); - return { - // Note we are converting the objects to rclone format here - // A better spot would be in the Fargate run task but the JsonPath - // for that would be a nightmare - Items: event.Items.map((a) => ({ - rcloneSource: `s3:${a.bucket}/${a.key}`, - })), - BatchInput: event.BatchInput, - }; + return {}; } diff --git a/packages/aws-copy-out-sharer/src/can-write-lambda-step-construct.ts b/packages/aws-copy-out-sharer/src/can-write-lambda-step-construct.ts index 1236c01..e684ae2 100644 --- a/packages/aws-copy-out-sharer/src/can-write-lambda-step-construct.ts +++ b/packages/aws-copy-out-sharer/src/can-write-lambda-step-construct.ts @@ -3,15 +3,11 @@ import { Effect, PolicyStatement } from "aws-cdk-lib/aws-iam"; import { Duration, Stack } from "aws-cdk-lib"; import { LambdaInvoke } from "aws-cdk-lib/aws-stepfunctions-tasks"; import { Runtime } from "aws-cdk-lib/aws-lambda"; -import { IVpc, SubnetType } from "aws-cdk-lib/aws-ec2"; import { JsonPath } from "aws-cdk-lib/aws-stepfunctions"; import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs"; import { join } from "path"; type CanWriteLambdaStepProps = { - vpc: IVpc; - vpcSubnetSelection: SubnetType; - requiredRegion: string; /** @@ -37,7 +33,6 @@ export class CanWriteLambdaStepConstruct extends Construct { super(scope, id); const canWriteLambda = new NodejsFunction(this, "CanWriteFunction", { - vpc: props.vpc, entry: join( __dirname, "..", @@ -45,18 +40,11 @@ export class CanWriteLambdaStepConstruct extends Construct { "can-write-lambda", "can-write-lambda.ts", ), - // by specifying the precise runtime - the bundler knows exactly what packages are already in - // the base image - and for us can skip bundling @aws-sdk - // if we need to move this forward beyond node 18 - then we may need to revisit this - runtime: Runtime.NODEJS_18_X, + runtime: Runtime.NODEJS_20_X, handler: "handler", bundling: { - externalModules: ["aws-sdk"], minify: false, }, - vpcSubnets: { - subnetType: props.vpcSubnetSelection, - }, // this seems like plenty of seconds to do a few API calls to S3 timeout: Duration.seconds(30), }); diff --git a/packages/aws-copy-out-sharer/src/copy-out-stack.ts b/packages/aws-copy-out-sharer/src/copy-out-stack.ts index 9068949..7d177d4 100644 --- a/packages/aws-copy-out-sharer/src/copy-out-stack.ts +++ b/packages/aws-copy-out-sharer/src/copy-out-stack.ts @@ -1,7 +1,6 @@ import { Stack } from "aws-cdk-lib"; import { Construct } from "constructs"; import { CopyOutStackProps } from "./copy-out-stack-props"; -import { Cluster } from "aws-cdk-lib/aws-ecs"; import { CopyOutStateMachineConstruct } from "./copy-out-state-machine-construct"; import { Service } from "aws-cdk-lib/aws-servicediscovery"; import { InfrastructureClient } from "@elsa-data/aws-infrastructure"; @@ -20,11 +19,6 @@ export class CopyOutStack extends Stack { const namespace = infraClient.getNamespaceFromLookup(this); - const cluster = new Cluster(this, "FargateCluster", { - vpc: vpc, - enableFargateCapacityProviders: true, - }); - const service = new Service(this, "Service", { namespace: namespace, name: "CopyOut", @@ -34,7 +28,6 @@ export class CopyOutStack extends Stack { const copyOut = new CopyOutStateMachineConstruct(this, "CopyOut", { vpc: vpc, vpcSubnetSelection: props.infrastructureSubnetSelection, - fargateCluster: cluster, aggressiveTimes: props.isDevelopment, allowWriteToThisAccount: props.isDevelopment, }); diff --git a/packages/aws-copy-out-sharer/src/copy-out-state-machine-construct.ts b/packages/aws-copy-out-sharer/src/copy-out-state-machine-construct.ts index 5410263..cd3bb7d 100644 --- a/packages/aws-copy-out-sharer/src/copy-out-state-machine-construct.ts +++ b/packages/aws-copy-out-sharer/src/copy-out-state-machine-construct.ts @@ -10,20 +10,17 @@ import { WaitTime, } from "aws-cdk-lib/aws-stepfunctions"; import { Arn, ArnFormat, Duration, Stack } from "aws-cdk-lib"; -import { ICluster } from "aws-cdk-lib/aws-ecs"; import { CanWriteLambdaStepConstruct } from "./can-write-lambda-step-construct"; import { IVpc, SubnetType } from "aws-cdk-lib/aws-ec2"; -import { DistributedMapStepConstruct } from "./distributed-map-step-construct"; -import { FargateRunTaskConstruct } from "./fargate-run-task-construct"; -import { ThawObjectsLambdaStepConstruct } from "./thaw-objects-lambda-step-construct"; +import { ThawObjectsMapConstruct } from "./thaw-objects-map-construct"; +import { CopyOutStateMachineInput } from "./copy-out-state-machine-input"; +import { RcloneMapConstruct } from "./rclone-map-construct"; export type CopyOutStateMachineProps = { vpc: IVpc; vpcSubnetSelection: SubnetType; - fargateCluster: ICluster; - /** * Whether the stack should use duration/timeouts that are more suited * to demonstration/development. i.e. minutes rather than hours for wait times, @@ -47,56 +44,16 @@ export class CopyOutStateMachineConstruct extends Construct { this, "CanWrite", { - vpc: props.vpc, - vpcSubnetSelection: props.vpcSubnetSelection, requiredRegion: Stack.of(this).region, allowWriteToThisAccount: props.allowWriteToThisAccount, }, ); - const thawObjectsLambdaStep = new ThawObjectsLambdaStepConstruct( - this, - "ThawObjects", - { - vpc: props.vpc, - vpcSubnetSelection: props.vpcSubnetSelection, - }, - ); - - thawObjectsLambdaStep.invocableLambda.addRetry({ - errors: ["IsThawingError"], - interval: Duration.minutes(1), - backoffRate: 1, - maxAttempts: 15, - }); - - const rcloneRunTask = new FargateRunTaskConstruct( - this, - "RcloneFargateTask", - { - fargateCluster: props.fargateCluster, - vpcSubnetSelection: props.vpcSubnetSelection, - }, - ).ecsRunTask; - - // our task is an idempotent copy operation so we can retry if we happen to get killed - // (possible given we are using Spot fargate) - rcloneRunTask.addRetry({ - errors: ["States.TaskFailed"], - maxAttempts: 3, + const rcloneMap = new RcloneMapConstruct(this, "RcloneMap", { + vpc: props.vpc, + vpcSubnetSelection: props.vpcSubnetSelection, }); - const distributedStepsChain = - thawObjectsLambdaStep.invocableLambda.next(rcloneRunTask); - - const distributedMapStep = new DistributedMapStepConstruct( - this, - "MapStep", - { - task: distributedStepsChain, //rcloneRunTask, - }, - ).distributedMapStep; - const canWriteStep = canWriteLambdaStep.invocableLambda; const waitStep = new Wait(this, "Wait X Minutes", { @@ -105,12 +62,14 @@ export class CopyOutStateMachineConstruct extends Construct { ), }); + const defaults: Partial = { + maxItemsPerBatch: 1, + requiredRegion: Stack.of(this).region, + destinationKey: "", + }; + const defineDefaults = new Pass(this, "Define Defaults", { - parameters: { - maxItemsPerBatch: 1, - requiredRegion: Stack.of(this).region, - destinationKey: "", - }, + parameters: defaults, resultPath: "$.inputDefaults", }); @@ -134,11 +93,14 @@ export class CopyOutStateMachineConstruct extends Construct { canWriteStep.addCatch(fail, { errors: ["WrongRegionError"] }); + const thawObjectsMap = new ThawObjectsMapConstruct(this, "ThawObjects", {}); + const definition = ChainDefinitionBody.fromChainable( defineDefaults .next(applyDefaults) .next(canWriteStep) - .next(distributedMapStep) + .next(thawObjectsMap.distributedMap) + .next(rcloneMap.distributedMap) .next(success), ); @@ -151,6 +113,8 @@ export class CopyOutStateMachineConstruct extends Construct { definitionBody: definition, }); + thawObjectsMap.distributedMap.grantNestedPermissions(this._stateMachine); + // this is needed to support distributed map - once there is a native CDK for this I presume this goes this._stateMachine.addToRolePolicy( new PolicyStatement({ diff --git a/packages/aws-copy-out-sharer/src/copy-out-state-machine-input.ts b/packages/aws-copy-out-sharer/src/copy-out-state-machine-input.ts new file mode 100644 index 0000000..482c362 --- /dev/null +++ b/packages/aws-copy-out-sharer/src/copy-out-state-machine-input.ts @@ -0,0 +1,11 @@ +export type CopyOutStateMachineInput = { + maxItemsPerBatch: number; + requiredRegion: string; + + sourceFilesCsvBucket: string; + sourceFilesCsvKey: string; + + destinationKey: string; +}; + +export type CopyOutStateMachineInputKeys = keyof CopyOutStateMachineInput; diff --git a/packages/aws-copy-out-sharer/src/rclone-map-construct.ts b/packages/aws-copy-out-sharer/src/rclone-map-construct.ts new file mode 100644 index 0000000..e7a658b --- /dev/null +++ b/packages/aws-copy-out-sharer/src/rclone-map-construct.ts @@ -0,0 +1,90 @@ +import { Construct } from "constructs"; +import { JsonPath } from "aws-cdk-lib/aws-stepfunctions"; +import { S3CsvDistributedMap } from "./s3-csv-distributed-map"; +import { RcloneRunTaskConstruct } from "./rclone-run-task-construct"; +import { IVpc, SubnetType } from "aws-cdk-lib/aws-ec2"; +import { Cluster } from "aws-cdk-lib/aws-ecs"; + +type Props = { + vpc: IVpc; + vpcSubnetSelection: SubnetType; +}; + +export class RcloneMapConstruct extends Construct { + public readonly distributedMap: S3CsvDistributedMap; + + constructor(scope: Construct, id: string, props: Props) { + super(scope, id); + + const cluster = new Cluster(this, "FargateCluster", { + vpc: props.vpc, + enableFargateCapacityProviders: true, + }); + + const rcloneRunTask = new RcloneRunTaskConstruct( + this, + "RcloneFargateTask", + { + fargateCluster: cluster, + vpcSubnetSelection: props.vpcSubnetSelection, + }, + ).ecsRunTask; + + // our task is an idempotent copy operation so we can retry if we happen to get killed + // (possible given we are using Spot fargate) + rcloneRunTask.addRetry({ + errors: ["States.TaskFailed"], + maxAttempts: 3, + }); + + // these names are internal only - but we pull out as a const to make sure + // they are consistent + const bucketColumnName = "b"; + const keyColumnName = "k"; + + // { + // "BatchInput": { + // "rcloneDestination": "s3:cpg-cardiac-flagship-transfer/optionalpath" + // }, + // "Items": [ + // { + // "rcloneSource": "s3:bucket/1.fastq.gz" + // }, + // { + // "rcloneSource": "s3:bucket/2.fastq.gz" + // }, + // } + + this.distributedMap = new S3CsvDistributedMap(this, "RcloneMap", { + toleratedFailurePercentage: 25, + itemReaderCsvHeaders: [bucketColumnName, keyColumnName], + itemReader: { + "Bucket.$": "$.sourceFilesCsvBucket", + "Key.$": "$.sourceFilesCsvKey", + }, + itemSelector: { + "rcloneSource.$": JsonPath.format( + // note: this is not an s3:// URL, it is the peculiar syntax used by rclone + "s3:{}/{}", + JsonPath.stringAt(`$$.Map.Item.Value.${bucketColumnName}`), + JsonPath.stringAt(`$$.Map.Item.Value.${keyColumnName}`), + ), + }, + batchInput: { + "rcloneDestination.$": JsonPath.format( + "s3:{}/{}", + JsonPath.stringAt(`$.destinationBucket`), + JsonPath.stringAt("$.destinationKey"), + ), + }, + iterator: rcloneRunTask, + resultWriter: { + "Bucket.$": "$.sourceFilesCsvBucket", + "Prefix.$": JsonPath.format( + "{}-results", + JsonPath.stringAt("$.sourceFilesCsvKey"), + ), + }, + }); + } +} diff --git a/packages/aws-copy-out-sharer/src/fargate-run-task-construct.ts b/packages/aws-copy-out-sharer/src/rclone-run-task-construct.ts similarity index 96% rename from packages/aws-copy-out-sharer/src/fargate-run-task-construct.ts rename to packages/aws-copy-out-sharer/src/rclone-run-task-construct.ts index 4d71376..3c0f3a6 100644 --- a/packages/aws-copy-out-sharer/src/fargate-run-task-construct.ts +++ b/packages/aws-copy-out-sharer/src/rclone-run-task-construct.ts @@ -37,7 +37,7 @@ type Props = { //allowWriteToThisAccount?: boolean; WIP NEED TO IMPLEMENT }; -export class FargateRunTaskConstruct extends Construct { +export class RcloneRunTaskConstruct extends Construct { public readonly ecsRunTask: EcsRunTask; constructor(scope: Construct, id: string, props: Props) { @@ -88,6 +88,8 @@ export class FargateRunTaskConstruct extends Construct { streamPrefix: "elsa-data-copy-out", logRetention: RetentionDays.ONE_WEEK, }), + // eg the equivalent of + // RCLONE_CONFIG_S3_TYPE=s3 RCLONE_CONFIG_S3_PROVIDER=AWS RCLONE_CONFIG_S3_ENV_AUTH=true RCLONE_CONFIG_S3_REGION=ap-southeast-2 rclone copy src dest environment: { RCLONE_CONFIG_S3_TYPE: "s3", RCLONE_CONFIG_S3_PROVIDER: "AWS", @@ -98,12 +100,10 @@ export class FargateRunTaskConstruct extends Construct { }, }); - // RCLONE_CONFIG_S3_TYPE=s3 RCLONE_CONFIG_S3_PROVIDER=AWS RCLONE_CONFIG_S3_ENV_AUTH=true RCLONE_CONFIG_S3_REGION=ap-southeast-2 rclone copy src dest - // https://github.com/aws/aws-cdk/issues/20013 this.ecsRunTask = new EcsRunTask(this, "Copy File with Rclone", { + // we use task tokens as we want to return rclone stats/results integrationPattern: IntegrationPattern.WAIT_FOR_TASK_TOKEN, - // .RUN_JOB, cluster: props.fargateCluster, taskDefinition: taskDefinition, launchTarget: new EcsFargateSpotOnlyLaunchTarget({ diff --git a/packages/aws-copy-out-sharer/src/s3-csv-distributed-map.ts b/packages/aws-copy-out-sharer/src/s3-csv-distributed-map.ts new file mode 100644 index 0000000..ec3ec2a --- /dev/null +++ b/packages/aws-copy-out-sharer/src/s3-csv-distributed-map.ts @@ -0,0 +1,152 @@ +import { + Chain, + IChainable, + INextable, + JsonPath, + State, + StateGraph, + StateMachine, +} from "aws-cdk-lib/aws-stepfunctions"; +import { Effect, Policy, PolicyStatement } from "aws-cdk-lib/aws-iam"; +import { Construct } from "constructs"; + +export interface S3CsvDistributedMapProps { + readonly iterator: State; + readonly toleratedFailurePercentage: number; + readonly itemReaderCsvHeaders: string[]; + readonly itemReader: { + readonly Bucket?: JsonPath | string; + readonly "Bucket.$"?: string; + readonly Key?: JsonPath | string; + readonly "Key.$"?: string; + }; + readonly resultWriter?: { + readonly Bucket?: JsonPath | string; + readonly "Bucket.$"?: string; + readonly Prefix?: JsonPath | string; + readonly "Prefix.$"?: string; + }; + readonly itemSelector?: Readonly>; + + readonly batchMaxItems?: number; + readonly batchMaxItemsPath?: JsonPath | string; + readonly batchInput?: Readonly>; + + readonly maxConcurrency?: number; + readonly maxConcurrencyPath?: JsonPath | string; + + readonly resultPath?: JsonPath | string; + readonly label?: string; +} + +export class S3CsvDistributedMap + extends State + implements IChainable, INextable +{ + public readonly endStates: INextable[]; + private readonly props: S3CsvDistributedMapProps; + + private readonly graph: StateGraph; + private readonly policy: Policy; + + constructor(scope: Construct, id: string, props: S3CsvDistributedMapProps) { + super(scope, id, {}); + this.props = props; + + if (props.batchMaxItems && props.batchMaxItemsPath) + throw Error("Only one of batchMaxItems or batchMaxItemsPath can be set"); + + if (props.maxConcurrency && props.maxConcurrencyPath) + throw Error( + "Only one of maxConcurrency or maxConcurrencyPath can be set", + ); + + this.graph = new StateGraph(props.iterator, `Map ${this.stateId} Iterator`); + this.policy = new Policy(this, "IamRole"); + + this.endStates = [this]; + } + + grantNestedPermissions(stateMachine: StateMachine) { + // this grants the autogenerated permissions in the distributed maps substates to the state machine + // (e.g) dynamodb, lambda invoke, etc + this.graph.policyStatements.forEach((s) => stateMachine.addToRolePolicy(s)); + + // this grants the permissions to the state machine to start, stop, and describe the map + // NB: we can't add the statement directly to the state machine or it creates a circular + // reference which the CDK objects too. By using a policy we get around this limitation. + + this.policy.addStatements( + new PolicyStatement({ + effect: Effect.ALLOW, + actions: [ + "states:StartExecution", + "states:DescribeExecution", + "states:StopExecution", + ], + resources: [stateMachine.stateMachineArn], + }), + ); + + this.policy.attachToRole(stateMachine.role); + } + + protected makeNext(next: State) { + super.makeNext(next); + next.bindToGraph(this.graph); + } + + public next(next: IChainable): Chain { + super.makeNext(next.startState); + return Chain.sequence(this, next); + } + + override toStateJson(): object { + const stateJson = { + Type: "Map", + ToleratedFailurePercentage: this.props.toleratedFailurePercentage, + ItemProcessor: { + ProcessorConfig: { + Mode: "DISTRIBUTED", + ExecutionType: "STANDARD", + }, + ...this.graph.toGraphJson(), + }, + ItemReader: { + Resource: "arn:aws:states:::s3:getObject", + ReaderConfig: { + InputType: "CSV", + CSVHeaderLocation: "GIVEN", + CSVHeaders: this.props.itemReaderCsvHeaders, + }, + Parameters: this.props.itemReader, + }, + ItemSelector: this.props.itemSelector, + ItemBatcher: + this.props.batchMaxItemsPath || + this.props.batchMaxItems || + this.props.batchInput + ? { + MaxItemsPerBatch: this.props.batchMaxItems, + MaxItemsPerBatchPath: this.props.batchMaxItemsPath, + BatchInput: this.props.batchInput, + } + : undefined, + MaxConcurrency: this.props.maxConcurrency, + MaxConcurrencyPath: this.props.maxConcurrencyPath, + Label: this.props.label, + ResultWriter: this.props.resultWriter + ? { + Resource: "arn:aws:states:::s3:putObject", + Parameters: this.props.resultWriter, + } + : undefined, + ResultPath: this.props.resultPath, + }; + + return { + ...this.renderNextEnd(), + ...stateJson, + }; + } +} diff --git a/packages/aws-copy-out-sharer/src/thaw-objects-lambda-step-construct.ts b/packages/aws-copy-out-sharer/src/thaw-objects-lambda-step-construct.ts index d84d1ec..4b08464 100644 --- a/packages/aws-copy-out-sharer/src/thaw-objects-lambda-step-construct.ts +++ b/packages/aws-copy-out-sharer/src/thaw-objects-lambda-step-construct.ts @@ -3,29 +3,29 @@ import { Effect, PolicyStatement } from "aws-cdk-lib/aws-iam"; import { Duration } from "aws-cdk-lib"; import { LambdaInvoke } from "aws-cdk-lib/aws-stepfunctions-tasks"; import { Runtime } from "aws-cdk-lib/aws-lambda"; -import { IVpc, SubnetType } from "aws-cdk-lib/aws-ec2"; import { JsonPath } from "aws-cdk-lib/aws-stepfunctions"; import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs"; import { join } from "path"; -type ThawObjectsLambdaStepProps = { - vpc: IVpc; - vpcSubnetSelection: SubnetType; -}; +type Props = {}; /** - * A construct for a Steps function that tests whether an S3 - * bucket exists, is in the correct region, and is writeable - * by us. Throws an exception if any of these conditions is not met. + * A construct for a Steps function that tests if a set + * of input files are available for copying (i.e. in active + * storage) and if not, triggers a restore on them. Whenever + * any input file is not available - this lambda throws a + * IsThawingError (at the end of processing all of them). + * + * It can then be used in a loop waiting + * for thawing to finish - by Retry/Catching this error. */ export class ThawObjectsLambdaStepConstruct extends Construct { public readonly invocableLambda; - constructor(scope: Construct, id: string, props: ThawObjectsLambdaStepProps) { + constructor(scope: Construct, id: string, _props: Props) { super(scope, id); const thawObjectsLambda = new NodejsFunction(this, "ThawObjectsFunction", { - vpc: props.vpc, entry: join( __dirname, "..", @@ -36,19 +36,19 @@ export class ThawObjectsLambdaStepConstruct extends Construct { runtime: Runtime.NODEJS_20_X, handler: "handler", bundling: { + // for a small method it is sometimes easier if it can be viewed + // in the AWS console un-minified minify: false, }, - vpcSubnets: { - subnetType: props.vpcSubnetSelection, - }, - // this seems like plenty of seconds to do a few API calls to S3 - timeout: Duration.seconds(300), + // we can theoretically need to loop through 1000s of objects + // so we give ourselves plenty of time + timeout: Duration.seconds(60 * 5), }); thawObjectsLambda.addToRolePolicy( new PolicyStatement({ effect: Effect.ALLOW, - actions: ["s3:*"], + actions: ["s3:GetObject", "s3:RestoreObject"], resources: ["*"], }), ); diff --git a/packages/aws-copy-out-sharer/src/thaw-objects-map-construct.ts b/packages/aws-copy-out-sharer/src/thaw-objects-map-construct.ts new file mode 100644 index 0000000..c5d305a --- /dev/null +++ b/packages/aws-copy-out-sharer/src/thaw-objects-map-construct.ts @@ -0,0 +1,82 @@ +import { Construct } from "constructs"; +import { JsonPath } from "aws-cdk-lib/aws-stepfunctions"; +import { S3CsvDistributedMap } from "./s3-csv-distributed-map"; +import { ThawObjectsLambdaStepConstruct } from "./thaw-objects-lambda-step-construct"; +import { Duration } from "aws-cdk-lib"; +import { CopyOutStateMachineInputKeys } from "./copy-out-state-machine-input"; + +type Props = {}; + +export class ThawObjectsMapConstruct extends Construct { + public readonly distributedMap: S3CsvDistributedMap; + + constructor(scope: Construct, id: string, _props: Props) { + super(scope, id); + + const thawObjectsLambdaStep = new ThawObjectsLambdaStepConstruct( + this, + "LambdaStep", + {}, + ); + + thawObjectsLambdaStep.invocableLambda.addRetry({ + errors: ["IsThawingError"], + interval: Duration.minutes(1), + backoffRate: 1, + maxAttempts: 15, + }); + + // these names are internal only - but we pull out as a const to make sure + // they are consistent + const bucketColumnName = "b"; + const keyColumnName = "k"; + + // this odd construct just makes sure that the JSON paths we specify + // here correspond with fields in the master "input" schema for the + // overall Steps function + const bucketKeyName: CopyOutStateMachineInputKeys = "sourceFilesCsvBucket"; + const keyKeyName: CopyOutStateMachineInputKeys = "sourceFilesCsvKey"; + + this.distributedMap = new S3CsvDistributedMap(this, "ThawObjectsMap", { + // we do not expect any failures of these functions and if we + // do - we are fully prepared for us to move onto the rclone + // steps where we will get proper error messages if the copies fail + toleratedFailurePercentage: 100, + itemReaderCsvHeaders: [bucketColumnName, keyColumnName], + itemReader: { + "Bucket.$": `$.${bucketKeyName}`, + "Key.$": `$.${keyKeyName}`, + }, + itemSelector: { + "bucket.$": JsonPath.stringAt(`$$.Map.Item.Value.${bucketColumnName}`), + "key.$": JsonPath.stringAt(`$$.Map.Item.Value.${keyColumnName}`), + }, + batchInput: { + glacierFlexibleRetrievalThawDays: 1, + glacierFlexibleRetrievalThawSpeed: "Expedited", + glacierDeepArchiveThawDays: 1, + glacierDeepArchiveThawSpeed: "Standard", + intelligentTieringArchiveThawDays: 1, + intelligentTieringArchiveThawSpeed: "Standard", + intelligentTieringDeepArchiveThawDays: 1, + intelligentTieringDeepArchiveThawSpeed: "Standard", + }, + iterator: thawObjectsLambdaStep.invocableLambda, + resultPath: JsonPath.DISCARD, + }); + } +} + +/* +{\"BatchInput\":{\"rcloneDestination\":" + +"\"s3:elsa-data-tmp/8e467a3e27e1e0b73fcd15b5c419e53c-dest\"}," + +"\"Items\":[" + +"{\"rcloneSource\":\"s3:elsa-data-tmp/8e467a3e27e1e0b73fcd15b5c419e53c-src/1.bin\"}," + +"{\"rcloneSource\":\"s3:elsa-data-tmp/8e467a3e27e1e0b73fcd15b5c419e53c-src/2.bin\"}," + +"{\"rcloneSource\":\"s3:elsa-data-tmp/8e467a3e27e1e0b73fcd15b5c419e53c-src/3.bin\"}]," + +"\"rcloneResult\":[" + +"{\"bytes\":0,\"checks\":0,\"deletedDirs\":0,\"deletes\":0,\"elapsedTime\":0.355840879,\"errors\":0,\"eta\":null,\"fatalError\":false,\"renames\":0,\"retryError\":false,\"serverSideCopies\":1,\"serverSideCopyBytes\":9,\"serverSideMoveBytes\":0,\"serverSideMoves\":0,\"source\":\"s3:elsa-data-tmp/8e467a3e27e1e0b73fcd15b5c419e53c-src/1.bin\",\"speed\":0,\"totalBytes\":0,\"totalChecks\":0,\"totalTransfers\":1,\"transferTime\":0.057049542,\"transfers\":1}," + +"{\"bytes\":0,\"checks\":0,\"deletedDirs\":0,\"deletes\":0,\"elapsedTime\":0.368602993,\"errors\":0,\"eta\":null,\"fatalError\":false,\"renames\":0,\"retryError\":false,\"serverSideCopies\":1,\"serverSideCopyBytes\":9,\"serverSideMoveBytes\":0,\"serverSideMoves\":0,\"source\":\"s3:elsa-data-tmp/8e467a3e27e1e0b73fcd15b5c419e53c-src/2.bin\",\"speed\":0,\"totalBytes\":0,\"totalChecks\":0,\"totalTransfers\":1,\"transferTime\":0.044628996,\"transfers\":1}," + +"{\"bytes\":0,\"checks\":0,\"deletedDirs\":0,\"deletes\":0,\"elapsedTime\":0.385300283,\"errors\":0,\"eta\":null,\"fatalError\":false,\"renames\":0,\"retryError\":false,\"serverSideCopies\":1,\"serverSideCopyBytes\":9,\"serverSideMoveBytes\":0,\"serverSideMoves\":0,\"source\":\"s3:elsa-data-tmp/8e467a3e27e1e0b73fcd15b5c419e53c-src/3.bin\",\"speed\":0,\"totalBytes\":0,\"totalChecks\":0,\"totalTransfers\":1,\"transferTime\":0.05217003,\"transfers\":1}]} + + */