Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Scala client for certified events #2045

Merged
merged 54 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
fd89b6f
Scala client for certified events
saileshbaidya Jul 28, 2023
0bbac30
Fixing bug, where I was extracting region and pvi environment details…
saileshbaidya Jul 28, 2023
9d85993
This state represents code that works in notebook after porting secti…
saileshbaidya Aug 4, 2023
9227747
Token expiry check, removing unused imports
saileshbaidya Aug 8, 2023
bf4c301
Creating JWT Token Parser. Removing dependencies that are no longer n…
saileshbaidya Aug 9, 2023
0fb16e6
Restoring resthelpers.scala to prior state. Adding exception handling…
saileshbaidya Aug 9, 2023
b1e6ba3
1) Restricting access level to class properties, and functions. 2) Cl…
saileshbaidya Aug 10, 2023
bb093cc
Refactoring to support single responsibility as much as possible and …
saileshbaidya Aug 12, 2023
8a1539b
Checking an empty http response content, before parsing
saileshbaidya Aug 14, 2023
070ccd2
Fixing the early http client termination. At this point we are succes…
saileshbaidya Aug 23, 2023
3df2f45
Fixing typos
saileshbaidya Aug 24, 2023
7fad214
Fixing test failure.
saileshbaidya Aug 24, 2023
f37a89f
At this point addressing just pr comments.
saileshbaidya Sep 6, 2023
a58a659
Addressing PR comments
saileshbaidya Sep 7, 2023
1a707a8
Removing token used for test and created a dummy token creator.
saileshbaidya Sep 9, 2023
db38753
Adding abstract classes to represent certified event payload, adding …
saileshbaidya Sep 20, 2023
2619dbb
Turning code immutable as much as possible and removing few tests and…
saileshbaidya Sep 22, 2023
b169537
Merge branch 'master' into saibai/ScalaClientCE
saileshbaidya Sep 22, 2023
3098851
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
2e65c87
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
b1558ed
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
b7a78c5
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
400dbe8
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
fd3a62a
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 24, 2023
01a6301
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 24, 2023
f3a0ba0
porting some api realted to web calls to WebUtils.scala, making few c…
saileshbaidya Sep 25, 2023
a007885
Syncing to remote repository
saileshbaidya Sep 25, 2023
0f33d72
Fixing build error that was partly introduced from missing to remove …
saileshbaidya Sep 25, 2023
d7a15c8
refactoring to change few object into traits, and addressing few PR c…
saileshbaidya Sep 25, 2023
71abb8c
changing the case of certified event activity name
saileshbaidya Sep 25, 2023
14cf664
Removing some class and associated test. Turning some variable lazy.
saileshbaidya Sep 25, 2023
bd35087
Changing some parameter to Option and then removing unused imports
saileshbaidya Sep 28, 2023
98771b7
1) Removing token extraction via fabric token service.
saileshbaidya Oct 2, 2023
5e6ec48
some cleanup
mhamilton723 Oct 3, 2023
5b66b58
Removing token caching.
saileshbaidya Oct 3, 2023
825c796
neaten PR
mhamilton723 Oct 5, 2023
b23941a
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 5, 2023
94d41ca
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 5, 2023
2c90498
add futures
mhamilton723 Oct 5, 2023
839ec90
Merge branch 'saibai/ScalaClientCE' of https://github.com/saileshbaid…
saileshbaidya Oct 5, 2023
a9de057
Adding Fabric environment check and using it to decide emitting certi…
saileshbaidya Oct 5, 2023
c987723
Adding platform check before emitting CE. Turning calls to log CE asy…
saileshbaidya Oct 7, 2023
8e38b6c
Modifying logic to determine if platform is Fabric only if it is Syna…
saileshbaidya Oct 8, 2023
17f7c15
Merge branch 'master' into saibai/ScalaClientCE
saileshbaidya Oct 8, 2023
3f0fe99
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/com…
mhamilton723 Oct 9, 2023
7b6afc2
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/com…
mhamilton723 Oct 9, 2023
6a760a9
Apply suggestions from code review
mhamilton723 Oct 9, 2023
2a257a6
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 9, 2023
75571ab
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 9, 2023
3093346
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 9, 2023
629dfa7
Update .gitignore
mhamilton723 Oct 9, 2023
a755d64
Update tools/docgen/docgen/manifest.yaml
mhamilton723 Oct 9, 2023
8811a52
Update environment.yml
mhamilton723 Oct 9, 2023
5ef7c59
Update environment.yml
mhamilton723 Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage

