From 16d556bf8800e6659f6ed0049f7af992ecb2d33d Mon Sep 17 00:00:00 2001 From: Dmitry Ivankov Date: Thu, 17 Aug 2023 08:27:44 +0200 Subject: [PATCH] Cleanup redundant maybeCacheToken `maybeCacheToken` has a misleading name, it actually is only an optional initial token which will be used until it expires https://github.com/cognitedata/cognite-sdk-scala/blob/2c1e74e67b3eba42ca564f53af08021ba146c642/src/main/scala/com/cognite/sdk/scala/common/OAuth2.scala#L173 https://github.com/cognitedata/cognite-sdk-scala/blob/2c1e74e67b3eba42ca564f53af08021ba146c642/src/main/scala/com/cognite/sdk/scala/common/OAuth2.scala#L200 But after it expires or if it wasn't provided normal path would be used to fetch a new token and persist it via `CachedResource` and invalidate-refresh as needed in `commonGetAuth` https://github.com/cognitedata/cognite-sdk-scala/blob/2c1e74e67b3eba42ca564f53af08021ba146c642/src/main/scala/com/cognite/sdk/scala/common/OAuth2.scala#L133 The intent is to be able to have single seed token for multiple auth instances. Here initial token was supplied out of normal path so exactly the same path as expire-refresh, so removing manually setting initial token would only save some eager requests and extra expiration checks on each refresh (current version of fetching token checks old initial token expiration every single call even after it has expired long time ago) To address separately in scala sdk later: - maybe unwrap `F[Option[initial token]]` back to `Option[initial token]` - rename `maybeCacheToken` to something like `maybeInitialToken` - add support for initial token in sdk to avoid having to hijack refresh path in consumer code each time --- .../scala/cognite/spark/v1/CdfSparkAuth.scala | 24 ++++--------------- .../cognite/spark/v1/DefaultSource.scala | 8 +------ 2 files changed, 5 insertions(+), 27 deletions(-) diff --git a/src/main/scala/cognite/spark/v1/CdfSparkAuth.scala b/src/main/scala/cognite/spark/v1/CdfSparkAuth.scala index c72dee110..6d3855163 100644 --- a/src/main/scala/cognite/spark/v1/CdfSparkAuth.scala +++ b/src/main/scala/cognite/spark/v1/CdfSparkAuth.scala @@ -16,40 +16,24 @@ object CdfSparkAuth { IO(AuthProvider(auth)) } - import CdpConnector.ioRuntime - - final case class OAuth2ClientCredentials(credentials: OAuth2.ClientCredentials)( - implicit sttpBackend: SttpBackend[IO, Any]) - extends CdfSparkAuth { - private val cacheToken = Some( - IO.pure(credentials.getAuth[IO]().attempt.map(_.toOption).unsafeRunSync())) + final case class OAuth2ClientCredentials(credentials: OAuth2.ClientCredentials) extends CdfSparkAuth { override def provider( implicit clock: Clock[IO], sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] = - OAuth2.ClientCredentialsProvider[IO](credentials, maybeCacheToken = cacheToken) + OAuth2.ClientCredentialsProvider[IO](credentials) } - final case class OAuth2Sessions(session: OAuth2.Session)(implicit sttpBackend: SttpBackend[IO, Any]) - extends CdfSparkAuth { + final case class OAuth2Sessions(session: OAuth2.Session) extends CdfSparkAuth { private val refreshSecondsBeforeExpiration = 300L - private val cacheToken = Some( - IO.pure( - session - .getAuth[IO](refreshSecondsBeforeExpiration = refreshSecondsBeforeExpiration) - .attempt - .map(_.toOption) - .unsafeRunSync())) - override def provider( implicit clock: Clock[IO], sttpBackend: SttpBackend[IO, Any]): IO[AuthProvider[IO]] = OAuth2.SessionProvider[IO]( session, - refreshSecondsBeforeExpiration = refreshSecondsBeforeExpiration, - maybeCacheToken = cacheToken + refreshSecondsBeforeExpiration = refreshSecondsBeforeExpiration ) } } diff --git a/src/main/scala/cognite/spark/v1/DefaultSource.scala b/src/main/scala/cognite/spark/v1/DefaultSource.scala index 5e4fc133e..8c771faa3 100644 --- a/src/main/scala/cognite/spark/v1/DefaultSource.scala +++ b/src/main/scala/cognite/spark/v1/DefaultSource.scala @@ -20,7 +20,6 @@ import io.circe.parser.parse import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} -import sttp.client3.SttpBackend import sttp.model.Uri class DefaultSource @@ -350,8 +349,7 @@ object DefaultSource { s"`$onConflictName` not a valid subtrees option. Valid options are: $validOptions")) } - private[v1] def parseAuth(parameters: Map[String, String])( - implicit backend: SttpBackend[IO, Any]): Option[CdfSparkAuth] = { + private[v1] def parseAuth(parameters: Map[String, String]): Option[CdfSparkAuth] = { val authTicket = parameters.get("authTicket").map(ticket => TicketAuth(ticket)) val bearerToken = parameters.get("bearerToken").map(bearerToken => BearerTokenAuth(bearerToken)) val scopes: List[String] = parameters.get("scopes") match { @@ -404,10 +402,6 @@ object DefaultSource { val clientTag = parameters.get("clientTag") val applicationName = parameters.get("applicationName") - //This backend is used only for auth, so we should not retry as much as maxRetries config - implicit val authBackend: SttpBackend[IO, Any] = - CdpConnector.retryingSttpBackend(5, maxRetryDelaySeconds) - val auth = parseAuth(parameters) match { case Some(x) => x case None =>