Skip to content

Commit

Permalink
Use max-connection-pool as queue size (#5453)
Browse files Browse the repository at this point in the history
  • Loading branch information
style95 authored Nov 3, 2023
1 parent a4f4b2d commit ae0c4e0
Showing 1 changed file with 13 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,26 @@ package org.apache.openwhisk.core.containerpool
import akka.actor.ActorSystem
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.MediaTypes
import akka.http.scaladsl.model.MessageEntity
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.StreamTcpException
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal
import spray.json._
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.common.LoggingMarkers.CONTAINER_CLIENT_RETRIES
import org.apache.openwhisk.common.MetricEmitter
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.entity.ActivationResponse.ContainerHttpError
import org.apache.openwhisk.core.entity.ActivationResponse._
import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
import org.apache.openwhisk.common.{Logging, MetricEmitter, TransactionId}
import org.apache.openwhisk.core.entity.ActivationResponse.{ContainerHttpError, _}
import org.apache.openwhisk.core.entity.size.SizeLong
import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
import org.apache.openwhisk.http.PoolingRestClient
import pureconfig.loadConfigOrThrow
import spray.json._

import java.time.Instant
import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal

/**
* This HTTP client is used only in the invoker to communicate with the action container.
Expand Down Expand Up @@ -193,6 +182,7 @@ protected class AkkaContainerClient(
}

object AkkaContainerClient {
private val queueSize = loadConfigOrThrow[Int]("akka.http.host-connection-pool.max-connections")

/** A helper method to post one single request to a connection. Used for container tests. */
def post(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)(
Expand Down Expand Up @@ -226,7 +216,7 @@ object AkkaContainerClient {
tid: TransactionId,
as: ActorSystem,
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
val connection = new AkkaContainerClient(host, port, timeout, 1)
val connection = new AkkaContainerClient(host, port, timeout, queueSize)
val futureResults = contents.map { executeRequest(connection, endPoint, _) }
val results = Await.result(Future.sequence(futureResults), timeout + 10.seconds) //additional timeout to complete futures
connection.close()
Expand Down

0 comments on commit ae0c4e0

Please sign in to comment.