diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..0a41085 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,52 @@ +name: Data Engineering CI +on: + push: + branches: + - main + - feat/* + - fix/* + +jobs: + run-ci: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.10.12 + + - name: Install dependencies + run: | + pip install mypy==1.7.1 pylint==3.0.2 pyspark==3.3.0 coverage + + - name: Type check + run: mypy ./ --ignore-missing-imports + + - name: Lint + run: pylint app/ main.py setup.py --output pylint-report.txt + + - name: Testing + run: coverage run --source=app -m unittest discover -s app/tests/ + + - name: Test coverage report + run: coverage xml + + - name: SonarQube Scan + uses: sonarsource/sonarqube-scan-action@master + with: + args: > + -Dsonar.scm.revision=${{ github.event.pull_request.head.sha }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }} + + - uses: sonarsource/sonarqube-quality-gate-action@master + timeout-minutes: 5 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }} diff --git a/.gitignore b/.gitignore index 2eea525..0fff62c 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -.env \ No newline at end of file +.env +.coverage +app/__pycache__ +app/tests/__pycache__ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..4b60116 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,32 @@ +repos: + - repo: https://github.com/psf/black + rev: 22.3.0 + hooks: + - id: black + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.5.0 + hooks: + - id: check-ast + - id: trailing-whitespace + - id: check-merge-conflict + - id: debug-statements + - id: detect-private-key + - id: end-of-file-fixer + + - repo: local + hooks: + - id: mypy + name: Type Checking + entry: mypy ./ --ignore-missing-imports + language: system + always_run: true + types: [python3] + stages: [commit] + + - id: lint + name: Linting + entry: pylint app/ main.py setup.py + language: system + always_run: true + types: [python3] + stages: [commit] \ No newline at end of file diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..09c3809 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,9 @@ +[MASTER] +disable=C0114,C0103,C0116,R0914,E0401,W0511,W0108,R0911,R0801,C0115 + +[FORMAT] +max-line-length=150 +max-module-lines=350 + +[SIMILARITIES] +min-similarity-lines=4 diff --git a/README.md b/README.md index 61b97ea..bbfceff 100644 --- a/README.md +++ b/README.md @@ -19,16 +19,19 @@ To run the same ETL code in multiple cloud services based on your preference, th 1. Clone this repo in your own repo. 2. For local IDE: Open configs file and write your own keys & storage accout name. -For Databricks job: Configure job parameters, then call them using ```dbutils.widgets.get(param_name)``` +For Databricks job: Add your parameters in a json file in a already mounted container, then read it in a dictionary. 3. Change your path for read & write in Extraction.py file, you can also change Extraction logic to use other sources. -4. (Optional) If you are connecting local IDE then uncomment the following line in the main file: +4. (Optional) If you are NOT connecting local IDE then comment the following line in the main file: ``` spark, dbutils = cd.init_databricks() ``` This fucntions connects to your cloud running cluster. - Also note that the ```dbutils.widgets.get(param_name)``` method only works in databricks jobs, so for other environments you have to load the keys directly or use an abstraction method like ```load_env()``` + + Also you need to make a Databricks CLI profile, refer: [Configure Databricks Workspace CLI](https://www.youtube.com/watch?v=h4L064NfMV0&ab_channel=WafaStudies) + + Also note that the ```keys.json``` file which i'm reading is in an already mounted container, so for other environments you have to load the keys directly or use an abstraction method like ```load_env()``` 5. Just run on the local IDE to develop & test. @@ -88,4 +91,12 @@ Steps: 5. Upload this wheel file in your S3 bucket then paste it's uri path in "Python Library path" in Glue Job details. Refer: [Glue Programming libraries](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-libraries.html) - + +## Run Tests & Coverage Report + +To run tests in the root of the directory use: + + coverage run --source=app -m unittest discover -s app/tests/ + coverage report + +Note that awsglue libraries are not availabe to download, so use AWS Glue 4 Docker container. diff --git a/app/Extraction.py b/app/Extraction.py index ecd9c11..c78bad8 100644 --- a/app/Extraction.py +++ b/app/Extraction.py @@ -1,11 +1,12 @@ -import subprocess import os -os.system('pip install kaggle') -import kaggle + +os.system("pip install kaggle") +import kaggle # pylint: disable=wrong-import-position def extract_from_kaggle(flag: bool): if flag: + print("WRONG BIT RAN!!!!") read_path = "/dbfs/mnt/rawdata/" write_path = "/mnt/transformed/" else: @@ -14,11 +15,13 @@ def extract_from_kaggle(flag: bool): api = kaggle.KaggleApi() api.authenticate() - api.dataset_download_cli("mastmustu/insurance-claims-fraud-data", unzip=True, path=read_path) + api.dataset_download_cli( + "mastmustu/insurance-claims-fraud-data", unzip=True, path=read_path + ) if flag: read_path = read_path[5:] else: read_path = "s3://glue-bucket-vighnesh/rawdata/" - return read_path, write_path \ No newline at end of file + return read_path, write_path diff --git a/app/SparkWrapper.py b/app/SparkWrapper.py index 020008e..2a0d98c 100644 --- a/app/SparkWrapper.py +++ b/app/SparkWrapper.py @@ -1,6 +1,5 @@ from pyspark.sql import SparkSession, DataFrame -from pyspark.sql import functions as F -from pyspark.sql import Window +from pyspark.sql import Window, WindowSpec def create_frame(sc: SparkSession, path: str): @@ -9,15 +8,28 @@ def create_frame(sc: SparkSession, path: str): def rename_columns(df: DataFrame, names: dict) -> DataFrame: + if not isinstance(names, dict): + raise TypeError("WRONG DATATYPE: column names should be dictionary") + + columns = df.columns renamed_df = df for old_col, new_col in names.items(): + if old_col not in columns: + raise ValueError( + f"COLUMN DOESN'T EXIST: Column '{old_col}' does not exist in the DataFrame" + ) renamed_df = renamed_df.withColumnRenamed(old_col, new_col) + return renamed_df def value_counts(df: DataFrame, column: str) -> DataFrame: - return df.groupBy(column).count().orderBy('count', ascending=False) + return df.groupBy(column).count().orderBy(["count", column], ascending=False) -def make_window(partition: str, order: str, range_from: int, range_to: int) -> Window: - return Window.partitionBy(partition).orderBy(order).rangeBetween(range_from, range_to) \ No newline at end of file +def make_window( + partition: str, order: str, range_from: int, range_to: int +) -> WindowSpec: + return ( + Window.partitionBy(partition).orderBy(order).rangeBetween(range_from, range_to) + ) diff --git a/app/configs.py b/app/configs.py deleted file mode 100644 index c865f8b..0000000 --- a/app/configs.py +++ /dev/null @@ -1,5 +0,0 @@ -import os - -storage_account_name = os.environ['storage_account_name'] - -datalake_access_key = os.environ['datalake_access_key'] diff --git a/app/connect_databricks.py b/app/connect_databricks.py index c0cd9c6..74a1f98 100644 --- a/app/connect_databricks.py +++ b/app/connect_databricks.py @@ -1,11 +1,11 @@ +import os from databricks.connect import DatabricksSession from databricks.sdk import WorkspaceClient -storage_name = os.environ['storage_account_name'] -storage_key = os.environ['datalake_access_key'] - def init_databricks(): + os.system("cp /dbfs/mnt/config/databricks-connect.txt ~/.databrickscfg") + spark = DatabricksSession.builder.getOrCreate() dbutils = WorkspaceClient().dbutils @@ -14,27 +14,29 @@ def init_databricks(): def create_mount(dbutils, container_name, mount_path): + storage_name = os.environ["storage_account_name"] + storage_key = os.environ["datalake_access_key"] + mounts = [x.mountPoint for x in dbutils.fs.mounts()] - try: - if mount_path not in [x.mountPoint for x in dbutils.fs.mounts()]: - dbutils.fs.mount( - source = f'wasbs://{container_name}@{storage_name}.blob.core.windows.net/', - mount_point = mount_path, - extra_configs = {f'fs.azure.account.key.{storage_name}.blob.core.windows.net': storage_key}) - print(f"{mount_path} Mount Successfull") - else: - dbutils.fs.refreshMounts() - print(f"{mount_path} Already mounted") - except Exception as e: - print(f"{mount_path} Error: " + e) + if mount_path not in mounts: + dbutils.fs.mount( + source=f"wasbs://{container_name}@{storage_name}.blob.core.windows.net/", + mount_point=mount_path, + extra_configs={ + f"fs.azure.account.key.{storage_name}.blob.core.windows.net": storage_key + }, + ) + print(f"{mount_path} Mount Successfull") + else: + dbutils.fs.refreshMounts() + print(f"{mount_path} Already mounted") def unmount(dbutils, mount_path): try: dbutils.fs.unmount(mount_path) print("Unmount Successful") - except Exception as e: - print("Error: " + e) - - - + except FileNotFoundError: + print(f"Error: Path not found - {mount_path}") + except Exception as e: # pylint: disable=W0718 + print(f"Error: {e}") diff --git a/app/connect_glue.py b/app/connect_glue.py index c9c889a..d18b928 100644 --- a/app/connect_glue.py +++ b/app/connect_glue.py @@ -1,6 +1,3 @@ -import sys -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job @@ -11,5 +8,5 @@ def init_glue(): glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) - - return glueContext, spark, job \ No newline at end of file + + return glueContext, spark, job diff --git a/app/tests/__init__.py b/app/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tests/connect_glue_test.py b/app/tests/connect_glue_test.py new file mode 100644 index 0000000..16a837a --- /dev/null +++ b/app/tests/connect_glue_test.py @@ -0,0 +1,52 @@ +import unittest +from unittest.mock import patch, MagicMock +from app.connect_glue import init_glue + + +class TestInitGlue(unittest.TestCase): + @patch("app.connect_glue.SparkContext") + @patch("app.connect_glue.GlueContext") + @patch("app.connect_glue.Job") + def test_init_glue(self, mock_job, mock_glue_context, mock_spark_context): + # Mock the SparkContext, GlueContext, and Job + mock_spark_context_instance = MagicMock() + mock_glue_context_instance = MagicMock() + mock_job_instance = MagicMock() + + # Set up the behavior of the mock instances + mock_spark_context.return_value = mock_spark_context_instance + mock_glue_context.return_value = mock_glue_context_instance + mock_job.return_value = mock_job_instance + + # Call the function to test + glue_context, spark, job = init_glue() + + # Assertions + mock_spark_context.assert_called_once() + mock_glue_context.assert_called_once_with(mock_spark_context_instance) + mock_job.assert_called_once_with(mock_glue_context_instance) + + # Check if the returned values are correct + self.assertEqual(glue_context, mock_glue_context_instance) + self.assertEqual(spark, mock_glue_context_instance.spark_session) + self.assertEqual(job, mock_job_instance) + + @patch("app.connect_glue.SparkContext") + @patch("app.connect_glue.GlueContext") + @patch("app.connect_glue.Job") + def test_init_glue_failure(self, mock_job, mock_glue_context, mock_spark_context): + # Simulate a ValueError during SparkContext initialization + error_statement = "Simulated SparkContext initialization failure" + mock_spark_context.side_effect = ValueError(error_statement) + + # Call the function to test + with self.assertRaises(ValueError) as context: + init_glue() + + # Assertions + mock_spark_context.assert_called_once() + mock_glue_context.assert_not_called() # GlueContext should not be called if SparkContext initialization fails + mock_job.assert_not_called() # Job should not be called if SparkContext initialization fails + + # Check if the error displayed correctly + self.assertEqual(str(context.exception), error_statement) diff --git a/app/tests/mock/sample.csv b/app/tests/mock/sample.csv new file mode 100644 index 0000000..0220c10 --- /dev/null +++ b/app/tests/mock/sample.csv @@ -0,0 +1,16 @@ +stock_name,market,close_price,date +ABC Corp,NYSE,100.25,2023-01-01 +DEF Ltd,LSE,50.75,2023-01-01 +XYZ Inc,NASDAQ,75.50,2023-01-01 +ABC Corp,NYSE,95.20,2023-01-11 +DEF Ltd,LSE,55.40,2023-01-11 +XYZ Inc,NASDAQ,80.10,2023-01-11 +ABC Corp,NYSE,105.80,2023-01-21 +DEF Ltd,LSE,60.20,2023-01-21 +XYZ Inc,NASDAQ,92.40,2023-01-21 +ABC Corp,NYSE,110.50,2023-01-31 +DEF Ltd,LSE,68.75,2023-01-31 +XYZ Inc,NASDAQ,102.60,2023-01-31 +ABC Corp,NYSE,115.75,2023-02-10 +DEF Ltd,LSE,75.30,2023-02-10 +XYZ Inc,NASDAQ,112.20,2023-02-10 diff --git a/app/tests/test_SparkWrapper.py b/app/tests/test_SparkWrapper.py new file mode 100644 index 0000000..eb2fe87 --- /dev/null +++ b/app/tests/test_SparkWrapper.py @@ -0,0 +1,107 @@ +from unittest import TestCase +from pyspark.sql import SparkSession +from pyspark.sql import functions as F +from app.SparkWrapper import value_counts, rename_columns, create_frame, make_window + + +class TestSparkWrapper(TestCase): + def setUp(self) -> None: + self.spark = ( + SparkSession.builder.master("local").appName("Testing").getOrCreate() + ) + self.df = self.spark.read.csv( + "app/tests/mock/sample.csv", inferSchema=True, header=True + ) + super().setUp() + + def tearDown(self) -> None: + self.spark.stop() + super().tearDown() + + def test_value_counts(self): + df = value_counts(self.df, "market") + data = df.collect() + + expected_data = [ + {"market": "NYSE", "count": 5}, + {"market": "NASDAQ", "count": 5}, + {"market": "LSE", "count": 5}, + ] + + for actual, expected in zip(data, expected_data): + for col_name in expected.keys(): + self.assertEqual(actual[col_name], expected[col_name]) + + def test_rename_columns(self): + df = rename_columns( + self.df, {"stock_name": "stock", "market": "Market", "date": "Date"} + ) + actual_columns = df.columns + + expected_columns = ["stock", "Market", "close_price", "Date"] + + self.assertListEqual(actual_columns, expected_columns) + + def test_create_frame(self): + path = "app/tests/mock/sample.csv" + df = create_frame(self.spark, path).drop("date") + actual_data = df.collect() + + expected_data = [ + {"stock_name": "ABC Corp", "market": "NYSE", "close_price": 100.25}, + {"stock_name": "DEF Ltd", "market": "LSE", "close_price": 50.75}, + {"stock_name": "XYZ Inc", "market": "NASDAQ", "close_price": 75.5}, + {"stock_name": "ABC Corp", "market": "NYSE", "close_price": 95.2}, + {"stock_name": "DEF Ltd", "market": "LSE", "close_price": 55.4}, + {"stock_name": "XYZ Inc", "market": "NASDAQ", "close_price": 80.1}, + {"stock_name": "ABC Corp", "market": "NYSE", "close_price": 105.8}, + {"stock_name": "DEF Ltd", "market": "LSE", "close_price": 60.2}, + {"stock_name": "XYZ Inc", "market": "NASDAQ", "close_price": 92.4}, + {"stock_name": "ABC Corp", "market": "NYSE", "close_price": 110.5}, + {"stock_name": "DEF Ltd", "market": "LSE", "close_price": 68.75}, + {"stock_name": "XYZ Inc", "market": "NASDAQ", "close_price": 102.6}, + {"stock_name": "ABC Corp", "market": "NYSE", "close_price": 115.75}, + {"stock_name": "DEF Ltd", "market": "LSE", "close_price": 75.3}, + {"stock_name": "XYZ Inc", "market": "NASDAQ", "close_price": 112.2}, + ] + + for actual, expected in zip(actual_data, expected_data): + for col_name in expected.keys(): + self.assertEqual(actual[col_name], expected[col_name]) + + def test_make_window(self): + sub = self.df.withColumn("date", F.unix_timestamp("date", "yyyy-MM-dd") / 86400) + + window_spec = make_window("market", "date", -20, -1) + + df = ( + sub.withColumn( + "last_20_days_close_avg", F.avg("close_price").over(window_spec) + ) + .orderBy(["date", "stock_name"]) + .select("close_price", "last_20_days_close_avg") + ) + + actual_data = df.collect() + + expected_data = [ + {"close_price": 100.25, "last_20_days_close_avg": None}, + {"close_price": 50.75, "last_20_days_close_avg": None}, + {"close_price": 75.5, "last_20_days_close_avg": None}, + {"close_price": 95.2, "last_20_days_close_avg": 100.25}, + {"close_price": 55.4, "last_20_days_close_avg": 50.75}, + {"close_price": 80.1, "last_20_days_close_avg": 75.5}, + {"close_price": 105.8, "last_20_days_close_avg": 97.725}, + {"close_price": 60.2, "last_20_days_close_avg": 53.075}, + {"close_price": 92.4, "last_20_days_close_avg": 77.8}, + {"close_price": 110.5, "last_20_days_close_avg": 100.5}, + {"close_price": 68.75, "last_20_days_close_avg": 57.8}, + {"close_price": 102.6, "last_20_days_close_avg": 86.25}, + {"close_price": 115.75, "last_20_days_close_avg": 108.15}, + {"close_price": 75.3, "last_20_days_close_avg": 64.475}, + {"close_price": 112.2, "last_20_days_close_avg": 97.5}, + ] + + for actual, expected in zip(actual_data, expected_data): + for col_name in expected.keys(): + self.assertEqual(actual[col_name], expected[col_name]) diff --git a/app/tests/test_SparkWrapperFailure.py b/app/tests/test_SparkWrapperFailure.py new file mode 100644 index 0000000..e2c453e --- /dev/null +++ b/app/tests/test_SparkWrapperFailure.py @@ -0,0 +1,74 @@ +import re +from unittest import TestCase +from pyspark.sql import SparkSession +from pyspark.sql import functions as F +from pyspark.sql import utils as U +from app.SparkWrapper import value_counts, rename_columns, create_frame, make_window + + +class TestSparkWrapper(TestCase): + def setUp(self) -> None: + self.spark = ( + SparkSession.builder.appName("Testing").master("local[*]").getOrCreate() + ) + self.df = self.spark.read.csv( + "app/tests/mock/sample.csv", inferSchema=True, header=True + ) + super().setUp() + + def tearDown(self) -> None: + self.spark.stop() + super().tearDown() + + def test_value_counts_invalid_column(self): + with self.assertRaises(U.AnalysisException) as context: + value_counts(self.df, "nonexistent_column") + + expected_error_message = re.compile("Column '.+' does not exist") + actual_error_message = str(context.exception) + + self.assertTrue(expected_error_message.search(actual_error_message)) + + def test_create_frame_invalid_path(self): + with self.assertRaises(U.AnalysisException) as context: + create_frame(self.spark, "nonexistent_path/sample.csv") + + expected_error_message = "Path does not exist" + actual_error_message = str(context.exception) + + self.assertTrue(expected_error_message in actual_error_message) + + def test_make_window_invalid_window_spec(self): + with self.assertRaises(U.AnalysisException) as context: + window_spec = make_window("invalid_column", "date", -20, -1) + self.df.withColumn("literal_1", F.lit(1).over(window_spec)) + + expected_error_message = re.compile("Column '.+' does not exist") + actual_error_message = str(context.exception) + + self.assertTrue(expected_error_message.search(actual_error_message)) + + def test_make_window_invalid_range(self): + with self.assertRaises(U.AnalysisException) as context: + window_spec = make_window("market", "date", 5, 2) + self.df.withColumn("literal_1", F.lit(1).over(window_spec)) + + expected_error_message = "The lower bound of a window frame must be less than or equal to the upper bound" + actual_error_message = str(context.exception) + self.assertTrue(expected_error_message in actual_error_message) + + def test_rename_column_invalid_column(self): + with self.assertRaises(ValueError) as context: + rename_columns(self.df, {"invalid_col": "myname"}) + + expected_error_message = "COLUMN DOESN'T EXIST" + actual_error_message = str(context.exception) + self.assertTrue(expected_error_message in actual_error_message) + + def test_rename_column_invalid_datatype(self): + with self.assertRaises(TypeError) as context: + rename_columns(self.df, ["invalid_col", "myname"]) + + expected_error_message = "WRONG DATATYPE" + actual_error_message = str(context.exception) + self.assertTrue(expected_error_message in actual_error_message) diff --git a/main.py b/main.py index 70a0659..b730912 100644 --- a/main.py +++ b/main.py @@ -1,136 +1,173 @@ # Databricks notebook source import os -os.system('pip install python-dotenv') -from dotenv import load_dotenv +import subprocess + +from pyspark.sql.functions import when, col +from pyspark.sql.functions import round as sp_round +from pyspark.sql import Window +import pyspark.sql.functions as F + +import app.SparkWrapper as sw + +os.system("pip install python-dotenv") +import dotenv # pylint: disable=wrong-import-position, disable=wrong-import-order # COMMAND ---------- try: - flag = dbutils.widgets.get('flag') -except: - flag = 'False' + import app.connect_databricks as cd # pylint: disable=ungrouped-imports + import json + # Comment the following line if running directly in cloud notebook + spark, dbutils = cd.init_databricks() -if flag == 'True': - flag = True -else: - flag = False + with open("/dbfs/mnt/config/keys.json", encoding="utf-8") as file: + keys = json.load(file) + + flag = keys["flag"] +except: # pylint: disable=bare-except + flag = "False" + + +flag = bool(flag == "True") # COMMAND ---------- if flag: + os.environ["KAGGLE_USERNAME"] = keys["kaggle_username"] - os.environ['KAGGLE_USERNAME'] = dbutils.widgets.get('kaggle_username') + os.environ["KAGGLE_KEY"] = keys["kaggle_token"] - os.environ['KAGGLE_KEY'] = dbutils.widgets.get('kaggle_token') + os.environ["storage_account_name"] = keys["storage_account_name"] - os.environ['storage_account_name'] = dbutils.widgets.get('storage_account_name') - - os.environ['datalake_access_key'] = dbutils.widgets.get('datalake_access_key') + os.environ["datalake_access_key"] = keys["datalake_access_key"] # COMMAND ---------- if flag: - import app.connect_databricks as cd - - # # Uncomment below line if working in local ide (i.e. vscode, pycharm, etc) - # spark, dbutils = cd.init_databricks() - # creating mounts cd.create_mount(dbutils, "zipdata", "/mnt/zipdata/") cd.create_mount(dbutils, "rawdata", "/mnt/rawdata/") cd.create_mount(dbutils, "transformed", "/mnt/transformed/") - + else: import app.connect_glue as cg from awsglue.utils import getResolvedOptions import sys - + # initiating glue spark try: print("Setting up params...") - args = getResolvedOptions(sys.argv, ['JOB_NAME', 'KAGGLE_USERNAME', 'KAGGLE_KEY', 'FLAG']) - except Exception as e: - print(f"ERROR: {e}") - args = {'JOB_NAME': 'local'} + args = getResolvedOptions( + sys.argv, ["JOB_NAME", "KAGGLE_USERNAME", "KAGGLE_KEY", "FLAG"] + ) + except: # pylint: disable=bare-except + args = {"JOB_NAME": "local"} glueContext, spark, job = cg.init_glue() job.init("sample") - if args['JOB_NAME'] == 'local': - load_dotenv() - flag = os.environ['FLAG'] + if args["JOB_NAME"] == "local": + dotenv.load_dotenv() else: - flag = args['FLAG'] - os.environ['KAGGLE_USERNAME'] = args['KAGGLE_USERNAME'] - os.environ['KAGGLE_KEY'] = args['KAGGLE_KEY'] - + os.environ["KAGGLE_USERNAME"] = args["KAGGLE_USERNAME"] + os.environ["KAGGLE_KEY"] = args["KAGGLE_KEY"] # COMMAND ---------- -import app.SparkWrapper as sw -from app.Extraction import extract_from_kaggle +from app.Extraction import extract_from_kaggle # pylint: disable=wrong-import-position # COMMAND ---------- -import subprocess - read_path, write_path = extract_from_kaggle(flag) -if flag==False: +if flag is False: copy_command = f"aws s3 cp temp/ {read_path} --recursive" - result = subprocess.run(copy_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + result = subprocess.run( + copy_command, + shell=True, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) print("Output:", result.stdout) # COMMAND ---------- # reading data in different frames -employee = sw.create_frame(spark, read_path + 'employee_data.csv') - -employee = sw.rename_columns(employee, {'ADDRESS_LINE1': 'AGENT_ADDRESS_LINE1', 'ADDRESS_LINE2': 'AGENT_ADDRESS_LINE2',\ - 'CITY': 'AGENT_CITY', 'STATE': 'AGENT_STATE', 'POSTAL_CODE': 'AGENT_POSTAL_CODE'}) +employee = sw.create_frame(spark, read_path + "employee_data.csv") + +employee = sw.rename_columns( + employee, + { + "ADDRESS_LINE1": "AGENT_ADDRESS_LINE1", + "ADDRESS_LINE2": "AGENT_ADDRESS_LINE2", + "CITY": "AGENT_CITY", + "STATE": "AGENT_STATE", + "POSTAL_CODE": "AGENT_POSTAL_CODE", + }, +) # COMMAND ---------- -insurance = sw.create_frame(spark, read_path + 'insurance_data.csv') - -insurance = sw.rename_columns(insurance, {'ADDRESS_LINE1': 'CUSTOMER_ADDRESS_LINE1', \ - 'ADDRESS_LINE2': 'CUSTOMER_ADDRESS_LINE2', 'CITY': 'CUSTOMER_CITY', 'STATE': 'CUSTOMER_STATE', \ - 'POSTAL_CODE': 'CUSTOMER_POSTAL_CODE'}) +insurance = sw.create_frame(spark, read_path + "insurance_data.csv") + +insurance = sw.rename_columns( + insurance, + { + "ADDRESS_LINE1": "CUSTOMER_ADDRESS_LINE1", + "ADDRESS_LINE2": "CUSTOMER_ADDRESS_LINE2", + "CITY": "CUSTOMER_CITY", + "STATE": "CUSTOMER_STATE", + "POSTAL_CODE": "CUSTOMER_POSTAL_CODE", + }, +) # COMMAND ---------- -vendor = sw.create_frame(spark, read_path + 'vendor_data.csv') - -vendor = sw.rename_columns(vendor, {'ADDRESS_LINE1': 'VENDOR_ADDRESS_LINE1', 'ADDRESS_LINE2': 'VENDOR_ADDRESS_LINE2', \ - 'CITY': 'VENDOR_CITY', 'STATE': 'VENDOR_STATE', 'POSTAL_CODE': 'VENDOR_POSTAL_CODE'}) +vendor = sw.create_frame(spark, read_path + "vendor_data.csv") + +vendor = sw.rename_columns( + vendor, + { + "ADDRESS_LINE1": "VENDOR_ADDRESS_LINE1", + "ADDRESS_LINE2": "VENDOR_ADDRESS_LINE2", + "CITY": "VENDOR_CITY", + "STATE": "VENDOR_STATE", + "POSTAL_CODE": "VENDOR_POSTAL_CODE", + }, +) # COMMAND ---------- # task 1: creating one view -insurance_employee = insurance.join(employee, on='AGENT_ID', how='left') +insurance_employee = insurance.join(employee, on="AGENT_ID", how="left") -df = insurance_employee.join(vendor, on='VENDOR_ID', how='left') +df = insurance_employee.join(vendor, on="VENDOR_ID", how="left") print("Task 1 Done") # COMMAND ---------- # task 2: create new column 'colocation' -from pyspark.sql.functions import when, col, mean -cond = ((col('CUSTOMER_STATE') == col('INCIDENT_STATE')) & (col('AGENT_STATE') == col('INCIDENT_STATE'))) +cond = (col("CUSTOMER_STATE") == col("INCIDENT_STATE")) & ( + col("AGENT_STATE") == col("INCIDENT_STATE") +) -df = df.withColumn('COLOCATION', when(cond, 1).otherwise(0)) +df = df.withColumn("COLOCATION", when(cond, 1).otherwise(0)) print("Task 2 Done") -# task 3: -cond = ((col('AUTHORITY_CONTACTED') != 'Police') & (col('POLICE_REPORT_AVAILABLE') == 1)) +# task 3: +cond = (col("AUTHORITY_CONTACTED") != "Police") & (col("POLICE_REPORT_AVAILABLE") == 1) -df = df.withColumn('AUTHORITY_CONTACTED', when(cond, 'Police').otherwise(col('AUTHORITY_CONTACTED'))) +df = df.withColumn( + "AUTHORITY_CONTACTED", when(cond, "Police").otherwise(col("AUTHORITY_CONTACTED")) +) print("Task 3 Done") @@ -138,106 +175,134 @@ # task 4: create new column claim_deviation -from pyspark.sql import Window -import pyspark.sql.functions as F +sub = df.select( + "TRANSACTION_ID", "INSURANCE_TYPE", "TXN_DATE_TIME", "CLAIM_AMOUNT" +).withColumn("TXN_DATE_TIME", F.unix_timestamp("TXN_DATE_TIME")) -sub = df.select('TRANSACTION_ID', 'INSURANCE_TYPE', 'TXN_DATE_TIME', 'CLAIM_AMOUNT')\ - .withColumn("TXN_DATE_TIME", F.unix_timestamp('TXN_DATE_TIME')) +window_spec = sw.make_window("INSURANCE_TYPE", "TXN_DATE_TIME", -30 * 86400, -1 * 86400) -window_spec = sw.make_window('INSURANCE_TYPE', 'TXN_DATE_TIME', -30 * 86400, -1 * 86400) +sub = sub.withColumn( + "AVG_30DAYS_CLAIM_AMOUNT", F.round(F.avg("CLAIM_AMOUNT").over(window_spec), 2) +) -sub = sub.withColumn('AVG_30DAYS_CLAIM_AMOUNT', F.round(F.avg('CLAIM_AMOUNT').over(window_spec), 2)) +window_prev = sw.make_window( + "INSURANCE_TYPE", "TXN_DATE_TIME", Window.unboundedPreceding, 0 +) -window_prev = sw.make_window('INSURANCE_TYPE', 'TXN_DATE_TIME', Window.unboundedPreceding, 0) +sub = sub.withColumn("MIN_TXN_DATE_TIME", F.min("TXN_DATE_TIME").over(window_prev)) -sub = sub.withColumn('MIN_TXN_DATE_TIME', F.min('TXN_DATE_TIME').over(window_prev)) +sub = sub.withColumn( + "DAYS_WITH_HISTORY", + F.datediff(F.from_unixtime("TXN_DATE_TIME"), F.from_unixtime("MIN_TXN_DATE_TIME")), +) -sub = sub.withColumn('DAYS_WITH_HISTORY', F.datediff(F.from_unixtime('TXN_DATE_TIME'), F.from_unixtime('MIN_TXN_DATE_TIME'))) +sub = sub.withColumn( + "DEVIATION", + F.round( + F.coalesce(F.col("AVG_30DAYS_CLAIM_AMOUNT"), F.lit(0)) / F.col("CLAIM_AMOUNT"), + 2, + ), +) -sub = sub.withColumn('DEVIATION', F.round(F.coalesce(F.col('AVG_30DAYS_CLAIM_AMOUNT'), F.lit(0)) / F.col('CLAIM_AMOUNT'), 2)) +cond1 = (F.col("DAYS_WITH_HISTORY") >= 30) & (F.col("DEVIATION") < 0.5) +cond2 = (F.col("DAYS_WITH_HISTORY") >= 30) & (F.col("DEVIATION") >= 0.5) -cond1 = ((F.col('DAYS_WITH_HISTORY') >= 30) & (F.col('DEVIATION') < 0.5)) -cond2 = ((F.col('DAYS_WITH_HISTORY') >= 30) & (F.col('DEVIATION') >= 0.5)) +sub = sub.withColumn("CLAIM_DEVIATION", F.when(cond1, 1).when(cond2, 0).otherwise(-1)) -sub = sub.withColumn('CLAIM_DEVIATION', F.when(cond1, 1).when(cond2, 0).otherwise(-1)) +claim_deviation = sub.select("TRANSACTION_ID", "CLAIM_DEVIATION") -claim_deviation = sub.select('TRANSACTION_ID', 'CLAIM_DEVIATION') - -df = df.join(claim_deviation, on = 'TRANSACTION_ID', how = 'left') +df = df.join(claim_deviation, on="TRANSACTION_ID", how="left") print("Task 4 Done") # COMMAND ---------- # task 5: apply discounts & increments in claim_amount -from pyspark.sql.functions import round as sp_round -def get_cond(type1, type2): - return ((col('INSURANCE_TYPE') == type1) | (col('INSURANCE_TYPE') == type2)) +def get_cond(type1, type2): + return (col("INSURANCE_TYPE") == type1) | (col("INSURANCE_TYPE") == type2) -df = df.withColumn('NEW_PREMIUM', \ - when(get_cond('Mobile', 'Travel'), sp_round(col('PREMIUM_AMOUNT') * 0.9, 2))\ - .when(get_cond('Health', 'Property'), sp_round(col('PREMIUM_AMOUNT') * 1.07, 2))\ - .when(get_cond('Life', 'Motor'), sp_round(col('PREMIUM_AMOUNT') * 1.02, 2)).otherwise('PREMIUM_AMOUNT')) +df = df.withColumn( + "NEW_PREMIUM", + when(get_cond("Mobile", "Travel"), sp_round(col("PREMIUM_AMOUNT") * 0.9, 2)) + .when(get_cond("Health", "Property"), sp_round(col("PREMIUM_AMOUNT") * 1.07, 2)) + .when(get_cond("Life", "Motor"), sp_round(col("PREMIUM_AMOUNT") * 1.02, 2)) + .otherwise("PREMIUM_AMOUNT"), +) print("Task 5 Done") # COMMAND ---------- -# task 6: create new column 'eligible_for_discount' -cond = (col('TENURE') > 60) & (col('EMPLOYMENT_STATUS') == 'N') & (col('NO_OF_FAMILY_MEMBERS') >= 4) +# task 6: create new column 'eligible_for_discount' +cond = ( + (col("TENURE") > 60) + & (col("EMPLOYMENT_STATUS") == "N") + & (col("NO_OF_FAMILY_MEMBERS") >= 4) +) -df = df.withColumn('ELIGIBLE_FOR_DISCOUNT', when(cond, 1).otherwise(0)) +df = df.withColumn("ELIGIBLE_FOR_DISCOUNT", when(cond, 1).otherwise(0)) print("Task 6 Done") # task 7: create new column 'claim_velocity' -sub = df.select('TRANSACTION_ID', 'INSURANCE_TYPE', 'TXN_DATE_TIME')\ - .withColumn('TXN_DATE_TIME', F.unix_timestamp('TXN_DATE_TIME')) +sub = df.select("TRANSACTION_ID", "INSURANCE_TYPE", "TXN_DATE_TIME").withColumn( + "TXN_DATE_TIME", F.unix_timestamp("TXN_DATE_TIME") +) -window_30_days = sw.make_window('INSURANCE_TYPE', 'TXN_DATE_TIME', -30 * 86400, -1 * 86400) +window_30_days = sw.make_window( + "INSURANCE_TYPE", "TXN_DATE_TIME", -30 * 86400, -1 * 86400 +) -sub = sub.withColumn('30_days_count', F.count('TRANSACTION_ID').over(window_30_days)) +sub = sub.withColumn("30_days_count", F.count("TRANSACTION_ID").over(window_30_days)) -window_3_days = sw.make_window('INSURANCE_TYPE', 'TXN_DATE_TIME', -3 * 86400, -1 * 86400) +window_3_days = sw.make_window( + "INSURANCE_TYPE", "TXN_DATE_TIME", -3 * 86400, -1 * 86400 +) -sub = sub.withColumn('3_days_count', F.count('TRANSACTION_ID').over(window_3_days)) +sub = sub.withColumn("3_days_count", F.count("TRANSACTION_ID").over(window_3_days)) -sub = sub.withColumn('CLAIM_VELOCITY', F.round(F.col('30_days_count') / F.col('3_days_count'), 2)) +sub = sub.withColumn( + "CLAIM_VELOCITY", F.round(F.col("30_days_count") / F.col("3_days_count"), 2) +) -claim_velocity = sub.select('TRANSACTION_ID', 'CLAIM_VELOCITY') +claim_velocity = sub.select("TRANSACTION_ID", "CLAIM_VELOCITY") -df = df.join(claim_velocity, on = 'TRANSACTION_ID', how = 'left') +df = df.join(claim_velocity, on="TRANSACTION_ID", how="left") print("Task 7 Done") # COMMAND ---------- # task 8: find all suspicious employees -cond = ((col('CLAIM_STATUS') == 'A') & (col('RISK_SEGMENTATION') == 'H') & (col('INCIDENT_SEVERITY') == 'Major Loss') & (col('CLAIM_AMOUNT') > 15000)) +cond = ( + (col("CLAIM_STATUS") == "A") + & (col("RISK_SEGMENTATION") == "H") + & (col("INCIDENT_SEVERITY") == "Major Loss") + & (col("CLAIM_AMOUNT") > 15000) +) -df = df.withColumn('SUSPICIOUS', when(cond, 1).otherwise(0)) +df = df.withColumn("SUSPICIOUS", when(cond, 1).otherwise(0)) print("Task 8 Done") # COMMAND ---------- # finally writting the data in transformed container -df.coalesce(1).write.csv(write_path + 'final_data.csv', header=True, mode="overwrite") +df.coalesce(1).write.csv(write_path + "final_data.csv", header=True, mode="overwrite") # COMMAND ---------- -if flag==False: +if flag is False: job.commit() print("Execution Complete") - diff --git a/setup.py b/setup.py index 7f50012..939da4d 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,3 @@ from setuptools import setup -# setup -setup( - packages=["app"], - name="app", - version="0.9", - install_requires=[] -) +setup(packages=["app"], name="app", version="0.9", install_requires=[]) diff --git a/sonar-project.properties b/sonar-project.properties new file mode 100644 index 0000000..f772ff1 --- /dev/null +++ b/sonar-project.properties @@ -0,0 +1,15 @@ +sonar.projectKey=wednesday-solutions_multi-cloud-etl-pipeline_AYxH5ljnvUodY4XtbCsH +sonar.projectName=Multi Cloud ETL Pipeline +sonar.sources=. +sonar.language=py +sonar.sourceEncoding=UTF-8 +sonar.python.pylint.reportPath=pylint-report.txt +sonar.python.coverage.reportPaths=*coverage.xml +sonar.python.pylint_config=.pylintrc +sonar.python.pylint=/usr/local/bin/pylint +sonar.inclusions=**/app/**,**/main.py +sonar.exclusions=**/app/**/tests/test_*.py,**/__init__.py,**/src/**/*.csv,**/app/tests/mock/*.* +sonar.test.exclusions=**/app/**/tests/test_*.py,**/__init__.py,**/app/**/*.csv,**/app/tests/mock/*.* +sonar.coverage.exclusions=**/app/**/tests/test_*.py,**/__init__.py,**/app/**/*.csv,**/app/tests/mock/*.* +sonar.text.excluded.file.suffixes=csv +sonar.python.version=3.7