Repository to demonstrate sample data engineering assignment
In this project, we will perform operations on raw reatil datasets to acheive results for analytic queries as part of assignment
- Background
- Assumptions
- Application Flow
- Data model diagram
- How to execute jobs
- Analytic queries with results
We have three data sets as follows -
Before proceeding with data lets visit expected ingestion frequency for them. This will help us to build system/flow accordingly
Name | Ingested frequency | Format |
---|---|---|
customer | daily | csv |
products | daily | csv |
orders | hourly | csv |
Based on above information we need to provide result of following queries
- Top 10 countries with the most number of customers.
- Revenue distribution by country.
- Relationship between average unit price of products and their sales volume.
- Top 3 products with the maximum unit price drop in the last month.
Before proceeding with actual development of scripts lets consider few assumptions which I have taken by looking at the given data
- Assumption for validating Customer data
- Remove records whose CustomerID or Country value is null
- For duplicate records, pick the last record as per country value ordering
e.g.
here, will select second recordCustomerId Country 12394 Belgium 12394 Denamrk CustomerId Country 12394 Denamrk
- Assumption for validating Products data
- Remove records whose Description or UnitPrice or StockCode value is null
- Remove records whose UnitPrice value is less than or equal to 0
- Remove all records whose description is not present in uppercase
- Remove all records whose StockCode value is not present in either of below formats
- 6 digit e.g. 84748
- 6 digit + code value e.g. 10124A
- In case of duplicate records -
- For duplicate StockCode, choose record whose description is not null and present in uppercase e.g.
Here, will select first recordStockCode Description Unit Price 85092 CANDY SPOT TEA COSY 1.56 85092 1.52 StockCode Description Unit Price 85092 CANDY SPOT TEA COSY 1.56 - For duplicate description, choose record whose StockCode biggest among them e.g.
here, will select second recordStockCode Description Unit Price 79190A RETRO PLASTIC 70'S TRAY 1.56 79192A RETRO PLASTIC 70'S TRAY 1.52 StockCode Description Unit Price 79192A RETRO PLASTIC 70'S TRAY 1.52 - For duplicate stockcodes, select records whose description is smallest among them e.g.
here, will select second recordStockCode Description Unit Price 23236 DOILEY BISCUIT TIN 1.05 23236 DOILEY STORAGE TIN 2.31 StockCode Description Unit Price 23236 DOILEY BISCUIT TIN 1.05
- For duplicate StockCode, choose record whose description is not null and present in uppercase e.g.
- Assumption for validating Orders data
- Remove records whose CustomerID is null
- Remove records whose quantity is invalid
- Remove records whose InvoiceNo is duplicated across CustomerID
- All files will be kept under 'bronze' directory. This will act as staging directory.
- In first stage, will do filtering, cleansing, validation as per assumption and enrichment of additional attributes as per requirement.
- Then, processed data will get store intermediately at 'silver' directory.
- intermediate data from 'silver' directory act as input to final jobs where aggregation and population of other metadata attributes will take place.
- Finally, all processed data will be put under 'gold' directory. This layer will act as final data model layer.
Name | bronze location and format | silver location and format | gold location and format |
---|---|---|---|
customer | '/bronze/customer/*.csv' | '/silver/customer/*.csv' | '/gold/customer_dim/*.parquet' |
products | '/bronze/products/*.csv' | '/silver/products/*.csv' | '/gold/products_dim/*.parquet' |
orders | '/bronze/orders/*.csv' | '/silver/orders/*.csv' | '/gold/orders_fact/*.parquet' |
Layer wise attribute details
-
customer
-
bronze layer
- customerid
- country
-
silver layer
- customerid
- country
- country_code
- region
-
gold layer
- customerid
- country
- country_code
- region
- insert_date
- updated_date
-
-
products
-
bronze layer
- stockcode
- description
- unitprice
-
silver layer
- stockcode
- description
- unitprice
-
gold layer
- stockcode
- product_code e.g. if stock_code is '123A' then product code is '123'
- product_category
- description
- unitprice
- eff_start_date
- eff_end_date
-
-
orders
- same across all layers
- invoiceno
- stockcode
- quantity
- InvoiceDate
- customerid
- same across all layers
- To get required data in final layer we have some dependency to process data
- I'm processing dimension data first. As it will be used for validation step in loading of fact data
- Lets look out directory structure
- As mentioned above we need to process customer data first
- There are two jobs which needs to execute
LoadCustomerDataToSilverSpark (package - main.spark.bronze_to_silver)
LoadCustomerDataToGoldSpark (package - main.spark.silver_to_gold)
- Note - as we are executing locally, you can use any IDE to check out code and execute them
- We can check if data is populated in directory 'gold/customer_dim'
- There are two jobs which needs to execute
- Now similarly we need to execute jobs to process products data
LoadProductsDataToSilverSpark (package - main.spark.bronze_to_silver)
LoadProductsDataToGoldSpark (package - main.spark.silver_to_gold)
- Then finally execute jobs to process orders data
LoadOrdersDataToSilverSpark (package - main.spark.bronze_to_silver)
LoadOrdersDataToGoldSpark (package - main.spark.silver_to_gold)
-
Once data is populated in final layer i.e. 'gold'. We can execute Task jobs to provide results for analytic queries mentioned above here
Execute `Task1` job to see results (package - spark.analytics) Result