diff --git a/acdc-ws/app/Filters.scala b/acdc-ws/app/Filters.scala index 96e418f..947e5d6 100644 --- a/acdc-ws/app/Filters.scala +++ b/acdc-ws/app/Filters.scala @@ -1,9 +1,9 @@ -import utils.MetricFilter - import javax.inject.Inject import play.api.http.DefaultHttpFilters +import com.salesforce.mce.kineticpulse.MetricFilter + class Filters @Inject() ( metricsFilter: MetricFilter ) extends DefaultHttpFilters(metricsFilter) diff --git a/acdc-ws/app/Module.scala b/acdc-ws/app/Module.scala index 63b33bd..9441b16 100644 --- a/acdc-ws/app/Module.scala +++ b/acdc-ws/app/Module.scala @@ -7,7 +7,8 @@ import com.google.inject.AbstractModule -import services.{Metric, PrometheusMetric} +import com.salesforce.mce.kineticpulse.{Metric, PrometheusMetric} + import tasks.{AuthSettingReloadTask, DataCountTask, DataInstExpirationTask} import utils.{Authorization, AuthorizationSettings} diff --git a/acdc-ws/app/controllers/MetricController.scala b/acdc-ws/app/controllers/MetricController.scala deleted file mode 100644 index 42cb411..0000000 --- a/acdc-ws/app/controllers/MetricController.scala +++ /dev/null @@ -1,25 +0,0 @@ -package controllers - -import javax.inject._ - -import scala.concurrent.ExecutionContext - -import play.api.mvc._ - -import services.Metric - -@Singleton -class MetricController @Inject() ( - cc: ControllerComponents, - metric: Metric -)(implicit - ec: ExecutionContext -) extends AbstractController(cc) { - - def collect: Action[AnyContent] = Action.async { r: Request[AnyContent] => - val metricResults = metric.collect.map(output => Ok(output)) - metricResults.onComplete(_ => metric.clear()) - metricResults - } - -} diff --git a/acdc-ws/app/services/Metric.scala b/acdc-ws/app/services/Metric.scala deleted file mode 100644 index 82ff76c..0000000 --- a/acdc-ws/app/services/Metric.scala +++ /dev/null @@ -1,112 +0,0 @@ -package services - -import java.io.StringWriter -import javax.inject.{Inject, Singleton} - -import scala.concurrent.{ExecutionContext, Future} - -import com.typesafe.config.ConfigFactory -import play.api.Configuration -import play.api.mvc.RequestHeader -import io.prometheus.client._ -import io.prometheus.client.exporter.common.TextFormat -import io.prometheus.client.hotspot.DefaultExports - -trait Metric { - private lazy val config = new Configuration(ConfigFactory.load()) - - private val enableMetrics = - config.getOptional[Boolean]("acdc.metrics.enable").getOrElse(false) - - private val bypassPaths = config.getOptional[Seq[String]]("acdc.metrics.bypass.paths") match { - case Some(paths) => paths.toSet - case _ => Set[String]() - } - - private val staticPathMarkers = Seq("instance", "dataset", "lineage", "__metrics", "__status") - - def parseRequest(request: RequestHeader): Option[(String, String)] = { - if (checkPathIsDisabled(request.path) || !checkMetricIsEnabled) - None - else { - val (staticPath, args) = request.path - .split("/") - .filter(_.nonEmpty) - .foldLeft((List[String](), List[String]())) { - case ((Nil, l2), token) => (List(token), l2) - case ((h :: t, l2), token) => - if (staticPathMarkers.contains(h)) (h :: t, token :: l2) - else (token :: h :: t, l2) - } - Some((staticPath.reverse.mkString("-"), args.reverse.mkString("-"))) - } - } - - def checkMetricIsEnabled: Boolean = { enableMetrics } - - def checkPathIsDisabled(path: String): Boolean = { bypassPaths.contains(path) } - - def incrementStatusCount(status: String): Unit - - def startApiTimer(labels: String*): () => Unit - - def collect: Future[String] - - def clear(): Unit - - def countDB(table: String, cnt: Double): Unit - -} - -@Singleton -class PrometheusMetric @Inject() (implicit ec: ExecutionContext) extends Metric { - - DefaultExports.initialize() - - private val httpStatusCount: Counter = Counter.build - .name("http_requests_total") - .help("Total HTTP Requests Count") - .labelNames("status") - .register - - private val apiLatency: Summary = Summary - .build() - .name("apiLatencySummary") - .labelNames("path", "method") - .help("Profile API response time summary") - .quantile(0.5d, 0.001d) - .quantile(0.95d, 0.001d) - .quantile(0.99d, 0.001d) - .register - - private val dbCountGauge = Gauge - .build() - .name("acdc_db_record_count") - .labelNames("table") - .help("acdc DB table records count") - .register - - override def countDB(table: String, cnt: Double): Unit = { - dbCountGauge.labels(table).set(cnt) - } - - override def incrementStatusCount(status: String): Unit = { - httpStatusCount.labels(status).inc() - } - - override def startApiTimer(labels: String*): () => Unit = { - val timer = apiLatency.labels(labels: _*).startTimer() - val callback = () => timer.close - callback - } - - // Get metrics from the local prometheus collector default registry - override def collect: Future[String] = Future { - val writer = new StringWriter() - TextFormat.write004(writer, CollectorRegistry.defaultRegistry.metricFamilySamples()) - writer.toString - } - - override def clear(): Unit = {} - -} diff --git a/acdc-ws/app/tasks/DataCountTask.scala b/acdc-ws/app/tasks/DataCountTask.scala index 6b08808..e58003e 100644 --- a/acdc-ws/app/tasks/DataCountTask.scala +++ b/acdc-ws/app/tasks/DataCountTask.scala @@ -14,14 +14,13 @@ import com.salesforce.mce.acdc.db.{ DatasetLineageQuery, DatasetQuery } -import services.{DatabaseService, Metric} -import utils.DbConfig +import services.DatabaseService +import utils.{AcdcMetric, DbConfig} class DataCountTask @Inject() ( actorSystem: ActorSystem, dbService: DatabaseService, - dbConfig: DbConfig, - metric: Metric + dbConfig: DbConfig )(implicit ec: ExecutionContext ) extends Logging { @@ -39,9 +38,9 @@ class DataCountTask @Inject() ( logger.debug(s"Counted $datasetInstanceCount records from dataset_instance ...") logger.debug(s"Counted $datasetLineageCount records from dataset_lineage ...") - metric.countDB("dataset", datasetCount.toDouble) - metric.countDB("dataset-instance", datasetInstanceCount.toDouble) - metric.countDB("dataset-lineage", datasetLineageCount.toDouble) + AcdcMetric.countDB("dataset", datasetCount.toDouble) + AcdcMetric.countDB("dataset-instance", datasetInstanceCount.toDouble) + AcdcMetric.countDB("dataset-lineage", datasetLineageCount.toDouble) refresh() } diff --git a/acdc-ws/app/utils/AcdcMetric.scala b/acdc-ws/app/utils/AcdcMetric.scala new file mode 100644 index 0000000..198ac96 --- /dev/null +++ b/acdc-ws/app/utils/AcdcMetric.scala @@ -0,0 +1,17 @@ +package utils + +import io.prometheus.client.Gauge + +object AcdcMetric { + val dbCountGauge: Gauge = Gauge + .build() + .name("acdc_db_record_count") + .labelNames("table") + .help("acdc DB table records count") + .register + + def countDB(table: String, cnt: Double): Unit = { + dbCountGauge.labels(table).set(cnt) + } + +} diff --git a/acdc-ws/app/utils/MetricFilter.scala b/acdc-ws/app/utils/MetricFilter.scala deleted file mode 100644 index f9496b5..0000000 --- a/acdc-ws/app/utils/MetricFilter.scala +++ /dev/null @@ -1,49 +0,0 @@ -package utils - -import javax.inject.Inject - -import scala.concurrent.{ExecutionContext, Future} -import akka.stream.Materializer - -import play.api.mvc.{Filter, RequestHeader, Result} - -import services.Metric - -class MetricFilter @Inject() ( - metric: Metric -)(implicit - val mat: Materializer, - ec: ExecutionContext -) extends Filter { - - private val patt = raw"(api-v1-[a-z]+).*".r // keep only first word after api-v1 - def apply( - nextFilter: RequestHeader => Future[Result] - )(requestHeader: RequestHeader): Future[Result] = { - metric.parseRequest(requestHeader) match { - case Some((staticPath, _)) => - // simplify path to control prometheus summary metric label cardinality - val simplePath = staticPath match { - case patt(prefix) => prefix - case _ => "" - } - val stopTimerCallback = metric.startApiTimer(simplePath, requestHeader.method) - - nextFilter(requestHeader) - .transform( - result => { - metric.incrementStatusCount(result.header.status.toString) - stopTimerCallback() - result - }, - exception => { - metric.incrementStatusCount("500") - stopTimerCallback() - exception - } - ) - case _ => - nextFilter(requestHeader) - } - } -} diff --git a/acdc-ws/conf/application.conf b/acdc-ws/conf/application.conf index 58e0881..b939c08 100644 --- a/acdc-ws/conf/application.conf +++ b/acdc-ws/conf/application.conf @@ -39,6 +39,26 @@ acdc.db { count-task-frequency-minute = ${?COUNT_TASK_FREQUENCY_MINUTE} } +com.salesforce.mce.kineticpulse.metrics { + + enabled = true + + # some http request paths are not interesting to capture metrics such as http duration + # Disable metric collection for these request paths + bypass.paths = ["/__status", "/", "/__metrics"] + + # routes segments to track in api_duration_seconds_summary, matched values go to the metric label "path" + # example: ["eci", "model/predict", "model/accuracy"] + routes-to-track = ["instance", "instances", "dataset", "datasets", "lineage"] + + # delimiter is used within metric label "path" to replace the "/" in the request.path + delimiter = "-" + + # label value to be used when a path is not bypass, but did not match routes-to-track, + unmatched-path = "" + +} + ## Akka # https://www.playframework.com/documentation/latest/ScalaAkka#Configuration # https://www.playframework.com/documentation/latest/JavaAkka#Configuration @@ -369,11 +389,3 @@ play.assets { path = "/public" urlPrefix = "/assets" } - -# some http request paths are not interesting to capture metrics such as http duration -acdc.metrics { - enable = true - - # Disable metric collection for these request paths - bypass.paths = ["/__status", "/", "/__metrics"] -} diff --git a/acdc-ws/conf/routes b/acdc-ws/conf/routes index 73115d8..c742357 100644 --- a/acdc-ws/conf/routes +++ b/acdc-ws/conf/routes @@ -1,4 +1,4 @@ GET /__status controllers.StatusController.status() -GET /__metrics controllers.MetricController.collect() +GET /__metrics com.salesforce.mce.kineticpulse.MetricController.collect() -> /api/v1 routers.ApiRouter diff --git a/build.sbt b/build.sbt index 0af49a9..75deaa8 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,7 @@ val slickVersion = "3.4.1" val scalaTestArtifact = "org.scalatest" %% "scalatest" % "3.2.+" % Test -val prometheusClient = "io.prometheus" % "simpleclient" % "0.16.0" -val prometheusCommon = "io.prometheus" % "simpleclient_common" % "0.16.0" -val prometheusHotSpot = "io.prometheus" % "simpleclient_hotspot" % "0.16.0" +val kineticpulse = "com.salesforce.mce" %% "kineticpulse-metric" % "0.2.+" lazy val commonSettings = Seq( scalacOptions ++= Seq("-deprecation", "-feature", "-Xlint"), // , "-Xfatal-warnings"), @@ -48,9 +46,7 @@ lazy val ws = (project in file("acdc-ws")). buildInfoPackage := "com.salesforce.mce.acdc.ws", libraryDependencies ++= Seq( guice, - prometheusClient, - prometheusCommon, - prometheusHotSpot + kineticpulse ), dependencyOverrides ++= Seq( // fix https://nvd.nist.gov/vuln/detail/CVE-2022-42003 diff --git a/version.sbt b/version.sbt index debd081..dc28510 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ - ThisBuild / version := "0.8.12" + ThisBuild / version := "0.8.13"