-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathairflow-ec2-2.0.2-RDS.yaml
316 lines (312 loc) · 11.2 KB
/
airflow-ec2-2.0.2-RDS.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
AWSTemplateFormatVersion: "2010-09-09"
Description: Airflow server v2.0.2 on EC2 Amazon Linux 2 backed by Postgres RDS
Parameters:
AirflowUser:
NoEcho: "false"
Description: Airflow UI admin account username
Type: String
MinLength: "4"
MaxLength: "41"
AllowedPattern: "[a-zA-Z0-9]*"
ConstraintDescription: Must contain only alphanumeric characters
AirflowPassword:
NoEcho: "false"
Description: Airflow UI admin account password
Type: String
MinLength: "8"
MaxLength: "41"
AllowedPattern: "[a-zA-Z0-9]*"
ConstraintDescription: Must contain only alphanumeric characters
DBPassword:
NoEcho: "false"
Description: Airflow database admin account password
Type: String
MinLength: "8"
MaxLength: "41"
AllowedPattern: "[a-zA-Z0-9]*"
ConstraintDescription: Must contain only alphanumeric characters
# Mapping to find the Amazon Linux AMI in each region.
Mappings:
RegionMap:
ap-northeast-1:
AMI: "ami-09ebacdc178ae23b7"
ap-northeast-2:
AMI: "ami-0a0de518b1fc4524c"
ap-northeast-3:
AMI: "ami-0e787554e61105680"
ap-south-1:
AMI: "ami-04db49c0fb2215364"
ap-southeast-1:
AMI: "ami-0f511ead81ccde020"
ap-southeast-2:
AMI: "ami-0aab712d6363da7f9"
ca-central-1:
AMI: "ami-02f84cf47c23f1769"
eu-central-1:
AMI: "ami-0453cb7b5f2b7fca2"
eu-west-1:
AMI: "ami-02b4e72b17337d6c1"
eu-west-2:
AMI: "ami-0d26eb3972b7f8c96"
eu-west-3:
AMI: "ami-0d49cec198762b78c"
sa-east-1:
AMI: "ami-0f8243a5175208e08"
us-east-1:
AMI: "ami-0c2b8ca1dad447f8a"
us-east-2:
AMI: "ami-0443305dabd4be2bc"
us-west-1:
AMI: "ami-04b6c97b14c54de18"
us-west-2:
AMI: "ami-083ac7c7ecf9bb9b0"
Resources:
EC2Instance:
Type: AWS::EC2::Instance
CreationPolicy:
ResourceSignal:
Timeout: PT10M
Properties:
SecurityGroups: [!Ref "AirflowEC2SecurityGroup"]
InstanceType: "m4.xlarge"
IamInstanceProfile:
Ref: EC2InstanceProfile
Tags:
- Key: Name
Value: Airflow
ImageId: !FindInMap
- RegionMap
- !Ref "AWS::Region"
- AMI
UserData:
Fn::Base64: !Sub |
#!/bin/bash
set -x
exec > >(tee /var/log/user-data.log|logger -t user-data ) 2>&1
ln -s /root/user-data.log /var/log/user-data.log
# Get right version of pip
yum install aws-cfn-bootstrap -y
python3 -m pip install pip==20.2.4 --user
# Start cfn-init
/opt/aws/bin/cfn-init -v -c install --stack ${AWS::StackId} --resource EC2Instance --region ${AWS::Region}
yum remove python3-docutils -y
echo "Installing s3fs"
python3 -m pip install --upgrade s3fs==0.4.2
python3 -m pip install psycopg2 wheel
# Upgrade sqlite
wget https://www.sqlite.org/src/tarball/sqlite.tar.gz
tar xzf sqlite.tar.gz
cd sqlite/
export CFLAGS="-DSQLITE_ENABLE_FTS3 \
-DSQLITE_ENABLE_FTS3_PARENTHESIS \
-DSQLITE_ENABLE_FTS4 \
-DSQLITE_ENABLE_FTS5 \
-DSQLITE_ENABLE_JSON1 \
-DSQLITE_ENABLE_LOAD_EXTENSION \
-DSQLITE_ENABLE_RTREE \
-DSQLITE_ENABLE_STAT4 \
-DSQLITE_ENABLE_UPDATE_DELETE_LIMIT \
-DSQLITE_SOUNDEX \
-DSQLITE_TEMP_STORE=3 \
-DSQLITE_USE_URI \
-O2 \
-fPIC"
export PREFIX="/usr/local"
LIBS="-lm" ./configure --disable-tcl --enable-shared --enable-tempstore=always --prefix="$PREFIX"
make
make install
export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
echo "Installing sagemaker sdk"
python3 -m pip install sagemaker==v1.72
# Install airflow using pip
echo "Installing Apache Airflow"
export AIRFLOW_GPL_UNIDECODE=yes
python3 -m pip install apache-airflow[crypto,postgres,amazon]==2.0.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt"
# Create Fernet Key
export FERNET_KEY=`openssl rand -base64 32`
sed -i 's|fernet_key =|fernet_key = '$FERNET'|g' ~/airflow/airflow.cfg
# Postgres operators and hook, support as an Airflow backend
echo 'export PATH=/usr/local/bin:~/.local/bin:$PATH' >> ~/.bash_profile
source ~/.bash_profile
# Initialize Airflow
airflow db init
# Update the RDS connection in the Airflow Config file
sed -i '/sql_alchemy_conn/s/^/#/g' ~/airflow/airflow.cfg
sed -i '/#sql_alchemy_conn/ a sql_alchemy_conn = postgresql://airflow:${DBPassword}@${DBInstance.Endpoint.Address}:${DBInstance.Endpoint.Port}/airflowdb' ~/airflow/airflow.cfg
# Update the type of executor in the Airflow Config file
sed -i '/executor = SequentialExecutor/s/^/#/g' ~/airflow/airflow.cfg
sed -i '/executor = SequentialExecutor/ a executor = LocalExecutor' ~/airflow/airflow.cfg
sed -i 's/load_examples = True/load_examples = False/g' ~/airflow/airflow.cfg
airflow db init
airflow users create -e [email protected] -f admin -l airflow -p ${AirflowPassword} -r Admin -u ${AirflowUser}
# create airflow connection to sagemaker
cat >> /tmp/airflow_conn.py << EOF
from airflow import settings
from airflow.models import Connection
#create a connection object
extra = '{"region_name": "${AWS::Region}"}'
conn_id = 'airflow-sagemaker'
conn = Connection(conn_id=conn_id,conn_type='s3', extra=extra)
# get the session
session = settings.Session()
session.add(conn)
session.commit()
EOF
python3 /tmp/airflow_conn.py
# create directories
mkdir -p ~/airflow/dags/sm-ml-pipeline
# clone the git repository
cd ~
git clone https://github.com/aws-samples/sagemaker-ml-workflow-with-apache-airflow.git
mv ~/sagemaker-ml-workflow-with-apache-airflow ~/sm-ml-pipeline
cd ~/sm-ml-pipeline/src
# prepare airflow dag definition for sagemaker blog post
sed -i 's/<s3-bucket>/${S3BucketName}/g' ./*.*
sed -i 's/<region-name>/${AWS::Region}/g' ./*.*
sed -i 's/<accountid>/${AWS::AccountId}/g' ~/sm-ml-pipeline/src/config.py
sed -i 's/AirflowSageMakerExecutionRole/AirflowSageMakerExecutionRole-${AWS::StackName}/g' ~/sm-ml-pipeline/src/config.py
sed -i "s/hook = AwsHook(aws_conn_id='airflow-sagemaker')/hook = AwsHook(aws_conn_id='airflow-sagemaker', client_type='s3')/g" ~/sm-ml-pipeline/src/dag_ml_pipeline_amazon_video_reviews.py
sed -i '/provide_context=False/d' ~/sm-ml-pipeline/src/dag_ml_pipeline_amazon_video_reviews.py
sed -i 's/enable_xcom_pickling = False/enable_xcom_pickling = True/g' ~/airflow/airflow.cfg
zip -r dag.zip *
cp dag.zip ~/airflow/dags/sm-ml-pipeline/dag.zip
cd -
# Run Airflow webserver and scheduler
airflow dags list
airflow webserver -D
airflow scheduler -D
yum install aws-cfn-bootstrap -y
/opt/aws/bin/cfn-signal --exit-code 0 --resource EC2Instance --region ${AWS::Region} --stack ${AWS::StackName}
Metadata:
AWS::CloudFormation::Init:
configSets:
install:
- installpackages
installpackages:
packages:
yum:
python3: []
python3-devel: []
gcc: []
gcc-c++: []
postgresql-devel: []
openssl-devel: []
git: []
DependsOn:
- DBInstance
- AirflowEC2SecurityGroup
DBInstance:
Type: AWS::RDS::DBInstance
DeletionPolicy: Delete
Properties:
DBName: airflowdb
Engine: postgres
MasterUsername: airflow
MasterUserPassword: !Ref "DBPassword"
DBInstanceClass: db.t3.small
AllocatedStorage: 5
DBSecurityGroups:
- Ref: DBSecurityGroup
AirflowEC2SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: !Sub 'AirflowEC2SG-${AWS::StackName}'
GroupDescription: Enable HTTP access via port 8080
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 8080
ToPort: 8080
CidrIp: 0.0.0.0/0
DBSecurityGroup:
Type: AWS::RDS::DBSecurityGroup
Properties:
GroupDescription: Frontend Access
DBSecurityGroupIngress:
EC2SecurityGroupName:
Ref: AirflowEC2SecurityGroup
EC2Role:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub 'AirflowInstanceRole-${AWS::StackName}'
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "ec2.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
- arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore
Policies:
- PolicyName: !Sub 'AirflowResourceAccess-${AWS::StackName}'
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:*
Resource:
- !Sub "arn:aws:s3:::${S3BucketName}"
- !Sub "arn:aws:s3:::${S3BucketName}/*"
- Effect: Allow
Action:
- iam:GetRole
Resource: "*"
EC2InstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
InstanceProfileName: !Sub 'AirflowInstanceProfile-${AWS::StackName}'
Roles:
- Ref: EC2Role
S3BucketName:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
AccessControl: BucketOwnerFullControl
BucketName: !Join
- "-"
- - "airflow-sagemaker"
- !Select
- 0
- !Split
- "-"
- !Select
- 2
- !Split
- "/"
- !Ref "AWS::StackId"
AirflowSageMakerExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub 'AirflowSageMakerExecutionRole-${AWS::StackName}'
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "sagemaker.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
Path: "/service-role/"
Policies:
- PolicyName: !Sub 'SageMakerS3BucketAccess-${AWS::StackName}'
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:*
Resource:
- !Sub "arn:aws:s3:::${S3BucketName}"
- !Sub "arn:aws:s3:::${S3BucketName}/*"
Outputs:
AirflowEC2PublicDNSName:
Description: Public DNS Name of the Airflow EC2 instance
Value: !Join ["", ["http://", !GetAtt EC2Instance.PublicDnsName, ":8080"]]