From bcf1344b6524af3c4f46f7c90434c6369ecbe742 Mon Sep 17 00:00:00 2001 From: Sanjay-Krishnan93 Date: Thu, 3 Jan 2019 19:09:40 +0530 Subject: [PATCH 1/2] Issue #SC-231 chore: change default value of first access in Device summary model --- .../org/ekstep/analytics/model/DeviceSummaryModel.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala b/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala index 23a55d7f0..6466957dd 100644 --- a/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala +++ b/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala @@ -16,7 +16,7 @@ import org.ekstep.analytics.util.Constants case class DeviceIndex(device_id: String, channel: String) case class DialStats(total_count: Long, success_count: Long, failure_count: Long) case class DeviceInput(index: DeviceIndex, wfsData: Option[Buffer[DerivedEvent]], rawData: Option[Buffer[V3Event]]) extends AlgoInput -case class DeviceSummary(device_id: String, channel: String, total_ts: Double, total_launches: Long, contents_played: Long, unique_contents_played: Long, content_downloads: Long, dial_stats: DialStats, dt_range: DtRange, syncts: Long, firstAccess: Long = 0L) extends AlgoOutput +case class DeviceSummary(device_id: String, channel: String, total_ts: Double, total_launches: Long, contents_played: Long, unique_contents_played: Long, content_downloads: Long, dial_stats: DialStats, dt_range: DtRange, syncts: Long, firstAccess: Long) extends AlgoOutput object DeviceSummaryModel extends IBatchModelTemplate[String, DeviceInput, DeviceSummary, MeasuredEvent] with Serializable { @@ -65,13 +65,13 @@ object DeviceSummaryModel extends IBatchModelTemplate[String, DeviceInput, Devic val dial_success = dialcodes_events.filter(f => f.edata.size > 0).length val dial_failure = dialcodes_events.filter(f => f.edata.size == 0).length val content_downloads = raw.filter(f => "INTERACT".equals(f.eid)).length - ((index.device_id, index.channel),DeviceSummary(index.device_id, index.channel, CommonUtil.roundDouble(total_ts, 2), total_launches, contents_played, unique_contents_played, content_downloads, DialStats(dial_count, dial_success, dial_failure), DtRange(startTimestamp, endTimestamp), syncts)) + (index, DeviceSummary(index.device_id, index.channel, CommonUtil.roundDouble(total_ts, 2), total_launches, contents_played, unique_contents_played, content_downloads, DialStats(dial_count, dial_success, dial_failure), DtRange(startTimestamp, endTimestamp), syncts, startTimestamp)) } val firstAccessFromCassandra = summary.map{ x => x._1} .joinWithCassandraTable[Long](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).select("first_access") .on(SomeColumns("device_id", "channel")) summary.leftOuterJoin(firstAccessFromCassandra) - .map{ x => x._2._1.copy(firstAccess = x._2._2.getOrElse(x._2._1.dt_range.from))} + .map{ x => x._2._1.copy(firstAccess = x._2._2.getOrElse(x._2._1.firstAccess))} } override def postProcess(data: RDD[DeviceSummary], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[MeasuredEvent] = { From 822debcbb890bcda5fa56d3fd499daaf99777c76 Mon Sep 17 00:00:00 2001 From: Sanjay-Krishnan93 Date: Thu, 10 Jan 2019 16:43:07 +0530 Subject: [PATCH 2/2] Issue #SC-231 chore: Fix null value in first access --- .../analytics/model/DeviceSummaryModel.scala | 11 +++++++--- .../model/TestDeviceSummaryModel.scala | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala b/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala index 6466957dd..36b8775e7 100644 --- a/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala +++ b/platform-modules/batch-models/src/main/scala/org/ekstep/analytics/model/DeviceSummaryModel.scala @@ -68,13 +68,18 @@ object DeviceSummaryModel extends IBatchModelTemplate[String, DeviceInput, Devic (index, DeviceSummary(index.device_id, index.channel, CommonUtil.roundDouble(total_ts, 2), total_launches, contents_played, unique_contents_played, content_downloads, DialStats(dial_count, dial_success, dial_failure), DtRange(startTimestamp, endTimestamp), syncts, startTimestamp)) } val firstAccessFromCassandra = summary.map{ x => x._1} - .joinWithCassandraTable[Long](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).select("first_access") + .joinWithCassandraTable[Option[Long]](Constants.DEVICE_KEY_SPACE_NAME, Constants.DEVICE_PROFILE_TABLE).select("first_access") .on(SomeColumns("device_id", "channel")) summary.leftOuterJoin(firstAccessFromCassandra) - .map{ x => x._2._1.copy(firstAccess = x._2._2.getOrElse(x._2._1.firstAccess))} + .map{ x => + val firstAccessValue = x._2._2.getOrElse(Option(0L)) + if(firstAccessValue.getOrElse(0L) != 0) + x._2._1.copy(firstAccess = firstAccessValue.getOrElse(x._2._1.firstAccess)) + else x._2._1 + } } - override def postProcess(data: RDD[DeviceSummary], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[MeasuredEvent] = { + override def postProcess(data: RDD[DeviceSummary], config: Map[String, AnyRef])(implicit sc: SparkContext): RDD[MeasuredEvent] = { data.map { x => val mid = CommonUtil.getMessageId("ME_DEVICE_SUMMARY", x.device_id, "DAY", x.dt_range, "NA", None, Option(x.channel)) val measures = Map( diff --git a/platform-modules/batch-models/src/test/scala/org/ekstep/analytics/model/TestDeviceSummaryModel.scala b/platform-modules/batch-models/src/test/scala/org/ekstep/analytics/model/TestDeviceSummaryModel.scala index fc24fae49..9478aeb34 100644 --- a/platform-modules/batch-models/src/test/scala/org/ekstep/analytics/model/TestDeviceSummaryModel.scala +++ b/platform-modules/batch-models/src/test/scala/org/ekstep/analytics/model/TestDeviceSummaryModel.scala @@ -93,4 +93,25 @@ class TestDeviceSummaryModel extends SparkSpec(null) { } } + it should "update the value of first_access with dt_range.from if null is returned from Cassandra" in { + CassandraConnector(sc.getConf).withSessionDo { session => + session.execute("TRUNCATE " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE) + session.execute("INSERT INTO " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE +"(device_id, channel)" + + "VALUES('49edda82418a1e916e9906a2fd7942cb','b00bc992ef25f1a9a8d63291e20efc8d')") + session.execute("INSERT INTO " + Constants.DEVICE_KEY_SPACE_NAME + "." + Constants.DEVICE_PROFILE_TABLE +"(device_id, channel, first_access)" + + "VALUES('88edda82418a1e916e9906a2fd7942cb','b00bc992ef25f1a9a8d63291e20efc8d', 1536909035000)") + } + val rdd = loadFile[String]("src/test/resources/device-summary/test_data1.log") + val measuredEvent = DeviceSummaryModel.execute(rdd, None) + measuredEvent.collect().foreach{ x => + val summary = JSONUtils.deserialize[DeviceSummary](JSONUtils.serialize(x.edata.eks)) + if(x.dimensions.did.get.equals("49edda82418a1e916e9906a2fd7942cb")) + summary.firstAccess should be(1537550355883L) + else if(x.dimensions.did.get.equals("88edda82418a1e916e9906a2fd7942cb")) + summary.firstAccess should be(1536909035000L) + else + summary.firstAccess should be(1537550355883L) + } + } + } \ No newline at end of file