Skip to content

Commit

Permalink
added print statements
Browse files Browse the repository at this point in the history
  • Loading branch information
prashanthShiksha committed Aug 23, 2024
1 parent 802fab8 commit 4feab87
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,48 +44,72 @@ object DruidQueryProcessingModel extends IBatchModelTemplate[DruidOutput, DruidO


override def preProcess(data: RDD[DruidOutput], config: Map[String, AnyRef])(implicit sc: SparkContext, fc: FrameworkContext): RDD[DruidOutput] = {

println(s"----------- started preProcess in Druid Query Processing Model ------------")
val reportConfig = JSONUtils.deserialize[ReportConfig](JSONUtils.serialize(config.getOrElse("reportConfig", Map()).asInstanceOf[Map[String, AnyRef]]))
println(s"reportConfig = $reportConfig")
setStorageConf(getStringProperty(config, "store", "local"), reportConfig.storageKey, reportConfig.storageSecret)
data
}

override def algorithm(data: RDD[DruidOutput], config: Map[String, AnyRef])(implicit sc: SparkContext, fc: FrameworkContext): RDD[DruidOutput] = {
println("------------ started processing algorithm --------------")
val streamQuery = config.getOrElse("streamQuery", false).asInstanceOf[Boolean]
println(s"streamQuery : $streamQuery")
val exhaustQuery = config.getOrElse("exhaustQuery", false).asInstanceOf[Boolean]
println(s"exhaustQuery : $exhaustQuery")
val strConfig = config("reportConfig").asInstanceOf[Map[String, AnyRef]]
println(s"strConfig : $strConfig")
val reportConfig = JSONUtils.deserialize[ReportConfig](JSONUtils.serialize(strConfig))
println(s"reportConfig : $reportConfig")

fetchDruidData(reportConfig, streamQuery, exhaustQuery)
}


override def postProcess(data: RDD[DruidOutput], config: Map[String, AnyRef])(implicit sc: SparkContext, fc: FrameworkContext): RDD[DruidOutput] = {
println(s"------------started processing postProcess in Druid Query Processing Model ------------ ")
val configMap = config("reportConfig").asInstanceOf[Map[String, AnyRef]]
println(s"configMap = $configMap")
val reportConfig = JSONUtils.deserialize[ReportConfig](JSONUtils.serialize(configMap))
println(s"reportConfig = $reportConfig")

val dimFields = reportConfig.metrics.flatMap { m =>
if (m.druidQuery.dimensions.nonEmpty) m.druidQuery.dimensions.get.map(f => f.aliasName.getOrElse(f.fieldName))
else if(m.druidQuery.sqlDimensions.nonEmpty) m.druidQuery.sqlDimensions.get.map(f => f.fieldName)
else List()
}
println(s"dimFields = $dimFields")
val labelsLookup = reportConfig.labels ++ Map("date" -> "Date")
println(s"labelsLookup = $labelsLookup")
implicit val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
//Using foreach as parallel execution might conflict with local file path
val dataCount = sc.longAccumulator("DruidReportCount")
println(s"dataCount = $dataCount")
reportConfig.output.foreach { f =>
val df = getReportDF(RestUtil,f,data,dataCount).na.fill(0)
df.show()
println(dataCount.value)
if (dataCount.value > 0) {
val metricFields = f.metrics.distinct
println(s"metricFields = $metricFields")
val fieldsList = (dimFields ++ metricFields ++ List("date")).distinct
println(s"fieldsList = $fieldsList")
val metricsLabels= metricFields.map(f=> labelsLookup.getOrElse(f,f)).distinct
println(s"metricsLabels = $metricsLabels")
val columnOrder = (List("Date") ++ dimFields.map(f=> labelsLookup.getOrElse(f,f))
.filter(f => !metricFields.contains(f)) ++ metricsLabels).distinct
println(s"columnOrder = $columnOrder")
val dimsLabels = labelsLookup.filter(x => f.dims.contains(x._1)).values.toList
println(s"dimsLabels = $dimsLabels")
val filteredDf = df.select(fieldsList.head, fieldsList.tail: _*)
println(s"---------filteredDf---------")
filteredDf.show()
val renamedDf = filteredDf.select(filteredDf.columns.map(c => filteredDf.col(c).as(labelsLookup.getOrElse(c, c))): _*).na.fill("unknown")
println(s"--------renamedDf-----------")
renamedDf.show()
val reportFinalId = if (f.label.nonEmpty && f.label.get.nonEmpty) reportConfig.id + "/" + f.label.get else reportConfig.id
println(s"reportFinalId = $reportFinalId")
val filesWithSize = saveReport(renamedDf, config ++ Map("dims" -> dimsLabels, "metricLabels" -> metricsLabels,
"reportId" -> reportFinalId, "fileParameters" -> f.fileParameters, "format" -> f.`type`), f.zip, Option(columnOrder))
val totalFileSize = filesWithSize.map(f => f._2).sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,49 +40,63 @@ trait BaseDruidQueryProcessor {

// Fetches data from druid and return back RDD
def fetchDruidData(reportConfig: ReportConfig, streamQuery: Boolean = false, exhaustQuery: Boolean = false, foldByKey: Boolean = true)(implicit sc: SparkContext, fc: FrameworkContext): RDD[DruidOutput] = {

println(s"--------- started fetchDruidData ----------------")
val queryDims = reportConfig.metrics.map { f =>
f.druidQuery.dimensions.getOrElse(List()).map(f => f.aliasName.getOrElse(f.fieldName))
}.distinct

print(s"queryDims = $queryDims")
if (queryDims.length > 1) throw new DruidConfigException("Query dimensions are not matching")

val interval = reportConfig.dateRange
println(s"interval = $interval")
val granularity = interval.granularity
println(s"granularity = $granularity")
var reportInterval = if (interval.staticInterval.nonEmpty) {
interval.staticInterval.get
} else if (interval.interval.nonEmpty) {
interval.interval.get
} else {
throw new DruidConfigException("Both staticInterval and interval cannot be missing. Either of them should be specified")
}
println(s"reportInterval = $reportInterval")
println(s"exhaustQuery = $exhaustQuery")
if(exhaustQuery) {
val config = reportConfig.metrics.head.druidQuery
println(s"config = $config")
val queryConfig = JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(config)) ++
Map("intervalSlider" -> interval.intervalSlider, "intervals" ->
(if (interval.staticInterval.isEmpty && interval.interval.nonEmpty) getDateRange(interval.interval.get,
interval.intervalSlider, config.dataSource) else reportInterval), "granularity" -> granularity.get)
DruidDataFetcher.executeSQLQuery(JSONUtils.deserialize[DruidQueryModel](JSONUtils.serialize(queryConfig)), fc.getAkkaHttpUtil())
}
else {
println(s"----------inside else condition of fetchDruidData function-----------")
val metrics = reportConfig.metrics.map { f =>

val queryInterval = if (interval.staticInterval.isEmpty && interval.interval.nonEmpty) {
val dateRange = interval.interval.get
getDateRange(dateRange, interval.intervalSlider, f.druidQuery.dataSource)
} else
reportInterval
println(s"queryInterval : $queryInterval")

val queryConfig = if (granularity.nonEmpty)
JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(f.druidQuery)) ++ Map("intervalSlider" -> interval.intervalSlider, "intervals" -> queryInterval, "granularity" -> granularity.get)
else
JSONUtils.deserialize[Map[String, AnyRef]](JSONUtils.serialize(f.druidQuery)) ++ Map("intervalSlider" -> interval.intervalSlider, "intervals" -> queryInterval)
println(s"queryConfig = $queryConfig")
val data = if (streamQuery) {
println(s"-----inside if condition streamQuery process function-------")
DruidDataFetcher.getDruidData(JSONUtils.deserialize[DruidQueryModel](JSONUtils.serialize(queryConfig)), true)
}
else {
println(s"-----inside else condition streamQuery process function-------")
DruidDataFetcher.getDruidData(JSONUtils.deserialize[DruidQueryModel](JSONUtils.serialize(queryConfig)))
}
println("Sample data from the RDD:")
data.take(5).foreach(println)
val count = data.count()
println(s"Number of records in the RDD: $count")
data.map { x =>
val dataMap = JSONUtils.deserialize[Map[String, AnyRef]](x)
val key = dataMap.filter(m => (queryDims.flatten ++ List("date")).contains(m._1)).values.map(f => f.toString).toList.sorted(Ordering.String.reverse).mkString(",")
Expand All @@ -92,6 +106,7 @@ trait BaseDruidQueryProcessor {

}
val finalResult = if (foldByKey) metrics.fold(sc.emptyRDD)(_ union _).foldByKey(Map())(_ ++ _) else metrics.fold(sc.emptyRDD)(_ union _)
println(s"finalResult = $finalResult")
finalResult.map { f =>
DruidOutput(f._2)
}
Expand All @@ -101,7 +116,9 @@ trait BaseDruidQueryProcessor {
// Converts RDD data into Dataframe with optional location mapping feature
def getReportDF(restUtil: HTTPClient, config: OutputConfig, data: RDD[DruidOutput] , dataCount: LongAccumulator)(implicit sc:SparkContext, sqlContext: SQLContext): DataFrame =
{
println("-------------- started processing getReportDF Model -------------")
if (config.locationMapping.getOrElse(false)) {
println("-------inside getReoortDF if condition------")
DruidQueryUtil.removeInvalidLocations(sqlContext.read.json(data.map(f => {
dataCount.add(1)
JSONUtils.serialize(f)
Expand All @@ -116,9 +133,13 @@ trait BaseDruidQueryProcessor {

// get date range from query interval
def getDateRange(interval: QueryInterval, intervalSlider: Integer = 0, dataSource: String): String = {
println(s"---------fetching getDateRange ------------")
val offset :Long = if(dataSource.contains("rollup") || dataSource.contains("distinct")) 0 else DateTimeZone.forID("Asia/Kolkata").getOffset(DateTime.now())
println(s"offset = $offset")
val startDate = DateTime.parse(interval.startDate).withTimeAtStartOfDay().minusDays(intervalSlider).plus(offset).toString("yyyy-MM-dd'T'HH:mm:ss")
println(s"startDate = $startDate")
val endDate = DateTime.parse(interval.endDate).withTimeAtStartOfDay().minusDays(intervalSlider).plus(offset).toString("yyyy-MM-dd'T'HH:mm:ss")
print(s"endDate = $endDate")
startDate + "/" + endDate
}

Expand Down

0 comments on commit 4feab87

Please sign in to comment.