Skip to content

Commit

Permalink
New distributed map construct from internet
Browse files Browse the repository at this point in the history
Working with thawing and copying
  • Loading branch information
andrewpatto committed Dec 1, 2023
1 parent 35af4c2 commit 8a3d9d2
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 109 deletions.
2 changes: 1 addition & 1 deletion dev/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async function makeTestObject(

async function createTestData() {
const sourceObjects = {
[`${testFolderSrc}/1.bin`]: StorageClass.DEEP_ARCHIVE,
[`${testFolderSrc}/1.bin`]: StorageClass.GLACIER_IR,
[`${testFolderSrc}/2.bin`]: StorageClass.STANDARD,
[`${testFolderSrc}/3.bin`]: StorageClass.GLACIER,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ interface ThawObjectsEvent {
}[];

BatchInput: {
// ...
// there are other fields in the batch input that we are not interested in - we pass them on
// ...

glacierFlexibleRetrievalThawDays: number;
glacierFlexibleRetrievalThawSpeed: string;

Expand Down Expand Up @@ -148,13 +144,5 @@ export async function handler(event: ThawObjectsEvent) {
`${isThawing}/${event.Items.length} are in the process of thawing`,
);

return {
// Note we are converting the objects to rclone format here
// A better spot would be in the Fargate run task but the JsonPath
// for that would be a nightmare
Items: event.Items.map((a) => ({
rcloneSource: `s3:${a.bucket}/${a.key}`,
})),
BatchInput: event.BatchInput,
};
return {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ import { Effect, PolicyStatement } from "aws-cdk-lib/aws-iam";
import { Duration, Stack } from "aws-cdk-lib";
import { LambdaInvoke } from "aws-cdk-lib/aws-stepfunctions-tasks";
import { Runtime } from "aws-cdk-lib/aws-lambda";
import { IVpc, SubnetType } from "aws-cdk-lib/aws-ec2";
import { JsonPath } from "aws-cdk-lib/aws-stepfunctions";
import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs";
import { join } from "path";

type CanWriteLambdaStepProps = {
vpc: IVpc;
vpcSubnetSelection: SubnetType;

requiredRegion: string;

/**
Expand All @@ -37,26 +33,18 @@ export class CanWriteLambdaStepConstruct extends Construct {
super(scope, id);

const canWriteLambda = new NodejsFunction(this, "CanWriteFunction", {
vpc: props.vpc,
entry: join(
__dirname,
"..",
"lambda",
"can-write-lambda",
"can-write-lambda.ts",
),
// by specifying the precise runtime - the bundler knows exactly what packages are already in
// the base image - and for us can skip bundling @aws-sdk
// if we need to move this forward beyond node 18 - then we may need to revisit this
runtime: Runtime.NODEJS_18_X,
runtime: Runtime.NODEJS_20_X,
handler: "handler",
bundling: {
externalModules: ["aws-sdk"],
minify: false,
},
vpcSubnets: {
subnetType: props.vpcSubnetSelection,
},
// this seems like plenty of seconds to do a few API calls to S3
timeout: Duration.seconds(30),
});
Expand Down
7 changes: 0 additions & 7 deletions packages/aws-copy-out-sharer/src/copy-out-stack.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Stack } from "aws-cdk-lib";
import { Construct } from "constructs";
import { CopyOutStackProps } from "./copy-out-stack-props";
import { Cluster } from "aws-cdk-lib/aws-ecs";
import { CopyOutStateMachineConstruct } from "./copy-out-state-machine-construct";
import { Service } from "aws-cdk-lib/aws-servicediscovery";
import { InfrastructureClient } from "@elsa-data/aws-infrastructure";
Expand All @@ -20,11 +19,6 @@ export class CopyOutStack extends Stack {

const namespace = infraClient.getNamespaceFromLookup(this);

const cluster = new Cluster(this, "FargateCluster", {
vpc: vpc,
enableFargateCapacityProviders: true,
});

const service = new Service(this, "Service", {
namespace: namespace,
name: "CopyOut",
Expand All @@ -34,7 +28,6 @@ export class CopyOutStack extends Stack {
const copyOut = new CopyOutStateMachineConstruct(this, "CopyOut", {
vpc: vpc,
vpcSubnetSelection: props.infrastructureSubnetSelection,
fargateCluster: cluster,
aggressiveTimes: props.isDevelopment,
allowWriteToThisAccount: props.isDevelopment,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,17 @@ import {
WaitTime,
} from "aws-cdk-lib/aws-stepfunctions";
import { Arn, ArnFormat, Duration, Stack } from "aws-cdk-lib";
import { ICluster } from "aws-cdk-lib/aws-ecs";
import { CanWriteLambdaStepConstruct } from "./can-write-lambda-step-construct";
import { IVpc, SubnetType } from "aws-cdk-lib/aws-ec2";
import { DistributedMapStepConstruct } from "./distributed-map-step-construct";
import { FargateRunTaskConstruct } from "./fargate-run-task-construct";
import { ThawObjectsLambdaStepConstruct } from "./thaw-objects-lambda-step-construct";
import { ThawObjectsMapConstruct } from "./thaw-objects-map-construct";
import { CopyOutStateMachineInput } from "./copy-out-state-machine-input";
import { RcloneMapConstruct } from "./rclone-map-construct";

export type CopyOutStateMachineProps = {
vpc: IVpc;

vpcSubnetSelection: SubnetType;

fargateCluster: ICluster;

/**
* Whether the stack should use duration/timeouts that are more suited
* to demonstration/development. i.e. minutes rather than hours for wait times,
Expand All @@ -47,56 +44,16 @@ export class CopyOutStateMachineConstruct extends Construct {
this,
"CanWrite",
{
vpc: props.vpc,
vpcSubnetSelection: props.vpcSubnetSelection,
requiredRegion: Stack.of(this).region,
allowWriteToThisAccount: props.allowWriteToThisAccount,
},
);

const thawObjectsLambdaStep = new ThawObjectsLambdaStepConstruct(
this,
"ThawObjects",
{
vpc: props.vpc,
vpcSubnetSelection: props.vpcSubnetSelection,
},
);

thawObjectsLambdaStep.invocableLambda.addRetry({
errors: ["IsThawingError"],
interval: Duration.minutes(1),
backoffRate: 1,
maxAttempts: 15,
});

const rcloneRunTask = new FargateRunTaskConstruct(
this,
"RcloneFargateTask",
{
fargateCluster: props.fargateCluster,
vpcSubnetSelection: props.vpcSubnetSelection,
},
).ecsRunTask;

// our task is an idempotent copy operation so we can retry if we happen to get killed
// (possible given we are using Spot fargate)
rcloneRunTask.addRetry({
errors: ["States.TaskFailed"],
maxAttempts: 3,
const rcloneMap = new RcloneMapConstruct(this, "RcloneMap", {
vpc: props.vpc,
vpcSubnetSelection: props.vpcSubnetSelection,
});

const distributedStepsChain =
thawObjectsLambdaStep.invocableLambda.next(rcloneRunTask);

const distributedMapStep = new DistributedMapStepConstruct(
this,
"MapStep",
{
task: distributedStepsChain, //rcloneRunTask,
},
).distributedMapStep;

const canWriteStep = canWriteLambdaStep.invocableLambda;

const waitStep = new Wait(this, "Wait X Minutes", {
Expand All @@ -105,12 +62,14 @@ export class CopyOutStateMachineConstruct extends Construct {
),
});

const defaults: Partial<CopyOutStateMachineInput> = {
maxItemsPerBatch: 1,
requiredRegion: Stack.of(this).region,
destinationKey: "",
};

const defineDefaults = new Pass(this, "Define Defaults", {
parameters: {
maxItemsPerBatch: 1,
requiredRegion: Stack.of(this).region,
destinationKey: "",
},
parameters: defaults,
resultPath: "$.inputDefaults",
});

Expand All @@ -134,11 +93,14 @@ export class CopyOutStateMachineConstruct extends Construct {

canWriteStep.addCatch(fail, { errors: ["WrongRegionError"] });

const thawObjectsMap = new ThawObjectsMapConstruct(this, "ThawObjects", {});

const definition = ChainDefinitionBody.fromChainable(
defineDefaults
.next(applyDefaults)
.next(canWriteStep)
.next(distributedMapStep)
.next(thawObjectsMap.distributedMap)
.next(rcloneMap.distributedMap)
.next(success),
);

Expand All @@ -151,6 +113,8 @@ export class CopyOutStateMachineConstruct extends Construct {
definitionBody: definition,
});

thawObjectsMap.distributedMap.grantNestedPermissions(this._stateMachine);

// this is needed to support distributed map - once there is a native CDK for this I presume this goes
this._stateMachine.addToRolePolicy(
new PolicyStatement({
Expand Down
11 changes: 11 additions & 0 deletions packages/aws-copy-out-sharer/src/copy-out-state-machine-input.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export type CopyOutStateMachineInput = {
maxItemsPerBatch: number;
requiredRegion: string;

sourceFilesCsvBucket: string;
sourceFilesCsvKey: string;

destinationKey: string;
};

export type CopyOutStateMachineInputKeys = keyof CopyOutStateMachineInput;
90 changes: 90 additions & 0 deletions packages/aws-copy-out-sharer/src/rclone-map-construct.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { Construct } from "constructs";
import { JsonPath } from "aws-cdk-lib/aws-stepfunctions";
import { S3CsvDistributedMap } from "./s3-csv-distributed-map";
import { RcloneRunTaskConstruct } from "./rclone-run-task-construct";
import { IVpc, SubnetType } from "aws-cdk-lib/aws-ec2";
import { Cluster } from "aws-cdk-lib/aws-ecs";

type Props = {
vpc: IVpc;
vpcSubnetSelection: SubnetType;
};

export class RcloneMapConstruct extends Construct {
public readonly distributedMap: S3CsvDistributedMap;

constructor(scope: Construct, id: string, props: Props) {
super(scope, id);

const cluster = new Cluster(this, "FargateCluster", {
vpc: props.vpc,
enableFargateCapacityProviders: true,
});

const rcloneRunTask = new RcloneRunTaskConstruct(
this,
"RcloneFargateTask",
{
fargateCluster: cluster,
vpcSubnetSelection: props.vpcSubnetSelection,
},
).ecsRunTask;

// our task is an idempotent copy operation so we can retry if we happen to get killed
// (possible given we are using Spot fargate)
rcloneRunTask.addRetry({
errors: ["States.TaskFailed"],
maxAttempts: 3,
});

// these names are internal only - but we pull out as a const to make sure
// they are consistent
const bucketColumnName = "b";
const keyColumnName = "k";

// {
// "BatchInput": {
// "rcloneDestination": "s3:cpg-cardiac-flagship-transfer/optionalpath"
// },
// "Items": [
// {
// "rcloneSource": "s3:bucket/1.fastq.gz"
// },
// {
// "rcloneSource": "s3:bucket/2.fastq.gz"
// },
// }

this.distributedMap = new S3CsvDistributedMap(this, "RcloneMap", {
toleratedFailurePercentage: 25,
itemReaderCsvHeaders: [bucketColumnName, keyColumnName],
itemReader: {
"Bucket.$": "$.sourceFilesCsvBucket",
"Key.$": "$.sourceFilesCsvKey",
},
itemSelector: {
"rcloneSource.$": JsonPath.format(
// note: this is not an s3:// URL, it is the peculiar syntax used by rclone
"s3:{}/{}",
JsonPath.stringAt(`$$.Map.Item.Value.${bucketColumnName}`),
JsonPath.stringAt(`$$.Map.Item.Value.${keyColumnName}`),
),
},
batchInput: {
"rcloneDestination.$": JsonPath.format(
"s3:{}/{}",
JsonPath.stringAt(`$.destinationBucket`),
JsonPath.stringAt("$.destinationKey"),
),
},
iterator: rcloneRunTask,
resultWriter: {
"Bucket.$": "$.sourceFilesCsvBucket",
"Prefix.$": JsonPath.format(
"{}-results",
JsonPath.stringAt("$.sourceFilesCsvKey"),
),
},
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Props = {
//allowWriteToThisAccount?: boolean; WIP NEED TO IMPLEMENT
};

export class FargateRunTaskConstruct extends Construct {
export class RcloneRunTaskConstruct extends Construct {
public readonly ecsRunTask: EcsRunTask;

constructor(scope: Construct, id: string, props: Props) {
Expand Down Expand Up @@ -88,6 +88,8 @@ export class FargateRunTaskConstruct extends Construct {
streamPrefix: "elsa-data-copy-out",
logRetention: RetentionDays.ONE_WEEK,
}),
// eg the equivalent of
// RCLONE_CONFIG_S3_TYPE=s3 RCLONE_CONFIG_S3_PROVIDER=AWS RCLONE_CONFIG_S3_ENV_AUTH=true RCLONE_CONFIG_S3_REGION=ap-southeast-2 rclone copy src dest
environment: {
RCLONE_CONFIG_S3_TYPE: "s3",
RCLONE_CONFIG_S3_PROVIDER: "AWS",
Expand All @@ -98,12 +100,10 @@ export class FargateRunTaskConstruct extends Construct {
},
});

// RCLONE_CONFIG_S3_TYPE=s3 RCLONE_CONFIG_S3_PROVIDER=AWS RCLONE_CONFIG_S3_ENV_AUTH=true RCLONE_CONFIG_S3_REGION=ap-southeast-2 rclone copy src dest

// https://github.com/aws/aws-cdk/issues/20013
this.ecsRunTask = new EcsRunTask(this, "Copy File with Rclone", {
// we use task tokens as we want to return rclone stats/results
integrationPattern: IntegrationPattern.WAIT_FOR_TASK_TOKEN,
// .RUN_JOB,
cluster: props.fargateCluster,
taskDefinition: taskDefinition,
launchTarget: new EcsFargateSpotOnlyLaunchTarget({
Expand Down
Loading

0 comments on commit 8a3d9d2

Please sign in to comment.