diff --git a/.gitignore b/.gitignore index b00e6280..9e28b2b4 100755 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,7 @@ docs/src/main/paradox/attachments .metals metals.sbt **/project/project + + +# Bloop +.bloop \ No newline at end of file diff --git a/akka-discovery-azure-api/src/main/resources/reference.conf b/akka-discovery-azure-api/src/main/resources/reference.conf new file mode 100644 index 00000000..6f139723 --- /dev/null +++ b/akka-discovery-azure-api/src/main/resources/reference.conf @@ -0,0 +1,60 @@ +###################################################### +# Akka Service Discovery Azure Config # +###################################################### + +akka.discovery { + # Set the following in your application.conf if you want to use this discovery mechanism: + # method = azure-rbac-aks-api + azure-rbac-aks-api { + class = akka.discovery.azureapi.AzureRbacAksServiceDiscovery + + authority-host = "https://login.microsoftonline.com/" + authority-host = ${?AZURE_AUTHORITY_HOST} + + # can't be empty + client-id = ${AZURE_CLIENT_ID} + + federated-token-file = "/var/run/secrets/azure/tokens/azure-identity-token" + federated-token-file = ${?AZURE_FEDERATED_TOKEN_FILE} + + # can't be empty + tenant-id = ${AZURE_TENANT_ID} + + # AKS uses a pair of first-party Microsoft Entra applications + # The AKS Microsoft Entra server application ID(scope) that the server side uses is 6dae42f8-4368-4678-94ff-3960e28e3630/.default + entra-server-id = "6dae42f8-4368-4678-94ff-3960e28e3630/.default" + entra-server-id = ${?AZURE_SERVER_ID} + + # API server, cert and token information. Currently these are present on K8s versions: 1.6, 1.7, 1.8, and perhaps more + api-ca-path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + api-token-path = "/var/run/secrets/kubernetes.io/serviceaccount/token" + api-service-host-env-name = "KUBERNETES_SERVICE_HOST" + api-service-port-env-name = "KUBERNETES_SERVICE_PORT" + + # Namespace discovery path + # + # If this path doesn't exist, the namespace will default to "default". + pod-namespace-path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" + + # Namespace to query for pods. + # + # Set this value to a specific string to override discovering the namespace using pod-namespace-path. + pod-namespace = "" + pod-namespace = ${?KUBERNETES_NAMESPACE} + + # Domain of the k8s cluster + pod-domain = "cluster.local" + + # Selector value to query pod API with. + # `%s` will be replaced with the configured effective name, which defaults to the actor system name + pod-label-selector = "app=%s" + + # Enables the usage of the raw IP instead of the composed value for the resolved target host + # Note that when using names, the deprecated DNS form ---..pod. is used + # and that may not work on newer Kubernetes versions. + use-raw-ip = true + + # When set, validate the container is not in 'waiting' state + container-name = "" + } +} diff --git a/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/AzureRbacAksServiceDiscovery.scala b/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/AzureRbacAksServiceDiscovery.scala new file mode 100644 index 00000000..ac5408ad --- /dev/null +++ b/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/AzureRbacAksServiceDiscovery.scala @@ -0,0 +1,265 @@ +/* + * Copyright (C) 2017-2024 Lightbend Inc. + */ + +package akka.discovery.azureapi + +import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi +import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } +import akka.discovery.azureapi.AzureRbacAksServiceDiscovery._ +import akka.discovery.azureapi.JsonFormat._ +import akka.discovery.{ Lookup, ServiceDiscovery } +import akka.dispatch.Dispatchers.DefaultBlockingDispatcherId +import akka.event.Logging +import akka.http.scaladsl.model.headers.{ Authorization, OAuth2BearerToken } +import akka.http.scaladsl.model.{ HttpRequest, StatusCodes, Uri } +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.http.scaladsl.{ ConnectionContext, Http, HttpsConnectionContext } +import akka.pki.kubernetes.PemManagersProvider +import com.azure.core.credential.{ AccessToken, TokenRequestContext } +import com.azure.identity.{ DefaultAzureCredential, DefaultAzureCredentialBuilder } + +import java.net.InetAddress +import java.nio.file.{ Files, Paths } +import java.security.{ KeyStore, SecureRandom } +import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager } +import scala.collection.immutable +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, Future } +import scala.jdk.FutureConverters._ +import scala.util.Try +import scala.util.control.{ NoStackTrace, NonFatal } + +object AzureRbacAksServiceDiscovery { + private def azureDefaultCredential: DefaultAzureCredential = + new DefaultAzureCredentialBuilder().build() + + private val accessTokenRequestContext: TokenRequestContext = + new TokenRequestContext() + + /** + * INTERNAL API + * + * Finds relevant targets given a pod list. Note that this doesn't filter by name as it is the job of the selector + * to do that. + */ + @InternalApi + private[azureapi] def targets( + podList: PodList, + portName: Option[String], + podNamespace: String, + podDomain: String, + rawIp: Boolean, + containerName: Option[String]): immutable.Seq[ResolvedTarget] = + for { + item <- podList.items + if item.metadata.flatMap(_.deletionTimestamp).isEmpty + itemSpec <- item.spec.toSeq + itemStatus <- item.status.toSeq + if itemStatus.phase.contains("Running") + if containerName.forall(name => + itemStatus.containerStatuses match { + case Some(statuses) => statuses.filter(_.name == name).exists(!_.state.contains("waiting")) + case None => false + }) + ip <- itemStatus.podIP.toSeq + // Maybe port is an Option of a port, and will be None if no portName was requested + maybePort <- portName match { + case None => + List(None) + case Some(name) => + for { + container <- itemSpec.containers + ports <- container.ports.toSeq + port <- ports + if port.name.contains(name) + } yield Some(port.containerPort) + } + } yield { + val hostOrIp = if (rawIp) ip else s"${ip.replace('.', '-')}.$podNamespace.pod.$podDomain" + ResolvedTarget( + host = hostOrIp, + port = maybePort, + address = Some(InetAddress.getByName(ip)) + ) + } + + final class KubernetesApiException(msg: String) extends RuntimeException(msg) with NoStackTrace + + final case class KubernetesSetup(namespace: String, ctx: HttpsConnectionContext) +} + +class AzureRbacAksServiceDiscovery(implicit system: ExtendedActorSystem) extends ServiceDiscovery { + private val http = Http() + + private val settings = Settings(system) + + private val log = Logging(system, classOf[AzureRbacAksServiceDiscovery]) + + log.debug("Settings {}", settings) + + private def fetchToken: Future[AccessToken] = + azureDefaultCredential.getToken(accessTokenRequestContext.addScopes(settings.entraServerId)).toFuture.asScala + + private val kubernetesSetup = { + implicit val blockingDispatcher: ExecutionContext = system.dispatchers.lookup(DefaultBlockingDispatcherId) + for { + namespace: String <- Future { + settings.podNamespace + .orElse(readConfigVarFromFilesystem(settings.podNamespacePath, "pod-namespace")) + .getOrElse("default") + } + httpsContext <- Future(clientHttpsConnectionContext()) + } yield { + KubernetesSetup(namespace, httpsContext) + } + } + + import system.dispatcher + + private def accessToken(retries: Int = 3): Future[AccessToken] = { + fetchToken.flatMap { + case token if token.isExpired && retries > 0 => accessToken(retries - 1) + case token if token.isExpired => + Future.failed(new RuntimeException("Failed to fetch a valid token after multiple attempts")) + case token => Future.successful(token) + } + } + + override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = { + val selector = settings.podLabelSelector(lookup.serviceName) + + for { + ks <- kubernetesSetup + + at <- accessToken() + + request <- { + log.info( + "Querying for pods with label selector: [{}]. Namespace: [{}]. Port: [{}]", + selector, + ks.namespace, + lookup.portName + ) + + optionToFuture( + // FIXME | remove .get + podRequest(at.getToken, ks.namespace, selector), + s"Unable to form request; check Kubernetes environment (expecting env vars ${settings.apiServiceHostEnvName}, ${settings.apiServicePortEnvName})" + ) + } + + response <- http.singleRequest(request, ks.ctx) + + entity <- response.entity.toStrict(resolveTimeout) + + podList <- { + + response.status match { + case StatusCodes.OK => + log.debug("Kubernetes API entity: [{}]", entity.data.utf8String) + val unmarshalled = Unmarshal(entity).to[PodList] + unmarshalled.failed.foreach { t => + log.warning( + "Failed to unmarshal Kubernetes API response. Status code: [{}]; Response body: [{}]. Ex: [{}]", + response.status.value, + entity, + t.getMessage) + } + unmarshalled + case StatusCodes.Forbidden => + Unmarshal(entity).to[String].foreach { body => + log.warning( + "Forbidden to communicate with Kubernetes API server; check RBAC settings. Response: [{}]", + body) + } + Future.failed( + new KubernetesApiException("Forbidden when communicating with the Kubernetes API. Check RBAC settings.")) + case other => + Unmarshal(entity).to[String].foreach { body => + log.warning( + "Non-200 when communicating with Kubernetes API server. Status code: [{}]. Response body: [{}]", + other, + body + ) + } + + Future.failed(new KubernetesApiException(s"Non-200 from Kubernetes API server: $other")) + } + + } + + } yield { + val addresses = + targets(podList, lookup.portName, ks.namespace, settings.podDomain, settings.rawIp, settings.containerName) + if (addresses.isEmpty && podList.items.nonEmpty) { + if (log.isInfoEnabled) { + val containerPortNames = podList.items.flatMap(_.spec).flatMap(_.containers).flatMap(_.ports).flatten.toSet + log.info( + "No targets found from pod list. Is the correct port name configured? Current configuration: [{}]. Ports on pods: [{}]", + lookup.portName, + containerPortNames + ) + } + } + Resolved( + serviceName = lookup.serviceName, + addresses = addresses + ) + } + } + + private def optionToFuture[T](option: Option[T], failMsg: String): Future[T] = + option.fold(Future.failed[T](new NoSuchElementException(failMsg)))(Future.successful) + + private def podRequest(token: String, namespace: String, labelSelector: String) = + for { + host <- sys.env.get(settings.apiServiceHostEnvName) + portStr <- sys.env.get(settings.apiServicePortEnvName) + port <- Try(portStr.toInt).toOption + } yield { + val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace / "pods" + val query = Uri.Query("labelSelector" -> labelSelector) + val uri = Uri.from(scheme = "https", host = host, port = port).withPath(path).withQuery(query) + + HttpRequest(uri = uri, headers = List(Authorization(OAuth2BearerToken(token)))) + } + + /** + * This uses blocking IO, and so should only be used at startup from blocking dispatcher. + */ + private def clientHttpsConnectionContext(): HttpsConnectionContext = { + val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath) + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + val keyStore = KeyStore.getInstance("PKCS12") + keyStore.load(null) + factory.init(keyStore, Array.empty) + val km: Array[KeyManager] = factory.getKeyManagers + val tm: Array[TrustManager] = + PemManagersProvider.buildTrustManagers(certificates) + val random: SecureRandom = new SecureRandom + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(km, tm, random) + ConnectionContext.httpsClient(sslContext) + } + + /** + * This uses blocking IO, and so should only be used to read configuration at startup from blocking dispatcher. + */ + private def readConfigVarFromFilesystem(path: String, name: String): Option[String] = { + val file = Paths.get(path) + if (Files.exists(file)) { + try { + Some(new String(Files.readAllBytes(file), "utf-8")) + } catch { + case NonFatal(e) => + log.error(e, "Error reading {} from {}", name, path) + None + } + } else { + log.warning("Unable to read {} from {} because it doesn't exist.", name, path) + None + } + } +} diff --git a/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/JsonFormat.scala b/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/JsonFormat.scala new file mode 100644 index 00000000..44fdd7a5 --- /dev/null +++ b/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/JsonFormat.scala @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2017-2024 Lightbend Inc. + */ + +package akka.discovery.azureapi + +import akka.annotation.InternalApi +import akka.discovery.azureapi.PodList._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json._ + +/** + * INTERNAL API + */ +@InternalApi private[azureapi] object JsonFormat extends SprayJsonSupport with DefaultJsonProtocol { + // If adding more formats here, remember to also add in META-INF/native-image reflect config + implicit val containerPortFormat: JsonFormat[ContainerPort] = jsonFormat2(ContainerPort.apply) + implicit val containerFormat: JsonFormat[Container] = jsonFormat2(Container.apply) + implicit val podSpecFormat: JsonFormat[PodSpec] = jsonFormat1(PodSpec.apply) + implicit val containerStatusFormat: JsonFormat[ContainerStatus] = jsonFormat2(ContainerStatus.apply) + implicit val podStatusFormat: JsonFormat[PodStatus] = jsonFormat3(PodStatus.apply) + implicit val metadataFormat: JsonFormat[Metadata] = jsonFormat1(Metadata.apply) + implicit val podFormat: JsonFormat[Pod] = jsonFormat3(Pod.apply) + implicit val podListFormat: RootJsonFormat[PodList] = jsonFormat1(PodList.apply) +} diff --git a/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/PodList.scala b/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/PodList.scala new file mode 100644 index 00000000..e2630561 --- /dev/null +++ b/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/PodList.scala @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2017-2024 Lightbend Inc. + */ + +package akka.discovery.azureapi + +import akka.annotation.InternalApi + +import scala.collection.immutable + +/** + * INTERNAL API + */ +@InternalApi private[azureapi] object PodList { + final case class Metadata(deletionTimestamp: Option[String]) + + final case class ContainerPort(name: Option[String], containerPort: Int) + + final case class Container(name: String, ports: Option[immutable.Seq[ContainerPort]]) + + final case class PodSpec(containers: immutable.Seq[Container]) + + final case class ContainerStatus(name: String, state: Map[String, Unit]) + + final case class PodStatus( + podIP: Option[String], + containerStatuses: Option[immutable.Seq[ContainerStatus]], + phase: Option[String]) + + final case class Pod(spec: Option[PodSpec], status: Option[PodStatus], metadata: Option[Metadata]) +} + +/** + * INTERNAL API + */ +@InternalApi private[azureapi] final case class PodList(items: immutable.Seq[PodList.Pod]) diff --git a/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/Settings.scala b/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/Settings.scala new file mode 100644 index 00000000..aea01408 --- /dev/null +++ b/akka-discovery-azure-api/src/main/scala/akka/discovery/azureapi/Settings.scala @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2017-2024 Lightbend Inc. + */ + +package akka.discovery.azureapi + +import akka.actor.{ + ActorSystem, + ClassicActorSystemProvider, + ExtendedActorSystem, + Extension, + ExtensionId, + ExtensionIdProvider +} +import com.typesafe.config.Config + +import java.util.Optional +import scala.compat.java8.OptionConverters._ + +final class Settings(system: ExtendedActorSystem) extends Extension { + + /** + * Copied from AkkaManagementSettings, which we don't depend on. + */ + private implicit class HasDefined(val config: Config) { + def hasDefined(key: String): Boolean = + config.hasPath(key) && + config.getString(key).trim.nonEmpty && + config.getString(key) != s"<$key>" + + def optDefinedValue(key: String): Option[String] = + if (hasDefined(key)) Some(config.getString(key)) else None + } + + private val config = system.settings.config.getConfig("akka.discovery.azure-rbac-aks-api") + + val authorityHost: String = + config.getString("authority-host") + + val clientId: String = + config.getString("client-id") + + val federatedTokenPath: String = + config.getString("federated-token-file") + + val tenantId: String = + config.getString("tenant-id") + + val entraServerId: String = + config.getString("entra-server-id") + + val apiCaPath: String = + config.getString("api-ca-path") + + val apiTokenPath: String = + config.getString("api-token-path") + + val apiServiceHostEnvName: String = + config.getString("api-service-host-env-name") + + val apiServicePortEnvName: String = + config.getString("api-service-port-env-name") + + val podNamespacePath: String = + config.getString("pod-namespace-path") + + /** Scala API */ + val podNamespace: Option[String] = + config.optDefinedValue("pod-namespace") + + /** Java API */ + def getPodNamespace: Optional[String] = podNamespace.asJava + + val podDomain: String = + config.getString("pod-domain") + + def podLabelSelector(name: String): String = + config.getString("pod-label-selector").format(name) + + lazy val rawIp: Boolean = config.getBoolean("use-raw-ip") + + val containerName: Option[String] = Some(config.getString("container-name")).filter(_.nonEmpty) + + override def toString = + s"Settings($apiCaPath, $apiTokenPath, $apiServiceHostEnvName, $apiServicePortEnvName, " + + s"$podNamespacePath, $podNamespace, $podDomain)" +} + +object Settings extends ExtensionId[Settings] with ExtensionIdProvider { + override def get(system: ActorSystem): Settings = super.get(system) + + override def get(system: ClassicActorSystemProvider): Settings = super.get(system) + + override def lookup: Settings.type = Settings + + override def createExtension(system: ExtendedActorSystem): Settings = new Settings(system) +} diff --git a/build.sbt b/build.sbt index acf4fd6c..c139a919 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,4 @@ -import com.typesafe.sbt.packager.docker.{ Cmd, ExecCmd } +import com.typesafe.sbt.packager.docker.{Cmd, ExecCmd} import sbt.Keys.parallelExecution ThisBuild / resolvers += "Akka library repository".at("https://repo.akka.io/maven") @@ -19,6 +19,7 @@ lazy val `akka-management-root` = project // in AkkaManagement should also be updated `akka-discovery-aws-api`, `akka-discovery-aws-api-async`, + `akka-discovery-azure-api`, `akka-discovery-kubernetes-api`, `akka-discovery-marathon-api`, `akka-management`, @@ -71,6 +72,17 @@ lazy val `akka-discovery-kubernetes-api` = project ) .dependsOn(`akka-management-pki`) +lazy val `akka-discovery-azure-api` = (project in file("akka-discovery-azure-api")) + .enablePlugins(AutomateHeaderPlugin) + .disablePlugins(com.geirsson.CiReleasePlugin) + .settings( + name := "akka-discovery-azure-api", + organization := "com.lightbend.akka.discovery", + libraryDependencies := Dependencies.DiscoveryAzureApi, + mimaPreviousArtifactsSet + ) + .dependsOn(`akka-management-pki`) + lazy val `akka-discovery-marathon-api` = project .in(file("discovery-marathon-api")) .enablePlugins(AutomateHeaderPlugin) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 81ca9764..ab83e4fb 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,5 +1,4 @@ -import sbt._ -import Keys._ +import sbt.* object Dependencies { @@ -17,6 +16,7 @@ object Dependencies { val ScalaTestVersion = "3.2.19" val ScalaTestPlusJUnitVersion = ScalaTestVersion + ".0" + val AzureIdentityVersion = "1.13.2" val AwsSdkVersion = "1.12.765" val JacksonVersion = "2.15.4" val JacksonDatabindVersion = JacksonVersion @@ -37,6 +37,16 @@ object Dependencies { "org.scalatest" %% "scalatest" % ScalaTestVersion % Test ) + val DiscoveryAzureApi = Seq( + "com.azure" % "azure-identity" % AzureIdentityVersion, + "com.typesafe.akka" %% "akka-actor" % AkkaVersion, + "com.typesafe.akka" %% "akka-discovery" % AkkaVersion, + "com.typesafe.akka" %% "akka-stream" % AkkaVersion, + "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, + "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion, + "org.scalatest" %% "scalatest" % ScalaTestVersion % Test + ) + val DiscoveryMarathonApi = Seq( "com.typesafe.akka" %% "akka-actor" % AkkaVersion, "com.typesafe.akka" %% "akka-discovery" % AkkaVersion,