Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Device Register & profile APIs to work with obsrv-2.0 #104

Open
wants to merge 2 commits into
base: release-obsrv-2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
machine:
image: ubuntu-2004:202008-01
environment:
CLOUD_STORE_VERSION: "1.4.0"
CLOUD_STORE_VERSION: "1.4.6"
CLOUD_STORE_ARTIFACT_ID: "cloud-store-sdk_2.12"
CLOUD_STORE_GROUP_ID: "org.sunbird"
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,9 @@ case class ReportResponse(reportId: String, reportDescription: String, createdBy
case class ReportFilter(request: ListReportFilter)
case class ListReportFilter(filters: Map[String,List[String]])

case class DateRange(from: String, to: String)
case class DateRange(from: String, to: String)

case class DeviceProfileRecord(device_id: String, devicespec: String, uaspec: String, fcm_token: String, producer: String, user_declared_state: String,
user_declared_district: String, geonameId: Int, continentName: String, countryCode: String, countryName: String,
stateCode: String, state: String, subDivsion2: String, city: String,
stateCustom: String, stateCodeCustom: String, districtCustom: String, first_access: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ import scala.collection.JavaConverters._

case class DeviceProfileRequest(did: String, headerIP: String)

class DeviceProfileService @Inject() (config: Config, redisUtil: RedisUtil) extends Actor {
class DeviceProfileService @Inject() (config: Config, postgresDBUtil: PostgresDBUtil) extends Actor {

implicit val className: String = "DeviceProfileService"
val deviceDatabaseIndex: Int = config.getInt("redis.deviceIndex")
// val deviceDatabaseIndex: Int = config.getInt("redis.deviceIndex")

override def preStart { println("starting DeviceProfileService") }

override def postStop {
redisUtil.closePool()
println("DeviceProfileService stopped successfully")
}

Expand Down Expand Up @@ -59,15 +58,10 @@ class DeviceProfileService @Inject() (config: Config, redisUtil: RedisUtil) exte
APILogger.log("", Option(Map("comments" -> s"IP Location is not resolved for $did")), "getDeviceProfile")
}

val jedisConnection: Jedis = redisUtil.getConnection(deviceDatabaseIndex)
val deviceLocation = try {
Option(jedisConnection.hgetAll(did).asScala.toMap)
} finally {
jedisConnection.close()
}
val deviceLocation = postgresDBUtil.readDeviceLocation(did)

val userDeclaredLoc = if (deviceLocation.nonEmpty && deviceLocation.get.getOrElse("user_declared_state", "").nonEmpty) {
Option(Location(deviceLocation.get("user_declared_state"), deviceLocation.get("user_declared_district")))
val userDeclaredLoc = if (deviceLocation.nonEmpty && deviceLocation.get.user_declared_state.nonEmpty) {
Option(Location(deviceLocation.get.user_declared_state, deviceLocation.get.user_declared_district))
} else None

userDeclaredLoc.foreach { declaredLocation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import com.google.common.net.InetAddresses
import com.google.common.primitives.UnsignedInts
import com.typesafe.config.Config
import is.tagomor.woothee.Classifier

import javax.inject.{Inject, Named}
import org.apache.logging.log4j.LogManager
import org.ekstep.analytics.api.DeviceProfileRecord
import org.ekstep.analytics.api.util._
import org.joda.time.{DateTime, DateTimeZone}
import redis.clients.jedis.Jedis

import java.sql.Timestamp
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext.Implicits.global
Expand All @@ -25,7 +28,7 @@ case object DeviceRegisterSuccesfulAck extends DeviceRegisterStatus
case object DeviceRegisterFailureAck extends DeviceRegisterStatus

class DeviceRegisterService @Inject() (@Named("save-metrics-actor") saveMetricsActor: ActorRef, config: Config,
redisUtil: RedisUtil, kafkaUtil: KafkaUtil) extends Actor {
postgresDBUtil: PostgresDBUtil, kafkaUtil: KafkaUtil) extends Actor {

implicit val className: String = "DeviceRegisterService"
val metricsActor: ActorRef = saveMetricsActor
Expand All @@ -37,7 +40,7 @@ class DeviceRegisterService @Inject() (@Named("save-metrics-actor") saveMetricsA
override def preStart { println("Starting DeviceRegisterService") }

override def postStop {
redisUtil.closePool()
// redisUtil.closePool()
kafkaUtil.close()
println("DeviceRegisterService stopped successfully")
}
Expand Down Expand Up @@ -83,16 +86,16 @@ class DeviceRegisterService @Inject() (@Named("save-metrics-actor") saveMetricsA
case None => Map()
}

// Add device profile to redis cache
val deviceProfileMap = getDeviceProfileMap(registrationDetails, location)
val jedisConnection: Jedis = redisUtil.getConnection(deviceDatabaseIndex)
try {
Option(jedisConnection.hmset(registrationDetails.did, deviceProfileMap.asJava))
} finally {
jedisConnection.close()
// Add device profile to postgres
val deviceProfileRecord = getDeviceProfileRecord(registrationDetails, location)
val previousDeviceDetails = postgresDBUtil.readDeviceLocation(deviceProfileRecord.device_id)
if (previousDeviceDetails.isEmpty) {
postgresDBUtil.saveDeviceData(deviceProfileRecord)
} else {
postgresDBUtil.updateDeviceData(deviceProfileRecord)
}

APILogger.log(s"Redis-cache updated for did: ${registrationDetails.did}", None, "registerDevice")
APILogger.log(s"Postgres updated for did: ${registrationDetails.did}", None, "registerDevice")

val deviceProfileLog = DeviceProfileLog(registrationDetails.did, location, Option(deviceSpec),
registrationDetails.uaspec, registrationDetails.fcmToken, registrationDetails.producer, registrationDetails.first_access,
Expand Down Expand Up @@ -155,18 +158,14 @@ class DeviceRegisterService @Inject() (@Named("save-metrics-actor") saveMetricsA
JSONUtils.serialize(deviceProfile)
}

def getDeviceProfileMap(registrationDetails: RegisterDevice, deviceLocation: DeviceLocation): Map[String, String] = {
// skipping firstaccess - handled in samza job
val dataMap =
Map(
"devicespec" -> registrationDetails.dspec.getOrElse(""),
"uaspec" -> parseUserAgent(registrationDetails.uaspec).getOrElse(""),
"fcm_token" -> registrationDetails.fcmToken.getOrElse(""),
"producer" -> registrationDetails.producer.getOrElse(""),
"user_declared_state" -> registrationDetails.user_declared_state.getOrElse(""),
"user_declared_district" -> registrationDetails.user_declared_district.getOrElse(""))

(dataMap ++ deviceLocation.toMap()).filter(data => data._2 != null && data._2.nonEmpty)
def getDeviceProfileRecord(registrationDetails: RegisterDevice, deviceLocation: DeviceLocation): DeviceProfileRecord = {
val uaspec = parseUserAgent(registrationDetails.uaspec).getOrElse("")
DeviceProfileRecord(registrationDetails.did, registrationDetails.dspec.getOrElse(""), uaspec,
registrationDetails.fcmToken.getOrElse(""), registrationDetails.producer.getOrElse(""),
registrationDetails.user_declared_state.getOrElse(""), registrationDetails.user_declared_district.getOrElse(""),
deviceLocation.geonameId, deviceLocation.continentName, deviceLocation.countryCode, deviceLocation.countryName,
deviceLocation.stateCode, deviceLocation.state, deviceLocation.subDivsion2, deviceLocation.city,
deviceLocation.stateCustom, deviceLocation.stateCodeCustom, deviceLocation.districtCustom, registrationDetails.first_access.getOrElse(new DateTime().getMillis))
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.ekstep.analytics.api.util

import org.ekstep.analytics.api.{DatasetConfig, JobConfig, ReportRequest}
import org.ekstep.analytics.api.{DatasetConfig, DeviceProfileRecord, JobConfig, ReportRequest}
import org.joda.time.DateTime
import scalikejdbc._

Expand Down Expand Up @@ -39,6 +39,62 @@ class PostgresDBUtil {
SQL(sqlString).map(rs => GeoLocationRange(rs)).list().apply()
}

def readDeviceLocation(deviceId: String): Option[DeviceUserDeclaredLocation] = {
sql"select user_declared_state, user_declared_district from ${DeviceUserDeclaredLocation.table} where device_id = ${deviceId};".map(rs => DeviceUserDeclaredLocation(rs)).first().apply()
}

def saveDeviceData(deviceProfileRecord: DeviceProfileRecord) = {
val table = DeviceUserDeclaredLocation.tableName
val insertQry = s"INSERT INTO $table (device_id, api_last_updated_on, city, country, country_code, device_spec, district_custom, fcm_token, first_access, last_access, producer_id, state, state_code, state_code_custom, state_custom, uaspec, updated_date, user_declared_district, user_declared_state, user_declared_on) values (?, ?, ?, ?, ?, ?::json, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?, ?, ?, ?)";
val pstmt: PreparedStatement = dbc.prepareStatement(insertQry);
pstmt.setString(1, deviceProfileRecord.device_id);
pstmt.setTimestamp(2, new Timestamp(new DateTime().getMillis));
pstmt.setString(3, deviceProfileRecord.city);
pstmt.setString(4, deviceProfileRecord.countryName);
pstmt.setString(5, deviceProfileRecord.countryCode);
pstmt.setString(6, if(deviceProfileRecord.devicespec.nonEmpty) deviceProfileRecord.devicespec else "{}");
pstmt.setString(7, deviceProfileRecord.districtCustom);
pstmt.setString(8, deviceProfileRecord.fcm_token);
pstmt.setTimestamp(9, new Timestamp(deviceProfileRecord.first_access)); // first access
pstmt.setTimestamp(10, new Timestamp(new DateTime().getMillis)); // last access
pstmt.setString(11, deviceProfileRecord.producer);
pstmt.setString(12, deviceProfileRecord.state);
pstmt.setString(13, deviceProfileRecord.stateCode);
pstmt.setString(14, deviceProfileRecord.stateCodeCustom);
pstmt.setString(15, deviceProfileRecord.stateCustom);
pstmt.setString(16, if(deviceProfileRecord.uaspec.nonEmpty) deviceProfileRecord.uaspec else "{}");
pstmt.setTimestamp(17, new Timestamp(new DateTime().getMillis));
pstmt.setString(18, deviceProfileRecord.user_declared_district);
pstmt.setString(19, deviceProfileRecord.user_declared_state);
pstmt.setTimestamp(20, new Timestamp(new DateTime().getMillis));
pstmt.execute()
}

def updateDeviceData(deviceProfileRecord: DeviceProfileRecord) = {
val table = DeviceUserDeclaredLocation.tableName
val insertQry = s"update $table set api_last_updated_on = ?, city = ?, country = ?, country_code = ?, device_spec = ?::JSON , district_custom = ? , fcm_token = ?, last_access = ?, producer_id = ?, state = ?, state_code = ?, state_code_custom = ? state_custom = ?, uaspec = ?::JSON, updated_date = ?, user_declared_district = ?, user_declared_state = ?, user_declared_on = ? where device_id = ?";
val pstmt: PreparedStatement = dbc.prepareStatement(insertQry);
pstmt.setTimestamp(1, new Timestamp(new DateTime().getMillis));
pstmt.setString(2, deviceProfileRecord.city);
pstmt.setString(3, deviceProfileRecord.countryName);
pstmt.setString(4, deviceProfileRecord.countryCode);
pstmt.setString(5, if(deviceProfileRecord.devicespec.nonEmpty) deviceProfileRecord.devicespec else "{}");
pstmt.setString(6, deviceProfileRecord.districtCustom);
pstmt.setString(7, deviceProfileRecord.fcm_token);
pstmt.setTimestamp(8, new Timestamp(new DateTime().getMillis)); // last access
pstmt.setString(9, deviceProfileRecord.producer);
pstmt.setString(10, deviceProfileRecord.state);
pstmt.setString(11, deviceProfileRecord.stateCode);
pstmt.setString(12, deviceProfileRecord.stateCodeCustom);
pstmt.setString(13, deviceProfileRecord.stateCustom);
pstmt.setString(14, if(deviceProfileRecord.uaspec.nonEmpty) deviceProfileRecord.uaspec else "{}");
pstmt.setTimestamp(15, new Timestamp(new DateTime().getMillis));
pstmt.setString(16, deviceProfileRecord.user_declared_district);
pstmt.setString(17, deviceProfileRecord.user_declared_state);
pstmt.setTimestamp(18, new Timestamp(new DateTime().getMillis));
pstmt.setString(19, deviceProfileRecord.device_id);
pstmt.execute()
}

def saveReportConfig(reportRequest: ReportRequest) = {
val config = JSONUtils.serialize(reportRequest.config)
Expand Down Expand Up @@ -354,6 +410,19 @@ object DeviceLocation extends SQLSyntaxSupport[DeviceLocation] {
rs.string("district_custom"))
}

case class DeviceUserDeclaredLocation(user_declared_state: String, user_declared_district: String) {
def this() = this("", "")

def toMap() = Map("user_declared_state" -> user_declared_state, "user_declared_district" -> user_declared_district)
}

object DeviceUserDeclaredLocation extends SQLSyntaxSupport[DeviceUserDeclaredLocation] {
override val tableName = AppConfig.getString("postgres.table.device_profile.name")
def apply(rs: WrappedResultSet) = new DeviceUserDeclaredLocation(
rs.string("user_declared_state"),
rs.string("user_declared_district"))
}

case class GeoLocationCity(geoname_id: Int, subdivision_1_name: String, subdivision_2_custom_name: String) {
def this() = this(0, "", "")
}
Expand Down
1 change: 1 addition & 0 deletions analytics-api-core/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ postgres.table.report_config.name="report_config"
postgres.table.job_request.name="job_request"
postgres.table.experiment_definition.name="experiment_definition"
postgres.table.dataset_metadata.name="dataset_metadata"
postgres.table.device_profile.name="device_profile"

channel {
data_exhaust {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,64 @@ import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.{TestActorRef, TestProbe}
import com.typesafe.config.Config
import org.ekstep.analytics.api.BaseSpec
import org.ekstep.analytics.api.util.{DeviceStateDistrict, RedisUtil}
import org.ekstep.analytics.api.util.{APILogger, CacheUtil, DeviceLocation, DeviceStateDistrict, EmbeddedPostgresql, IPLocationCache, KafkaUtil, PostgresDBUtil, RedisUtil}
import org.mockito.Mockito.{times, verify, when}
import org.ekstep.analytics.api.util.CacheUtil
import org.ekstep.analytics.api.util.IPLocationCache
import org.ekstep.analytics.api.util.DeviceLocation
import de.sciss.fingertree.RangedSeq

import scala.math.Ordering
import redis.embedded.RedisServer;
import redis.embedded.RedisServer

import scala.collection.JavaConverters._
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatestplus.mockito.MockitoSugar
import com.typesafe.config.ConfigFactory
import org.ekstep.analytics.api.util.KafkaUtil
import redis.clients.jedis.exceptions.JedisConnectionException
import org.ekstep.analytics.api.util.APILogger

class TestDeviceProfileService extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {

implicit val config = ConfigFactory.load()
val deviceProfileServiceMock: DeviceProfileService = mock[DeviceProfileService]
private implicit val system: ActorSystem = ActorSystem("device-register-test-actor-system", config)
private val configMock = mock[Config]
private val redisUtil = new RedisUtil();
val redisIndex: Int = 2
private val postgresUtil = new PostgresDBUtil
// val redisIndex: Int = 2
implicit val executor = scala.concurrent.ExecutionContext.global
val kafkaUtil = new KafkaUtil()
val saveMetricsActor = TestActorRef(new SaveMetricsActor(kafkaUtil))
val metricsActorProbe = TestProbe()
when(configMock.getInt("redis.deviceIndex")).thenReturn(2)
when(configMock.getInt("redis.port")).thenReturn(6379)
// when(configMock.getInt("redis.deviceIndex")).thenReturn(2)
// when(configMock.getInt("redis.port")).thenReturn(6379)
when(configMock.getString("postgres.table.device_profile.name")).thenReturn("device_profile")
when(configMock.getString("postgres.table.geo_location_city.name")).thenReturn("geo_location_city")
when(configMock.getString("postgres.table.geo_location_city_ipv4.name")).thenReturn("geo_location_city_ipv4")
when(configMock.getString("postgres.table.device_profile.name")).thenReturn("device_profile")
when(configMock.getBoolean("device.api.enable.debug.log")).thenReturn(true)
private val deviceProfileServiceActorRef = TestActorRef(new DeviceProfileService(configMock, redisUtil) {
private val deviceProfileServiceActorRef = TestActorRef(new DeviceProfileService(configMock, postgresUtil) {

})
val geoLocationCityIpv4TableName = config.getString("postgres.table.geo_location_city_ipv4.name")
val geoLocationCityTableName = config.getString("postgres.table.geo_location_city.name")
private var redisServer:RedisServer = _;
// private var redisServer:RedisServer = _;


override def beforeAll() {
super.beforeAll()
redisServer = new RedisServer(6379);
redisServer.start();
val jedis = redisUtil.getConnection(redisIndex);
jedis.hmset("device-001", Map("user_declared_state" -> "Karnataka", "user_declared_district" -> "Tumkur").asJava);
jedis.close();
EmbeddedPostgresql.start()
EmbeddedPostgresql.createTables()
EmbeddedPostgresql.execute(
s"""insert into device_profile("device_id", "user_declared_district",
"user_declared_state") values ('device-001', 'Tumkur', 'Karnataka');""")
// redisServer = new RedisServer(6379);
// redisServer.start();
// val jedis = redisUtil.getConnection(redisIndex);
// jedis.hmset("device-001", Map("user_declared_state" -> "Karnataka", "user_declared_district" -> "Tumkur").asJava);
// jedis.close();
}

override def afterAll() {
super.afterAll()
redisServer.stop();
// redisServer.stop();
EmbeddedPostgresql.close()
deviceProfileServiceActorRef.restart(new Exception() {})
deviceProfileServiceActorRef.stop();
}
Expand Down Expand Up @@ -122,11 +127,11 @@ class TestDeviceProfileService extends FlatSpec with Matchers with BeforeAndAfte
intercept[Exception] {
deviceProfileServiceActorRef.receive(DeviceProfileRequest("device-001", "xyz"))
}
intercept[JedisConnectionException] {
redisServer.stop();
deviceProfileServiceActorRef.receive(DeviceProfileRequest("device-001", "106.51.74.185"))
redisServer.start();
}
// intercept[JedisConnectionException] {
// redisServer.stop();
// deviceProfileServiceActorRef.receive(DeviceProfileRequest("device-001", "106.51.74.185"))
// redisServer.start();
// }
}

}
Loading