-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
54 lines (46 loc) · 2.13 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""Running the Xetra ETL application"""
import argparse
import logging
import logging.config
import yaml
from source.common.s3 import S3BucketConnector
from source.transformers.xetra_transformer import XetraETL, XetraSourceConfig, XetraTargetConfig
# entry point to run the xetra ETL job.
def main():
# Parsing YML file
parser = argparse.ArgumentParser(description='Run the Xetra ETL Job.')
parser.add_argument('config', help='A configuration file in YAML format.')
args = parser.parse_args()
config = 'C:/my_work/python_etl_pipeline_project/ETL-Pipelines-AWS/configs/etl_report1_config.yml'
config = yaml.safe_load(open(config))
config = yaml.safe_load(open(args.config))
# configure logging
log_config = config['logging']
logging.config.dictConfig(log_config)
# reading s3 configuration
s3_config = config['s3']
# creating the S3BucketConnector classes for source and target
s3_bucket_src = S3BucketConnector(access_key=s3_config['access_key'],
secret_key=s3_config['secret_key'],
endpoint_url=s3_config['src_endpoint_url'],
bucket=s3_config['src_bucket'])
s3_bucket_trg = S3BucketConnector(access_key=s3_config['access_key'],
secret_key=s3_config['secret_key'],
endpoint_url=s3_config['trg_endpoint_url'],
bucket=s3_config['trg_bucket'])
# reading source configuration
source_config = XetraSourceConfig(**config['source'])
# reading target configuration
target_config = XetraTargetConfig(**config['target'])
# reading meta file configuration
meta_config = config['meta']
# creating XetraETL class
logger = logging.getLogger(__name__)
logger.info('ETL job started.')
xetra_etl = XetraETL(s3_bucket_src, s3_bucket_trg,
meta_config['meta_key'], source_config, target_config)
# running etl job for xetra report 1
xetra_etl.etl_report1()
logger.info('ETL job finished.')
if __name__ == '__main__':
main()