Skip to content

Commit

Permalink
Update to new version of the connector. Add new Spark SQL example
Browse files Browse the repository at this point in the history
  • Loading branch information
doanduyhai committed Jan 24, 2015
1 parent ea54538 commit e3ee5b5
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 29 deletions.
9 changes: 5 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ scalaVersion := "2.10.4"


libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.1.0" withSources() withJavadoc(),
"org.apache.spark" %% "spark-sql" % "1.1.0" withSources() withJavadoc(),
"org.apache.spark" %% "spark-streaming-twitter" % "1.1.0" withSources() withJavadoc(),
"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0-alpha2" withSources() withJavadoc()
"org.apache.spark" %% "spark-core" % "1.2.0" withSources() withJavadoc(),
"org.apache.spark" %% "spark-sql" % "1.2.0" withSources() withJavadoc(),
"org.apache.spark" %% "spark-streaming-twitter" % "1.2.0" withSources() withJavadoc(),
"com.datastax.spark" %% "spark-cassandra-connector" % "1.2.0-alpha1" withSources() withJavadoc(),
"com.github.nscala-time" %% "nscala-time" % "1.6.0" withSources() withJavadoc()
)


Expand Down
4 changes: 2 additions & 2 deletions src/main/data/weather_data_schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ CREATE TABLE IF NOT EXISTS weather_station (
);

TRUNCATE weather_station;
CREATE INDEX weather_station_country ON weather_station (country_code);
//COPY weather_station (id, name, country_code, state_code, call_sign, lat, long, elevation) FROM 'stations.csv';
//CREATE INDEX weather_station_country ON weather_station (country_code);
COPY weather_station (id, name, country_code, state_code, call_sign, lat, long, elevation) FROM 'stations.csv';

