Skip to content

Commit

Permalink
init: azure rbac aks service discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
girdharshubham committed Aug 12, 2024
1 parent b0b2e1b commit 2b97b60
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 3 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ docs/src/main/paradox/attachments
.metals
metals.sbt
**/project/project


# Bloop
.bloop
60 changes: 60 additions & 0 deletions akka-discovery-azure-api/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
######################################################
# Akka Service Discovery AWS Config #
######################################################

akka.discovery {
# Set the following in your application.conf if you want to use this discovery mechanism:
# method = aws-api-ec2-tag-based
azure-rbac-aks-api {
class = akka.discovery.kubernetes.AzureRbacAksServiceDiscovery

authority-host = "https://login.microsoftonline.com/"
authority-host = ${?AZURE_AUTHORITY_HOST}

// can't be empty
client-id = ${AZURE_CLIENT_ID}

federated-token-file = "/var/run/secrets/azure/tokens/azure-identity-token"
federated-token-file = ${?AZURE_FEDERATED_TOKEN_FILE}

// can't be empty
tenant-id = ${AZURE_TENANT_ID}

# AKS uses a pair of first-party Microsoft Entra applications
# The AKS Microsoft Entra server application ID that the server side uses is 6dae42f8-4368-4678-94ff-3960e28e3630
entra-server-id = "6dae42f8-4368-4678-94ff-3960e28e3630"
entra-server-id = ${?AZURE_SERVER_ID}

# API server, cert and token information. Currently these are present on K8s versions: 1.6, 1.7, 1.8, and perhaps more
api-ca-path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
api-token-path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
api-service-host-env-name = "KUBERNETES_SERVICE_HOST"
api-service-port-env-name = "KUBERNETES_SERVICE_PORT"

# Namespace discovery path
#
# If this path doesn't exist, the namespace will default to "default".
pod-namespace-path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

# Namespace to query for pods.
#
# Set this value to a specific string to override discovering the namespace using pod-namespace-path.
pod-namespace = "default"
pod-namespace = ${?KUBERNETES_NAMESPACE}

# Domain of the k8s cluster
pod-domain = "cluster.local"

# Selector value to query pod API with.
# `%s` will be replaced with the configured effective name, which defaults to the actor system name
pod-label-selector = "app=%s"

# Enables the usage of the raw IP instead of the composed value for the resolved target host
# Note that when using names, the deprecated DNS form <a>-<b>-<c>-<d>.<ns>.pod.<zone> is used
# and that may not work on newer Kubernetes versions.
use-raw-ip = true

# When set, validate the container is not in 'waiting' state
container-name = ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (C) 2017-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.discovery.azureapi

import akka.actor.ActorSystem
import akka.discovery.{ Lookup, ServiceDiscovery }
import akka.event.Logging
import akka.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken }
import akka.http.scaladsl.model.{ HttpRequest, Uri }
import akka.http.scaladsl.{ ConnectionContext, Http, HttpsConnectionContext }
import akka.pki.kubernetes.PemManagersProvider
import com.azure.core.credential.{ AccessToken, TokenRequestContext }
import com.azure.identity.DefaultAzureCredentialBuilder

import java.security.{ KeyStore, SecureRandom }
import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager }
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.jdk.FutureConverters._
import scala.util.Try

object AzureRbacAKSServiceDiscovery {
private def azureDefaultCredential =
new DefaultAzureCredentialBuilder().build()

private val accessTokenRequestContext = new TokenRequestContext()
}

