Skip to content

Commit

Permalink
Adjust prewarm container dynamically
Browse files Browse the repository at this point in the history
  • Loading branch information
ningyougang committed Apr 10, 2020
1 parent ecf92b0 commit 98e06dd
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 52 deletions.
8 changes: 6 additions & 2 deletions ansible/files/runtimes.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@
},
"stemCells": [
{
"count": 2,
"memory": "256 MB"
"initialCount": 2,
"maxCount": 4,
"memory": "256 MB",
"ttl": "2 minutes",
"threshold": 1,
"increment": 1
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import spray.json.DefaultJsonProtocol._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.entity.Attachments._
import org.apache.openwhisk.core.entity.Attachments.Attached._
import fastparse._, NoWhitespace._
import fastparse._
import NoWhitespace._

import scala.concurrent.duration.Duration

/**
* Reads manifest of supported runtimes from configuration file and stores
Expand Down Expand Up @@ -135,11 +138,24 @@ protected[core] object ExecManifest {
/**
* A stemcell configuration read from the manifest for a container image to be initialized by the container pool.
*
* @param count the number of stemcell containers to create
* @param initialCount the initial number of stemcell containers to create
* @param maxCount the max number of stemcell containers to create
* @param memory the max memory this stemcell will allocate
* @param ttl time to live of the prewarmed container
* @param threshold the executed activation number of cold start in previous one minute
* @param increment increase increment prewarmed number when activation >= threshold if doesn't have enough prewarmed containers
*/
protected[entity] case class StemCell(count: Int, memory: ByteSize) {
require(count > 0, "count must be positive")
protected[entity] case class StemCell(initialCount: Int,
maxCount: Int,
memory: ByteSize,
ttl: Duration,
threshold: Int,
increment: Int) {
require(initialCount > 0, "initialCount must be positive")
require(maxCount >= initialCount, "maxCount must be greater than or equal to initialCount")
require(ttl.toSeconds > 0, "ttl must be positive")
require(threshold > 0, "threshold must be positive")
require(increment > 0 && increment <= maxCount, "increment must be positive and less than or equal to count")
}

/**
Expand Down Expand Up @@ -344,9 +360,19 @@ protected[core] object ExecManifest {

protected[entity] implicit val imageNameSerdes: RootJsonFormat[ImageName] = jsonFormat4(ImageName.apply)

protected[entity] implicit val ttlSerdes: RootJsonFormat[Duration] = new RootJsonFormat[Duration] {
override def write(duration: Duration): JsValue = JsString(duration.toString)

override def read(value: JsValue): Duration = value match {
case JsString(s) => Duration(s)
case _ =>
deserializationError("time unit not supported. Only milliseconds, seconds, minutes, hours, days are supported")
}
}

protected[entity] implicit val stemCellSerdes: RootJsonFormat[StemCell] = {
import org.apache.openwhisk.core.entity.size.serdes
jsonFormat2(StemCell.apply)
jsonFormat6(StemCell.apply)
}

protected[entity] implicit val runtimeManifestSerdes: RootJsonFormat[RuntimeManifest] = jsonFormat8(RuntimeManifest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.openwhisk.core.containerpool

import java.text.SimpleDateFormat
import java.time.{Duration => JDuration, Instant}
import java.util.Calendar

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.apache.openwhisk.common.MetricEmitter
import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration._
Expand All @@ -36,6 +40,9 @@ case class WorkerData(data: ContainerData, state: WorkerState)

case object EmitMetrics

case object DeleteUnusedPrewarmedContainer
case object SupplementPrewarmedContainer

/**
* A pool managing containers to run actions on.
*
Expand All @@ -59,11 +66,10 @@ case object EmitMetrics
class ContainerPool(childFactory: ActorRefFactory => ActorRef,
feed: ActorRef,
prewarmConfig: List[PrewarmingConfig] = List.empty,
poolConfig: ContainerPoolConfig)
poolConfig: ContainerPoolConfig)(implicit val logging: Logging)
extends Actor {
import ContainerPool.memoryConsumptionOf

implicit val logging = new AkkaLogging(context.system.log)
implicit val ec = context.dispatcher

var freePool = immutable.Map.empty[ActorRef, ContainerData]
Expand All @@ -80,8 +86,59 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
//periodically emit metrics (don't need to do this for each message!)
context.system.scheduler.schedule(30.seconds, 10.seconds, self, EmitMetrics)

// Key is `kind,memory,time`, value is the number of cold Start in minute
var coldStartCount = immutable.Map.empty[String, Int]

backfillPrewarms(true)

// check periodically every 1 minute, delete prewarmed container if unused for sometime
context.system.scheduler.schedule(10.seconds, 1.minute, self, DeleteUnusedPrewarmedContainer)

// check periodically for the cold start and create some increment containers automatically if activation >= threshold
context.system.scheduler.schedule(1.minute, 1.minute, self, SupplementPrewarmedContainer)

def deleteUnusedPrewarmedContainer(): Unit = {
prewarmConfig.foreach { config =>
val kind = config.exec.kind
val memory = config.memoryLimit
val ttlSeconds = config.ttl.toSeconds
val containers = prewarmedPool.filter { warmInfo =>
warmInfo match {
case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
case _ => false
}
}
for ((container, data) <- containers) {
if (JDuration.between(data.lastUsed, Instant.now).compareTo(JDuration.ofSeconds(ttlSeconds)) > 0) {
// Don't recover a new one under this situation
container ! RemovePreWarmedContainer
}
}
}
}

def supplementPrewarmedContainer() = {
prewarmConfig.foreach { config =>
val kind = config.exec.kind
val memory = config.memoryLimit
val calendar = Calendar.getInstance()
calendar.add(Calendar.MINUTE, -1)
val lastDate = new SimpleDateFormat("yyyy-MM-dd-HH:mm").format(calendar.getTime)
val key = s"${kind},${memory.toMB},${lastDate}"
coldStartCount.get(key) match {
case Some(value) =>
if (value >= config.threshold) {
prewarmContainerIfpossible(kind, memory, value / config.increment)
} else {
// at lease create 1 prewarmed container
prewarmContainerIfpossible(kind, memory, 1)
}
case None =>
}
coldStartCount = coldStartCount - key
}
}

def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
val namespaceName = r.msg.user.namespace.name
val actionName = r.action.name.name
Expand Down Expand Up @@ -131,7 +188,11 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
if (hasPoolSpaceFor(busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) {
takePrewarmContainer(r.action)
.map(container => (container, "prewarmed"))
.orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold"))
.orElse {
val container = Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")
countColdStart(r.action.exec.kind, r.action.limits.memory.megabytes.MB)
container
}
} else None)
.orElse(
// Remove a container and create a new one for the given job
Expand All @@ -145,7 +206,11 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
.map(_ =>
takePrewarmContainer(r.action)
.map(container => (container, "recreatedPrewarm"))
.getOrElse(createContainer(r.action.limits.memory.megabytes.MB), "recreated")))
.getOrElse {
val container = (createContainer(r.action.limits.memory.megabytes.MB), "recreated")
countColdStart(r.action.exec.kind, r.action.limits.memory.megabytes.MB)
container
}))

} else None

Expand Down Expand Up @@ -271,6 +336,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
//backfill prewarms on every ContainerRemoved, just in case
backfillPrewarms(false) //in case a prewarm is removed due to health failure or crash

// prewarmed container got removed
case PreWarmedContainerRemoved =>
prewarmedPool.get(sender()).foreach { _ =>
logging.info(this, "prewarmed container is deleted due to unused for long time")
prewarmedPool = prewarmedPool - sender()
}

// This message is received for one of these reasons:
// 1. Container errored while resuming a warm container, could not process the job, and sent the job back
// 2. The container aged, is destroying itself, and was assigned a job which it had to send back
Expand All @@ -281,6 +353,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
busyPool = busyPool - sender()
case EmitMetrics =>
emitMetrics()

case DeleteUnusedPrewarmedContainer =>
deleteUnusedPrewarmedContainer()

case SupplementPrewarmedContainer =>
supplementPrewarmedContainer()
}

/** Resend next item in the buffer, or trigger next item in the feed, if no items in the buffer. */
Expand Down Expand Up @@ -313,12 +391,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && p._2._2 == memory)
val containerCount = currentCount + startingCount
if (containerCount < config.count) {
if (containerCount < config.initialCount) {
logging.info(
this,
s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${config.count - containerCount} pre-warms to desired count: ${config.count} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
s"found ${currentCount} started and ${startingCount} starting; ${if (init) "initing" else "backfilling"} ${config.initialCount - containerCount} pre-warms to desired initialCount: ${config.initialCount} for kind:${config.exec.kind} mem:${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(containerCount until config.count).foreach { _ =>
(containerCount until config.initialCount).foreach { _ =>
prewarmContainer(config.exec, config.memoryLimit)
}
}
Expand All @@ -340,6 +418,47 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
newContainer ! Start(exec, memoryLimit)
}

/** Create a new prewarm container if currentCount doesn't reach maxCount */
def prewarmContainerIfpossible(kind: String, memoryLimit: ByteSize, count: Int): Unit = {
prewarmConfig
.filter { config =>
kind == config.exec.kind && memoryLimit == config.memoryLimit
}
.foreach { config =>
val currentCount = prewarmedPool.count {
case (_, PreWarmedData(_, `kind`, `memoryLimit`, _)) => true //done starting
case _ => false //started but not finished starting
}
val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && p._2._2 == memoryLimit)
val containerCount = currentCount + startingCount
if (containerCount < config.maxCount) {
val createNumber = if (config.maxCount - containerCount > count) count else config.maxCount - containerCount
1 to createNumber foreach { _ =>
prewarmContainer(config.exec, config.memoryLimit)
}
}
}
}

/** statistics the cold start */
def countColdStart(kind: String, memoryLimit: ByteSize): Unit = {
prewarmConfig
.filter { config =>
kind == config.exec.kind && memoryLimit == config.memoryLimit
}
.foreach { _ =>
val time = new SimpleDateFormat("yyyy-MM-dd-HH:mm").format(Calendar.getInstance().getTime)
val key = s"${kind},${memoryLimit.toMB},${time}"
coldStartCount.get(key) match {
case Some(value) => coldStartCount = coldStartCount + (key -> (value + 1))
case None => coldStartCount = coldStartCount + (key -> 1)
}
for ((k, v) <- coldStartCount) {
logging.info(this, s"===statistics the cold start, k: ${k}, v: ${v}")
}
}
}

/**
* Takes a prewarm container out of the prewarmed pool
* iff a container with a matching kind and memory is found.
Expand Down Expand Up @@ -502,9 +621,15 @@ object ContainerPool {
def props(factory: ActorRefFactory => ActorRef,
poolConfig: ContainerPoolConfig,
feed: ActorRef,
prewarmConfig: List[PrewarmingConfig] = List.empty) =
prewarmConfig: List[PrewarmingConfig] = List.empty)(implicit logging: Logging) =
Props(new ContainerPool(factory, feed, prewarmConfig, poolConfig))
}

/** Contains settings needed to perform container prewarming. */
case class PrewarmingConfig(count: Int, exec: CodeExec[_], memoryLimit: ByteSize)
case class PrewarmingConfig(initialCount: Int,
maxCount: Int,
exec: CodeExec[_],
memoryLimit: ByteSize,
ttl: Duration,
threshold: Int,
increment: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ case class PreWarmedData(override val container: Container,
kind: String,
override val memoryLimit: ByteSize,
override val activeActivationCount: Int = 0)
extends ContainerStarted(container, Instant.EPOCH, memoryLimit, activeActivationCount)
extends ContainerStarted(container, Instant.now(), memoryLimit, activeActivationCount)
with ContainerNotInUse {
override val initingState = "prewarmed"
override def nextRun(r: Run) =
Expand Down Expand Up @@ -196,12 +196,14 @@ case class WarmedData(override val container: Container,
case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
case class Run(action: ExecutableWhiskAction, msg: ActivationMessage, retryLogDeadline: Option[Deadline] = None)
case object Remove
case object RemovePreWarmedContainer
case class HealthPingEnabled(enabled: Boolean)

// Events sent by the actor
case class NeedWork(data: ContainerData)
case object ContainerPaused
case object ContainerRemoved // when container is destroyed
case object PreWarmedContainerRemoved
case object RescheduleJob // job is sent back to parent and could not be processed because container is being destroyed
case class PreWarmCompleted(data: PreWarmedData)
case class InitCompleted(data: WarmedData)
Expand Down Expand Up @@ -375,6 +377,8 @@ class ContainerProxy(factory: (TransactionId,

case Event(Remove, data: PreWarmedData) => destroyContainer(data)

case Event(RemovePreWarmedContainer, data: PreWarmedData) => destoryPreWarmedContainer(data)

// prewarm container failed
case Event(_: FailureMessage, data: PreWarmedData) =>
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_PREWARM)
Expand Down Expand Up @@ -636,6 +640,26 @@ class ContainerProxy(factory: (TransactionId,
goto(Removing) using newData
}

/**
* Destroys the preWarmed container if unused for long time.
* @param newData the ContainerStarted which container will be destroyed
*/
def destoryPreWarmedContainer(newData: ContainerStarted) = {
val container = newData.container
context.parent ! PreWarmedContainerRemoved

val unpause = stateName match {
case Paused => container.resume()(TransactionId.invokerNanny)
case _ => Future.successful(())
}

unpause
.flatMap(_ => container.destroy()(TransactionId.invokerNanny))
.map(_ => ContainerRemoved)
.pipeTo(self)
goto(Removing) using newData
}

/**
* Return any buffered jobs to parent, in case buffer is not empty at removal/error time.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,14 @@ class InvokerReactive(
ExecManifest.runtimesManifest.stemcells.flatMap {
case (mf, cells) =>
cells.map { cell =>
PrewarmingConfig(cell.count, new CodeExecAsString(mf, "", None), cell.memory)
PrewarmingConfig(
cell.initialCount,
cell.maxCount,
new CodeExecAsString(mf, "", None),
cell.memory,
cell.ttl,
cell.threshold,
cell.increment)
}
}.toList
}
Expand Down
Loading

0 comments on commit 98e06dd

Please sign in to comment.