Skip to content

Commit

Permalink
Merge pull request #2 from wednesday-solutions/feat/ci-integration
Browse files Browse the repository at this point in the history
Feat: integrating ci pipeline
  • Loading branch information
vighnesh-wednesday authored Dec 12, 2023
2 parents 1b583ba + b6bc7f5 commit 5152760
Show file tree
Hide file tree
Showing 18 changed files with 590 additions and 151 deletions.
52 changes: 52 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Data Engineering CI
on:
push:
branches:
- main
- feat/*
- fix/*

jobs:
run-ci:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.10.12

- name: Install dependencies
run: |
pip install mypy==1.7.1 pylint==3.0.2 pyspark==3.3.0 coverage
- name: Type check
run: mypy ./ --ignore-missing-imports

- name: Lint
run: pylint app/ main.py setup.py --output pylint-report.txt

- name: Testing
run: coverage run --source=app -m unittest discover -s app/tests/

- name: Test coverage report
run: coverage xml

- name: SonarQube Scan
uses: sonarsource/sonarqube-scan-action@master
with:
args: >
-Dsonar.scm.revision=${{ github.event.pull_request.head.sha }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}

- uses: sonarsource/sonarqube-quality-gate-action@master
timeout-minutes: 5
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
.env
.env
.coverage
app/__pycache__
app/tests/__pycache__
32 changes: 32 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
repos:
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.5.0
hooks:
- id: check-ast
- id: trailing-whitespace
- id: check-merge-conflict
- id: debug-statements
- id: detect-private-key
- id: end-of-file-fixer

- repo: local
hooks:
- id: mypy
name: Type Checking
entry: mypy ./ --ignore-missing-imports
language: system
always_run: true
types: [python3]
stages: [commit]

- id: lint
name: Linting
entry: pylint app/ main.py setup.py
language: system
always_run: true
types: [python3]
stages: [commit]
9 changes: 9 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[MASTER]
disable=C0114,C0103,C0116,R0914,E0401,W0511,W0108,R0911,R0801,C0115

[FORMAT]
max-line-length=150
max-module-lines=350

[SIMILARITIES]
min-similarity-lines=4
19 changes: 15 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ To run the same ETL code in multiple cloud services based on your preference, th
1. Clone this repo in your own repo.

2. For local IDE: Open configs file and write your own keys & storage accout name.
For Databricks job: Configure job parameters, then call them using ```dbutils.widgets.get(param_name)```
For Databricks job: Add your parameters in a json file in a already mounted container, then read it in a dictionary.

3. Change your path for read & write in Extraction.py file, you can also change Extraction logic to use other sources.

4. (Optional) If you are connecting local IDE then uncomment the following line in the main file:
4. (Optional) If you are NOT connecting local IDE then comment the following line in the main file:
```
spark, dbutils = cd.init_databricks()
```
This fucntions connects to your cloud running cluster.
Also note that the ```dbutils.widgets.get(param_name)``` method only works in databricks jobs, so for other environments you have to load the keys directly or use an abstraction method like ```load_env()```
Also you need to make a Databricks CLI profile, refer: [Configure Databricks Workspace CLI](https://www.youtube.com/watch?v=h4L064NfMV0&ab_channel=WafaStudies)
Also note that the ```keys.json``` file which i'm reading is in an already mounted container, so for other environments you have to load the keys directly or use an abstraction method like ```load_env()```
5. Just run on the local IDE to develop & test.
Expand Down Expand Up @@ -88,4 +91,12 @@ Steps:
5. Upload this wheel file in your S3 bucket then paste it's uri path in "Python Library path" in Glue Job details.
Refer: [Glue Programming libraries](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-libraries.html)
## Run Tests & Coverage Report
To run tests in the root of the directory use:
coverage run --source=app -m unittest discover -s app/tests/
coverage report
Note that awsglue libraries are not availabe to download, so use AWS Glue 4 Docker container.
13 changes: 8 additions & 5 deletions app/Extraction.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import subprocess
import os
os.system('pip install kaggle')
import kaggle

os.system("pip install kaggle")
import kaggle # pylint: disable=wrong-import-position


def extract_from_kaggle(flag: bool):
if flag:
print("WRONG BIT RAN!!!!")
read_path = "/dbfs/mnt/rawdata/"
write_path = "/mnt/transformed/"
else:
Expand All @@ -14,11 +15,13 @@ def extract_from_kaggle(flag: bool):

api = kaggle.KaggleApi()
api.authenticate()
api.dataset_download_cli("mastmustu/insurance-claims-fraud-data", unzip=True, path=read_path)
api.dataset_download_cli(
"mastmustu/insurance-claims-fraud-data", unzip=True, path=read_path
)

if flag:
read_path = read_path[5:]
else:
read_path = "s3://glue-bucket-vighnesh/rawdata/"

return read_path, write_path
return read_path, write_path
22 changes: 17 additions & 5 deletions app/SparkWrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql import Window, WindowSpec


def create_frame(sc: SparkSession, path: str):
Expand All @@ -9,15 +8,28 @@ def create_frame(sc: SparkSession, path: str):


def rename_columns(df: DataFrame, names: dict) -> DataFrame:
if not isinstance(names, dict):
raise TypeError("WRONG DATATYPE: column names should be dictionary")

columns = df.columns
renamed_df = df
for old_col, new_col in names.items():
if old_col not in columns:
raise ValueError(
f"COLUMN DOESN'T EXIST: Column '{old_col}' does not exist in the DataFrame"
)
renamed_df = renamed_df.withColumnRenamed(old_col, new_col)

return renamed_df


def value_counts(df: DataFrame, column: str) -> DataFrame:
return df.groupBy(column).count().orderBy('count', ascending=False)
return df.groupBy(column).count().orderBy(["count", column], ascending=False)


def make_window(partition: str, order: str, range_from: int, range_to: int) -> Window:
return Window.partitionBy(partition).orderBy(order).rangeBetween(range_from, range_to)
def make_window(
partition: str, order: str, range_from: int, range_to: int
) -> WindowSpec:
return (
Window.partitionBy(partition).orderBy(order).rangeBetween(range_from, range_to)
)
5 changes: 0 additions & 5 deletions app/configs.py

This file was deleted.

42 changes: 22 additions & 20 deletions app/connect_databricks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import os
from databricks.connect import DatabricksSession
from databricks.sdk import WorkspaceClient

storage_name = os.environ['storage_account_name']
storage_key = os.environ['datalake_access_key']


def init_databricks():
os.system("cp /dbfs/mnt/config/databricks-connect.txt ~/.databrickscfg")

spark = DatabricksSession.builder.getOrCreate()

dbutils = WorkspaceClient().dbutils
Expand All @@ -14,27 +14,29 @@ def init_databricks():


def create_mount(dbutils, container_name, mount_path):
storage_name = os.environ["storage_account_name"]
storage_key = os.environ["datalake_access_key"]

mounts = [x.mountPoint for x in dbutils.fs.mounts()]
try:
if mount_path not in [x.mountPoint for x in dbutils.fs.mounts()]:
dbutils.fs.mount(
source = f'wasbs://{container_name}@{storage_name}.blob.core.windows.net/',
mount_point = mount_path,
extra_configs = {f'fs.azure.account.key.{storage_name}.blob.core.windows.net': storage_key})
print(f"{mount_path} Mount Successfull")
else:
dbutils.fs.refreshMounts()
print(f"{mount_path} Already mounted")
except Exception as e:
print(f"{mount_path} Error: " + e)
if mount_path not in mounts:
dbutils.fs.mount(
source=f"wasbs://{container_name}@{storage_name}.blob.core.windows.net/",
mount_point=mount_path,
extra_configs={
f"fs.azure.account.key.{storage_name}.blob.core.windows.net": storage_key
},
)
print(f"{mount_path} Mount Successfull")
else:
dbutils.fs.refreshMounts()
print(f"{mount_path} Already mounted")


def unmount(dbutils, mount_path):
try:
dbutils.fs.unmount(mount_path)
print("Unmount Successful")
except Exception as e:
print("Error: " + e)



except FileNotFoundError:
print(f"Error: Path not found - {mount_path}")
except Exception as e: # pylint: disable=W0718
print(f"Error: {e}")
7 changes: 2 additions & 5 deletions app/connect_glue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
Expand All @@ -11,5 +8,5 @@ def init_glue():
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
return glueContext, spark, job

return glueContext, spark, job
Empty file added app/tests/__init__.py
Empty file.
52 changes: 52 additions & 0 deletions app/tests/connect_glue_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import unittest
from unittest.mock import patch, MagicMock
from app.connect_glue import init_glue


class TestInitGlue(unittest.TestCase):
@patch("app.connect_glue.SparkContext")
@patch("app.connect_glue.GlueContext")
@patch("app.connect_glue.Job")
def test_init_glue(self, mock_job, mock_glue_context, mock_spark_context):
# Mock the SparkContext, GlueContext, and Job
mock_spark_context_instance = MagicMock()
mock_glue_context_instance = MagicMock()
mock_job_instance = MagicMock()

# Set up the behavior of the mock instances
mock_spark_context.return_value = mock_spark_context_instance
mock_glue_context.return_value = mock_glue_context_instance
mock_job.return_value = mock_job_instance

# Call the function to test
glue_context, spark, job = init_glue()

# Assertions
mock_spark_context.assert_called_once()
mock_glue_context.assert_called_once_with(mock_spark_context_instance)
mock_job.assert_called_once_with(mock_glue_context_instance)

# Check if the returned values are correct
self.assertEqual(glue_context, mock_glue_context_instance)
self.assertEqual(spark, mock_glue_context_instance.spark_session)
self.assertEqual(job, mock_job_instance)

@patch("app.connect_glue.SparkContext")
@patch("app.connect_glue.GlueContext")
@patch("app.connect_glue.Job")
def test_init_glue_failure(self, mock_job, mock_glue_context, mock_spark_context):
# Simulate a ValueError during SparkContext initialization
error_statement = "Simulated SparkContext initialization failure"
mock_spark_context.side_effect = ValueError(error_statement)

# Call the function to test
with self.assertRaises(ValueError) as context:
init_glue()

# Assertions
mock_spark_context.assert_called_once()
mock_glue_context.assert_not_called() # GlueContext should not be called if SparkContext initialization fails
mock_job.assert_not_called() # Job should not be called if SparkContext initialization fails

# Check if the error displayed correctly
self.assertEqual(str(context.exception), error_statement)
16 changes: 16 additions & 0 deletions app/tests/mock/sample.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
stock_name,market,close_price,date
ABC Corp,NYSE,100.25,2023-01-01
DEF Ltd,LSE,50.75,2023-01-01
XYZ Inc,NASDAQ,75.50,2023-01-01
ABC Corp,NYSE,95.20,2023-01-11
DEF Ltd,LSE,55.40,2023-01-11
XYZ Inc,NASDAQ,80.10,2023-01-11
ABC Corp,NYSE,105.80,2023-01-21
DEF Ltd,LSE,60.20,2023-01-21
XYZ Inc,NASDAQ,92.40,2023-01-21
ABC Corp,NYSE,110.50,2023-01-31
DEF Ltd,LSE,68.75,2023-01-31
XYZ Inc,NASDAQ,102.60,2023-01-31
ABC Corp,NYSE,115.75,2023-02-10
DEF Ltd,LSE,75.30,2023-02-10
XYZ Inc,NASDAQ,112.20,2023-02-10
Loading

0 comments on commit 5152760

Please sign in to comment.