class AzureRbacAKSServiceDiscovery(implicit system: ActorSystem) extends ServiceDiscovery {
private val http = Http()

private val settings = Settings(system)

private val log = Logging(system, classOf[AzureRbacAKSServiceDiscovery])

log.debug("Settings {}", settings)

private def fetchToken: Future[AccessToken] =
AzureRbacAKSServiceDiscovery.azureDefaultCredential
.getToken(AzureRbacAKSServiceDiscovery.accessTokenRequestContext.addScopes(settings.entraServerId))
.toFuture
.asScala

import system.dispatcher

private def accessToken(retries: Int = 3): Future[AccessToken] = {
fetchToken.flatMap { token =>
if (token.isExpired && retries > 0) {
accessToken(retries - 1)
} else if (token.isExpired) {
Future.failed(new RuntimeException("Failed to fetch a valid token after multiple attempts"))
} else {
Future.successful(token)
}
}
}

override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = {
val selector = settings.podLabelSelector(lookup.serviceName)

for {
at <- accessToken()
request <- {
log.info(
"Querying for pods with label selector: [{}]. Namespace: [{}]. Port: [{}]",
selector,
settings.podNamespace,
lookup.portName
)

optionToFuture(
// FIXME | remove .get
podRequest(at.getToken, settings.podNamespace.get, selector),
s"Unable to form request; check Kubernetes environment (expecting env vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})"
)
}

} yield ???
}

private def optionToFuture[T](option: Option[T], failMsg: String): Future[T] =
option.fold(Future.failed[T](new NoSuchElementException(failMsg)))(Future.successful)

private def podRequest(token: String, namespace: String, labelSelector: String) =
for {
host <- sys.env.get(settings.apiServiceHostEnvName)
portStr <- sys.env.get(settings.apiServicePortEnvName)
port <- Try(portStr.toInt).toOption
} yield {
val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace / "pods"
val query = Uri.Query("labelSelector" -> labelSelector)
val uri = Uri.from(scheme = "https", host = host, port = port).withPath(path).withQuery(query)

HttpRequest(uri = uri, headers = List(Authorization(OAuth2BearerToken(token))))
}

/**
* This uses blocking IO, and so should only be used at startup from blocking dispatcher.
*/
private def clientHttpsConnectionContext(): HttpsConnectionContext = {
val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath)
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance("PKCS12")
keyStore.load(null)
factory.init(keyStore, Array.empty)
val km: Array[KeyManager] = factory.getKeyManagers
val tm: Array[TrustManager] =
PemManagersProvider.buildTrustManagers(certificates)
val random: SecureRandom = new SecureRandom
val sslContext = SSLContext.getInstance("TLSv1.2")
sslContext.init(km, tm, random)
ConnectionContext.httpsClient(sslContext)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2017-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.discovery.azureapi

import akka.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import com.typesafe.config.Config

import java.util.Optional
import scala.compat.java8.OptionConverters._

final class Settings(system: ExtendedActorSystem) extends Extension {

/**
* Copied from AkkaManagementSettings, which we don't depend on.
*/
private implicit class HasDefined(val config: Config) {
def hasDefined(key: String): Boolean =
config.hasPath(key) &&
config.getString(key).trim.nonEmpty &&
config.getString(key) != s"<$key>"

def optDefinedValue(key: String): Option[String] =
if (hasDefined(key)) Some(config.getString(key)) else None
}

private val config = system.settings.config.getConfig("akka.discovery.azure-rbac-aks-api")

val authorityHost: String =
config.getString("authority-host")

val clientId: String =
config.getString("client-id")

val federatedTokenPath: String =
config.getString("federated-token-path")

val tenantId: String =
config.getString("tenant-id")

val entraServerId: String =
config.getString("entra-server-id")

val apiCaPath: String =
config.getString("api-ca-path")

val apiTokenPath: String =
config.getString("api-token-path")

val apiServiceHostEnvName: String =
config.getString("api-service-host-env-name")

val apiServicePortEnvName: String =
config.getString("api-service-port-env-name")

val podNamespacePath: String =
config.getString("pod-namespace-path")

/** Scala API */
val podNamespace: Option[String] =
config.optDefinedValue("pod-namespace")

/** Java API */
def getPodNamespace: Optional[String] = podNamespace.asJava

val podDomain: String =
config.getString("pod-domain")

def podLabelSelector(name: String): String =
config.getString("pod-label-selector").format(name)

lazy val rawIp: Boolean = config.getBoolean("use-raw-ip")

val containerName: Option[String] = Some(config.getString("container-name")).filter(_.nonEmpty)

override def toString =
s"Settings($apiCaPath, $apiTokenPath, $apiServiceHostEnvName, $apiServicePortEnvName, " +
s"$podNamespacePath, $podNamespace, $podDomain)"
}

object Settings extends ExtensionId[Settings] with ExtensionIdProvider {
override def get(system: ActorSystem): Settings = super.get(system)

override def get(system: ClassicActorSystemProvider): Settings = super.get(system)

override def lookup: Settings.type = Settings

override def createExtension(system: ExtendedActorSystem): Settings = new Settings(system)
}
14 changes: 13 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import com.typesafe.sbt.packager.docker.{ Cmd, ExecCmd }
import com.typesafe.sbt.packager.docker.{Cmd, ExecCmd}
import sbt.Keys.parallelExecution

ThisBuild / resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Expand All @@ -19,6 +19,7 @@ lazy val `akka-management-root` = project
// in AkkaManagement should also be updated
`akka-discovery-aws-api`,
`akka-discovery-aws-api-async`,
`akka-discovery-azure-api`,
`akka-discovery-kubernetes-api`,
`akka-discovery-marathon-api`,
`akka-management`,
Expand Down Expand Up @@ -71,6 +72,17 @@ lazy val `akka-discovery-kubernetes-api` = project
)
.dependsOn(`akka-management-pki`)

lazy val `akka-discovery-azure-api` = (project in file("akka-discovery-azure-api"))
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(com.geirsson.CiReleasePlugin)
.settings(
name := "akka-discovery-azure-api",
organization := "com.lightbend.akka.discovery",
libraryDependencies := Dependencies.DiscoveryAzureApi,
mimaPreviousArtifactsSet
)
.dependsOn(`akka-management-pki`)

lazy val `akka-discovery-marathon-api` = project
.in(file("discovery-marathon-api"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down
14 changes: 12 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import sbt._
import Keys._
import sbt.*

object Dependencies {

Expand All @@ -17,6 +16,7 @@ object Dependencies {
val ScalaTestVersion = "3.2.19"
val ScalaTestPlusJUnitVersion = ScalaTestVersion + ".0"

val AzureIdentityVersion = "1.13.2"
val AwsSdkVersion = "1.12.765"
val JacksonVersion = "2.15.4"
val JacksonDatabindVersion = JacksonVersion
Expand All @@ -37,6 +37,16 @@ object Dependencies {
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test
)

val DiscoveryAzureApi = Seq(
"com.azure" % "azure-identity" % AzureIdentityVersion,
"com.typesafe.akka" %% "akka-actor" % AkkaVersion,
"com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test
)

val DiscoveryMarathonApi = Seq(
"com.typesafe.akka" %% "akka-actor" % AkkaVersion,
"com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
Expand Down

0 comments on commit 2b97b60

Please sign in to comment.