Skip to content

Commit

Permalink
Added save to parquet, file reading from parquet for faster analysis,…
Browse files Browse the repository at this point in the history
… more modularisation
  • Loading branch information
maximelovino committed May 11, 2019
1 parent 8fdb754 commit 4bc4089
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 42 deletions.
31 changes: 31 additions & 0 deletions src/main/scala/SavedSpark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import org.apache.spark.sql.SparkSession

/**
* @author Maxime Lovino
* @date 2019-05-11
* @package
* @project YellowSpark
*/

object SavedSpark extends App {
val spark = SparkSession.builder()
.appName("Spark Taxi saved")
.master("local[*]")
.getOrCreate()

val df = spark.read.parquet("./src/main/resources/rides.df")

df.printSchema()

val analytics = new TaxiAnalytics(spark)

analytics.displayBoroughStats(df)

println(s"Number of rides total: ${df.count()}")

analytics.displayRateCodeStats(df)

analytics.displayTipStatsByBorough(df)

analytics.topDrivers(df)
}
59 changes: 59 additions & 0 deletions src/main/scala/TaxiAnalytics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* @author Maxime Lovino
* @date 2019-05-11
* @package
* @project YellowSpark
*/

class TaxiAnalytics(spark: SparkSession) {

import spark.implicits._


def displayBoroughStats(df: DataFrame): Unit = {
df.groupBy($"pickup_borough").count().show()
df.groupBy($"dropoff_borough").count().show()
}

def displayRateCodeStats(df: DataFrame): Unit = {
df.groupBy($"rate_code").count().show()
}

def displayTipStatsByBorough(df: DataFrame): Unit = {
df.groupBy($"pickup_borough").avg("tip_amount").show()
df.groupBy($"dropoff_borough").avg("tip_amount").show()
val avgTip = df.groupBy($"pickup_borough", $"dropoff_borough").avg("tip_amount")
val count = df.groupBy($"pickup_borough", $"dropoff_borough").count()
avgTip.join(count, Seq("pickup_borough", "dropoff_borough"))
.orderBy("pickup_borough", "dropoff_borough")
.show(100, truncate = false)
}


def topDrivers(df: DataFrame, count: Int = 20) = {
val groupByLicense = df.groupBy("hack_license").count()
val avgByLicense = df.groupBy("hack_license").avg("average_speed_kmh")
.withColumnRenamed("avg(average_speed_kmh)", "Average speed in km/h")


val sumByLicense = df.groupBy("hack_license")
.sum("passenger_count", "trip_distance_km", "taxi_revenue", "tip_amount", "fare_amount", "trip_time_in_secs")
.withColumnRenamed("sum(passenger_count)", "total_passengers")
.withColumnRenamed("sum(trip_distance_km)", "total_distance_km")
.withColumnRenamed("sum(taxi_revenue)", "$$$$$")
.withColumnRenamed("sum(tip_amount)", "TIPS")
.withColumnRenamed("sum(fare_amount)", "FARES")
.withColumnRenamed("sum(trip_time_in_secs)", "DURATION")

val aggByLicense = groupByLicense
.join(sumByLicense, "hack_license")
.join(avgByLicense, "hack_license")
.orderBy(desc("$$$$$"))

aggByLicense.show(count)
}

}
49 changes: 7 additions & 42 deletions src/main/scala/YellowSpark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,58 +20,23 @@ object YellowSpark extends App {
import spark.implicits._


private def displayBoroughStats(df: DataFrame): Unit = {
df.groupBy($"pickup_borough").count().show()
df.groupBy($"dropoff_borough").count().show()
}

private def displayRateCodeStats(df: DataFrame): Unit = {
df.groupBy($"rate_code").count().show()
}

private def displayTipStatsByBorough(df: DataFrame): Unit = {
df.groupBy($"pickup_borough").avg("tip_amount").show()
df.groupBy($"dropoff_borough").avg("tip_amount").show()
}


private def topDrivers(df: DataFrame, count: Int = 20) = {
val groupByLicense = df.groupBy("hack_license").count()
val avgByLicense = df.groupBy("hack_license").avg("average_speed_kmh")
.withColumnRenamed("avg(average_speed_kmh)", "Average speed in km/h")


val sumByLicense = df.groupBy("hack_license")
.sum("passenger_count", "trip_distance_km", "taxi_revenue", "tip_amount", "fare_amount", "trip_time_in_secs")
.withColumnRenamed("sum(passenger_count)", "total_passengers")
.withColumnRenamed("sum(trip_distance_km)", "total_distance_km")
.withColumnRenamed("sum(taxi_revenue)", "$$$$$")
.withColumnRenamed("sum(tip_amount)", "TIPS")
.withColumnRenamed("sum(fare_amount)", "FARES")
.withColumnRenamed("sum(trip_time_in_secs)", "DURATION")

val aggByLicense = groupByLicense
.join(sumByLicense, "hack_license")
.join(avgByLicense, "hack_license")
.orderBy(desc("$$$$$"))

aggByLicense.show(count)
}

val boroughs = GeoReader.parseBoroughs()
val finalDf = TaxiReader.parseTaxiData(spark, boroughs)

finalDf.printSchema()
finalDf.show(2)
finalDf.write.parquet("./src/main/resources/rides.df")

val analytics = new TaxiAnalytics(spark)

displayBoroughStats(finalDf)
analytics.displayBoroughStats(finalDf)

println(s"Number of rides total: ${finalDf.count()}")

displayRateCodeStats(finalDf)
analytics.displayRateCodeStats(finalDf)

displayTipStatsByBorough(finalDf)
analytics.displayTipStatsByBorough(finalDf)

topDrivers(finalDf)
analytics.topDrivers(finalDf)

}
4 changes: 4 additions & 0 deletions src/main/scala/readers/TaxiReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ object TaxiReader {
.withColumn("dropoff_latitude", $"dropoff_latitude".cast(DoubleType))
.withColumn("pickup_borough", boroughUDF($"pickup_longitude", $"pickup_latitude"))
.withColumn("dropoff_borough", boroughUDF($"dropoff_longitude", $"dropoff_latitude"))
.drop("vendor_id")

faresDf.printSchema()
tripsDf.printSchema()

val df = tripsDf.join(faresDf, Seq("medallion", "hack_license", "pickup_datetime"), "inner")

Expand Down

0 comments on commit 4bc4089

Please sign in to comment.