Skip to content

Commit

Permalink
Cleaned up regression to do it for all rate codes, started working on…
Browse files Browse the repository at this point in the history
… congestion model
  • Loading branch information
maximelovino committed May 17, 2019
1 parent 62bf73c commit 413d0ad
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 13 deletions.
74 changes: 74 additions & 0 deletions src/main/scala/YellowSparkCongestion.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/**
* @author Maxime Lovino
* @date 2019-05-17
* @package
* @project YellowSpark
*/

object YellowSparkCongestion extends App {
val spark = SparkSession.builder()
.appName("Spark Taxi Congestion")
.master("local[*]")
.getOrCreate()

import spark.implicits._

val df = spark.read.parquet("./src/main/resources/rides.df")
.where("rate_code = 1")
.select("pickup_datetime", "dropoff_datetime", "trip_time_in_secs", "trip_distance_km", "average_speed_kmh", "pickup_borough", "dropoff_borough")
.where("average_speed_kmh > 0.0")
.where("average_speed_kmh < 100.0")
.where("trip_distance_km > 1")
.where("trip_time_in_secs > 30")
.withColumn("pickup_weekday", dayofweek($"pickup_datetime"))
.withColumn("pickup_hour", hour($"pickup_datetime"))
.withColumn("dropoff_weekday", dayofweek($"dropoff_datetime"))
.withColumn("dropoff_hour", hour($"dropoff_datetime"))
.drop("pickup_datetime", "dropoff_datetime")


val Array(train, test) = df.randomSplit(Array(0.7, 0.3))


val pickupIndexer = new StringIndexer()
.setInputCol("pickup_borough")
.setOutputCol("pickup_borough_index")

val dropoffIndexer = new StringIndexer()
.setInputCol("dropoff_borough")
.setOutputCol("dropoff_borough_index")

val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("pickup_weekday", "pickup_hour", "dropoff_weekday", "dropoff_hour", "pickup_borough_index", "dropoff_borough_index"))
.setOutputCols(Array("pickup_weekday_vec", "pickup_hour_vec", "dropoff_weekday_vec", "dropoff_hour_vec", "pickup_borough_vec", "dropoff_borough_vec"))

val rFormula = new RFormula().setFormula("average_speed_kmh ~ pickup_weekday_vec + pickup_hour_vec + dropoff_weekday_vec + dropoff_hour_vec + pickup_borough_vec + dropoff_borough_vec")


val regressor = new RandomForestRegressor().setLabelCol("label").setFeaturesCol("features")


val pipeline = new Pipeline()
.setStages(Array(pickupIndexer, dropoffIndexer, encoder, rFormula, regressor))


val fittedModel = pipeline.fit(train)


val predictions = fittedModel.transform(test)
predictions.show(10)

val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
}
54 changes: 41 additions & 13 deletions src/main/scala/YellowSparkRegression.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.SparkSession

/**
Expand All @@ -17,26 +18,53 @@ object YellowSparkRegression extends App {
.getOrCreate()

val df = spark.read.parquet("./src/main/resources/rides.df")
df.printSchema()

val preparedDF = df
.select("fare_amount", "rate_code", "trip_time_in_secs", "trip_distance_km", "surcharge", "total_amount", "tolls_amount", "tip_amount")
.withColumnRenamed("fare_amount", "cost")
.withColumnRenamed("trip_time_in_secs", "duration")
.withColumnRenamed("trip_distance_km", "distance")

df.printSchema()
preparedDF.printSchema()
val models = preparedDF.select("rate_code").distinct().where("rate_code BETWEEN 1 and 5").collect().map(row => {
val rateCode = row.getAs[Int]("rate_code")

println(s"Training for ratecode: $rateCode")

val rateCodeSet = preparedDF.where(s"rate_code = $rateCode")

val Array(train, test) = rateCodeSet.randomSplit(Array(0.7, 0.3))
val formula = new RFormula().setFormula("cost ~ duration + distance")

val reg = new LinearRegression()
.setLabelCol("label")
.setFeaturesCol("features")

val stages = Array(formula, reg)
val pipeline = new Pipeline().setStages(stages)

val evaluator = new RegressionEvaluator().setMetricName("mse").setPredictionCol("prediction").setLabelCol("label")

org.apache.spark.ml.regression.LinearRegression
val rateCode1 = df.where("rate_code = 1")
val formula = new RFormula().setFormula("cost ~ duration + distance")

val fitFormula = formula.fit(rateCode1)
val preparedDF = fitFormula.transform(rateCode1)
val fittedModel: PipelineModel = pipeline.fit(train)

val Array(train, test) = preparedDF.randomSplit(Array(0.7, 0.3))

val reg = new LinearRegression().setLabelCol("label").setFeaturesCol("features")
val fittedReg = reg.fit(train)
val testPredict = fittedModel.transform(test)
val mse = evaluator.evaluate(testPredict)

println(s"Coefficients: ${fittedReg.coefficients} Intercept: ${fittedReg.intercept}")
testPredict.show(10, truncate = false)
(rateCode, mse, fittedModel)
})

val evaluator = new RegressionEvaluator().setMetricName("mse").setPredictionCol("prediction").setLabelCol("label")
println(s"MSE: ${evaluator.evaluate(fittedReg.transform(test))}")
models.foreach {
case (rateCode, mse, fittedPipelineModel) => {
println(s"Model for rate code $rateCode:")
val fittedLr = fittedPipelineModel.stages.last.asInstanceOf[LinearRegressionModel]
val coefficients = fittedLr.coefficients
val intercept = fittedLr.intercept
println(s"MSE: $mse")
println(s"Formula: ${coefficients.apply(0)} * seconds + ${coefficients.apply(1)} * km + $intercept")
}
}
}

0 comments on commit 413d0ad

Please sign in to comment.