From ae0c4e0c37a77de4c23ca95dee763b165ce20d18 Mon Sep 17 00:00:00 2001 From: Dominic Kim Date: Sat, 4 Nov 2023 02:07:53 +0900 Subject: [PATCH] Use max-connection-pool as queue size (#5453) --- .../containerpool/AkkaContainerClient.scala | 36 +++++++------------ 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala index 03dcf78330f..e4958d2d239 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala @@ -20,37 +20,26 @@ package org.apache.openwhisk.core.containerpool import akka.actor.ActorSystem import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.marshalling.Marshal -import akka.http.scaladsl.model.HttpMethods -import akka.http.scaladsl.model.HttpRequest -import akka.http.scaladsl.model.HttpResponse -import akka.http.scaladsl.model.MediaTypes -import akka.http.scaladsl.model.MessageEntity -import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.Accept import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.StreamTcpException -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{Sink, Source} import akka.util.ByteString -import scala.concurrent.Await -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.TimeoutException -import scala.concurrent.duration._ -import scala.util.Try -import scala.util.control.NonFatal -import spray.json._ -import org.apache.openwhisk.common.Logging import org.apache.openwhisk.common.LoggingMarkers.CONTAINER_CLIENT_RETRIES -import org.apache.openwhisk.common.MetricEmitter -import org.apache.openwhisk.common.TransactionId -import org.apache.openwhisk.core.entity.ActivationResponse.ContainerHttpError -import org.apache.openwhisk.core.entity.ActivationResponse._ -import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize} +import org.apache.openwhisk.common.{Logging, MetricEmitter, TransactionId} +import org.apache.openwhisk.core.entity.ActivationResponse.{ContainerHttpError, _} import org.apache.openwhisk.core.entity.size.SizeLong +import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize} import org.apache.openwhisk.http.PoolingRestClient +import pureconfig.loadConfigOrThrow +import spray.json._ import java.time.Instant +import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException} +import scala.concurrent.duration._ +import scala.util.Try +import scala.util.control.NonFatal /** * This HTTP client is used only in the invoker to communicate with the action container. @@ -193,6 +182,7 @@ protected class AkkaContainerClient( } object AkkaContainerClient { + private val queueSize = loadConfigOrThrow[Int]("akka.http.host-connection-pool.max-connections") /** A helper method to post one single request to a connection. Used for container tests. */ def post(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)( @@ -226,7 +216,7 @@ object AkkaContainerClient { tid: TransactionId, as: ActorSystem, ec: ExecutionContext): Seq[(Int, Option[JsObject])] = { - val connection = new AkkaContainerClient(host, port, timeout, 1) + val connection = new AkkaContainerClient(host, port, timeout, queueSize) val futureResults = contents.map { executeRequest(connection, endPoint, _) } val results = Await.result(Future.sequence(futureResults), timeout + 10.seconds) //additional timeout to complete futures connection.close()