diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/SynapseMLLogging.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/SynapseMLLogging.scala index 800f675cb1..4cab7a971b 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/SynapseMLLogging.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/SynapseMLLogging.scala @@ -5,6 +5,7 @@ package com.microsoft.azure.synapse.ml.logging import com.microsoft.azure.synapse.ml.build.BuildInfo import com.microsoft.azure.synapse.ml.logging.common.SASScrubber +import com.microsoft.azure.synapse.ml.logging.fabric.CertifiedEventClient.logToCertifiedEvents import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import spray.json.DefaultJsonProtocol._ @@ -12,6 +13,8 @@ import spray.json._ import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future case class RequiredLogFields(uid: String, className: String, @@ -110,16 +113,29 @@ trait SynapseMLLogging extends Logging { protected def logBase(methodName: String, numCols: Option[Int], - executionSeconds: Option[Double] + executionSeconds: Option[Double], + logCertifiedEvent: Boolean = false ): Unit = { logBase(getPayload( methodName, numCols, executionSeconds, - None)) + None), logCertifiedEvent) } - protected def logBase(info: Map[String, String]): Unit = { + protected def logBase(info: Map[String, String], logCertifiedEvent: Boolean): Unit = { + if (logCertifiedEvent) { + Future { + logToCertifiedEvents( + info("libraryName"), + info("method"), + info -- Seq("libraryName", "method") + ) + }.failed.map { + case e: Exception => logErrorBase("certifiedEventLogging", e) + } + } + logInfo(info.toJson.compactPrint) } @@ -130,23 +146,23 @@ trait SynapseMLLogging extends Logging { } def logClass(): Unit = { - logBase("constructor", None, None) + logBase("constructor", None, None, true) } - def logFit[T](f: => T, columns: Int): T = { - logVerb("fit", f, Some(columns)) + def logFit[T](f: => T, columns: Int, logCertifiedEvent: Boolean = true): T = { + logVerb("fit", f, columns, logCertifiedEvent) } - def logTransform[T](f: => T, columns: Int): T = { - logVerb("transform", f, Some(columns)) + def logTransform[T](f: => T, columns: Int, logCertifiedEvent: Boolean = true): T = { + logVerb("transform", f, columns, logCertifiedEvent) } - def logVerb[T](verb: String, f: => T, columns: Option[Int] = None): T = { + def logVerb[T](verb: String, f: => T, columns: Int = -1, logCertifiedEvent: Boolean = false): T = { val startTime = System.nanoTime() try { - val ret = f - logBase(verb, columns, Some((System.nanoTime() - startTime) / 1e9)) - ret + // Begin emitting certified event. + logBase(verb, Some(columns), Some((System.nanoTime() - startTime) / 1e9), logCertifiedEvent) + f } catch { case e: Exception => logErrorBase(verb, e) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/common/PlatformDetails.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/common/PlatformDetails.scala new file mode 100644 index 0000000000..ce00a8d1fa --- /dev/null +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/common/PlatformDetails.scala @@ -0,0 +1,34 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.azure.synapse.ml.logging.common + +import org.apache.spark.sql.SparkSession + +object PlatformDetails { + val PlatformSynapseInternal = "synapse_internal" + val PlatformSynapse = "synapse" + val PlatformBinder = "binder" + val PlatformDatabricks = "databricks" + val PlatformUnknown = "unknown" + val SynapseProjectName = "Microsoft.ProjectArcadia" + + def currentPlatform(): String = { + val azureService = sys.env.get("AZURE_SERVICE") + azureService match { + case Some(serviceName) if serviceName == SynapseProjectName => + val spark = SparkSession.builder.getOrCreate() + val clusterType = spark.conf.get("spark.cluster.type") + if (clusterType == "synapse") PlatformSynapse else PlatformSynapseInternal + case _ if new java.io.File("/dbfs").exists() => PlatformDatabricks + case _ if sys.env.get("BINDER_LAUNCH_HOST").isDefined => PlatformBinder + case _ => PlatformUnknown + } + } + + def runningOnSynapseInternal(): Boolean = currentPlatform() == PlatformSynapseInternal + + def runningOnSynapse(): Boolean = currentPlatform() == PlatformSynapse + + def runningOnFabric(): Boolean = runningOnSynapseInternal +} diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/CertifiedEventClient.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/CertifiedEventClient.scala new file mode 100644 index 0000000000..8c832301f9 --- /dev/null +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/CertifiedEventClient.scala @@ -0,0 +1,108 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.azure.synapse.ml.logging.fabric + +import org.apache.spark.sql.SparkSession +import spray.json.DefaultJsonProtocol.{StringJsonFormat, _} +import spray.json._ + +import java.time.Instant +import java.util.UUID +import scala.reflect.runtime.currentMirror +import scala.reflect.runtime.universe._ + +import com.microsoft.azure.synapse.ml.logging.common.PlatformDetails.runningOnFabric + +object CertifiedEventClient extends RESTUtils { + + private val PbiGlobalServiceEndpoints = Map( + "public" -> "https://api.powerbi.com/", + "fairfax" -> "https://api.powerbigov.us", + "mooncake" -> "https://api.powerbi.cn", + "blackforest" -> "https://app.powerbi.de", + "msit" -> "https://api.powerbi.com/", + "prod" -> "https://api.powerbi.com/", + "int3" -> "https://biazure-int-edog-redirect.analysis-df.windows.net/", + "dxt" -> "https://powerbistagingapi.analysis.windows.net/", + "edog" -> "https://biazure-int-edog-redirect.analysis-df.windows.net/", + "dev" -> "https://onebox-redirect.analysis.windows-int.net/", + "console" -> "http://localhost:5001/", + "daily" -> "https://dailyapi.powerbi.com/") + + + private lazy val CertifiedEventUri = getCertifiedEventUri + + private def getAccessToken: String = { + val objectName = "com.microsoft.azure.trident.tokenlibrary.TokenLibrary" + val mirror = currentMirror + val module = mirror.staticModule(objectName) + val obj = mirror.reflectModule(module).instance + val objType = mirror.reflect(obj).symbol.toType + val methodName = "getAccessToken" + val methodSymbols = objType.decl(TermName(methodName)).asTerm.alternatives + val argType = typeOf[String] + val selectedMethodSymbol = methodSymbols.find { m => + m.asMethod.paramLists match { + case List(List(param)) => param.typeSignature =:= argType + case _ => false + } + }.getOrElse(throw new NoSuchMethodException(s"Method $methodName with argument type $argType not found")) + val methodMirror = mirror.reflect(obj).reflectMethod(selectedMethodSymbol.asMethod) + methodMirror("pbi").asInstanceOf[String] + } + + private def getHeaders: Map[String, String] = { + Map( + "Authorization" -> s"Bearer $getAccessToken", + "RequestId" -> UUID.randomUUID().toString, + "Content-Type" -> "application/json", + "x-ms-workload-resource-moniker" -> UUID.randomUUID().toString + ) + } + + private def getCertifiedEventUri: String = { + val sc = SparkSession.builder().getOrCreate().sparkContext + val workspaceId = sc.hadoopConfiguration.get("trident.artifact.workspace.id") + val capacityId = sc.hadoopConfiguration.get("trident.capacity.id") + val pbiEnv = sc.getConf.get("spark.trident.pbienv").toLowerCase() + + val clusterDetailUrl = s"${PbiGlobalServiceEndpoints(pbiEnv)}powerbi/globalservice/v201606/clusterDetails" + val headers = getHeaders + + val clusterUrl = usageGet(clusterDetailUrl, headers) + .asJsObject.fields("clusterUrl").convertTo[String] + val tokenUrl: String = s"$clusterUrl/metadata/v201606/generatemwctokenv2" + + val payload = + s"""{ + |"capacityObjectId": "$capacityId", + |"workspaceObjectId": "$workspaceId", + |"workloadType": "ML" + |}""".stripMargin + + + val host = usagePost(tokenUrl, payload, headers) + .asJsObject.fields("TargetUriHost").convertTo[String] + + s"https://$host/webapi/Capacities/$capacityId/workloads/ML/MLAdmin/Automatic/workspaceid/$workspaceId/telemetry" + } + + + private[ml] def logToCertifiedEvents(featureName: String, + activityName: String, + attributes: Map[String, String]): Unit = { + + if (runningOnFabric) { + val payload = + s"""{ + |"timestamp":${Instant.now().getEpochSecond}, + |"feature_name":"$featureName", + |"activity_name":"$activityName", + |"attributes":${attributes.toJson.compactPrint} + |}""".stripMargin + + usagePost(CertifiedEventUri, payload, getHeaders) + } + } +} diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/RESTUtils.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/RESTUtils.scala new file mode 100644 index 0000000000..12121f321c --- /dev/null +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fabric/RESTUtils.scala @@ -0,0 +1,47 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.azure.synapse.ml.logging.fabric + +import com.microsoft.azure.synapse.ml.io.http.RESTHelpers +import org.apache.commons.io.IOUtils +import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost} +import org.apache.http.entity.StringEntity +import spray.json.{JsObject, JsValue, _} + +trait RESTUtils { + def usagePost(url: String, body: String, headers: Map[String, String]): JsValue = { + val request = new HttpPost(url) + + for ((k, v) <- headers) + request.addHeader(k, v) + + request.setEntity(new StringEntity(body)) + + val response = RESTHelpers.safeSend(request, close = false) + val parsedResponse = parseResponse(response) + response.close() + parsedResponse + } + + def usageGet(url: String, headers: Map[String, String]): JsValue = { + val request = new HttpGet(url) + for ((k, v) <- headers) + request.addHeader(k, v) + + val response = RESTHelpers.safeSend(request, close = false) + val result = parseResponse(response) + response.close() + result + } + + private def parseResponse(response: CloseableHttpResponse): JsValue = { + val content: String = IOUtils.toString(response.getEntity.getContent, "utf-8") + if (content.nonEmpty) { + content.parseJson + } else { + JsObject() + } + } + +} diff --git a/environment.yml b/environment.yml index 9dac854ab7..97c23c11af 100644 --- a/environment.yml +++ b/environment.yml @@ -50,4 +50,3 @@ dependencies: - pypandoc - markdownify - traitlets -