Gen3 ETL is designed to translate data from a graph data model stored in Postgresql database to flatten indices in ElasticSearch (ES) which supports the efficient way to query data from the front-end.
Interestingly, choosing transformer is the most important thing in ETL, because transformer requires a specific format of input and output data. Specifically to our use-case, Spark becomes one of the most advanced data processing technology, because its distributed architecture allows:
- processing data in parallel simply inside the horizontally scalable memory.
- iteratively processing data in multiple steps without reloading from data storage (disk).
- streaming and integrating incremental data to an existing data source.
Hence, we choose Spark as a data transformer for a fast and scalable data processing.
As discussed previously, there are multiple ways to extract data from database and load to Spark. One is directly generate and execute in parallel multiple SQL queries and load it to Spark's memory, another one is dumping the whole dataset to intermediate data storage like HDFS and then load text data stored in HDFS into Spark in parallel.
Learning all the options that one of our collaborators OICR tried (posted here ). We decided to go with similar strategy - dump postgres to HDFS and load HDFS to rdd/SPARK. We decided to use SQOOP to dump the postgres database to HDFS. In order to dump postgresql database, SQOOP calls CopyManager.
Finally, we decided to use python instead of scala because cdis dev teams are much more comfortable with python programming. And since all the computation will be done in spark, we won't do any manipulation on the python level, the performance won't be a huge difference.
Every ETL process is defined by a translation from the original dataset to the expected one. Gen3-ETL provides a neutral way with which you can:
aggregate, collect
data from multiple nodes in original dataset to an individual one in the target dataset.embed
some fields in high level node to lower level nodes orextract
some particular fields from any specific node.
- define mapping file and save that mapping file as
etlMapping.yaml
to the gitops repo. Format of the mapping file can be found here - run
gen3 kube-setup-secrets
to create the new configmap. If the configmap exists, you must delete it first by runningkubectl delete configmap etl-mapping
- run
gen3 roll spark
- run
gen3 roll tube
- waiting for
tube
pod ready and runningkubectl exec -it {tube-pod-name} -- bash
- inside the
tube
pod run:python run_import.py
to import data from postgresql to HDFS inspark
pod- then
python run_spark.py
to transform data and put extracted data from HDFS to ElasticSearch.