/*
Raw weather readings from a single station, hourly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object FromCSVCaseClassToCassandra {

val conf = new SparkConf(true)
.setAppName("write_csv_to_cassandra")
.setMaster("local[4]")
.setMaster("local")
.set("spark.cassandra.connection.host", "localhost")

val sc = new SparkContext(conf)
Expand All @@ -44,5 +44,7 @@ object FromCSVCaseClassToCassandra {
lines(8).toInt, lines(9).toDouble, lines(10).toInt)
}}
.saveToCassandra(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE)

sc.stop()
}
}
6 changes: 4 additions & 2 deletions src/main/scala/us/unemployment/demo/FromCSVToCassandra.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object FromCSVToCassandra {

val conf = new SparkConf(true)
.setAppName("write_csv_to_cassandra")
.setMaster("local[4]")
.setMaster("local")
.set("spark.cassandra.connection.host", "localhost")

val sc = new SparkContext(conf)
Expand All @@ -64,8 +64,10 @@ object FromCSVToCassandra {
lines(8).toInt, lines(9).toDouble, lines(10).toInt)
}}
.saveToCassandra(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE, SomeColumns(TABLE_COLUMNS:_*))

sc.stop()
}


}

}
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package us.unemployment.demo

import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd.CassandraRDD
import org.apache.spark.{SparkConf, SparkContext}
import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE}

object FromCassandraToCaseClass {

def main (args: Array[String]) {

val conf = new SparkConf(true)
.setAppName("read_csv_from_cassandra_into_case_class")
.setMaster("local[4]")
.setMaster("local")
.set("spark.cassandra.connection.host", "localhost")

val sc = new SparkContext(conf)

sc.cassandraTable[UsUnemployment](UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE)
sc.cassandraTable[UsUnemployment](KEYSPACE, TABLE)
.foreach(println(_))

sc.stop()
}


Expand Down
17 changes: 15 additions & 2 deletions src/main/scala/us/unemployment/demo/FromCassandraToRow.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
package us.unemployment.demo

import org.apache.spark.SparkContext._
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd.CassandraRDD
import org.apache.spark.{SparkConf, SparkContext}
import us.unemployment.demo.UsUnemploymentSchema.{TABLE, KEYSPACE}

object FromCassandraToRow {

def main (args: Array[String]) {

val conf = new SparkConf(true)
.setAppName("read_csv_from_cassandra_into_case_class")
.setMaster("local[4]")
.setMaster("local")
.set("spark.cassandra.connection.host", "localhost")


val sc = new SparkContext(conf)

sc.cassandraTable(UsUnemploymentSchema.KEYSPACE, UsUnemploymentSchema.TABLE)
sc.cassandraTable(KEYSPACE, TABLE)
.foreach( row => {
val year = row.getInt("year")
val unemployedPercentageToLabor = row.getDouble("unemployed_percentage_to_labor")
println(s"""Year($year), unemployment % : $unemployedPercentageToLabor""")
}
)

// or
val unemployment: CassandraRDD[(String, Double)] = sc.cassandraTable(KEYSPACE, TABLE)
.select("year", "unemployed_percentage_to_labor").as((_: String, _: Double))

println(" ------------Alternative ----------------- ")

unemployment
.sortByKey()
.collect().foreach{case (year,percentage) => println(s""" Year($year), unemployment % : $percentage""")}

sc.stop()
}
}
33 changes: 33 additions & 0 deletions src/main/scala/us/unemployment/demo/FromCassandraToSQL.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package us.unemployment.demo

import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}
import us.unemployment.demo.UsUnemploymentSchema.{KEYSPACE, TABLE}

object FromCassandraToSQL {

def main (args: Array[String]) {

val conf = new SparkConf(true)
.setAppName("read_csv_from_cassandra_into_case_class")
.setMaster("local")
.set("spark.cassandra.connection.host", "localhost")

val sc = new SparkContext(conf)

val cc = new CassandraSQLContext(sc)

cc.setKeyspace(KEYSPACE)

val row: SchemaRDD = cc.cassandraSql(s"SELECT year,unemployed_percentage_to_labor " +
s"FROM $TABLE WHERE unemployed_percentage_to_labor > 8 " +
s"ORDER BY year DESC")

row.collect().foreach(row => println(s" Year(${row(0)}): ${row(1)}%"))

sc.stop()
}


}
44 changes: 31 additions & 13 deletions src/main/scala/weather/data/demo/WeatherDataFromCassandra.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package weather.data.demo

import java.util.Date

import com.datastax.spark.connector._
import org.apache.spark.SparkContext._
import com.github.nscala_time.time.Imports._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.Seconds
import weather.data.demo.WeatherDataSchema.{WEATHER_DATA_TABLE, WEATHER_STATION_TABLE, KEYSPACE}

import scala.collection.immutable.TreeMap
import scala.collection.{SortedMap, mutable, Map}

object WeatherDataFromCassandra {

Expand All @@ -11,38 +20,47 @@ object WeatherDataFromCassandra {

val conf = new SparkConf(true)
.setAppName("write_csv_to_cassandra")
.setMaster("local[4]")
.setMaster("local[2]")
.set("spark.cassandra.input.page.row.size","10000")
.set("spark.cassandra.input.split.size","1000000")
.set("spark.cassandra.connection.host", "localhost")

val sc = new SparkContext(conf)

// Fetch only FRENCH weather stations
val frenchWeatherStations = sc.cassandraTable[WeatherStation](WeatherDataSchema.KEYSPACE, WeatherDataSchema.WEATHER_STATION_TABLE)
val frenchWeatherStations = sc.cassandraTable[WeatherStation](KEYSPACE, WEATHER_STATION_TABLE)
.filter(_.countryCode == "FR").map(station => (station.id, station.name)).collectAsMap()

val stations: Broadcast[Map[String, String]] = sc.broadcast(frenchWeatherStations)

val startTime = DateTime.now

// Calculate average daily temperature & pressure over all French stations, between March 2014 and June 2014
val frenchSamples = sc.cassandraTable[MonthlySampleData](WeatherDataSchema.KEYSPACE, WeatherDataSchema.WEATHER_DATA_TABLE)
.select("weather_station","year","month","temperature","pressure")
.filter(instance => frenchWeatherStations.contains(instance.weatherStation)) //only French stations
val frenchSamples: Map[String, Sample] = sc.cassandraTable[MonthlySampleData](KEYSPACE, WEATHER_DATA_TABLE)
.select("weather_station", "year", "month", "temperature", "pressure")
.filter(instance => stations.value.contains(instance.weatherStation)) //only French stations
.filter(instance => instance.month >= 3 && instance.month <= 6) // between March 2014 and June 2014
.map(data => {
val date = new StringBuilder().append(data.year).append("-").append(data.month.formatted("%02d")).toString()
(date, Sample(data.temperature, data.pressure))
})
.mapValues{case sample => (sample,1)} // add counter. (date, sample) becomes (date, (sample,1))
.reduceByKey{ case ((sample1,count1),(sample2,count2)) => (sample1+sample2,count1+count2)} // sum temperature,pressure,occurence
.mapValues{case (sample,totalCount) => sample.avg(totalCount)} // average on temperature & pressure
.sortByKey()
val date = new mutable.StringBuilder().append(data.year).append("-").append(data.month.formatted("%02d")).toString()
(date, Sample(data.temperature, data.pressure))
})
.mapValues { case sample => (sample, 1)} // add counter. (date, sample) becomes (date, (sample,1))
.reduceByKey { case ((sample1, count1), (sample2, count2)) => (sample1 + sample2, count1 + count2)} // sum temperature,pressure,occurence
.mapValues { case (sample, totalCount) => sample.avg(totalCount)} // average on temperature & pressure
.collectAsMap()



println("")
println("/************************************ RESULT *******************************/")
println("")
println(frenchSamples)
println((SortedMap.empty[String, Sample] ++ frenchSamples))
println("")
println("/***************************************************************************/")
val endTime = DateTime.now

