Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Scala client for certified events #2045

Merged
merged 54 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
fd89b6f
Scala client for certified events
saileshbaidya Jul 28, 2023
0bbac30
Fixing bug, where I was extracting region and pvi environment details…
saileshbaidya Jul 28, 2023
9d85993
This state represents code that works in notebook after porting secti…
saileshbaidya Aug 4, 2023
9227747
Token expiry check, removing unused imports
saileshbaidya Aug 8, 2023
bf4c301
Creating JWT Token Parser. Removing dependencies that are no longer n…
saileshbaidya Aug 9, 2023
0fb16e6
Restoring resthelpers.scala to prior state. Adding exception handling…
saileshbaidya Aug 9, 2023
b1e6ba3
1) Restricting access level to class properties, and functions. 2) Cl…
saileshbaidya Aug 10, 2023
bb093cc
Refactoring to support single responsibility as much as possible and …
saileshbaidya Aug 12, 2023
8a1539b
Checking an empty http response content, before parsing
saileshbaidya Aug 14, 2023
070ccd2
Fixing the early http client termination. At this point we are succes…
saileshbaidya Aug 23, 2023
3df2f45
Fixing typos
saileshbaidya Aug 24, 2023
7fad214
Fixing test failure.
saileshbaidya Aug 24, 2023
f37a89f
At this point addressing just pr comments.
saileshbaidya Sep 6, 2023
a58a659
Addressing PR comments
saileshbaidya Sep 7, 2023
1a707a8
Removing token used for test and created a dummy token creator.
saileshbaidya Sep 9, 2023
db38753
Adding abstract classes to represent certified event payload, adding …
saileshbaidya Sep 20, 2023
2619dbb
Turning code immutable as much as possible and removing few tests and…
saileshbaidya Sep 22, 2023
b169537
Merge branch 'master' into saibai/ScalaClientCE
saileshbaidya Sep 22, 2023
3098851
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
2e65c87
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
b1558ed
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
b7a78c5
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
400dbe8
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
fd3a62a
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 24, 2023
01a6301
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 24, 2023
f3a0ba0
porting some api realted to web calls to WebUtils.scala, making few c…
saileshbaidya Sep 25, 2023
a007885
Syncing to remote repository
saileshbaidya Sep 25, 2023
0f33d72
Fixing build error that was partly introduced from missing to remove …
saileshbaidya Sep 25, 2023
d7a15c8
refactoring to change few object into traits, and addressing few PR c…
saileshbaidya Sep 25, 2023
71abb8c
changing the case of certified event activity name
saileshbaidya Sep 25, 2023
14cf664
Removing some class and associated test. Turning some variable lazy.
saileshbaidya Sep 25, 2023
bd35087
Changing some parameter to Option and then removing unused imports
saileshbaidya Sep 28, 2023
98771b7
1) Removing token extraction via fabric token service.
saileshbaidya Oct 2, 2023
5e6ec48
some cleanup
mhamilton723 Oct 3, 2023
5b66b58
Removing token caching.
saileshbaidya Oct 3, 2023
825c796
neaten PR
mhamilton723 Oct 5, 2023
b23941a
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 5, 2023
94d41ca
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 5, 2023
2c90498
add futures
mhamilton723 Oct 5, 2023
839ec90
Merge branch 'saibai/ScalaClientCE' of https://github.com/saileshbaid…
saileshbaidya Oct 5, 2023
a9de057
Adding Fabric environment check and using it to decide emitting certi…
saileshbaidya Oct 5, 2023
c987723
Adding platform check before emitting CE. Turning calls to log CE asy…
saileshbaidya Oct 7, 2023
8e38b6c
Modifying logic to determine if platform is Fabric only if it is Syna…
saileshbaidya Oct 8, 2023
17f7c15
Merge branch 'master' into saibai/ScalaClientCE
saileshbaidya Oct 8, 2023
3f0fe99
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/com…
mhamilton723 Oct 9, 2023
7b6afc2
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/com…
mhamilton723 Oct 9, 2023
6a760a9
Apply suggestions from code review
mhamilton723 Oct 9, 2023
2a257a6
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 9, 2023
75571ab
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 9, 2023
3093346
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 9, 2023
629dfa7
Update .gitignore
mhamilton723 Oct 9, 2023
a755d64
Update tools/docgen/docgen/manifest.yaml
mhamilton723 Oct 9, 2023
8811a52
Update environment.yml
mhamilton723 Oct 9, 2023
5ef7c59
Update environment.yml
mhamilton723 Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ package com.microsoft.azure.synapse.ml.logging

