diff --git a/src/main/scala/YellowSparkCongestion.scala b/src/main/scala/YellowSparkCongestion.scala new file mode 100644 index 0000000..2634da0 --- /dev/null +++ b/src/main/scala/YellowSparkCongestion.scala @@ -0,0 +1,74 @@ +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.evaluation.RegressionEvaluator +import org.apache.spark.ml.feature._ +import org.apache.spark.ml.regression.RandomForestRegressor +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ + +/** + * @author Maxime Lovino + * @date 2019-05-17 + * @package + * @project YellowSpark + */ + +object YellowSparkCongestion extends App { + val spark = SparkSession.builder() + .appName("Spark Taxi Congestion") + .master("local[*]") + .getOrCreate() + + import spark.implicits._ + + val df = spark.read.parquet("./src/main/resources/rides.df") + .where("rate_code = 1") + .select("pickup_datetime", "dropoff_datetime", "trip_time_in_secs", "trip_distance_km", "average_speed_kmh", "pickup_borough", "dropoff_borough") + .where("average_speed_kmh > 0.0") + .where("average_speed_kmh < 100.0") + .where("trip_distance_km > 1") + .where("trip_time_in_secs > 30") + .withColumn("pickup_weekday", dayofweek($"pickup_datetime")) + .withColumn("pickup_hour", hour($"pickup_datetime")) + .withColumn("dropoff_weekday", dayofweek($"dropoff_datetime")) + .withColumn("dropoff_hour", hour($"dropoff_datetime")) + .drop("pickup_datetime", "dropoff_datetime") + + + val Array(train, test) = df.randomSplit(Array(0.7, 0.3)) + + + val pickupIndexer = new StringIndexer() + .setInputCol("pickup_borough") + .setOutputCol("pickup_borough_index") + + val dropoffIndexer = new StringIndexer() + .setInputCol("dropoff_borough") + .setOutputCol("dropoff_borough_index") + + val encoder = new OneHotEncoderEstimator() + .setInputCols(Array("pickup_weekday", "pickup_hour", "dropoff_weekday", "dropoff_hour", "pickup_borough_index", "dropoff_borough_index")) + .setOutputCols(Array("pickup_weekday_vec", "pickup_hour_vec", "dropoff_weekday_vec", "dropoff_hour_vec", "pickup_borough_vec", "dropoff_borough_vec")) + + val rFormula = new RFormula().setFormula("average_speed_kmh ~ pickup_weekday_vec + pickup_hour_vec + dropoff_weekday_vec + dropoff_hour_vec + pickup_borough_vec + dropoff_borough_vec") + + + val regressor = new RandomForestRegressor().setLabelCol("label").setFeaturesCol("features") + + + val pipeline = new Pipeline() + .setStages(Array(pickupIndexer, dropoffIndexer, encoder, rFormula, regressor)) + + + val fittedModel = pipeline.fit(train) + + + val predictions = fittedModel.transform(test) + predictions.show(10) + + val evaluator = new RegressionEvaluator() + .setLabelCol("label") + .setPredictionCol("prediction") + .setMetricName("rmse") + val rmse = evaluator.evaluate(predictions) + println(s"Root Mean Squared Error (RMSE) on test data = $rmse") +} diff --git a/src/main/scala/YellowSparkRegression.scala b/src/main/scala/YellowSparkRegression.scala index 7a96eee..dbd2019 100644 --- a/src/main/scala/YellowSparkRegression.scala +++ b/src/main/scala/YellowSparkRegression.scala @@ -1,6 +1,7 @@ import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.RFormula -import org.apache.spark.ml.regression.LinearRegression +import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} +import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.sql.SparkSession /** @@ -17,26 +18,53 @@ object YellowSparkRegression extends App { .getOrCreate() val df = spark.read.parquet("./src/main/resources/rides.df") + df.printSchema() + + val preparedDF = df + .select("fare_amount", "rate_code", "trip_time_in_secs", "trip_distance_km", "surcharge", "total_amount", "tolls_amount", "tip_amount") .withColumnRenamed("fare_amount", "cost") .withColumnRenamed("trip_time_in_secs", "duration") .withColumnRenamed("trip_distance_km", "distance") - df.printSchema() + preparedDF.printSchema() + val models = preparedDF.select("rate_code").distinct().where("rate_code BETWEEN 1 and 5").collect().map(row => { + val rateCode = row.getAs[Int]("rate_code") + + println(s"Training for ratecode: $rateCode") + + val rateCodeSet = preparedDF.where(s"rate_code = $rateCode") + + val Array(train, test) = rateCodeSet.randomSplit(Array(0.7, 0.3)) + val formula = new RFormula().setFormula("cost ~ duration + distance") + + val reg = new LinearRegression() + .setLabelCol("label") + .setFeaturesCol("features") + + val stages = Array(formula, reg) + val pipeline = new Pipeline().setStages(stages) + + val evaluator = new RegressionEvaluator().setMetricName("mse").setPredictionCol("prediction").setLabelCol("label") - org.apache.spark.ml.regression.LinearRegression - val rateCode1 = df.where("rate_code = 1") - val formula = new RFormula().setFormula("cost ~ duration + distance") - val fitFormula = formula.fit(rateCode1) - val preparedDF = fitFormula.transform(rateCode1) + val fittedModel: PipelineModel = pipeline.fit(train) - val Array(train, test) = preparedDF.randomSplit(Array(0.7, 0.3)) - val reg = new LinearRegression().setLabelCol("label").setFeaturesCol("features") - val fittedReg = reg.fit(train) + val testPredict = fittedModel.transform(test) + val mse = evaluator.evaluate(testPredict) - println(s"Coefficients: ${fittedReg.coefficients} Intercept: ${fittedReg.intercept}") + testPredict.show(10, truncate = false) + (rateCode, mse, fittedModel) + }) - val evaluator = new RegressionEvaluator().setMetricName("mse").setPredictionCol("prediction").setLabelCol("label") - println(s"MSE: ${evaluator.evaluate(fittedReg.transform(test))}") + models.foreach { + case (rateCode, mse, fittedPipelineModel) => { + println(s"Model for rate code $rateCode:") + val fittedLr = fittedPipelineModel.stages.last.asInstanceOf[LinearRegressionModel] + val coefficients = fittedLr.coefficients + val intercept = fittedLr.intercept + println(s"MSE: $mse") + println(s"Formula: ${coefficients.apply(0)} * seconds + ${coefficients.apply(1)} * km + $intercept") + } + } }