Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Scala client for certified events #2045

Merged
merged 54 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
fd89b6f
Scala client for certified events
saileshbaidya Jul 28, 2023
0bbac30
Fixing bug, where I was extracting region and pvi environment details…
saileshbaidya Jul 28, 2023
9d85993
This state represents code that works in notebook after porting secti…
saileshbaidya Aug 4, 2023
9227747
Token expiry check, removing unused imports
saileshbaidya Aug 8, 2023
bf4c301
Creating JWT Token Parser. Removing dependencies that are no longer n…
saileshbaidya Aug 9, 2023
0fb16e6
Restoring resthelpers.scala to prior state. Adding exception handling…
saileshbaidya Aug 9, 2023
b1e6ba3
1) Restricting access level to class properties, and functions. 2) Cl…
saileshbaidya Aug 10, 2023
bb093cc
Refactoring to support single responsibility as much as possible and …
saileshbaidya Aug 12, 2023
8a1539b
Checking an empty http response content, before parsing
saileshbaidya Aug 14, 2023
070ccd2
Fixing the early http client termination. At this point we are succes…
saileshbaidya Aug 23, 2023
3df2f45
Fixing typos
saileshbaidya Aug 24, 2023
7fad214
Fixing test failure.
saileshbaidya Aug 24, 2023
f37a89f
At this point addressing just pr comments.
saileshbaidya Sep 6, 2023
a58a659
Addressing PR comments
saileshbaidya Sep 7, 2023
1a707a8
Removing token used for test and created a dummy token creator.
saileshbaidya Sep 9, 2023
db38753
Adding abstract classes to represent certified event payload, adding …
saileshbaidya Sep 20, 2023
2619dbb
Turning code immutable as much as possible and removing few tests and…
saileshbaidya Sep 22, 2023
b169537
Merge branch 'master' into saibai/ScalaClientCE
saileshbaidya Sep 22, 2023
3098851
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
2e65c87
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
b1558ed
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
b7a78c5
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
400dbe8
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 23, 2023
fd3a62a
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 24, 2023
01a6301
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/Usa…
saileshbaidya Sep 24, 2023
f3a0ba0
porting some api realted to web calls to WebUtils.scala, making few c…
saileshbaidya Sep 25, 2023
a007885
Syncing to remote repository
saileshbaidya Sep 25, 2023
0f33d72
Fixing build error that was partly introduced from missing to remove …
saileshbaidya Sep 25, 2023
d7a15c8
refactoring to change few object into traits, and addressing few PR c…
saileshbaidya Sep 25, 2023
71abb8c
changing the case of certified event activity name
saileshbaidya Sep 25, 2023
14cf664
Removing some class and associated test. Turning some variable lazy.
saileshbaidya Sep 25, 2023
bd35087
Changing some parameter to Option and then removing unused imports
saileshbaidya Sep 28, 2023
98771b7
1) Removing token extraction via fabric token service.
saileshbaidya Oct 2, 2023
5e6ec48
some cleanup
mhamilton723 Oct 3, 2023
5b66b58
Removing token caching.
saileshbaidya Oct 3, 2023
825c796
neaten PR
mhamilton723 Oct 5, 2023
b23941a
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 5, 2023
94d41ca
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 5, 2023
2c90498
add futures
mhamilton723 Oct 5, 2023
839ec90
Merge branch 'saibai/ScalaClientCE' of https://github.com/saileshbaid…
saileshbaidya Oct 5, 2023
a9de057
Adding Fabric environment check and using it to decide emitting certi…
saileshbaidya Oct 5, 2023
c987723
Adding platform check before emitting CE. Turning calls to log CE asy…
saileshbaidya Oct 7, 2023
8e38b6c
Modifying logic to determine if platform is Fabric only if it is Syna…
saileshbaidya Oct 8, 2023
17f7c15
Merge branch 'master' into saibai/ScalaClientCE
saileshbaidya Oct 8, 2023
3f0fe99
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/com…
mhamilton723 Oct 9, 2023
7b6afc2
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/com…
mhamilton723 Oct 9, 2023
6a760a9
Apply suggestions from code review
mhamilton723 Oct 9, 2023
2a257a6
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 9, 2023
75571ab
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 9, 2023
3093346
Update core/src/main/scala/com/microsoft/azure/synapse/ml/logging/fab…
mhamilton723 Oct 9, 2023
629dfa7
Update .gitignore
mhamilton723 Oct 9, 2023
a755d64
Update tools/docgen/docgen/manifest.yaml
mhamilton723 Oct 9, 2023
8811a52
Update environment.yml
mhamilton723 Oct 9, 2023
5ef7c59
Update environment.yml
mhamilton723 Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ val extraDependencies = Seq(
// Although breeze 1.2 is already provided by Spark, this is needed for Azure Synapse Spark 3.2 pools.
// Otherwise a NoSuchMethodError will be thrown by interpretability code. This problem only happens
// to Azure Synapse Spark 3.2 pools.
"org.scalanlp" %% "breeze" % "1.2"
"org.scalanlp" %% "breeze" % "1.2",
"com.typesafe.play" %% "play" % "2.8.8",
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
"com.pauldijou" %% "jwt-core" % "3.0.0",
"org.json" % "json" % "20210307"
).map(d => d excludeAll (excludes: _*))
val dependencies = coreDependencies ++ extraDependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import scala.annotation.tailrec
import scala.concurrent.blocking
import scala.util.Try