println(s"Job start time : $startTime, end time : $endTime")

}

case class Sample(temperature:Float,pressure:Float) {
Expand Down
18 changes: 15 additions & 3 deletions src/main/scala/weather/data/demo/WeatherDataIntoCassandra.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package weather.data.demo

import com.datastax.spark.connector.{SomeColumns, _}
import com.github.nscala_time.time.Imports._
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.Period

object WeatherDataIntoCassandra {

val WEATHER_2014_CSV: String = "/path/to/2014.csv"
val WEATHER_2014_CSV: String = "/Users/archinnovinfo/perso/spark_data/Weather Data 2014.csv"
val TABLE_COLUMNS = Seq("weather_station", "year", "month", "day", "hour",
"temperature", "dewpoint", "pressure", "wind_direction", "wind_speed",
"sky_condition", "sky_condition_text", "one_hour_precip", "six_hour_precip")
Expand All @@ -15,14 +17,17 @@ object WeatherDataIntoCassandra {

val conf = new SparkConf(true)
.setAppName("write_csv_to_cassandra")
.setMaster("local[4]")
.setMaster("local[2]")
.set("spark.cassandra.connection.host", "localhost")

val sc = new SparkContext(conf)

val skyConditions = sc.cassandraTable[SkyCondition](WeatherDataSchema.KEYSPACE, WeatherDataSchema.SKY_CONDITION_TABLE).collect()
.map(instance => (instance.code, instance.condition)).toMap;

val skyConditionBc = sc.broadcast(skyConditions)

val startTime = DateTime.now

// Example of raw data
// 010010:99999,
Expand All @@ -36,11 +41,18 @@ object WeatherDataIntoCassandra {

(lines(0), lines(1).toInt, lines(2).toInt, lines(3).toInt, lines(4).toInt,
lines(5).toFloat, lines(6).toFloat, lines(7).toFloat, lines(8).toFloat.toInt, lines(9).toFloat,
skyConditionCode, skyConditions.get(skyConditionCode), lines(11).toFloat, lines(12).toFloat)
skyConditionCode, skyConditionBc.value.get(skyConditionCode), lines(11).toFloat, lines(12).toFloat)
}}
.saveToCassandra(WeatherDataSchema.KEYSPACE, WeatherDataSchema.WEATHER_DATA_TABLE, SomeColumns(TABLE_COLUMNS:_*))

val endTime = DateTime.now

val period: Period = new Period(endTime, startTime)

println(s"Job duration (sec) : ${period.getSeconds}")
}


case class SkyCondition(code:Int, condition:String)

}

0 comments on commit e3ee5b5

Please sign in to comment.