Skip to content

Commit

Permalink
#0 refactor: Refactor Device Register & profile APIs to work with obs…
Browse files Browse the repository at this point in the history
…rv-2.0.
  • Loading branch information
sowmya-dixit committed Nov 2, 2023
1 parent fc5578f commit a9e5827
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,9 @@ case class ReportResponse(reportId: String, reportDescription: String, createdBy
case class ReportFilter(request: ListReportFilter)
case class ListReportFilter(filters: Map[String,List[String]])

case class DateRange(from: String, to: String)
case class DateRange(from: String, to: String)

case class DeviceProfileRecord(device_id: String, devicespec: String, uaspec: String, fcm_token: String, producer: String, user_declared_state: String,
user_declared_district: String, geonameId: Int, continentName: String, countryCode: String, countryName: String,
stateCode: String, state: String, subDivsion2: String, city: String,
stateCustom: String, stateCodeCustom: String, districtCustom: String, first_access: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ import scala.collection.JavaConverters._

case class DeviceProfileRequest(did: String, headerIP: String)

class DeviceProfileService @Inject() (config: Config, redisUtil: RedisUtil) extends Actor {
class DeviceProfileService @Inject() (config: Config, postgresDBUtil: PostgresDBUtil) extends Actor {

implicit val className: String = "DeviceProfileService"
val deviceDatabaseIndex: Int = config.getInt("redis.deviceIndex")
// val deviceDatabaseIndex: Int = config.getInt("redis.deviceIndex")

override def preStart { println("starting DeviceProfileService") }

override def postStop {
redisUtil.closePool()
println("DeviceProfileService stopped successfully")
}

Expand Down Expand Up @@ -59,15 +58,10 @@ class DeviceProfileService @Inject() (config: Config, redisUtil: RedisUtil) exte
APILogger.log("", Option(Map("comments" -> s"IP Location is not resolved for $did")), "getDeviceProfile")
}

val jedisConnection: Jedis = redisUtil.getConnection(deviceDatabaseIndex)
val deviceLocation = try {
Option(jedisConnection.hgetAll(did).asScala.toMap)
} finally {
jedisConnection.close()
}
val deviceLocation = postgresDBUtil.readDeviceLocation(did)

val userDeclaredLoc = if (deviceLocation.nonEmpty && deviceLocation.get.getOrElse("user_declared_state", "").nonEmpty) {
Option(Location(deviceLocation.get("user_declared_state"), deviceLocation.get("user_declared_district")))
val userDeclaredLoc = if (deviceLocation.nonEmpty && deviceLocation.get.user_declared_state.nonEmpty) {
Option(Location(deviceLocation.get.user_declared_state, deviceLocation.get.user_declared_district))
} else None

userDeclaredLoc.foreach { declaredLocation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import com.google.common.net.InetAddresses
import com.google.common.primitives.UnsignedInts
import com.typesafe.config.Config
import is.tagomor.woothee.Classifier

import javax.inject.{Inject, Named}
import org.apache.logging.log4j.LogManager
import org.ekstep.analytics.api.DeviceProfileRecord
import org.ekstep.analytics.api.util._
import org.joda.time.{DateTime, DateTimeZone}
import redis.clients.jedis.Jedis

import java.sql.Timestamp
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext.Implicits.global
Expand All @@ -25,7 +28,7 @@ case object DeviceRegisterSuccesfulAck extends DeviceRegisterStatus
case object DeviceRegisterFailureAck extends DeviceRegisterStatus

class DeviceRegisterService @Inject() (@Named("save-metrics-actor") saveMetricsActor: ActorRef, config: Config,
redisUtil: RedisUtil, kafkaUtil: KafkaUtil) extends Actor {
postgresDBUtil: PostgresDBUtil, kafkaUtil: KafkaUtil) extends Actor {

implicit val className: String = "DeviceRegisterService"
val metricsActor: ActorRef = saveMetricsActor
Expand All @@ -37,7 +40,7 @@ class DeviceRegisterService @Inject() (@Named("save-metrics-actor") saveMetricsA
override def preStart { println("Starting DeviceRegisterService") }

override def postStop {
redisUtil.closePool()
// redisUtil.closePool()
kafkaUtil.close()
println("DeviceRegisterService stopped successfully")
}
Expand Down Expand Up @@ -83,16 +86,16 @@ class DeviceRegisterService @Inject() (@Named("save-metrics-actor") saveMetricsA
case None => Map()
}

// Add device profile to redis cache
val deviceProfileMap = getDeviceProfileMap(registrationDetails, location)
val jedisConnection: Jedis = redisUtil.getConnection(deviceDatabaseIndex)
try {
Option(jedisConnection.hmset(registrationDetails.did, deviceProfileMap.asJava))
} finally {
jedisConnection.close()
// Add device profile to postgres
val deviceProfileRecord = getDeviceProfileRecord(registrationDetails, location)
val previousDeviceDetails = postgresDBUtil.readDeviceLocation(deviceProfileRecord.device_id)
if (previousDeviceDetails.isEmpty) {
postgresDBUtil.saveDeviceData(deviceProfileRecord)
} else {
postgresDBUtil.updateDeviceData(deviceProfileRecord)
}

APILogger.log(s"Redis-cache updated for did: ${registrationDetails.did}", None, "registerDevice")
APILogger.log(s"Postgres updated for did: ${registrationDetails.did}", None, "registerDevice")

val deviceProfileLog = DeviceProfileLog(registrationDetails.did, location, Option(deviceSpec),
registrationDetails.uaspec, registrationDetails.fcmToken, registrationDetails.producer, registrationDetails.first_access,
Expand Down Expand Up @@ -155,18 +158,14 @@ class DeviceRegisterService @Inject() (@Named("save-metrics-actor") saveMetricsA
JSONUtils.serialize(deviceProfile)
}

def getDeviceProfileMap(registrationDetails: RegisterDevice, deviceLocation: DeviceLocation): Map[String, String] = {
// skipping firstaccess - handled in samza job
val dataMap =
Map(
"devicespec" -> registrationDetails.dspec.getOrElse(""),
"uaspec" -> parseUserAgent(registrationDetails.uaspec).getOrElse(""),
"fcm_token" -> registrationDetails.fcmToken.getOrElse(""),
"producer" -> registrationDetails.producer.getOrElse(""),
"user_declared_state" -> registrationDetails.user_declared_state.getOrElse(""),
"user_declared_district" -> registrationDetails.user_declared_district.getOrElse(""))

(dataMap ++ deviceLocation.toMap()).filter(data => data._2 != null && data._2.nonEmpty)
def getDeviceProfileRecord(registrationDetails: RegisterDevice, deviceLocation: DeviceLocation): DeviceProfileRecord = {
val uaspec = parseUserAgent(registrationDetails.uaspec).getOrElse("")
DeviceProfileRecord(registrationDetails.did, registrationDetails.dspec.getOrElse(""), uaspec,
registrationDetails.fcmToken.getOrElse(""), registrationDetails.producer.getOrElse(""),
registrationDetails.user_declared_state.getOrElse(""), registrationDetails.user_declared_district.getOrElse(""),
deviceLocation.geonameId, deviceLocation.continentName, deviceLocation.countryCode, deviceLocation.countryName,
deviceLocation.stateCode, deviceLocation.state, deviceLocation.subDivsion2, deviceLocation.city,
deviceLocation.stateCustom, deviceLocation.stateCodeCustom, deviceLocation.districtCustom, registrationDetails.first_access.getOrElse(new DateTime().getMillis))
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.ekstep.analytics.api.util

import org.ekstep.analytics.api.{DatasetConfig, JobConfig, ReportRequest}
import org.ekstep.analytics.api.{DatasetConfig, DeviceProfileRecord, JobConfig, ReportRequest}
import org.joda.time.DateTime
import scalikejdbc._

Expand Down Expand Up @@ -39,6 +39,62 @@ class PostgresDBUtil {
SQL(sqlString).map(rs => GeoLocationRange(rs)).list().apply()
}

def readDeviceLocation(deviceId: String): Option[DeviceUserDeclaredLocation] = {
sql"select user_declared_state, user_declared_district from ${DeviceUserDeclaredLocation.table} where device_id = ${deviceId};".map(rs => DeviceUserDeclaredLocation(rs)).first().apply()
}

def saveDeviceData(deviceProfileRecord: DeviceProfileRecord) = {
val table = DeviceUserDeclaredLocation.tableName
val insertQry = s"INSERT INTO $table (device_id, api_last_updated_on, city, country, country_code, device_spec, district_custom, fcm_token, first_access, last_access, producer_id, state, state_code, state_code_custom, state_custom, uaspec, updated_date, user_declared_district, user_declared_state, user_declared_on) values (?, ?, ?, ?, ?, ?::json, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?, ?, ?, ?)";
val pstmt: PreparedStatement = dbc.prepareStatement(insertQry);
pstmt.setString(1, deviceProfileRecord.device_id);
pstmt.setTimestamp(2, new Timestamp(new DateTime().getMillis));
pstmt.setString(3, deviceProfileRecord.city);
pstmt.setString(4, deviceProfileRecord.countryName);
pstmt.setString(5, deviceProfileRecord.countryCode);
pstmt.setString(6, if(deviceProfileRecord.devicespec.nonEmpty) deviceProfileRecord.devicespec else "{}");
pstmt.setString(7, deviceProfileRecord.districtCustom);
pstmt.setString(8, deviceProfileRecord.fcm_token);
pstmt.setTimestamp(9, new Timestamp(deviceProfileRecord.first_access)); // first access
pstmt.setTimestamp(10, new Timestamp(new DateTime().getMillis)); // last access
pstmt.setString(11, deviceProfileRecord.producer);
pstmt.setString(12, deviceProfileRecord.state);
pstmt.setString(13, deviceProfileRecord.stateCode);
pstmt.setString(14, deviceProfileRecord.stateCodeCustom);
pstmt.setString(15, deviceProfileRecord.stateCustom);
pstmt.setString(16, if(deviceProfileRecord.uaspec.nonEmpty) deviceProfileRecord.uaspec else "{}");
pstmt.setTimestamp(17, new Timestamp(new DateTime().getMillis));
pstmt.setString(18, deviceProfileRecord.user_declared_district);
pstmt.setString(19, deviceProfileRecord.user_declared_state);
pstmt.setTimestamp(20, new Timestamp(new DateTime().getMillis));
pstmt.execute()
}

def updateDeviceData(deviceProfileRecord: DeviceProfileRecord) = {
val table = DeviceUserDeclaredLocation.tableName
val insertQry = s"update $table set api_last_updated_on = ?, city = ?, country = ?, country_code = ?, device_spec = ?::JSON , district_custom = ? , fcm_token = ?, last_access = ?, producer_id = ?, state = ?, state_code = ?, state_code_custom = ? state_custom = ?, uaspec = ?::JSON, updated_date = ?, user_declared_district = ?, user_declared_state = ?, user_declared_on = ? where device_id = ?";
val pstmt: PreparedStatement = dbc.prepareStatement(insertQry);
pstmt.setTimestamp(1, new Timestamp(new DateTime().getMillis));
pstmt.setString(2, deviceProfileRecord.city);
pstmt.setString(3, deviceProfileRecord.countryName);
pstmt.setString(4, deviceProfileRecord.countryCode);
pstmt.setString(5, if(deviceProfileRecord.devicespec.nonEmpty) deviceProfileRecord.devicespec else "{}");
pstmt.setString(6, deviceProfileRecord.districtCustom);
pstmt.setString(7, deviceProfileRecord.fcm_token);
pstmt.setTimestamp(8, new Timestamp(new DateTime().getMillis)); // last access
pstmt.setString(9, deviceProfileRecord.producer);
pstmt.setString(10, deviceProfileRecord.state);
pstmt.setString(11, deviceProfileRecord.stateCode);
pstmt.setString(12, deviceProfileRecord.stateCodeCustom);
pstmt.setString(13, deviceProfileRecord.stateCustom);
pstmt.setString(14, if(deviceProfileRecord.uaspec.nonEmpty) deviceProfileRecord.uaspec else "{}");
pstmt.setTimestamp(15, new Timestamp(new DateTime().getMillis));
pstmt.setString(16, deviceProfileRecord.user_declared_district);
pstmt.setString(17, deviceProfileRecord.user_declared_state);
pstmt.setTimestamp(18, new Timestamp(new DateTime().getMillis));
pstmt.setString(19, deviceProfileRecord.device_id);
pstmt.execute()
}

def saveReportConfig(reportRequest: ReportRequest) = {
val config = JSONUtils.serialize(reportRequest.config)
Expand Down Expand Up @@ -354,6 +410,19 @@ object DeviceLocation extends SQLSyntaxSupport[DeviceLocation] {
rs.string("district_custom"))
}

case class DeviceUserDeclaredLocation(user_declared_state: String, user_declared_district: String) {
def this() = this("", "")

def toMap() = Map("user_declared_state" -> user_declared_state, "user_declared_district" -> user_declared_district)
}

object DeviceUserDeclaredLocation extends SQLSyntaxSupport[DeviceUserDeclaredLocation] {
override val tableName = AppConfig.getString("postgres.table.device_profile.name")
def apply(rs: WrappedResultSet) = new DeviceUserDeclaredLocation(
rs.string("user_declared_state"),
rs.string("user_declared_district"))
}

case class GeoLocationCity(geoname_id: Int, subdivision_1_name: String, subdivision_2_custom_name: String) {
def this() = this(0, "", "")
}
Expand Down
1 change: 1 addition & 0 deletions analytics-api-core/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ postgres.table.report_config.name="report_config"
postgres.table.job_request.name="job_request"
postgres.table.experiment_definition.name="experiment_definition"
postgres.table.dataset_metadata.name="dataset_metadata"
postgres.table.device_profile.name="device_profile"

channel {
data_exhaust {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,64 @@ import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.{TestActorRef, TestProbe}
import com.typesafe.config.Config
import org.ekstep.analytics.api.BaseSpec
import org.ekstep.analytics.api.util.{DeviceStateDistrict, RedisUtil}
import org.ekstep.analytics.api.util.{APILogger, CacheUtil, DeviceLocation, DeviceStateDistrict, EmbeddedPostgresql, IPLocationCache, KafkaUtil, PostgresDBUtil, RedisUtil}
import org.mockito.Mockito.{times, verify, when}
import org.ekstep.analytics.api.util.CacheUtil
import org.ekstep.analytics.api.util.IPLocationCache
import org.ekstep.analytics.api.util.DeviceLocation
import de.sciss.fingertree.RangedSeq

import scala.math.Ordering
import redis.embedded.RedisServer;
import redis.embedded.RedisServer

import scala.collection.JavaConverters._
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatestplus.mockito.MockitoSugar
import com.typesafe.config.ConfigFactory
import org.ekstep.analytics.api.util.KafkaUtil
import redis.clients.jedis.exceptions.JedisConnectionException
import org.ekstep.analytics.api.util.APILogger

class TestDeviceProfileService extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {

implicit val config = ConfigFactory.load()
val deviceProfileServiceMock: DeviceProfileService = mock[DeviceProfileService]
private implicit val system: ActorSystem = ActorSystem("device-register-test-actor-system", config)
private val configMock = mock[Config]
private val redisUtil = new RedisUtil();
val redisIndex: Int = 2
private val postgresUtil = new PostgresDBUtil
// val redisIndex: Int = 2
implicit val executor = scala.concurrent.ExecutionContext.global
val kafkaUtil = new KafkaUtil()
val saveMetricsActor = TestActorRef(new SaveMetricsActor(kafkaUtil))
val metricsActorProbe = TestProbe()
when(configMock.getInt("redis.deviceIndex")).thenReturn(2)
when(configMock.getInt("redis.port")).thenReturn(6379)
// when(configMock.getInt("redis.deviceIndex")).thenReturn(2)
// when(configMock.getInt("redis.port")).thenReturn(6379)
when(configMock.getString("postgres.table.device_profile.name")).thenReturn("device_profile")
when(configMock.getString("postgres.table.geo_location_city.name")).thenReturn("geo_location_city")
when(configMock.getString("postgres.table.geo_location_city_ipv4.name")).thenReturn("geo_location_city_ipv4")
when(configMock.getString("postgres.table.device_profile.name")).thenReturn("device_profile")
when(configMock.getBoolean("device.api.enable.debug.log")).thenReturn(true)
private val deviceProfileServiceActorRef = TestActorRef(new DeviceProfileService(configMock, redisUtil) {
private val deviceProfileServiceActorRef = TestActorRef(new DeviceProfileService(configMock, postgresUtil) {

})
val geoLocationCityIpv4TableName = config.getString("postgres.table.geo_location_city_ipv4.name")
val geoLocationCityTableName = config.getString("postgres.table.geo_location_city.name")
private var redisServer:RedisServer = _;
// private var redisServer:RedisServer = _;


override def beforeAll() {
super.beforeAll()
redisServer = new RedisServer(6379);
redisServer.start();
val jedis = redisUtil.getConnection(redisIndex);
jedis.hmset("device-001", Map("user_declared_state" -> "Karnataka", "user_declared_district" -> "Tumkur").asJava);
jedis.close();
EmbeddedPostgresql.start()
EmbeddedPostgresql.createTables()
EmbeddedPostgresql.execute(
s"""insert into device_profile("device_id", "user_declared_district",
"user_declared_state") values ('device-001', 'Tumkur', 'Karnataka');""")
// redisServer = new RedisServer(6379);
// redisServer.start();
// val jedis = redisUtil.getConnection(redisIndex);
// jedis.hmset("device-001", Map("user_declared_state" -> "Karnataka", "user_declared_district" -> "Tumkur").asJava);
// jedis.close();
}

override def afterAll() {
super.afterAll()
redisServer.stop();
// redisServer.stop();
EmbeddedPostgresql.close()
deviceProfileServiceActorRef.restart(new Exception() {})
deviceProfileServiceActorRef.stop();
}
Expand Down Expand Up @@ -122,11 +127,11 @@ class TestDeviceProfileService extends FlatSpec with Matchers with BeforeAndAfte
intercept[Exception] {
deviceProfileServiceActorRef.receive(DeviceProfileRequest("device-001", "xyz"))
}
intercept[JedisConnectionException] {
redisServer.stop();
deviceProfileServiceActorRef.receive(DeviceProfileRequest("device-001", "106.51.74.185"))
redisServer.start();
}
// intercept[JedisConnectionException] {
// redisServer.stop();
// deviceProfileServiceActorRef.receive(DeviceProfileRequest("device-001", "106.51.74.185"))
// redisServer.start();
// }
}

}
Loading

0 comments on commit a9e5827

Please sign in to comment.