From 97dc9ecf99eda2a7f79bb861bab2bdf4c7c37322 Mon Sep 17 00:00:00 2001 From: Maxime Lovino Date: Sat, 11 May 2019 22:11:35 +0200 Subject: [PATCH] Added sessionisation, wait times by borough --- src/main/scala/SavedSpark.scala | 22 ++++++++++--- src/main/scala/TaxiAnalytics.scala | 52 ++++++++++++++++++++++-------- src/main/scala/YellowSpark.scala | 6 ++-- 3 files changed, 59 insertions(+), 21 deletions(-) diff --git a/src/main/scala/SavedSpark.scala b/src/main/scala/SavedSpark.scala index d773b95..3cb7157 100644 --- a/src/main/scala/SavedSpark.scala +++ b/src/main/scala/SavedSpark.scala @@ -1,5 +1,4 @@ -import org.apache.spark.sql.SparkSession - +import org.apache.spark.sql.{SaveMode, SparkSession} /** * @author Maxime Lovino * @date 2019-05-11 @@ -23,9 +22,22 @@ object SavedSpark extends App { println(s"Number of rides total: ${df.count()}") - analytics.displayRateCodeStats(df) + analytics.statsByRateCode(df).show() + + analytics.statsByBoroughPairs(df).show(100) + + analytics.topDrivers(df).show(20) + + + val sessions = analytics.sessionise(df) + + sessions.cache() + + val waitTimes = analytics.waitTimesByBorough(sessions) + + waitTimes.write.mode(SaveMode.Overwrite).parquet("./src/main/resources/wait.df") + + waitTimes.show(100, truncate = false) - analytics.displayTipStatsByBorough(df) - analytics.topDrivers(df) } diff --git a/src/main/scala/TaxiAnalytics.scala b/src/main/scala/TaxiAnalytics.scala index f3033b0..adb934e 100644 --- a/src/main/scala/TaxiAnalytics.scala +++ b/src/main/scala/TaxiAnalytics.scala @@ -1,5 +1,7 @@ -import org.apache.spark.sql.functions.desc -import org.apache.spark.sql.{DataFrame, SparkSession} +import java.sql.Timestamp + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * @author Maxime Lovino @@ -18,22 +20,20 @@ class TaxiAnalytics(spark: SparkSession) { df.groupBy($"dropoff_borough").count().show() } - def displayRateCodeStats(df: DataFrame): Unit = { - df.groupBy($"rate_code").count().show() + def statsByRateCode(df: DataFrame): DataFrame = { + df.groupBy($"rate_code").count() } - def displayTipStatsByBorough(df: DataFrame): Unit = { - df.groupBy($"pickup_borough").avg("tip_amount").show() - df.groupBy($"dropoff_borough").avg("tip_amount").show() - val avgTip = df.groupBy($"pickup_borough", $"dropoff_borough").avg("tip_amount") + def statsByBoroughPairs(df: DataFrame): DataFrame = { + val avg = df.groupBy($"pickup_borough", $"dropoff_borough").avg("tip_amount", "trip_time_in_secs", "trip_distance_km", "taxi_revenue") + val sum = df.groupBy($"pickup_borough", $"dropoff_borough").sum("trip_time_in_secs", "trip_distance_km", "taxi_revenue") val count = df.groupBy($"pickup_borough", $"dropoff_borough").count() - avgTip.join(count, Seq("pickup_borough", "dropoff_borough")) + avg.join(count, Seq("pickup_borough", "dropoff_borough")).join(sum, Seq("pickup_borough", "dropoff_borough")) .orderBy("pickup_borough", "dropoff_borough") - .show(100, truncate = false) } - def topDrivers(df: DataFrame, count: Int = 20) = { + def topDrivers(df: DataFrame, count: Int = 20): DataFrame = { val groupByLicense = df.groupBy("hack_license").count() val avgByLicense = df.groupBy("hack_license").avg("average_speed_kmh") .withColumnRenamed("avg(average_speed_kmh)", "Average speed in km/h") @@ -48,12 +48,38 @@ class TaxiAnalytics(spark: SparkSession) { .withColumnRenamed("sum(fare_amount)", "FARES") .withColumnRenamed("sum(trip_time_in_secs)", "DURATION") - val aggByLicense = groupByLicense + groupByLicense .join(sumByLicense, "hack_license") .join(avgByLicense, "hack_license") .orderBy(desc("$$$$$")) + } + + def sessionise(df: DataFrame): DataFrame = { + df.repartition($"hack_license") + .sortWithinPartitions($"hack_license", $"pickup_datetime") + } + + def waitTimesByBorough(sessonisedDf: DataFrame): DataFrame = { + def waitingTime(r1: Row, r2: Row) = { + (r2.getAs[Timestamp]("pickup_datetime").getTime - r1.getAs[Timestamp]("dropoff_datetime").getTime) / 1000 + } + + val boroughDurations: DataFrame = sessonisedDf.mapPartitions(trips => { + val iter = trips.sliding(2) + + val viter = iter + .filter(_.size == 2) + .filter(p => p.head.getAs("hack_license") == p.tail.head.getAs("hack_license")) + + viter.map(p => (p.head.getAs[String]("dropoff_borough"), waitingTime(p.head, p.tail.head))) + }).toDF("dropoff_borough", "wait_seconds") - aggByLicense.show(count) + boroughDurations. + where("wait_seconds > 0 AND wait_seconds < 60*60*4") + .groupBy("dropoff_borough") + .agg(avg("wait_seconds"), stddev("wait_seconds")) + .withColumnRenamed("avg(wait_seconds)", "average_wait") + .withColumnRenamed("stddev_samp(wait_seconds)", "stdDev_wait") } } diff --git a/src/main/scala/YellowSpark.scala b/src/main/scala/YellowSpark.scala index f143c81..81cfc62 100644 --- a/src/main/scala/YellowSpark.scala +++ b/src/main/scala/YellowSpark.scala @@ -33,10 +33,10 @@ object YellowSpark extends App { println(s"Number of rides total: ${finalDf.count()}") - analytics.displayRateCodeStats(finalDf) + analytics.statsByRateCode(finalDf).show() - analytics.displayTipStatsByBorough(finalDf) + analytics.statsByBoroughPairs(finalDf).show(100) - analytics.topDrivers(finalDf) + analytics.topDrivers(finalDf).show(20) }