From 98e06dd6965433c3a962e43ef7c80ee7b5d2ff88 Mon Sep 17 00:00:00 2001 From: "ning.yougang" Date: Wed, 1 Apr 2020 16:56:46 +0800 Subject: [PATCH] Adjust prewarm container dynamically --- ansible/files/runtimes.json | 8 +- .../openwhisk/core/entity/ExecManifest.scala | 36 ++++- .../core/containerpool/ContainerPool.scala | 147 ++++++++++++++++-- .../core/containerpool/ContainerProxy.scala | 26 +++- .../core/invoker/InvokerReactive.scala | 9 +- .../test/ContainerPoolTests.scala | 120 +++++++++++++- .../core/entity/test/ExecHelpers.scala | 10 +- .../core/entity/test/ExecManifestTests.scala | 91 ++++++++--- 8 files changed, 395 insertions(+), 52 deletions(-) diff --git a/ansible/files/runtimes.json b/ansible/files/runtimes.json index ba785f45e2a..00cbabe2fbc 100644 --- a/ansible/files/runtimes.json +++ b/ansible/files/runtimes.json @@ -57,8 +57,12 @@ }, "stemCells": [ { - "count": 2, - "memory": "256 MB" + "initialCount": 2, + "maxCount": 4, + "memory": "256 MB", + "ttl": "2 minutes", + "threshold": 1, + "increment": 1 } ] }, diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala index fe90515a3d5..2987ede23f8 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala @@ -26,7 +26,10 @@ import spray.json.DefaultJsonProtocol._ import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.core.entity.Attachments._ import org.apache.openwhisk.core.entity.Attachments.Attached._ -import fastparse._, NoWhitespace._ +import fastparse._ +import NoWhitespace._ + +import scala.concurrent.duration.Duration /** * Reads manifest of supported runtimes from configuration file and stores @@ -135,11 +138,24 @@ protected[core] object ExecManifest { /** * A stemcell configuration read from the manifest for a container image to be initialized by the container pool. * - * @param count the number of stemcell containers to create + * @param initialCount the initial number of stemcell containers to create + * @param maxCount the max number of stemcell containers to create * @param memory the max memory this stemcell will allocate + * @param ttl time to live of the prewarmed container + * @param threshold the executed activation number of cold start in previous one minute + * @param increment increase increment prewarmed number when activation >= threshold if doesn't have enough prewarmed containers */ - protected[entity] case class StemCell(count: Int, memory: ByteSize) { - require(count > 0, "count must be positive") + protected[entity] case class StemCell(initialCount: Int, + maxCount: Int, + memory: ByteSize, + ttl: Duration, + threshold: Int, + increment: Int) { + require(initialCount > 0, "initialCount must be positive") + require(maxCount >= initialCount, "maxCount must be greater than or equal to initialCount") + require(ttl.toSeconds > 0, "ttl must be positive") + require(threshold > 0, "threshold must be positive") + require(increment > 0 && increment <= maxCount, "increment must be positive and less than or equal to count") } /** @@ -344,9 +360,19 @@ protected[core] object ExecManifest { protected[entity] implicit val imageNameSerdes: RootJsonFormat[ImageName] = jsonFormat4(ImageName.apply) + protected[entity] implicit val ttlSerdes: RootJsonFormat[Duration] = new RootJsonFormat[Duration] { + override def write(duration: Duration): JsValue = JsString(duration.toString) + + override def read(value: JsValue): Duration = value match { + case JsString(s) => Duration(s) + case _ => + deserializationError("time unit not supported. Only milliseconds, seconds, minutes, hours, days are supported") + } + } + protected[entity] implicit val stemCellSerdes: RootJsonFormat[StemCell] = { import org.apache.openwhisk.core.entity.size.serdes - jsonFormat2(StemCell.apply) + jsonFormat6(StemCell.apply) } protected[entity] implicit val runtimeManifestSerdes: RootJsonFormat[RuntimeManifest] = jsonFormat8(RuntimeManifest) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 1e267fed422..965c89195a6 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -17,12 +17,16 @@ package org.apache.openwhisk.core.containerpool +import java.text.SimpleDateFormat +import java.time.{Duration => JDuration, Instant} +import java.util.Calendar + import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} -import org.apache.openwhisk.common.MetricEmitter -import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId} +import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId} import org.apache.openwhisk.core.connector.MessageFeed import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ + import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration._ @@ -36,6 +40,9 @@ case class WorkerData(data: ContainerData, state: WorkerState) case object EmitMetrics +case object DeleteUnusedPrewarmedContainer +case object SupplementPrewarmedContainer + /** * A pool managing containers to run actions on. * @@ -59,11 +66,10 @@ case object EmitMetrics class ContainerPool(childFactory: ActorRefFactory => ActorRef, feed: ActorRef, prewarmConfig: List[PrewarmingConfig] = List.empty, - poolConfig: ContainerPoolConfig) + poolConfig: ContainerPoolConfig)(implicit val logging: Logging) extends Actor { import ContainerPool.memoryConsumptionOf - implicit val logging = new AkkaLogging(context.system.log) implicit val ec = context.dispatcher var freePool = immutable.Map.empty[ActorRef, ContainerData] @@ -80,8 +86,59 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, //periodically emit metrics (don't need to do this for each message!) context.system.scheduler.schedule(30.seconds, 10.seconds, self, EmitMetrics) + // Key is `kind,memory,time`, value is the number of cold Start in minute + var coldStartCount = immutable.Map.empty[String, Int] + backfillPrewarms(true) + // check periodically every 1 minute, delete prewarmed container if unused for sometime + context.system.scheduler.schedule(10.seconds, 1.minute, self, DeleteUnusedPrewarmedContainer) + + // check periodically for the cold start and create some increment containers automatically if activation >= threshold + context.system.scheduler.schedule(1.minute, 1.minute, self, SupplementPrewarmedContainer) + + def deleteUnusedPrewarmedContainer(): Unit = { + prewarmConfig.foreach { config => + val kind = config.exec.kind + val memory = config.memoryLimit + val ttlSeconds = config.ttl.toSeconds + val containers = prewarmedPool.filter { warmInfo => + warmInfo match { + case (_, PreWarmedData(_, `kind`, `memory`, _)) => true + case _ => false + } + } + for ((container, data) <- containers) { + if (JDuration.between(data.lastUsed, Instant.now).compareTo(JDuration.ofSeconds(ttlSeconds)) > 0) { + // Don't recover a new one under this situation + container ! RemovePreWarmedContainer + } + } + } + } + + def supplementPrewarmedContainer() = { + prewarmConfig.foreach { config => + val kind = config.exec.kind + val memory = config.memoryLimit + val calendar = Calendar.getInstance() + calendar.add(Calendar.MINUTE, -1) + val lastDate = new SimpleDateFormat("yyyy-MM-dd-HH:mm").format(calendar.getTime) + val key = s"${kind},${memory.toMB},${lastDate}" + coldStartCount.get(key) match { + case Some(value) => + if (value >= config.threshold) { + prewarmContainerIfpossible(kind, memory, value / config.increment) + } else { + // at lease create 1 prewarmed container + prewarmContainerIfpossible(kind, memory, 1) + } + case None => + } + coldStartCount = coldStartCount - key + } + } + def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = { val namespaceName = r.msg.user.namespace.name val actionName = r.action.name.name @@ -131,7 +188,11 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, if (hasPoolSpaceFor(busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) { takePrewarmContainer(r.action) .map(container => (container, "prewarmed")) - .orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")) + .orElse { + val container = Some(createContainer(r.action.limits.memory.megabytes.MB), "cold") + countColdStart(r.action.exec.kind, r.action.limits.memory.megabytes.MB) + container + } } else None) .orElse( // Remove a container and create a new one for the given job @@ -145,7 +206,11 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, .map(_ => takePrewarmContainer(r.action) .map(container => (container, "recreatedPrewarm")) - .getOrElse(createContainer(r.action.limits.memory.megabytes.MB), "recreated"))) + .getOrElse { + val container = (createContainer(r.action.limits.memory.megabytes.MB), "recreated") + countColdStart(r.action.exec.kind, r.action.limits.memory.megabytes.MB) + container + })) } else None @@ -271,6 +336,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, //backfill prewarms on every ContainerRemoved, just in case backfillPrewarms(false) //in case a prewarm is removed due to health failure or crash + // prewarmed container got removed + case PreWarmedContainerRemoved => + prewarmedPool.get(sender()).foreach { _ => + logging.info(this, "prewarmed container is deleted due to unused for long time") + prewarmedPool = prewarmedPool - sender() + } + // This message is received for one of these reasons: // 1. Container errored while resuming a warm container, could not process the job, and sent the job back // 2. The container aged, is destroying itself, and was assigned a job which it had to send back @@ -281,6 +353,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, busyPool = busyPool - sender() case EmitMetrics => emitMetrics() + + case DeleteUnusedPrewarmedContainer => + deleteUnusedPrewarmedContainer() + + case SupplementPrewarmedContainer => + supplementPrewarmedContainer() } /** Resend next item in the buffer, or trigger next item in the feed, if no items in the buffer. */ @@ -313,12 +391,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, } val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && p._2._2 == memory) val containerCount = currentCount + startingCount - if (containerCount < config.count) { + if (containerCount < config.initialCount) { logging.info( this, - s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${config.count - containerCount} pre-warms to desired count: ${config.count} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")( + s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${config.initialCount - containerCount} pre-warms to desired initialCount: ${config.initialCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")( TransactionId.invokerWarmup) - (containerCount until config.count).foreach { _ => + (containerCount until config.initialCount).foreach { _ => prewarmContainer(config.exec, config.memoryLimit) } } @@ -340,6 +418,47 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, newContainer ! Start(exec, memoryLimit) } + /** Create a new prewarm container if currentCount doesn't reach maxCount */ + def prewarmContainerIfpossible(kind: String, memoryLimit: ByteSize, count: Int): Unit = { + prewarmConfig + .filter { config => + kind == config.exec.kind && memoryLimit == config.memoryLimit + } + .foreach { config => + val currentCount = prewarmedPool.count { + case (_, PreWarmedData(_, `kind`, `memoryLimit`, _)) => true //done starting + case _ => false //started but not finished starting + } + val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && p._2._2 == memoryLimit) + val containerCount = currentCount + startingCount + if (containerCount < config.maxCount) { + val createNumber = if (config.maxCount - containerCount > count) count else config.maxCount - containerCount + 1 to createNumber foreach { _ => + prewarmContainer(config.exec, config.memoryLimit) + } + } + } + } + + /** statistics the cold start */ + def countColdStart(kind: String, memoryLimit: ByteSize): Unit = { + prewarmConfig + .filter { config => + kind == config.exec.kind && memoryLimit == config.memoryLimit + } + .foreach { _ => + val time = new SimpleDateFormat("yyyy-MM-dd-HH:mm").format(Calendar.getInstance().getTime) + val key = s"${kind},${memoryLimit.toMB},${time}" + coldStartCount.get(key) match { + case Some(value) => coldStartCount = coldStartCount + (key -> (value + 1)) + case None => coldStartCount = coldStartCount + (key -> 1) + } + for ((k, v) <- coldStartCount) { + logging.info(this, s"===statistics the cold start, k: ${k}, v: ${v}") + } + } + } + /** * Takes a prewarm container out of the prewarmed pool * iff a container with a matching kind and memory is found. @@ -502,9 +621,15 @@ object ContainerPool { def props(factory: ActorRefFactory => ActorRef, poolConfig: ContainerPoolConfig, feed: ActorRef, - prewarmConfig: List[PrewarmingConfig] = List.empty) = + prewarmConfig: List[PrewarmingConfig] = List.empty)(implicit logging: Logging) = Props(new ContainerPool(factory, feed, prewarmConfig, poolConfig)) } /** Contains settings needed to perform container prewarming. */ -case class PrewarmingConfig(count: Int, exec: CodeExec[_], memoryLimit: ByteSize) +case class PrewarmingConfig(initialCount: Int, + maxCount: Int, + exec: CodeExec[_], + memoryLimit: ByteSize, + ttl: Duration, + threshold: Int, + increment: Int) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala index 71acec5834c..1d846566d64 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala @@ -146,7 +146,7 @@ case class PreWarmedData(override val container: Container, kind: String, override val memoryLimit: ByteSize, override val activeActivationCount: Int = 0) - extends ContainerStarted(container, Instant.EPOCH, memoryLimit, activeActivationCount) + extends ContainerStarted(container, Instant.now(), memoryLimit, activeActivationCount) with ContainerNotInUse { override val initingState = "prewarmed" override def nextRun(r: Run) = @@ -196,12 +196,14 @@ case class WarmedData(override val container: Container, case class Start(exec: CodeExec[_], memoryLimit: ByteSize) case class Run(action: ExecutableWhiskAction, msg: ActivationMessage, retryLogDeadline: Option[Deadline] = None) case object Remove +case object RemovePreWarmedContainer case class HealthPingEnabled(enabled: Boolean) // Events sent by the actor case class NeedWork(data: ContainerData) case object ContainerPaused case object ContainerRemoved // when container is destroyed +case object PreWarmedContainerRemoved case object RescheduleJob // job is sent back to parent and could not be processed because container is being destroyed case class PreWarmCompleted(data: PreWarmedData) case class InitCompleted(data: WarmedData) @@ -375,6 +377,8 @@ class ContainerProxy(factory: (TransactionId, case Event(Remove, data: PreWarmedData) => destroyContainer(data) + case Event(RemovePreWarmedContainer, data: PreWarmedData) => destoryPreWarmedContainer(data) + // prewarm container failed case Event(_: FailureMessage, data: PreWarmedData) => MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_PREWARM) @@ -636,6 +640,26 @@ class ContainerProxy(factory: (TransactionId, goto(Removing) using newData } + /** + * Destroys the preWarmed container if unused for long time. + * @param newData the ContainerStarted which container will be destroyed + */ + def destoryPreWarmedContainer(newData: ContainerStarted) = { + val container = newData.container + context.parent ! PreWarmedContainerRemoved + + val unpause = stateName match { + case Paused => container.resume()(TransactionId.invokerNanny) + case _ => Future.successful(()) + } + + unpause + .flatMap(_ => container.destroy()(TransactionId.invokerNanny)) + .map(_ => ContainerRemoved) + .pipeTo(self) + goto(Removing) using newData + } + /** * Return any buffered jobs to parent, in case buffer is not empty at removal/error time. */ diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 544ca1ae5d4..0f46532a535 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -156,7 +156,14 @@ class InvokerReactive( ExecManifest.runtimesManifest.stemcells.flatMap { case (mf, cells) => cells.map { cell => - PrewarmingConfig(cell.count, new CodeExecAsString(mf, "", None), cell.memory) + PrewarmingConfig( + cell.initialCount, + cell.maxCount, + new CodeExecAsString(mf, "", None), + cell.memory, + cell.ttl, + cell.threshold, + cell.increment) } }.toList } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala index 4efbe852168..8f9ae5c4f89 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala @@ -33,7 +33,7 @@ import akka.actor.ActorSystem import akka.testkit.ImplicitSender import akka.testkit.TestKit import akka.testkit.TestProbe -import common.WhiskProperties +import common.{StreamLogging, WhiskProperties} import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.connector.ActivationMessage import org.apache.openwhisk.core.containerpool._ @@ -55,7 +55,8 @@ class ContainerPoolTests with FlatSpecLike with Matchers with BeforeAndAfterAll - with MockFactory { + with MockFactory + with StreamLogging { override def afterAll = TestKit.shutdownActorSystem(system) @@ -67,6 +68,9 @@ class ContainerPoolTests // the values is done properly. val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) val memoryLimit = 256.MB + val ttl = Duration("10 minutes") + val threshold = 1 + val increment = 1 /** Creates a `Run` message */ def createRunMessage(action: ExecutableWhiskAction, invocationNamespace: EntityName) = { @@ -311,7 +315,11 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, poolConfig(0.MB), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + .props( + factory, + poolConfig(0.MB), + feed.ref, + List(PrewarmingConfig(1, 1, exec, memoryLimit, ttl, threshold, increment)))) containers(0).expectMsg(Start(exec, memoryLimit)) } @@ -322,7 +330,11 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + .props( + factory, + poolConfig(MemoryLimit.STD_MEMORY), + feed.ref, + List(PrewarmingConfig(1, 1, exec, memoryLimit, ttl, threshold, increment)))) containers(0).expectMsg(Start(exec, memoryLimit)) containers(0).send(pool, NeedWork(preWarmedData(exec.kind))) pool ! runMessage @@ -341,7 +353,7 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref, - List(PrewarmingConfig(1, alternativeExec, memoryLimit)))) + List(PrewarmingConfig(1, 1, alternativeExec, memoryLimit, ttl, threshold, increment)))) containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind))) pool ! runMessage @@ -361,7 +373,7 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.STD_MEMORY), feed.ref, - List(PrewarmingConfig(1, exec, alternativeLimit)))) + List(PrewarmingConfig(1, 1, exec, alternativeLimit, ttl, threshold, increment)))) containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit))) pool ! runMessage @@ -747,8 +759,13 @@ class ContainerPoolTests val feed = TestProbe() val pool = - system.actorOf(ContainerPool - .props(factory, poolConfig(MemoryLimit.STD_MEMORY * 5), feed.ref, List(PrewarmingConfig(2, exec, memoryLimit)))) + system.actorOf( + ContainerPool + .props( + factory, + poolConfig(MemoryLimit.STD_MEMORY * 5), + feed.ref, + List(PrewarmingConfig(2, 2, exec, memoryLimit, ttl, threshold, increment)))) containers(0).expectMsg(Start(exec, memoryLimit)) containers(1).expectMsg(Start(exec, memoryLimit)) @@ -761,6 +778,93 @@ class ContainerPoolTests containers(4).expectNoMessage(100.milliseconds) containers(5).expectNoMessage(100.milliseconds) } + + it should "remove the prewarmed container after ttl time if unused" in { + val (containers, factory) = testContainers(2) + val feed = TestProbe() + + val ttl = Duration("2 seconds") + val pool = + system.actorOf( + ContainerPool + .props( + factory, + poolConfig(MemoryLimit.STD_MEMORY * 4), + feed.ref, + List(PrewarmingConfig(2, 2, exec, memoryLimit, ttl, threshold, increment)))) + containers(0).expectMsg(Start(exec, memoryLimit)) + containers(1).expectMsg(Start(exec, memoryLimit)) + containers(0).send(pool, NeedWork(preWarmedData(exec.kind))) + containers(1).send(pool, NeedWork(preWarmedData(exec.kind))) + stream.reset() + + // Make sure prewarmed containers can be deleted from prewarmedPool due to unused + Thread.sleep(3.seconds.toMillis) + + pool ! DeleteUnusedPrewarmedContainer + containers(0).expectMsg(RemovePreWarmedContainer) + containers(1).expectMsg(RemovePreWarmedContainer) + containers(0).send(pool, PreWarmedContainerRemoved) + containers(1).send(pool, PreWarmedContainerRemoved) + + // Make sure prewarmed containers are deleted + Thread.sleep(2.seconds.toMillis) + + stream.toString should include("prewarmed container is deleted") + stream.reset() + } + + it should "supplement prewarmed container when doesn't have enough container to handle activation" in { + val (containers, factory) = testContainers(6) + val feed = TestProbe() + + val ttl = Duration("2 seconds") + val pool = + system.actorOf( + ContainerPool + .props( + factory, + poolConfig(MemoryLimit.STD_MEMORY * 4), + feed.ref, + List(PrewarmingConfig(2, 2, exec, memoryLimit, ttl, threshold, increment)))) + containers(0).expectMsg(Start(exec, memoryLimit)) + containers(1).expectMsg(Start(exec, memoryLimit)) + containers(0).send(pool, NeedWork(preWarmedData(exec.kind))) + containers(1).send(pool, NeedWork(preWarmedData(exec.kind))) + stream.reset() + + // Make sure prewarmed containers can be deleted from prewarmedPool due to unused + Thread.sleep(3.seconds.toMillis) + + pool ! DeleteUnusedPrewarmedContainer + containers(0).expectMsg(RemovePreWarmedContainer) + containers(1).expectMsg(RemovePreWarmedContainer) + containers(0).send(pool, PreWarmedContainerRemoved) + containers(1).send(pool, PreWarmedContainerRemoved) + + val action = ExecutableWhiskAction( + EntityPath("actionSpace"), + EntityName("actionName"), + exec, + limits = ActionLimits(memory = MemoryLimit(memoryLimit))) + val run = createRunMessage(action, invocationNamespace) + + // Make sure prewarmed containers are deleted + Thread.sleep(2.seconds.toMillis) + + pool ! run + pool ! run + containers(2).expectMsg(run) + containers(3).expectMsg(run) + + // Make sure cold start in previous 1 minute can be counted + Thread.sleep(60.seconds.toMillis) + + // supplement 2 prewarmed container + pool ! SupplementPrewarmedContainer + containers(4).expectMsg(Start(exec, memoryLimit)) + containers(5).expectMsg(Start(exec, memoryLimit)) + } } abstract class MockableContainer extends Container { protected[core] val addr: ContainerAddress = ContainerAddress("nohost") diff --git a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecHelpers.scala b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecHelpers.scala index 6907d6ddd72..bb887c62aef 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecHelpers.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecHelpers.scala @@ -29,6 +29,8 @@ import org.apache.openwhisk.core.entity.size._ import spray.json._ import spray.json.DefaultJsonProtocol._ +import scala.concurrent.duration.Duration + trait ExecHelpers extends Matchers with WskActorSystem with StreamLogging { self: Suite => @@ -40,6 +42,10 @@ trait ExecHelpers extends Matchers with WskActorSystem with StreamLogging { protected val BLACKBOX = "blackbox" protected val JAVA_DEFAULT = "java:8" + val ttl = Duration("10 minutes") + val threshold = 1 + val increment = 1 + private def attFmt[T: JsonFormat] = Attachments.serdes[T] protected def imageName(name: String) = @@ -52,7 +58,7 @@ trait ExecHelpers extends Matchers with WskActorSystem with StreamLogging { imageName(NODEJS10), default = Some(true), deprecated = Some(false), - stemCells = Some(List(StemCell(2, 256.MB)))), + stemCells = Some(List(StemCell(2, 2, 256.MB, ttl, threshold, increment)))), trim(code), main.map(_.trim)) } @@ -75,7 +81,7 @@ trait ExecHelpers extends Matchers with WskActorSystem with StreamLogging { imageName(NODEJS10), default = Some(true), deprecated = Some(false), - stemCells = Some(List(StemCell(2, 256.MB)))), + stemCells = Some(List(StemCell(2, 2, 256.MB, ttl, threshold, increment)))), binary, main.map(_.trim)) } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala index 605b0465fda..83324ed7d62 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala @@ -28,6 +28,7 @@ import org.apache.openwhisk.core.entity.ExecManifest._ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.core.entity.ByteSize +import scala.concurrent.duration.Duration import scala.util.Success @RunWith(classOf[JUnitRunner]) @@ -35,6 +36,10 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging behavior of "ExecManifest" + val ttl = Duration("2 minutes") + val threshold = 1 + val increment = 1 + private def manifestFactory(runtimes: JsObject) = { JsObject("runtimes" -> runtimes) } @@ -71,7 +76,8 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging val k1 = RuntimeManifest("k1", ImageName("???")) val k2 = RuntimeManifest("k2", ImageName("???"), default = Some(true)) val p1 = RuntimeManifest("p1", ImageName("???")) - val s1 = RuntimeManifest("s1", ImageName("???"), stemCells = Some(List(StemCell(2, 256.MB)))) + val s1 = + RuntimeManifest("s1", ImageName("???"), stemCells = Some(List(StemCell(2, 2, 256.MB, ttl, threshold, increment)))) val mf = manifestFactory(JsObject("ks" -> Set(k1, k2).toJson, "p1" -> Set(p1).toJson, "s1" -> Set(s1).toJson)) val runtimes = ExecManifest.runtimes(mf, RuntimeManifestConfig()).get @@ -100,7 +106,8 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging val k1 = RuntimeManifest("k1", ImageName("???", None, Some("ppp"))) val p1 = RuntimeManifest("p1", ImageName("???", None, Some("ppp"), Some("ttt"))) val q1 = RuntimeManifest("q1", ImageName("???", Some("rrr"), None, Some("ttt"))) - val s1 = RuntimeManifest("s1", ImageName("???"), stemCells = Some(List(StemCell(2, 256.MB)))) + val s1 = + RuntimeManifest("s1", ImageName("???"), stemCells = Some(List(StemCell(2, 2, 256.MB, ttl, threshold, increment)))) val mf = JsObject( @@ -123,7 +130,7 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging runtimes.resolveDefaultRuntime("p1").get.image.resolveImageName() shouldBe "ppp/???:ttt" runtimes.resolveDefaultRuntime("q1").get.image.resolveImageName() shouldBe "rrr/???:ttt" runtimes.resolveDefaultRuntime("s1").get.image.resolveImageName() shouldBe "???" - runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).count shouldBe 2 + runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).initialCount shouldBe 2 runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).memory shouldBe 256.MB } @@ -236,26 +243,45 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging } it should "de/serialize stem cell configuration" in { - val cell = StemCell(3, 128.MB) - val cellAsJson = JsObject("count" -> JsNumber(3), "memory" -> JsString("128 MB")) + val cell = StemCell(3, 3, 128.MB, ttl, threshold, increment) + val cellAsJson = JsObject( + "initialCount" -> JsNumber(3), + "maxCount" -> JsNumber(3), + "memory" -> JsString("128 MB"), + "ttl" -> JsString("2 minutes"), + "threshold" -> JsNumber(1), + "increment" -> JsNumber(1)) stemCellSerdes.write(cell) shouldBe cellAsJson stemCellSerdes.read(cellAsJson) shouldBe cell an[IllegalArgumentException] shouldBe thrownBy { - StemCell(-1, 128.MB) + StemCell(-1, -1, 128.MB, ttl, threshold, increment) } an[IllegalArgumentException] shouldBe thrownBy { - StemCell(0, 128.MB) + StemCell(0, 0, 128.MB, ttl, threshold, increment) } an[IllegalArgumentException] shouldBe thrownBy { - val cellAsJson = JsObject("count" -> JsNumber(0), "memory" -> JsString("128 MB")) + val cellAsJson = JsObject( + "initialCount" -> JsNumber(0), + "maxCount" -> JsNumber(0), + "memory" -> JsString("128 MB"), + "ttl" -> JsString("2 minutes"), + "threshold" -> JsNumber(1), + "increment" -> JsNumber(1)) stemCellSerdes.read(cellAsJson) } the[IllegalArgumentException] thrownBy { - val cellAsJson = JsObject("count" -> JsNumber(1), "memory" -> JsString("128")) + val cellAsJson = + JsObject( + "initialCount" -> JsNumber(1), + "maxCount" -> JsNumber(1), + "memory" -> JsString("128"), + "ttl" -> JsString("2 minutes"), + "threshold" -> JsNumber(1), + "increment" -> JsNumber(1)) stemCellSerdes.read(cellAsJson) } should have message { ByteSize.formatError @@ -273,8 +299,12 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging | "name": "nodejsaction" | }, | "stemCells": [{ - | "count": 1, - | "memory": "128 MB" + | "initialCount": 1, + | "maxCount": 1, + | "memory": "128 MB", + | "ttl": "2 minutes", + | "threshold": 1, + | "increment": 1 | }] | }, { | "kind": "nodejs:8", @@ -283,11 +313,19 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging | "name": "nodejsaction" | }, | "stemCells": [{ - | "count": 1, - | "memory": "128 MB" + | "initialCount": 1, + | "maxCount": 1, + | "memory": "128 MB", + | "ttl": "2 minutes", + | "threshold": 1, + | "increment": 1 | }, { - | "count": 1, - | "memory": "256 MB" + | "initialCount": 1, + | "maxCount": 1, + | "memory": "256 MB", + | "ttl": "2 minutes", + | "threshold": 1, + | "increment": 1 | }] | } | ], @@ -297,8 +335,12 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging | "name": "pythonaction" | }, | "stemCells": [{ - | "count": 2, - | "memory": "256 MB" + | "initialCount": 2, + | "maxCount": 2, + | "memory": "256 MB", + | "ttl": "2 minutes", + | "threshold": 1, + | "increment": 1 | }] | }], | "swiftf": [{ @@ -322,13 +364,18 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging "nodejs:10", ImageName("nodejsaction"), deprecated = Some(true), - stemCells = Some(List(StemCell(1, 128.MB)))) + stemCells = Some(List(StemCell(1, 1, 128.MB, ttl, threshold, increment)))) val js8 = RuntimeManifest( "nodejs:8", ImageName("nodejsaction"), default = Some(true), - stemCells = Some(List(StemCell(1, 128.MB), StemCell(1, 256.MB)))) - val py = RuntimeManifest("python", ImageName("pythonaction"), stemCells = Some(List(StemCell(2, 256.MB)))) + stemCells = Some( + List(StemCell(1, 1, 128.MB, ttl, threshold, increment), StemCell(1, 1, 256.MB, ttl, threshold, increment)))) + val py = + RuntimeManifest( + "python", + ImageName("pythonaction"), + stemCells = Some(List(StemCell(2, 2, 256.MB, ttl, threshold, increment)))) val sw = RuntimeManifest("swift", ImageName("swiftaction"), stemCells = Some(List.empty)) val ph = RuntimeManifest("php", ImageName("phpaction")) val mf = ExecManifest.runtimes(json, RuntimeManifestConfig()).get @@ -345,13 +392,13 @@ class ExecManifestTests extends FlatSpec with WskActorSystem with StreamLogging } def stemCellFactory(m: RuntimeManifest, cells: List[StemCell]) = cells.map { c => - (m.kind, m.image, c.count, c.memory) + (m.kind, m.image, c.initialCount, c.memory) } mf.stemcells.flatMap { case (m, cells) => cells.map { c => - (m.kind, m.image, c.count, c.memory) + (m.kind, m.image, c.initialCount, c.memory) } }.toList should contain theSameElementsAs List( (js6.kind, js6.image, 1, 128.MB),