Skip to content

Commit

Permalink
Added sessionisation, wait times by borough
Browse files Browse the repository at this point in the history
  • Loading branch information
maximelovino committed May 11, 2019
1 parent 4bc4089 commit 97dc9ec
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 21 deletions.
22 changes: 17 additions & 5 deletions src/main/scala/SavedSpark.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* @author Maxime Lovino
* @date 2019-05-11
Expand All @@ -23,9 +22,22 @@ object SavedSpark extends App {

println(s"Number of rides total: ${df.count()}")

analytics.displayRateCodeStats(df)
analytics.statsByRateCode(df).show()

analytics.statsByBoroughPairs(df).show(100)

analytics.topDrivers(df).show(20)


val sessions = analytics.sessionise(df)

sessions.cache()

val waitTimes = analytics.waitTimesByBorough(sessions)

waitTimes.write.mode(SaveMode.Overwrite).parquet("./src/main/resources/wait.df")

waitTimes.show(100, truncate = false)

analytics.displayTipStatsByBorough(df)

analytics.topDrivers(df)
}
52 changes: 39 additions & 13 deletions src/main/scala/TaxiAnalytics.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.sql.Timestamp

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
* @author Maxime Lovino
Expand All @@ -18,22 +20,20 @@ class TaxiAnalytics(spark: SparkSession) {
df.groupBy($"dropoff_borough").count().show()
}

def displayRateCodeStats(df: DataFrame): Unit = {
df.groupBy($"rate_code").count().show()
def statsByRateCode(df: DataFrame): DataFrame = {
df.groupBy($"rate_code").count()
}

def displayTipStatsByBorough(df: DataFrame): Unit = {
df.groupBy($"pickup_borough").avg("tip_amount").show()
df.groupBy($"dropoff_borough").avg("tip_amount").show()
val avgTip = df.groupBy($"pickup_borough", $"dropoff_borough").avg("tip_amount")
def statsByBoroughPairs(df: DataFrame): DataFrame = {
val avg = df.groupBy($"pickup_borough", $"dropoff_borough").avg("tip_amount", "trip_time_in_secs", "trip_distance_km", "taxi_revenue")
val sum = df.groupBy($"pickup_borough", $"dropoff_borough").sum("trip_time_in_secs", "trip_distance_km", "taxi_revenue")
val count = df.groupBy($"pickup_borough", $"dropoff_borough").count()
avgTip.join(count, Seq("pickup_borough", "dropoff_borough"))
avg.join(count, Seq("pickup_borough", "dropoff_borough")).join(sum, Seq("pickup_borough", "dropoff_borough"))
.orderBy("pickup_borough", "dropoff_borough")
.show(100, truncate = false)
}


def topDrivers(df: DataFrame, count: Int = 20) = {
def topDrivers(df: DataFrame, count: Int = 20): DataFrame = {
val groupByLicense = df.groupBy("hack_license").count()
val avgByLicense = df.groupBy("hack_license").avg("average_speed_kmh")
.withColumnRenamed("avg(average_speed_kmh)", "Average speed in km/h")
Expand All @@ -48,12 +48,38 @@ class TaxiAnalytics(spark: SparkSession) {
.withColumnRenamed("sum(fare_amount)", "FARES")
.withColumnRenamed("sum(trip_time_in_secs)", "DURATION")

val aggByLicense = groupByLicense
groupByLicense
.join(sumByLicense, "hack_license")
.join(avgByLicense, "hack_license")
.orderBy(desc("$$$$$"))
}

def sessionise(df: DataFrame): DataFrame = {
df.repartition($"hack_license")
.sortWithinPartitions($"hack_license", $"pickup_datetime")
}

def waitTimesByBorough(sessonisedDf: DataFrame): DataFrame = {
def waitingTime(r1: Row, r2: Row) = {
(r2.getAs[Timestamp]("pickup_datetime").getTime - r1.getAs[Timestamp]("dropoff_datetime").getTime) / 1000
}

val boroughDurations: DataFrame = sessonisedDf.mapPartitions(trips => {
val iter = trips.sliding(2)

val viter = iter
.filter(_.size == 2)
.filter(p => p.head.getAs("hack_license") == p.tail.head.getAs("hack_license"))

viter.map(p => (p.head.getAs[String]("dropoff_borough"), waitingTime(p.head, p.tail.head)))
}).toDF("dropoff_borough", "wait_seconds")

aggByLicense.show(count)
boroughDurations.
where("wait_seconds > 0 AND wait_seconds < 60*60*4")
.groupBy("dropoff_borough")
.agg(avg("wait_seconds"), stddev("wait_seconds"))
.withColumnRenamed("avg(wait_seconds)", "average_wait")
.withColumnRenamed("stddev_samp(wait_seconds)", "stdDev_wait")
}

}
6 changes: 3 additions & 3 deletions src/main/scala/YellowSpark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ object YellowSpark extends App {

println(s"Number of rides total: ${finalDf.count()}")

analytics.displayRateCodeStats(finalDf)
analytics.statsByRateCode(finalDf).show()

analytics.displayTipStatsByBorough(finalDf)
analytics.statsByBoroughPairs(finalDf).show(100)

analytics.topDrivers(finalDf)
analytics.topDrivers(finalDf).show(20)

}

0 comments on commit 97dc9ec

Please sign in to comment.