import com.microsoft.azure.synapse.ml.build.BuildInfo
import com.microsoft.azure.synapse.ml.logging.common.SASScrubber
import com.microsoft.azure.synapse.ml.logging.fabric.CertifiedEventClient.logToCertifiedEvents
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import spray.json.DefaultJsonProtocol._
import spray.json._

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

case class RequiredLogFields(uid: String,
className: String,
Expand Down Expand Up @@ -110,16 +113,29 @@ trait SynapseMLLogging extends Logging {

protected def logBase(methodName: String,
numCols: Option[Int],
executionSeconds: Option[Double]
executionSeconds: Option[Double],
logCertifiedEvent: Boolean = false
): Unit = {
logBase(getPayload(
methodName,
numCols,
executionSeconds,
None))
None), logCertifiedEvent)
}

protected def logBase(info: Map[String, String]): Unit = {
protected def logBase(info: Map[String, String], logCertifiedEvent: Boolean): Unit = {
if (logCertifiedEvent) {
Future {
logToCertifiedEvents(
info("libraryName"),
info("method"),
info -- Seq("libraryName", "method")
)
}.failed.map {
case e: Exception => logErrorBase("certifiedEventLogging", e)
}
}

logInfo(info.toJson.compactPrint)
}

Expand All @@ -130,23 +146,23 @@ trait SynapseMLLogging extends Logging {
}

def logClass(): Unit = {
logBase("constructor", None, None)
logBase("constructor", None, None, true)
}

def logFit[T](f: => T, columns: Int): T = {
logVerb("fit", f, Some(columns))
def logFit[T](f: => T, columns: Int, logCertifiedEvent: Boolean = true): T = {
logVerb("fit", f, columns, logCertifiedEvent)
}

def logTransform[T](f: => T, columns: Int): T = {
logVerb("transform", f, Some(columns))
def logTransform[T](f: => T, columns: Int, logCertifiedEvent: Boolean = true): T = {
logVerb("transform", f, columns, logCertifiedEvent)
}

def logVerb[T](verb: String, f: => T, columns: Option[Int] = None): T = {
def logVerb[T](verb: String, f: => T, columns: Int = -1, logCertifiedEvent: Boolean = false): T = {
val startTime = System.nanoTime()
try {
val ret = f
logBase(verb, columns, Some((System.nanoTime() - startTime) / 1e9))
ret
// Begin emitting certified event.
logBase(verb, Some(columns), Some((System.nanoTime() - startTime) / 1e9), logCertifiedEvent)
f
} catch {
case e: Exception =>
logErrorBase(verb, e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.common

import org.apache.spark.sql.SparkSession

object PlatformDetails {
val PlatformSynapseInternal = "synapse_internal"
val PlatformSynapse = "synapse"
val PlatformBinder = "binder"
val PlatformDatabricks = "databricks"
val PlatformUnknown = "unknown"
val SynapseProjectName = "Microsoft.ProjectArcadia"

def currentPlatform(): String = {
val azureService = sys.env.get("AZURE_SERVICE")
azureService match {
case Some(serviceName) if serviceName == SynapseProjectName =>
val spark = SparkSession.builder.getOrCreate()
val clusterType = spark.conf.get("spark.cluster.type")
if (clusterType == "synapse") PlatformSynapse else PlatformSynapseInternal
case _ if new java.io.File("/dbfs").exists() => PlatformDatabricks
case _ if sys.env.get("BINDER_LAUNCH_HOST").isDefined => PlatformBinder
case _ => PlatformUnknown
}
}

def runningOnSynapseInternal(): Boolean = currentPlatform() == PlatformSynapseInternal

def runningOnSynapse(): Boolean = currentPlatform() == PlatformSynapse

def runningOnFabric(): Boolean = runningOnSynapseInternal
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.fabric

import org.apache.spark.sql.SparkSession
import spray.json.DefaultJsonProtocol.{StringJsonFormat, _}
import spray.json._

import java.time.Instant
import java.util.UUID
import scala.reflect.runtime.currentMirror
import scala.reflect.runtime.universe._

import com.microsoft.azure.synapse.ml.logging.common.PlatformDetails.runningOnFabric

object CertifiedEventClient extends RESTUtils {

private val PbiGlobalServiceEndpoints = Map(
"public" -> "https://api.powerbi.com/",
"fairfax" -> "https://api.powerbigov.us",
"mooncake" -> "https://api.powerbi.cn",
"blackforest" -> "https://app.powerbi.de",
"msit" -> "https://api.powerbi.com/",
"prod" -> "https://api.powerbi.com/",
"int3" -> "https://biazure-int-edog-redirect.analysis-df.windows.net/",
"dxt" -> "https://powerbistagingapi.analysis.windows.net/",
"edog" -> "https://biazure-int-edog-redirect.analysis-df.windows.net/",
"dev" -> "https://onebox-redirect.analysis.windows-int.net/",
"console" -> "http://localhost:5001/",
"daily" -> "https://dailyapi.powerbi.com/")


private lazy val CertifiedEventUri = getCertifiedEventUri

private def getAccessToken: String = {
val objectName = "com.microsoft.azure.trident.tokenlibrary.TokenLibrary"
val mirror = currentMirror
val module = mirror.staticModule(objectName)
val obj = mirror.reflectModule(module).instance
val objType = mirror.reflect(obj).symbol.toType
val methodName = "getAccessToken"
val methodSymbols = objType.decl(TermName(methodName)).asTerm.alternatives
val argType = typeOf[String]
val selectedMethodSymbol = methodSymbols.find { m =>
m.asMethod.paramLists match {
case List(List(param)) => param.typeSignature =:= argType
case _ => false
}
}.getOrElse(throw new NoSuchMethodException(s"Method $methodName with argument type $argType not found"))
val methodMirror = mirror.reflect(obj).reflectMethod(selectedMethodSymbol.asMethod)
methodMirror("pbi").asInstanceOf[String]
}

private def getHeaders: Map[String, String] = {
Map(
"Authorization" -> s"Bearer $getAccessToken",
"RequestId" -> UUID.randomUUID().toString,
"Content-Type" -> "application/json",
"x-ms-workload-resource-moniker" -> UUID.randomUUID().toString
)
}

private def getCertifiedEventUri: String = {
val sc = SparkSession.builder().getOrCreate().sparkContext
val workspaceId = sc.hadoopConfiguration.get("trident.artifact.workspace.id")
val capacityId = sc.hadoopConfiguration.get("trident.capacity.id")
val pbiEnv = sc.getConf.get("spark.trident.pbienv").toLowerCase()

val clusterDetailUrl = s"${PbiGlobalServiceEndpoints(pbiEnv)}powerbi/globalservice/v201606/clusterDetails"
val headers = getHeaders

val clusterUrl = usageGet(clusterDetailUrl, headers)
.asJsObject.fields("clusterUrl").convertTo[String]
val tokenUrl: String = s"$clusterUrl/metadata/v201606/generatemwctokenv2"

val payload =
s"""{
|"capacityObjectId": "$capacityId",
|"workspaceObjectId": "$workspaceId",
|"workloadType": "ML"
|}""".stripMargin


val host = usagePost(tokenUrl, payload, headers)
.asJsObject.fields("TargetUriHost").convertTo[String]

s"https://$host/webapi/Capacities/$capacityId/workloads/ML/MLAdmin/Automatic/workspaceid/$workspaceId/telemetry"
}


private[ml] def logToCertifiedEvents(featureName: String,
activityName: String,
attributes: Map[String, String]): Unit = {

if (runningOnFabric) {
val payload =
s"""{
|"timestamp":${Instant.now().getEpochSecond},
|"feature_name":"$featureName",
|"activity_name":"$activityName",
|"attributes":${attributes.toJson.compactPrint}
|}""".stripMargin

usagePost(CertifiedEventUri, payload, getHeaders)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.fabric

import com.microsoft.azure.synapse.ml.io.http.RESTHelpers
import org.apache.commons.io.IOUtils
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
import org.apache.http.entity.StringEntity
import spray.json.{JsObject, JsValue, _}

trait RESTUtils {
def usagePost(url: String, body: String, headers: Map[String, String]): JsValue = {
val request = new HttpPost(url)

for ((k, v) <- headers)
request.addHeader(k, v)

request.setEntity(new StringEntity(body))

val response = RESTHelpers.safeSend(request, close = false)
val parsedResponse = parseResponse(response)
response.close()
parsedResponse
}

def usageGet(url: String, headers: Map[String, String]): JsValue = {
val request = new HttpGet(url)
for ((k, v) <- headers)
request.addHeader(k, v)

val response = RESTHelpers.safeSend(request, close = false)
val result = parseResponse(response)
response.close()
result
}

private def parseResponse(response: CloseableHttpResponse): JsValue = {
val content: String = IOUtils.toString(response.getEntity.getContent, "utf-8")
if (content.nonEmpty) {
content.parseJson
} else {
JsObject()
}
}

}
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ dependencies:
- pypandoc
- markdownify
- traitlets
mhamilton723 marked this conversation as resolved.
Show resolved Hide resolved

mhamilton723 marked this conversation as resolved.
Show resolved Hide resolved