import spray.json._

class InvalidJwtTokenException(message: String) extends Exception(message)
class JwtTokenExpiryMissingException(message: String) extends Exception(message)
class FabricTokenParser(JWToken: String) {
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
val tokens: Array[String] = JWToken.split("\\.")
private var parsedToken: JsValue = tokenCheckAndDecode(tokens)
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
def getExpiry: Long ={
val exp: Option[Long] = parsedToken.asJsObject.fields.get("exp").collect { case JsNumber(value) => value.toLong }
exp match {
case Some(expValue) =>
expValue
case None =>
throw new JwtTokenExpiryMissingException(s"JWT token does not have expiration set. " +
s"Here is the token = {$JWToken}")
}
}

private def tokenCheckAndDecode(tokens: Array[String]): JsValue ={
if (tokens.length == 3) {
// Getting the JWT payload which is second member of [header].[payload].[signature]
val payload = tokens(1)
// Removing whitespace and url safe characters encoded that might have been added to token
val sanitizedPayload = payload.replace('-', '+').replace('_', '/').replaceAll("\\.", "").replaceAll("\\s", "")
val decodedPayload = java.util.Base64.getDecoder.decode(sanitizedPayload)
val decodedJson = new String(decodedPayload)
decodedJson.parseJson
}
else {
throw new InvalidJwtTokenException(s"Invalid JWT token. Here is the token = {$JWToken}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage

import java.io.IOException
import java.lang.management.ManagementFactory
import java.net.URL
import java.net.InetAddress
import java.util.UUID
import spray.json.DefaultJsonProtocol.StringJsonFormat
import com.microsoft.azure.synapse.ml.logging.Usage.FabricConstants._
import spray.json.{JsArray, JsObject, JsValue, _}
import com.microsoft.azure.synapse.ml.logging.common.WebUtils.usageGet
import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
import org.apache.hc.client5.http.ClientProtocolException

class FabricTokenServiceClient {
private val resourceMapping = Map(
"https://storage.azure.com" -> "storage",
"storage" -> "storage",
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
"https://analysis.windows.net/powerbi/api" -> "pbi",
"pbi" -> "pbi",
"https://vault.azure.net" -> "keyvault",
"keyvault" -> "keyvault",
"https://kusto.kusto.windows.net" -> "kusto",
"kusto" -> "kusto"
)

private val hostname = InetAddress.getLocalHost.getHostName
private val processDetail = ManagementFactory.getRuntimeMXBean.getName
private val processName = processDetail.substring(processDetail.indexOf('@') + 1)
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved

private val fabricContext = FabricUtils.getFabricContext
private val synapseTokenServiceEndpoint: String = fabricContext(synapseTokenServiceEndpoint)
private val workloadEndpoint = fabricContext(TridentLakehouseTokenServiceEndpoint)
private val sessionToken = fabricContext(TridentSessionToken)
private val clusterIdentifier = fabricContext(SynapseClusterIdentifier)

def getAccessToken(resourceParam: String): String = {
if (!resourceMapping.contains(resourceParam)) {
throw new Exception(s"$resourceParam not supported")
}
val resource = resourceMapping.getOrElse(resourceParam, "")
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
val rid = UUID.randomUUID().toString()
val targetUrl = new URL(workloadEndpoint)
val headers: Map[String, String] = Map(
"x-ms-cluster-identifier" -> clusterIdentifier,
"x-ms-workload-resource-moniker" -> clusterIdentifier,
"Content-Type" -> "application/json;charset=utf-8",
"x-ms-proxy-host" -> s"${targetUrl.getProtocol}://${targetUrl.getHost}",
"x-ms-partner-token" -> sessionToken,
"User-Agent" -> s"Trident Token Library - HostName:$hostname, ProcessName:$processName",
"x-ms-client-request-id" -> rid
)
var url = s"$synapseTokenServiceEndpoint/api/v1/proxy${targetUrl.getPath}/access?resource=$resource"
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
try {
val response: JsValue = usageGet(url, headers)
if (response.asJsObject.fields("status_code").convertTo[String] != 200
|| response.asJsObject.fields("content").convertTo[String].isEmpty) {
throw new Exception("Fetch access token error")
}
response.asJsObject.fields("content").toString().getBytes("UTF-8").toString
} catch {
case e: IOException =>
SynapseMLLogging.logMessage(s"getAccessToken: Failed to fetch cluster details. Problems in executing" +
s" http request or the connection might have been aborted. Exception = $e.")
""
case e: ClientProtocolException =>
SynapseMLLogging.logMessage(s"getAccessToken: Failed to fetch cluster details. " +
s"HTTP protocol error. Exception = $e.")
""
case e: Exception =>
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
SynapseMLLogging.logMessage(s"getAccessToken: Failed to fetch cluster details. Exception = $e.")
""
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage

import com.microsoft.azure.synapse.ml.core.env.StreamUtilities
import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
import spray.json._
import scala.util.matching.Regex
import scala.io.Source

case class TokenServiceConfig(tokenServiceEndpoint: String,
clusterType: String,
clusterName: String,
sessionToken: String)

object TokenServiceConfigProtocol extends DefaultJsonProtocol {
implicit val TokenServiceConfigFormat: RootJsonFormat[TokenServiceConfig] = jsonFormat4(TokenServiceConfig)
}

import TokenServiceConfigProtocol._

object FabricUtils {
//private var TridentContext: Map[String, String] = Map[String, String]()

def getFabricContext: Map[String, String] = {
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
try {
val linesContextFile = StreamUtilities.usingSource(scala.io.Source.fromFile(FabricConstants.ContextFilePath)) {
source => source.getLines().toList
}.get

val tokenServiceConfig = StreamUtilities.usingSource(scala.io.Source.fromFile
(FabricConstants.TokenServiceFilePath)) {
source => cleanJson(source.mkString).parseJson.convertTo[TokenServiceConfig]
}.get

val tridentContext: Map[String, String] = linesContextFile
.filter(line => line.split('=').length == 2)
.map { line =>
val Array(k, v) = line.split('=')
(k.trim, v.trim)
}.toMap
.++(Seq(
(FabricConstants.SynapseTokenServiceEndpoint, tokenServiceConfig.tokenServiceEndpoint),
(FabricConstants.SynapseClusterType, tokenServiceConfig.clusterType),
(FabricConstants.SynapseClusterIdentifier, tokenServiceConfig.clusterName),
(FabricConstants.TridentSessionToken, tokenServiceConfig.sessionToken)
).toMap)

tridentContext
} catch {
case e: NullPointerException =>
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
SynapseMLLogging.logMessage(s"Error reading Fabric context file: Trident context file path is missing. $e")
throw e
}
}

private def cleanJson(s: String): String = {
val pattern: Regex = ",[ \t\r\n]+}".r
val cleanedJson = pattern.replaceAllIn(s, "}")
cleanedJson
}
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage

case class FeatureUsagePayload(feature_name: UsageFeatureNames.Value,
activity_name: FeatureActivityName.Value,
attributes: Map[String, String])
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage

import com.microsoft.azure.synapse.ml.logging.common.WebUtils.usageGet
import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
import com.microsoft.azure.synapse.ml.logging.Usage.FabricConstants.{Capacities, Workloads, WorkloadEndpointAutomatic}
import com.microsoft.azure.synapse.ml.logging.Usage.FabricConstants.{WebApi, WorkloadEndpointMl, WorkspaceID}
import spray.json.DefaultJsonProtocol.StringJsonFormat
import spray.json.{JsValue, JsonParser}

object HostEndpointUtils {
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
def getMlflowSharedHost(pbienv: String): String = {
val pbiGlobalServiceEndpoints = Map(
"public" -> "https://api.powerbi.com/",
"fairfax" -> "https://api.powerbigov.us",
"mooncake" -> "https://api.powerbi.cn",
"blackforest" -> "https://app.powerbi.de",
"msit" -> "https://api.powerbi.com/",
"prod" -> "https://api.powerbi.com/",
"int3" -> "https://biazure-int-edog-redirect.analysis-df.windows.net/",
"dxt" -> "https://powerbistagingapi.analysis.windows.net/",
"edog" -> "https://biazure-int-edog-redirect.analysis-df.windows.net/",
"dev" -> "https://onebox-redirect.analysis.windows-int.net/",
"console" -> "http://localhost:5001/",
"daily" -> "https://dailyapi.powerbi.com/")

val defaultGlobalServiceEndpoint: String = "https://api.powerbi.com/"
val fetchClusterDetailUri: String = "powerbi/globalservice/v201606/clusterDetails"

val url = pbiGlobalServiceEndpoints.getOrElse(pbienv, defaultGlobalServiceEndpoint) + fetchClusterDetailUri
val headers = Map(
"Authorization" -> s"Bearer ${TokenUtils.getAccessToken}",
"RequestId" -> java.util.UUID.randomUUID().toString
)
var response: JsValue = JsonParser("{}")
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
try {
response = usageGet(url, headers)
} catch {
case e: Exception =>
SynapseMLLogging.logMessage(s"HostEndpointUtils.getMlflowSharedHost: " +
s"Can't get ml flow shared host. Exception = $e. (usage test)")
""
}
response.asJsObject.fields("clusterUrl").convertTo[String]
}
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved

def getMlflowWorkloadHost(pbienv: String, capacityId: String,
workspaceId: String,
sharedHost: String = ""): String = {
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
val clusterUrl = if (sharedHost.isEmpty) {
getMlflowSharedHost(pbienv)
} else {
sharedHost
}
val mwcToken: Option[MwcToken] = TokenUtils.getMwcToken(clusterUrl,
workspaceId, capacityId, TokenUtils.MwcWorkloadTypeMl)
mwcToken match {
case Some(token) =>
token.TargetUriHost
case None =>
""
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
}
}

def getMLWorkloadEndpoint(wlHost: String, capacityId: String, endpoint: String, workspaceID: String): String = {
val mlWorkloadEndpoint = s"$wlHost/$WebApi/$Capacities/$capacityId/$Workloads/" +
s"$WorkloadEndpointMl/$endpoint/$WorkloadEndpointAutomatic/${WorkspaceID}/$workspaceID/"
mlWorkloadEndpoint
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage

import scala.reflect.runtime.currentMirror
import scala.reflect.runtime.universe._
import java.time.Instant
import org.apache.spark.SparkContext
import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
import spray.json.DefaultJsonProtocol.{StringJsonFormat, jsonFormat3}
import java.util.UUID
import com.microsoft.azure.synapse.ml.logging.common.WebUtils._
import spray.json.{DeserializationException, RootJsonFormat}
import spray.json.JsonParser.ParsingException

case class MwcToken (TargetUriHost: String, CapacityObjectId: String, Token: String)
object TokenUtils {
var AADToken: String = ""
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
val MwcWorkloadTypeMl = "ML"

def getAccessToken: String = {
if (isTokenValid(Option(this.AADToken)))
this.AADToken
else {
refreshAccessToken()
this.AADToken
}
}

def getAccessToken(tokenType: String): String = {

val objectName = "com.microsoft.azure.trident.tokenlibrary.TokenLibrary"
val mirror = currentMirror
val module = mirror.staticModule(objectName)
val obj = mirror.reflectModule(module).instance
val objType = mirror.reflect(obj).symbol.toType
val methodName = "getAccessToken"
val methodSymbols = objType.decl(TermName(methodName)).asTerm.alternatives
val argType = typeOf[String]

val selectedMethodSymbol = methodSymbols.find { m =>
m.asMethod.paramLists match {
case List(List(param)) => param.typeSignature =:= argType
case _ => false
}
}.getOrElse(throw new NoSuchMethodException(s"Method $methodName with argument type $argType not found"))

val methodMirror = mirror.reflect(obj).reflectMethod(selectedMethodSymbol.asMethod)
methodMirror(tokenType).asInstanceOf[String]
}

private def isTokenValid(tokenOption: Option[String]): Boolean = {
tokenOption match {
case Some(token) if token.nonEmpty =>
try {
val tokenParser = new FabricTokenParser(token)
val expiryEpoch = tokenParser.getExpiry
val now = Instant.now().getEpochSecond
now < expiryEpoch - 60
} catch {
case e: InvalidJwtTokenException =>
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
SynapseMLLogging.logMessage(s"TokenUtils::checkTokenValid: Token used to trigger telemetry " +
s"endpoint is invalid. Exception = $e")
false
case e: JwtTokenExpiryMissingException =>
SynapseMLLogging.logMessage(s"TokenUtils::checkTokenValid: Token misses expiry. " +
s"Exception = $e")
false
}
case _ =>
false // No value is present or the value is empty
}
}

private def refreshAccessToken(): Unit = {
try {
if (SparkContext.getOrCreate() != null) {
val token = getAccessToken("pbi")
AADToken = token
} else {
val token = new FabricTokenServiceClient().getAccessToken("pbi")
AADToken = token
}
} catch {
case e: Exception =>
SynapseMLLogging.logMessage(s"refreshAccessTok: failed to refresh pbi tok. Exception: {$e}. (usage test)")
}
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
}

def getMwcToken(shared_host: String, WorkspaceId: String, capacity_id: String,
workload_type: String): Option[MwcToken]= {
val url: String = shared_host + "/metadata/v201606/generatemwctokenv2"

val payLoad = s"""{
|"capacityObjectId": "$capacity_id",
|"workspaceObjectId": "$WorkspaceId",
|"workloadType": "$workload_type"
}""".stripMargin

val driverAADToken = getAccessToken

val headers = Map(
"Content-Type" -> "application/json",
"Authorization" -> s"""Bearer $driverAADToken""".stripMargin,
"x-ms-workload-resource-moniker" -> UUID.randomUUID().toString
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
)

try{
val response = usagePost(url, payLoad, headers)
var targetUriHost = response.asJsObject.fields("TargetUriHost").convertTo[String]
targetUriHost = s"https://$targetUriHost"
response.asJsObject.fields.updated("TargetUriHost", targetUriHost)

implicit val mwcTokenFormat: RootJsonFormat[MwcToken] = jsonFormat3(MwcToken)
val mwcToken = response.convertTo[MwcToken]
Some(mwcToken)
}
catch {
case e: NoSuchElementException =>
SynapseMLLogging.logMessage(s"TokenUtils.getMWCToken: Cannot retrieve targetUriHost from MWC Token.")
None
case e: DeserializationException =>
SynapseMLLogging.logMessage(s"TokenUtils.getMWCToken: The structure of response is not of type MwcToken.")
None
case e: ParsingException =>
SynapseMLLogging.logMessage(s"TokenUtils.getMWCToken: The structure of json response is formed correctly.")
None
case e: Exception =>
SynapseMLLogging.logMessage(s"getMWCTok: Failed to fetch MWC token that is required to " +
s"get cluster details: $e.")
None
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Loading