Skip to content

Commit

Permalink
Fix spelling (#5516)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbampton authored Oct 18, 2024
1 parent a4508ba commit 368dd80
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.openwhisk.core.ack
import org.apache.openwhisk.common.{TransactionId, UserEvents}
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer}
import org.apache.openwhisk.core.connector.{AcknowledgementMessage, EventMessage, MessageProducer}
import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID, WhiskActivation}

import scala.concurrent.Future
Expand All @@ -27,23 +27,23 @@ import scala.concurrent.Future
* are either completion messages for an activation to indicate a resource slot is free, or result-forwarding
* messages for continuations (e.g., sequences and conductor actions).
*
* The activation result is always provided because some acknowledegment messages may not carry the result of
* The activation result is always provided because some acknowledgement messages may not carry the result of
* the activation and this is needed for sending user events.
*
* @param tid the transaction id for the activation
* @param activationResult is the activation result
* @param blockingInvoke is true iff the activation was a blocking request
* @param controllerInstance the originating controller/loadbalancer id
* @param userId is the UUID for the namespace owning the activation
* @param acknowledegment the acknowledgement message to send
* @param acknowledgement the acknowledgement message to send
*/
trait ActiveAck {
def apply(tid: TransactionId,
activationResult: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID,
acknowledegment: AcknowledegmentMessage): Future[Any]
acknowledgement: AcknowledgementMessage): Future[Any]
}

trait EventSender {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.openwhisk.core.ack

import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, MessageProducer}
import org.apache.openwhisk.core.connector.{AcknowledgementMessage, MessageProducer}
import org.apache.openwhisk.core.entity.{ControllerInstanceId, UUID, WhiskActivation}
import spray.json.DefaultJsonProtocol._

