Wenliang Zhao ([email protected]), Courant Institute
-
Data reader:
I use Databricks [spark-csv] to read csv data. Spark version: [Spark-2.1.0] -
Remove duplicate timestamp in events. Group by advertizer id, user id and event type, sort by timestamp. Check each record sequentially to see if the current one is within 1 minute of the previous one.
-
Convert events and impression to a generalized record format [GeneralRecord1]
-
Merge events and impressions so that we can compare timestamps together
-
Count attribution and generate a standard form of attribution dataset [GeneralAttribute] Group by aid and uid. This process collects two list: timestamp and event type. Sort these two lists by timestamp, then count for attribution based on the rule.
-
Count for attribution for each advertizer and event type from standard attribution dataset Group by aid and etype, sum all counts.
-
Count for unique user for each advertizer and event type from standard attribution dataset. Group by aid and etype, collect uid into a list, get unique count of the list.
- Summarize.scala: main file contains all pipeline.
- Parameters.scala: object storing original dataset schema, input file paths, case classes pool.
- DeDuplication.scala: object with methods working on de-duplication of events.
- Attribution.scala: object with methods working on attribution.
I include 2 kinds of testing:
- scalatest. scalatest in this project is used for unit test on utilities functions. We use the style "FlatSpec + Matchers".
- spark-testing-base.
Source Files:
- DeDuplicationMethodTest.scala: unit test for de-duplication methods
- DeDuplicationDatasetTest.scala: unit test for de-duplication dataset operations.
- AttributionMethodTest.scala: unit test for attribution methods.
- AttributionDatasetTest.scala: unit test for attribution dataset operations.
- PipelineIntegrationTest: integration test of whole pipeline process using testing files.
- Statement coverage: 84.54%.
- Branch coverage: 100%
There are two ways to run the code:
-
sbt:
Use runMain (or simply run) in sbt to run the code. Need to specify 2 parameters for the main function - event_file, impression_file. -
assembly jar.
Use sbt assembly to pack the source code into a jar file, and run it accordingly. There is a python code "run.py" which automatically run everything. There are 5 steps in run.py2.1. Remove results from previous run (output/*, metastore_db) 2.2. Clean files due to previous assembly 2.3. pack the source code using assembly 2.4. delete META-INF related unrelated files in jar using zip 2.5. Run the code.
NOTICE: If there is new data other then the testing files, user needs to change paths in run.py