Skip to content

Commit

Permalink
Issue #SC-231 merge: Merge pull request #37 from Sanjay-Krishnan93/da…
Browse files Browse the repository at this point in the history
…ily-device-summary-enhancement

Issue #SC-231 chore: change default value of first access in Device s…
  • Loading branch information
sowmya-dixit authored Jan 10, 2019
2 parents 544a31c + 822debc commit 1d0df3f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.ekstep.analytics.util.Constants
case class DeviceIndex(device_id: String, channel: String)
case class DialStats(total_count: Long, success_count: Long, failure_count: Long)
case class DeviceInput(index: DeviceIndex, wfsData: Option[Buffer[DerivedEvent]], rawData: Option[Buffer[V3Event]]) extends AlgoInput
case class DeviceSummary(device_id: String, channel: String, total_ts: Double, total_launches: Long, contents_played: Long, unique_contents_played: Long, content_downloads: Long, dial_stats: DialStats, dt_range: DtRange, syncts: Long, firstAccess: Long = 0L) extends AlgoOutput
case class DeviceSummary(device_id: String, channel: String, total_ts: Double, total_launches: Long, contents_played: Long, unique_contents_played: Long, content_downloads: Long, dial_stats: DialStats, dt_range: DtRange, syncts: Long, firstAccess: Long) extends AlgoOutput

object DeviceSummaryModel extends IBatchModelTemplate[String, DeviceInput, DeviceSummary, MeasuredEvent] with Serializable {

Expand Down Expand Up @@ -65,16 +65,21 @@ object DeviceSummaryModel extends IBatchModelTemplate[String, DeviceInput, Devic
val dial_success = dialcodes_events.filter(f => f.edata.size > 0).length
val dial_failure = dialcodes_events.filter(f => f.edata.size == 0).length
val content_downloads = raw.filter(f => "INTERACT".equals(f.eid)).length
((index.device_id, index.channel),DeviceSummary(index.device_id, index.channel, CommonUtil.roundDouble(total_ts, 2), total_launches, contents_played, unique_contents_played, content_downloads, DialStats(dial_count, dial_success, dial_failure), DtRange(startTimestamp, endTimestamp), syncts))
(index, DeviceSummary(index.device_id, index.channel, CommonUtil.roundDouble(total_ts, 2), total_launches, contents_played, unique_contents_played, content_downloads, DialStats(dial_count, dial_success, dial_failure), DtRange(startTimestamp, endTimestamp), syncts, startTimestamp))
}
val firstAccessFromCassandra = summary.map{ x => x._1}
.joinWithCassandraTable[Long](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).select("first_access")
.joinWithCassandraTable[Option[Long]](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).select("first_access")
.on(SomeColumns("device_id", "channel"))
summary.leftOuterJoin(firstAccessFromCassandra)
.map{ x => x._2._1.copy(firstAccess = x._2._2.getOrElse(x._2._1.dt_range.from))}
.map{ x =>
val firstAccessValue = x._2._2.getOrElse(Option(0L))
if(firstAccessValue.getOrElse(0L) != 0)
x._2._1.copy(firstAccess = firstAccessValue.getOrElse(x._2._1.firstAccess))
else x._2._1
}
}

override def postProcess(data: RDD[DeviceSummary], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[MeasuredEvent] = {
override def postProcess(data: RDD[DeviceSummary], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[MeasuredEvent] = {
data.map { x =>
val mid = CommonUtil.getMessageId("ME_DEVICE_SUMMARY", x.device_id, "DAY", x.dt_range, "NA", None, Option(x.channel))
val measures = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,25 @@ class TestDeviceSummaryModel extends SparkSpec(null) {
}
}

it should "update the value of first_access with dt_range.from if null is returned from Cassandra" in {
CassandraConnector(sc.getConf).withSessionDo { session =>
session.execute("TRUNCATE " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE)
session.execute("INSERT INTO " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE +"(device_id, channel)" +
"VALUES('49edda82418a1e916e9906a2fd7942cb','b00bc992ef25f1a9a8d63291e20efc8d')")
session.execute("INSERT INTO " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE +"(device_id, channel, first_access)" +
"VALUES('88edda82418a1e916e9906a2fd7942cb','b00bc992ef25f1a9a8d63291e20efc8d', 1536909035000)")
}
val rdd = loadFile[String]("src/test/resources/device-summary/test_data1.log")
val measuredEvent = DeviceSummaryModel.execute(rdd, None)
measuredEvent.collect().foreach{ x =>
val summary = JSONUtils.deserialize[DeviceSummary](JSONUtils.serialize(x.edata.eks))
if(x.dimensions.did.get.equals("49edda82418a1e916e9906a2fd7942cb"))
summary.firstAccess should be(1537550355883L)
else if(x.dimensions.did.get.equals("88edda82418a1e916e9906a2fd7942cb"))
summary.firstAccess should be(1536909035000L)
else
summary.firstAccess should be(1537550355883L)
}
}

}

0 comments on commit 1d0df3f

Please sign in to comment.