Expand All @@ -30,7 +30,7 @@ class HealthActionAck(producer: MessageProducer)(implicit logging: Logging, ec:
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID,
acknowledegment: AcknowledegmentMessage): Future[Any] = {
acknowledgement: AcknowledgementMessage): Future[Any] = {
implicit val transid: TransactionId = tid

logging.debug(this, s"health action was successfully invoked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.openwhisk.core.ack
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer}
import org.apache.openwhisk.core.connector.{AcknowledgementMessage, EventMessage, MessageProducer}
import org.apache.openwhisk.core.entity._
import pureconfig._
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -38,19 +38,19 @@ class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventS
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID,
acknowledegment: AcknowledegmentMessage): Future[Any] = {
acknowledgement: AcknowledgementMessage): Future[Any] = {
implicit val transid: TransactionId = tid

def send(msg: AcknowledegmentMessage, recovery: Boolean = false) = {
def send(msg: AcknowledgementMessage, recovery: Boolean = false) = {
producer.send(topic = topicPrefix + "completed" + controllerInstance.asString, msg).andThen {
case Success(_) =>
val info = if (recovery) s"recovery ${msg.messageType}" else msg.messageType
logging.info(this, s"posted $info of activation ${acknowledegment.activationId}")
logging.info(this, s"posted $info of activation ${acknowledgement.activationId}")
}
}

// UserMetrics are sent, when the slot is free again. This ensures, that all metrics are sent.
if (acknowledegment.isSlotFree.nonEmpty) {
if (acknowledgement.isSlotFree.nonEmpty) {
eventSender.foreach { s =>
EventMessage.from(activationResult, instance.source, userId) match {
case Success(msg) => s.send(msg)
Expand All @@ -62,10 +62,10 @@ class MessagingActiveAck(producer: MessageProducer, instance: InstanceId, eventS
// An acknowledgement containing the result is only needed for blocking invokes in order to further the
// continuation. A result message for a non-blocking activation is not actually registered in the load balancer
// and the container proxy should not send such an acknowlegement unless it's a blocking request. Here the code
// is defensive and will shrink all non-blocking acknowledegments.
send(if (blockingInvoke) acknowledegment else acknowledegment.shrink).recoverWith {
// is defensive and will shrink all non-blocking acknowledgements.
send(if (blockingInvoke) acknowledgement else acknowledgement.shrink).recoverWith {
case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
send(acknowledegment.shrink, recovery = true)
send(acknowledgement.shrink, recovery = true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ case class ActivationMessage(override val transid: TransactionId,
* Message that is sent from the invoker to the controller after action is completed or after slot is free again for
* new actions.
*/
abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Message {
abstract class AcknowledgementMessage(private val tid: TransactionId) extends Message {
override val transid: TransactionId = tid

override def serialize: String = AcknowledegmentMessage.serdes.write(this).compactPrint
override def serialize: String = AcknowledgementMessage.serdes.write(this).compactPrint

/** Pithy descriptor for logging. */
def messageType: String
Expand All @@ -106,7 +106,7 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me
/**
* Converts the message to a more compact form if it cannot cross the message bus as is or some of its details are not necessary.
*/
def shrink: AcknowledegmentMessage
def shrink: AcknowledgementMessage
}

/**
Expand All @@ -122,7 +122,7 @@ case class CombinedCompletionAndResultMessage private (override val transid: Tra
response: Either[ActivationId, WhiskActivation],
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
extends AcknowledgementMessage(transid) {
override def messageType = "combined"

override def result = Some(response)
Expand All @@ -148,7 +148,7 @@ case class CompletionMessage private (override val transid: TransactionId,
override val activationId: ActivationId,
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
extends AcknowledgementMessage(transid) {
override def messageType = "completion"

override def result = None
Expand All @@ -171,7 +171,7 @@ case class CompletionMessage private (override val transid: TransactionId,
* Right when this message is created.
*/
case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage(transid) {
extends AcknowledgementMessage(transid) {
override def messageType = "result"

override def result = Some(response)
Expand Down Expand Up @@ -209,7 +209,7 @@ object CombinedCompletionAndResultMessage extends DefaultJsonProtocol {
instance: InstanceId): CombinedCompletionAndResultMessage =
new CombinedCompletionAndResultMessage(transid, Right(activation), Some(activation.response.isWhiskError), instance)

implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
implicit private val eitherSerdes = AcknowledgementMessage.eitherResponse
implicit val serdes = jsonFormat4(
CombinedCompletionAndResultMessage
.apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _: Option[Boolean], _: InstanceId))
Expand Down Expand Up @@ -239,12 +239,12 @@ object ResultMessage extends DefaultJsonProtocol {
def apply(transid: TransactionId, activation: WhiskActivation): ResultMessage =
new ResultMessage(transid, Right(activation))

implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
implicit private val eitherSerdes = AcknowledgementMessage.eitherResponse
implicit val serdes = jsonFormat2(ResultMessage.apply(_: TransactionId, _: Either[ActivationId, WhiskActivation]))
}

object AcknowledegmentMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[AcknowledegmentMessage] = Try(serdes.read(msg.parseJson))
object AcknowledgementMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[AcknowledgementMessage] = Try(serdes.read(msg.parseJson))

protected[connector] val eitherResponse = new JsonFormat[Either[ActivationId, WhiskActivation]] {
def write(either: Either[ActivationId, WhiskActivation]) = either.fold(_.toJson, _.toJson)
Expand All @@ -259,13 +259,13 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
}
}

implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
override def write(m: AcknowledegmentMessage): JsValue = m.toJson
implicit val serdes = new RootJsonFormat[AcknowledgementMessage] {
override def write(m: AcknowledgementMessage): JsValue = m.toJson

// The field invoker is only part of CombinedCompletionAndResultMessage and CompletionMessage.
// If this field is part of the JSON, we try to deserialize into one of these two types,
// and otherwise to a ResultMessage. If all conversions fail, an error will be thrown that needs to be handled.
override def read(json: JsValue): AcknowledegmentMessage = {
override def read(json: JsValue): AcknowledgementMessage = {
val JsObject(fields) = json
val completion = fields.contains("instance")
val result = fields.contains("response")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ abstract class WhiskEntity protected[entity] (en: EntityName, val entityType: St

/**
* A JSON view of the entity, that should match the result returned in a list operation.
* This should be synchronized with the views computed in the databse.
* This should be synchronized with the views computed in the database.
* Strictly used in view testing to enforce alignment.
*/
def summaryAsJson: JsObject = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,19 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
/** 4. Get the ack message and parse it */
protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
AcknowledegmentMessage.parse(raw) match {
case Success(acknowledegment) =>
acknowledegment.isSlotFree.foreach { instance =>
AcknowledgementMessage.parse(raw) match {
case Success(acknowledgement) =>
acknowledgement.isSlotFree.foreach { instance =>
processCompletion(
acknowledegment.activationId,
acknowledegment.transid,
acknowledgement.activationId,
acknowledgement.transid,
forced = false,
isSystemError = acknowledegment.isSystemError.getOrElse(false),
isSystemError = acknowledgement.isSystemError.getOrElse(false),
instance)
}

acknowledegment.result.foreach { response =>
processResult(acknowledegment.activationId, acknowledegment.transid, response)
acknowledgement.result.foreach { response =>
processResult(acknowledgement.activationId, acknowledgement.transid, response)
}

activationFeed ! MessageFeed.Processed
Expand All @@ -242,7 +242,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,

case _ =>
activationFeed ! MessageFeed.Processed
logging.error(this, s"Unexpected Acknowledgment message received by loadbalancer: $raw")
logging.error(this, s"Unexpected Acknowledgement message received by loadbalancer: $raw")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,18 +295,18 @@ class FPCPoolBalancer(config: WhiskConfig,
/** 4. Get the active-ack message and parse it */
protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
AcknowledegmentMessage.parse(raw) match {
case Success(acknowledegment) =>
acknowledegment.isSlotFree.foreach { invoker =>
AcknowledgementMessage.parse(raw) match {
case Success(acknowledgement) =>
acknowledgement.isSlotFree.foreach { invoker =>
processCompletion(
acknowledegment.activationId,
acknowledegment.transid,
acknowledgement.activationId,
acknowledgement.transid,
forced = false,
isSystemError = acknowledegment.isSystemError.getOrElse(false))
isSystemError = acknowledgement.isSystemError.getOrElse(false))
}

acknowledegment.result.foreach { response =>
processResult(acknowledegment.activationId, acknowledegment.transid, response)
acknowledgement.result.foreach { response =>
processResult(acknowledgement.activationId, acknowledgement.transid, response)
}

activationFeed ! MessageFeed.Processed
Expand All @@ -316,7 +316,7 @@ class FPCPoolBalancer(config: WhiskConfig,

case _ =>
activationFeed ! MessageFeed.Processed
logging.warn(this, s"Unexpected Acknowledgment message received by loadbalancer: $raw")
logging.warn(this, s"Unexpected Acknowledgement message received by loadbalancer: $raw")
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/performance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Next, the action will be invoked 100 times blocking and one after each other. Be
`PAUSE_BETWEEN_INVOKES` milliseconds. The last step is to delete the action.

Once one language is finished, the next kind will be taken. They are not running in parallel. There are never more than
1 activations in the system, as we only want to meassure latency of warm activations.
1 activations in the system, as we only want to measure latency of warm activations.
As all actions are invoked blocking and only one action is in the system, it doesn't matter how many controllers
and invokers are deployed. If several controllers or invokers are deployed, all controllers send the activation
always to the same invoker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ColdBlockingInvokeSimulation extends Simulation {
// Execute all actions for the given amount of time.
.during(seconds) {
// Cycle through the actions of this user, to not invoke the same action directly one after each other.
// Otherwise there is the possiblity, that it is warm.
// Otherwise there is the possibility, that it is warm.
repeat(actionsPerUser, "i") {
exec(openWhisk("Invoke action").authenticate(uuid, key).action(actionName).invoke())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.junit.JUnitRunner
import spray.json._
import org.apache.openwhisk.common.{TransactionId, WhiskInstants}
import org.apache.openwhisk.core.connector.{
AcknowledegmentMessage,
AcknowledgementMessage,
CombinedCompletionAndResultMessage,
CompletionMessage,
ResultMessage
Expand Down Expand Up @@ -60,15 +60,15 @@ class AcknowledgementMessageTests extends FlatSpec with Matchers with WhiskInsta
m.isSlotFree shouldBe empty
m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> m.response.left.get.toJson).compactPrint
m.serialize shouldBe m.toJson.compactPrint
AcknowledegmentMessage.parse(m.serialize) shouldBe Success(m)
AcknowledgementMessage.parse(m.serialize) shouldBe Success(m)
}

it should "serialize and deserialize a Result message with Right result" in {
val m = ResultMessage(TransactionId.testing, activation)
m.response shouldBe 'right
m.isSlotFree shouldBe empty
m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> m.response.right.get.toJson).compactPrint
AcknowledegmentMessage.parse(m.serialize) shouldBe Success(m)
AcknowledgementMessage.parse(m.serialize) shouldBe Success(m)
}

it should "serialize and deserialize a Completion message" in {
Expand All @@ -79,7 +79,7 @@ class AcknowledgementMessageTests extends FlatSpec with Matchers with WhiskInsta
InvokerInstanceId(0, userMemory = defaultUserMemory))
m.isSlotFree should not be empty
m.serialize shouldBe m.toJson.compactPrint
AcknowledegmentMessage.parse(m.serialize) shouldBe Success(m)
AcknowledgementMessage.parse(m.serialize) shouldBe Success(m)
}

it should "serialize and deserialize a CombinedCompletionAndResultMessage" in {
Expand All @@ -92,7 +92,7 @@ class AcknowledgementMessageTests extends FlatSpec with Matchers with WhiskInsta
c.isSlotFree should not be empty
c.isSystemError shouldBe Some(false)
c.serialize shouldBe c.toJson.compactPrint
AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
AcknowledgementMessage.parse(c.serialize) shouldBe Success(c)
}

withClue("system error true and right") {
Expand All @@ -106,7 +106,7 @@ class AcknowledgementMessageTests extends FlatSpec with Matchers with WhiskInsta
c.isSlotFree should not be empty
c.isSystemError shouldBe Some(true)
c.serialize shouldBe c.toJson.compactPrint
AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
AcknowledgementMessage.parse(c.serialize) shouldBe Success(c)
}

withClue("system error false and left") {
Expand All @@ -118,7 +118,7 @@ class AcknowledgementMessageTests extends FlatSpec with Matchers with WhiskInsta
c.isSlotFree should not be empty
c.isSystemError shouldBe Some(false)
c.serialize shouldBe c.toJson.compactPrint
AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
AcknowledgementMessage.parse(c.serialize) shouldBe Success(c)
}

withClue("system error true and left") {
Expand All @@ -132,7 +132,7 @@ class AcknowledgementMessageTests extends FlatSpec with Matchers with WhiskInsta
c.isSlotFree should not be empty
c.isSystemError shouldBe Some(true)
c.serialize shouldBe c.toJson.compactPrint
AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
AcknowledgementMessage.parse(c.serialize) shouldBe Success(c)
}
}
}
Loading

0 comments on commit 368dd80

Please sign in to comment.