import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging

object RESTHelpers {
lazy val RequestTimeout = 60000

Expand Down Expand Up @@ -53,7 +55,6 @@ object RESTHelpers {
backoffs: List[Int] = List(100, 500, 1000), //scalastyle:ignore magic.number
expectedCodes: Set[Int] = Set(),
close: Boolean = true): CloseableHttpResponse = {

retry(backoffs, { () =>
val response = Client.execute(request)
try {
Expand All @@ -78,6 +79,10 @@ object RESTHelpers {
}
} catch {
case e: Exception =>
println(s"RESTHelpers::safeSend: getting error response parsing." +
s". Exception = $e")
SynapseMLLogging.logMessage(s"RESTHelpers::safeSend: getting error response parsing." +
s". Exception = $e")
response.close()
throw e
} finally {
Expand All @@ -89,7 +94,18 @@ object RESTHelpers {
}

def parseResult(result: CloseableHttpResponse): String = {
IOUtils.toString(result.getEntity.getContent, "utf-8")
var res: String = ""
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
try {
res = IOUtils.toString(result.getEntity.getContent, "utf-8")
}
catch{
case e: Exception =>
println(s"RestHelpers::parseResult: getting exception parsing response." +
s"Exception = $e")
SynapseMLLogging.logMessage(s"RestHelpers::parseResult: getting exception parsing response." +
s"Exception = $e")
}
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
res
}

def sendAndParseJson(request: HttpRequestBase, expectedCodes: Set[Int]=Set()): JsValue = {
Expand All @@ -98,5 +114,4 @@ object RESTHelpers {
response.close()
output
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage

import java.util.UUID
import java.net.URL
import java.net.InetAddress
import java.lang.management.ManagementFactory
import com.microsoft.azure.synapse.ml.logging.common.WebUtils._
import spray.json.DefaultJsonProtocol.StringJsonFormat
import com.microsoft.azure.synapse.ml.logging.Usage.FabricConstants._
import spray.json.{JsArray, JsObject, JsValue, _}
import com.microsoft.azure.synapse.ml.logging.common.WebUtils.{usageGet}
import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging

class FabricTokenServiceClient {
val resourceMapping = Map(
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
"https://storage.azure.com" -> "storage",
"storage" -> "storage",
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
"https://analysis.windows.net/powerbi/api" -> "pbi",
"pbi" -> "pbi",
"https://vault.azure.net" -> "keyvault",
"keyvault" -> "keyvault",
"https://kusto.kusto.windows.net" -> "kusto",
"kusto" -> "kusto"
)

val hostname = InetAddress.getLocalHost.getHostName
val processDetail = ManagementFactory.getRuntimeMXBean().getName()
val processName = processDetail.substring(processDetail.indexOf('@') + 1)

val fabricConbtext = FabricUtils.getFabricContext()
val synapseTokenserviceEndpoint = fabricConbtext(SynapseTokenServiceEndpoint)
val workloadEndpoint = fabricConbtext(TridentLakehouseTokenServiceEndpoint)
val sessionToken = fabricConbtext(TridentSessionToken)
val clusterIdentifier = fabricConbtext(SynapseClusterIdentifier)

def getAccessToken(resourceParam: String): String = {
if (!resourceMapping.contains(resourceParam)) {
throw new Exception(s"$resourceParam not supported")
}
val resource = resourceMapping.getOrElse(resourceParam, "")
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
val rid = UUID.randomUUID().toString()
//to do workloadEndpoint
val targetUrl = new URL(workloadEndpoint)
var headers = Map(
"x-ms-cluster-identifier" -> clusterIdentifier,
"x-ms-workload-resource-moniker" -> clusterIdentifier,
"Content-Type" -> "application/json;charset=utf-8",
"x-ms-proxy-host" -> s"${targetUrl.getProtocol}://${targetUrl.getHost}",
"x-ms-partner-token" -> sessionToken,
"User-Agent" -> s"Trident Token Library - HostName:$hostname, ProcessName:$processName",
"x-ms-client-request-id" -> rid
)
var url = s"$synapseTokenserviceEndpoint/api/v1/proxy${targetUrl.getPath}/access?resource=$resource"
var response: JsValue = JsonParser("")
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
try {
response = usageGet(url, headers)
if (response.asJsObject.fields("status_code").convertTo[String] != 200
|| response.asJsObject.fields("content").convertTo[String].isEmpty) {
throw new Exception("Fetch access token error")
}
} catch {
case e: Exception =>
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
println(s"getAccessToken: Failed to fetch cluster details. Exception = $e. (usage test)")
SynapseMLLogging.logMessage(s"getAccessToken: Failed to fetch cluster details. Exception = $e. (usage test)")
}
response.asJsObject.fields("content").toString().getBytes("UTF-8").toString()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage
import com.microsoft.azure.synapse.ml.logging.Usage.FabricConstants._
import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
import spray.json._
import spray.json.DefaultJsonProtocol._
import scala.util.matching.Regex
import scala.io.Source

case class TokenServiceConfig(tokenServiceEndpoint: String,
clusterType: String,
clusterName: String,
sessionToken: String)

object TokenServiceConfigProtocol extends DefaultJsonProtocol {
implicit val TokenServiceConfigFormat: RootJsonFormat[TokenServiceConfig] = jsonFormat4(TokenServiceConfig)
}

import TokenServiceConfigProtocol._

object FabricUtils {
var TridentContext = Map[String, String]()

def getFabricContext(): Map[String, String] = {
if (TridentContext.nonEmpty) {
TridentContext
} else {
try {
val lines = scala.io.Source.fromFile(FabricConstants.ContextFilePath).getLines().toList
for (line <- lines) {
if (line.split('=').length == 2) {
val Array(k, v) = line.split('=')
TridentContext += (k.trim -> v.trim)
}
}

var fileContent: String = Source.fromFile(FabricConstants.TokenServiceFilePath).mkString
fileContent = cleanJson(fileContent)
val tokenServiceConfigJson = fileContent.parseJson

// Extract the values from the JSON using Spray JSON's automatic JSON-to-case-class conversion
val tokenServiceConfig = tokenServiceConfigJson.convertTo[TokenServiceConfig]
// Populate the TridentContext map
TridentContext += (FabricConstants.SynapseTokenServiceEndpoint -> tokenServiceConfig.tokenServiceEndpoint)
TridentContext += (FabricConstants.SynapseClusterType -> tokenServiceConfig.clusterType)
TridentContext += (FabricConstants.SynapseClusterIdentifier -> tokenServiceConfig.clusterName)
TridentContext += (FabricConstants.TridentSessionToken -> tokenServiceConfig.sessionToken)
} catch {
case e: Exception =>
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
SynapseMLLogging.logMessage(s"Error reading Fabric context file: $e")
throw e
}
}
TridentContext
}

def cleanJson(s: String): String = {
val pattern: Regex = ",[ \t\r\n]+}".r
val cleanedJson = pattern.replaceAllIn(s, "}")
cleanedJson
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage

//import scala.collection.mutable.Map

case class FeatureUsagePayload(feature_name: UsageFeatureNames.Value,
activity_name: FeatureActivityName.Value,
attributes: Map[String, String] = Map.empty[String, String] )
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.azure.synapse.ml.logging.Usage

import scala.reflect.runtime.currentMirror
import scala.reflect.runtime.universe._
import java.time.Instant
import org.apache.spark.SparkContext
import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
import spray.json.DefaultJsonProtocol.{StringJsonFormat, jsonFormat3}

import java.util.UUID
import com.microsoft.azure.synapse.ml.logging.common.WebUtils._

import java.util.Date
import pdi.jwt._
import org.json.JSONObject
import spray.json.RootJsonFormat

import scala.util.{Failure, Success, Try}

case class MwcToken (TargetUriHost: String, CapacityObjectId: String, Token: String)
object TokenUtils {
var AADToken: String = ""
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
val MwcWorkloadTypeMl = "ML"

def getAccessToken(): String = {
if (checkTokenValid(this.AADToken))
this.AADToken
else {
refreshAccessToken()
this.AADToken
}
}

def getAccessToken(tokenType: String): String = {

val objectName = "com.microsoft.azure.trident.tokenlibrary.TokenLibrary"
val mirror = currentMirror
val module = mirror.staticModule(objectName)
val obj = mirror.reflectModule(module).instance
val objType = mirror.reflect(obj).symbol.toType
val methodName = "getAccessToken"
val methodSymbols = objType.decl(TermName(methodName)).asTerm.alternatives
val argType = typeOf[String]

val selectedMethodSymbol = methodSymbols.find { m =>
m.asMethod.paramLists match {
case List(List(param)) => param.typeSignature =:= argType
case _ => false
}
}.getOrElse(throw new NoSuchMethodException(s"Method $methodName with argument type $argType not found"))

val methodMirror = mirror.reflect(obj).reflectMethod(selectedMethodSymbol.asMethod)
methodMirror(tokenType).asInstanceOf[String]
}

private def checkTokenValid(token: String): Boolean = {
try{
val expiryDate: Date = getExpiry(token)
val expiryEpoch = expiryDate.toInstant.getEpochSecond
val now = Instant.now().getEpochSecond
now < expiryEpoch - 60
}
catch
{
case e: Exception =>
SynapseMLLogging.logMessage(s"TokenUtils::checkTokValid: Token {$token} parsing went wrong (usage test). " +
s"Exception = $e")
false
}
/*if (token == null || token.isEmpty()) {
false
}
try {
val parsedToken = token.parseJson.asJsObject()
val expTime = parsedToken.fields("exp").convertTo[Int]
val now = Instant.now().getEpochSecond()
now < expTime - 60
} catch {
case e: Exception => {
SynapseMLLogging.logMessage(s"TokenUtils::checkTokValid: Token {$token} parsing went wrong (usage test).")
false
}
}*/
}

private def refreshAccessToken(): Unit = {
try {
if (SparkContext.getOrCreate() != null) {
val token = getAccessToken("pbi")
AADToken = token
} else {
val token = new FabricTokenServiceClient().getAccessToken("pbi")
AADToken = token
}
} catch {
case e: Exception =>
SynapseMLLogging.logMessage(s"refreshAccessTok: failed to refresh pbi tok. Exception: {$e}. (usage test)")
}
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
}

def getMWCToken(shared_host: String, WorkspaceId: String, capacity_id: String,
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
workload_type: String): MwcToken = {
val url: String = shared_host + "/metadata/v201606/generatemwctokenv2"

val payLoad = s"""{
|"capacityObjectId": "$capacity_id",
|"workspaceObjectId": "$WorkspaceId",
|"workloadType": "$workload_type"
}""".stripMargin

val driverAADToken = getAccessToken()

val headers = Map(
"Content-Type" -> "application/json",
"Authorization" -> s"""Bearer $driverAADToken""".stripMargin,
"x-ms-workload-resource-moniker" -> UUID.randomUUID().toString
saileshbaidya marked this conversation as resolved.
Show resolved Hide resolved
)

try{
val response = usagePost(url, payLoad, headers)
/*if (response.asJsObject.fields("status_code").convertTo[String] != 200
|| response.asJsObject.fields("content").convertTo[String].isEmpty) {
throw new Exception("Fetch access token error")
}*/
var targetUriHost = response.asJsObject.fields("TargetUriHost").convertTo[String]
targetUriHost = s"https://$targetUriHost"
response.asJsObject.fields.updated("TargetUriHost", targetUriHost)

implicit val mwcTokenFormat: RootJsonFormat[MwcToken] = jsonFormat3(MwcToken)
//implicit val mwcTokenFormat = jsonFormat3(MwcToken)
response.convertTo[MwcToken]
}
catch {
case e: Exception =>
SynapseMLLogging.logMessage(s"getMWCTok: Failed to fetch cluster details: $e. (usage test)")
throw e
}
}

private def getExpiry(accessToken: String): Date = {
val jwtOptions = new JwtOptions(false, false, false, 0)
val jwtTokenDecoded: Try[(String, String, String)] = Jwt.decodeRawAll(accessToken, jwtOptions)
jwtTokenDecoded match {
case Success((_, payload, _)) =>
val jsonPayload: JSONObject = new JSONObject(payload)
val expiry = jsonPayload.get("exp").toString
new Date(expiry.toLong * 1000)
case Failure(t) =>
SynapseMLLogging.logMessage(t.getMessage)
throw t
}
}
}
Loading