Skip to content

Commit

Permalink
More examples & monitor heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
kshaa committed Oct 31, 2021
1 parent 272e720 commit f4f39e4
Show file tree
Hide file tree
Showing 20 changed files with 1,485 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ object HardwareControlMessage {
case class UploadSoftwareRequest(softwareId: SoftwareId) extends HardwareControlMessage
case class UploadSoftwareResult(error: Option[String]) extends HardwareControlMessage
case class SerialMonitorRequest(serialConfig: Option[SerialConfig]) extends HardwareControlMessage
case class SerialMonitorRequestStop() extends HardwareControlMessage
case class SerialMonitorResult(error: Option[String]) extends HardwareControlMessage
case class SerialMonitorMessageToAgent(message: SerialMessageToAgent) extends HardwareControlMessage
case class SerialMonitorMessageToClient(message: SerialMessageToClient) extends HardwareControlMessage
case class SerialMonitorListenersHeartbeatStart() extends HardwareControlMessage
case class SerialMonitorListenersHeartbeatPing() extends HardwareControlMessage
case class SerialMonitorListenersHeartbeatPong() extends HardwareControlMessage
case class SerialMonitorListenersHeartbeatFinish() extends HardwareControlMessage
case class Ping() extends HardwareControlMessage

val uploadUnavailableMessage: UploadSoftwareResult =
Expand Down
34 changes: 30 additions & 4 deletions backend/protocol/src/main/scala/diptestbed/protocol/Codecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,22 @@ object Codecs {
private implicit val uploadSoftwareRequestCodec: Codec[UploadSoftwareRequest] = deriveCodec[UploadSoftwareRequest]
private implicit val uploadSoftwareResultCodec: Codec[UploadSoftwareResult] = deriveCodec[UploadSoftwareResult]

private implicit val SerialMonitorRequestCodec: Codec[SerialMonitorRequest] = deriveCodec[SerialMonitorRequest]
private implicit val SerialMonitorResultCodec: Codec[SerialMonitorResult] = deriveCodec[SerialMonitorResult]
private implicit val SerialMonitorMessageToAgentCodec: Codec[SerialMonitorMessageToAgent] =
private implicit val serialMonitorRequestCodec: Codec[SerialMonitorRequest] = deriveCodec[SerialMonitorRequest]
private implicit val serialMonitorRequestStopCodec: Codec[SerialMonitorRequestStop] =
deriveCodec[SerialMonitorRequestStop]
private implicit val serialMonitorResultCodec: Codec[SerialMonitorResult] = deriveCodec[SerialMonitorResult]
private implicit val serialMonitorMessageToAgentCodec: Codec[SerialMonitorMessageToAgent] =
deriveCodec[SerialMonitorMessageToAgent]
private implicit val SerialMonitorMessageToClientCodec: Codec[SerialMonitorMessageToClient] =
private implicit val serialMonitorMessageToClientCodec: Codec[SerialMonitorMessageToClient] =
deriveCodec[SerialMonitorMessageToClient]
private implicit val serialMonitorListenersHeartbeatStartCodec: Codec[SerialMonitorListenersHeartbeatStart] =
deriveCodec[SerialMonitorListenersHeartbeatStart]
private implicit val serialMonitorListenersHeartbeatPingCodec: Codec[SerialMonitorListenersHeartbeatPing] =
deriveCodec[SerialMonitorListenersHeartbeatPing]
private implicit val serialMonitorListenersHeartbeatPongCodec: Codec[SerialMonitorListenersHeartbeatPong] =
deriveCodec[SerialMonitorListenersHeartbeatPong]
private implicit val serialMonitorListenersHeartbeatFinishCodec: Codec[SerialMonitorListenersHeartbeatFinish] =
deriveCodec[SerialMonitorListenersHeartbeatFinish]

private implicit val pingCodec: Codec[Ping] = deriveCodec[Ping]

Expand All @@ -47,9 +57,16 @@ object Codecs {
case c: UploadSoftwareResult => NamedMessage("uploadSoftwareResult", c.asJson).asJson

case c: SerialMonitorRequest => NamedMessage("serialMonitorRequest", c.asJson).asJson
case c: SerialMonitorRequestStop => NamedMessage("serialMonitorRequestStop", c.asJson).asJson
case c: SerialMonitorResult => NamedMessage("serialMonitorResult", c.asJson).asJson
case c: SerialMonitorMessageToAgent => NamedMessage("serialMonitorMessageToAgent", c.asJson).asJson
case c: SerialMonitorMessageToClient => NamedMessage("serialMonitorMessageToClient", c.asJson).asJson
case c: SerialMonitorListenersHeartbeatStart =>
NamedMessage("serialMonitorListenersHeartbeatStart", c.asJson).asJson
case c: SerialMonitorListenersHeartbeatPing => NamedMessage("serialMonitorListenersHeartbeatPing", c.asJson).asJson
case c: SerialMonitorListenersHeartbeatPong => NamedMessage("SerialMonitorListenersHeartbeatPong", c.asJson).asJson
case c: SerialMonitorListenersHeartbeatFinish =>
NamedMessage("SerialMonitorListenersHeartbeatFinish", c.asJson).asJson

case c: Ping => NamedMessage("ping", c.asJson).asJson
}
Expand All @@ -61,10 +78,19 @@ object Codecs {
case "uploadSoftwareResult" => Decoder[UploadSoftwareResult].widen[HardwareControlMessage].some

case "serialMonitorRequest" => Decoder[SerialMonitorRequest].widen[HardwareControlMessage].some
case "serialMonitorRequestStop" => Decoder[SerialMonitorRequestStop].widen[HardwareControlMessage].some
case "serialMonitorResult" => Decoder[SerialMonitorResult].widen[HardwareControlMessage].some
case "serialMonitorMessageToAgent" => Decoder[SerialMonitorMessageToAgent].widen[HardwareControlMessage].some
case "serialMonitorMessageToClient" =>
Decoder[SerialMonitorMessageToClient].widen[HardwareControlMessage].some
case "serialMonitorListenersHeartbeatStart" =>
Decoder[SerialMonitorListenersHeartbeatStart].widen[HardwareControlMessage].some
case "serialMonitorListenersHeartbeatPing" =>
Decoder[SerialMonitorListenersHeartbeatPing].widen[HardwareControlMessage].some
case "serialMonitorListenersHeartbeatPong" =>
Decoder[SerialMonitorListenersHeartbeatPong].widen[HardwareControlMessage].some
case "serialMonitorListenersHeartbeatFinish" =>
Decoder[SerialMonitorListenersHeartbeatFinish].widen[HardwareControlMessage].some

case "ping" => Decoder[Ping].widen[HardwareControlMessage].some
case _ => None
Expand Down
59 changes: 47 additions & 12 deletions backend/web/app/diptestbed/web/actors/HardwareControlActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import diptestbed.domain.HardwareControlMessage._
import akka.util.Timeout
import cats.data.EitherT
import cats.implicits.toTraverseOps
import scala.annotation.unused
import com.typesafe.scalalogging.LazyLogging
import diptestbed.web.actors.HardwareControlState.{ConfiguringMonitor, Initial, Uploading}
import diptestbed.web.actors.QueryActor.Promise
import scala.concurrent.duration.{DurationInt, FiniteDuration}

sealed trait HardwareControlState {}
object HardwareControlState {
Expand Down Expand Up @@ -77,7 +77,6 @@ class HardwareControlActor(
hardwareId: HardwareId,
)(implicit
iort: IORuntime,
@unused timeout: Timeout,
) extends Actor
with LazyLogging
with HardwareControlStateActions {
Expand All @@ -86,6 +85,19 @@ class HardwareControlActor(
val serialMonitorTopic: String = hardwareSerialMonitorTopic(hardwareId)
var state: HardwareControlState = Initial

var listenerHeartbeatsReceived: Int = 0
val listenerHeartbeatInterval: FiniteDuration = 5.seconds
val listenerHeartbeatWaitTime: FiniteDuration = 2.seconds // Should be less than the interval

def scheduleHeartbeatTest(): IO[Unit] = {
val laterTest = IO.sleep(listenerHeartbeatInterval) >>
IO(self ! SerialMonitorListenersHeartbeatStart())

laterTest.start.void
}

scheduleHeartbeatTest().unsafeRunAsync(_ => ())

def sendToAgent(message: Control): IO[Unit] =
IO(out ! message.asJson.noSpaces)
def sendToRequester(requester: ActorRef, message: Control): IO[Unit] =
Expand All @@ -108,13 +120,18 @@ class HardwareControlActor(

def controlHandler(inquirer: Option[ActorRef], message: Control): IO[Unit] =
message match {
case m: UploadSoftwareRequest => handleUploadSoftwareRequest(inquirer, m)
case m: UploadSoftwareResult => handleUploadSoftwareResult(m)
case m: SerialMonitorRequest => handleSerialMonitorRequest(inquirer, m)
case m: SerialMonitorResult => handleSerialMonitorResult(m)
case m: SerialMonitorMessageToClient => handleSerialMonitorMessageToClient(m)
case m: SerialMonitorMessageToAgent => handleSerialMonitorMessageToAgent(m)
case _: Ping => IO(())
case m: UploadSoftwareRequest => handleUploadSoftwareRequest(inquirer, m)
case m: UploadSoftwareResult => handleUploadSoftwareResult(m)
case m: SerialMonitorRequest => handleSerialMonitorRequest(inquirer, m)
case m: SerialMonitorRequestStop => handleSerialMonitorRequestStop(m)
case m: SerialMonitorResult => handleSerialMonitorResult(m)
case m: SerialMonitorMessageToClient => handleSerialMonitorMessageToClient(m)
case m: SerialMonitorMessageToAgent => handleSerialMonitorMessageToAgent(m)
case _: SerialMonitorListenersHeartbeatStart => handleSerialMonitorListenersHeartbeatStart()
case _: SerialMonitorListenersHeartbeatPing => IO(())
case _: SerialMonitorListenersHeartbeatPong => handleSerialMonitorListenersHeartbeatPong()
case _: SerialMonitorListenersHeartbeatFinish => handleSerialMonitorListenersHeartbeatFinish()
case _: Ping => IO(())
}

def handleUploadSoftwareRequest(inquirer: Option[ActorRef], message: UploadSoftwareRequest): IO[Unit] =
Expand All @@ -135,6 +152,9 @@ class HardwareControlActor(
_ <- IO.whenA(isUploading)(IO(setInitialState()))
} yield ()

def handleSerialMonitorRequestStop(message: SerialMonitorRequestStop): IO[Unit] =
sendToAgent(message)

def handleSerialMonitorRequest(inquirer: Option[ActorRef], message: SerialMonitorRequest): IO[Unit] =
for {
// If available, start monitor, maybe respond eventually
Expand All @@ -159,6 +179,22 @@ class HardwareControlActor(
def handleSerialMonitorMessageToAgent(message: SerialMonitorMessageToAgent): IO[Unit] =
sendToAgent(message)

def handleSerialMonitorListenersHeartbeatStart(): IO[Unit] = {
val startAndLaterFinish = IO { listenerHeartbeatsReceived = 0 } >>
IO(pubSubMediator ! Publish(serialMonitorTopic, SerialMonitorListenersHeartbeatPing())) >>
IO.sleep(listenerHeartbeatWaitTime) >>
IO(self ! SerialMonitorListenersHeartbeatFinish())

startAndLaterFinish.start.void
}

def handleSerialMonitorListenersHeartbeatPong(): IO[Unit] =
IO { listenerHeartbeatsReceived += 1 }

def handleSerialMonitorListenersHeartbeatFinish(): IO[Unit] =
(if (listenerHeartbeatsReceived == 0) sendToAgent(SerialMonitorRequestStop())
else sendToAgent(SerialMonitorRequest(None))) >>
scheduleHeartbeatTest()
}

object HardwareControlActor extends ActorHelper {
Expand All @@ -171,7 +207,6 @@ object HardwareControlActor extends ActorHelper {
hardwareId: HardwareId,
)(implicit
iort: IORuntime,
timeout: Timeout,
): Props = Props(new HardwareControlActor(pubSubMediator, out, hardwareId))

implicit val transformer: MessageFlowTransformer[Control, String] =
Expand All @@ -198,9 +233,9 @@ object HardwareControlActor extends ActorHelper {
})
} yield monitorResult

def sendSerialMessageToAgent(
def sendToHardwareActor(
hardwareId: HardwareId,
serialMessage: SerialMonitorMessageToAgent,
serialMessage: Control,
)(implicit actorSystem: ActorSystem, t: Timeout): EitherT[IO, String, Unit] =
for {
hardwareRef <- resolveActorRef(userActorPath(hardwareActor(hardwareId)))(actorSystem, implicitly)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ class HardwareSerialMonitorListenerActor(
case serialMessage: SerialMonitorMessageToClient =>
val message: HardwareControlMessage = serialMessage
sendToListener(message).unsafeRunAsync(_ => ())
case _: SerialMonitorListenersHeartbeatPing =>
sendToHardwareActor(hardwareId, SerialMonitorListenersHeartbeatPong()).value.unsafeRunAsync(_ => ())
case text: TextMessage =>
decode[HardwareControlMessage](text.data).toOption.foreach {
case serialMessage: SerialMonitorMessageToAgent =>
sendSerialMessageToAgent(hardwareId, serialMessage).value.unsafeRunAsync(_ => ())
sendToHardwareActor(hardwareId, serialMessage).value.unsafeRunAsync(_ => ())
case _ => ()
}
case _: SubscribeAck =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ class HardwareController(

def controlHardware(hardwareId: HardwareId): WebSocket = {
WebSocket.accept[HardwareControlMessage, String](_ => {
implicit val timeout: Timeout = 60.seconds
BetterActorFlow.actorRef(
subscriber => HardwareControlActor.props(pubSubMediator, subscriber, hardwareId),
maybeName = hardwareActor(hardwareId).some,
Expand Down
2 changes: 1 addition & 1 deletion client/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from engine import Engine
from agent_util import AgentConfig

LOGGER = log.timed_named_logger("client")
LOGGER = log.timed_named_logger("agent")
PI = TypeVar('PI')
PO = TypeVar('PO')

Expand Down
5 changes: 4 additions & 1 deletion client/agent_anvyl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
CommonIncomingMessage, \
UploadMessage, \
SerialMonitorRequest, \
SerialMonitorMessageToAgent
SerialMonitorMessageToAgent, \
SerialMonitorRequestStop
from sh import root_relative_path, outcome_sh
from agent_util import AgentConfig
import log
Expand Down Expand Up @@ -69,6 +70,8 @@ def process(
return self.process_upload_message_sh(message, self.firmware_upload)
elif isinstance(message, SerialMonitorRequest):
return self.process_serial_monitor(self.config.device_path, message)
elif isinstance(message, SerialMonitorRequestStop):
return self.process_serial_monitor_stop()
elif isinstance(message, SerialMonitorMessageToAgent):
return self.process_serial_monitor_to_agent(message)
else:
Expand Down
7 changes: 5 additions & 2 deletions client/agent_nrf52.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
CommonIncomingMessage, \
UploadMessage, \
SerialMonitorRequest, \
SerialMonitorMessageToAgent
SerialMonitorMessageToAgent, \
SerialMonitorRequestStop
from sh import root_relative_path, outcome_sh
from agent_util import AgentConfig
import log
Expand Down Expand Up @@ -68,7 +69,9 @@ def process(
if isinstance(message, UploadMessage):
return self.process_upload_message_sh(message, self.firmware_upload)
elif isinstance(message, SerialMonitorRequest):
return self.process_serial_monitor(self.config.device_path, message)
return self.process_serial_monitor(self.config.device, message)
elif isinstance(message, SerialMonitorRequestStop):
return self.process_serial_monitor_stop()
elif isinstance(message, SerialMonitorMessageToAgent):
return self.process_serial_monitor_to_agent(message)
else:
Expand Down
2 changes: 1 addition & 1 deletion client/cli_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
)
HEARTBEAT_SECONDS_OPTION = click.option(
'--heartbeat-seconds', '-r', "heartbeat_seconds", show_envvar=True,
type=int, envvar="DIP_HEARTBEAT_SECONDS", required=True, default=30,
type=int, envvar="DIP_HEARTBEAT_SECONDS", required=True, default=25,
help='Regular interval in which to ping the server'
)
JSON_OUTPUT_OPTION = click.option(
Expand Down
24 changes: 21 additions & 3 deletions client/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ async def keep_monitoring(self):
(serial, serialConfig) = self.serial
received_bytes = serial.read(serialConfig.receive_size)
if received_bytes is not None and len(received_bytes) > 0:
await self.monitor_to_server(received_bytes)
await asyncio.sleep(2)
# Send to server immediately, don't wait for request to finish
loop = asyncio.get_event_loop()
loop.create_task(self.monitor_to_server(received_bytes))
# Python is trash, sleep for a bit to allow other
# async coroutines to execute
await asyncio.sleep(serialConfig.timeout)

def start_monitor(self, serial: Serial, serial_config: SerialConfig):
"""Start serial monitoring"""
Expand All @@ -106,6 +110,7 @@ def stop_monitor(self):
if self.serial is not None:
(serial, _) = self.serial
serial.close()
self.serial = None

# Socket lifecycle methods
def on_start(self, socket: WebSocket):
Expand Down Expand Up @@ -177,11 +182,20 @@ def process_serial_monitor(
# Stop old monitor
self.stop_monitor()

# Connect to serial device w/ the given configurations
# Define a strictly non-empty configuration
if message.config is None:
serial_config = SerialConfig.empty()
else:
serial_config = message.config

# Avoid changes to monitoring if configuration doesn't change
if self.serial is not None:
(_, old_serial_config) = self.serial
if old_serial_config == serial_config:
LOGGER.debug("Skipping monitor request with unchanged config")
return Ok(SerialMonitorResult(None))

# Connect to serial device w/ the given configurations
serial_result = monitor_serial(device, serial_config)
if isinstance(serial_result, Err):
outcome_message = f"Failed setting up monitor: {pformat(serial_result.value, indent=4)}"
Expand All @@ -193,6 +207,10 @@ def process_serial_monitor(

return Ok(SerialMonitorResult(None))

def process_serial_monitor_stop(self):
"""Stop serial monitoring"""
self.stop_monitor()

def process_serial_monitor_to_agent(
self,
message: SerialMonitorMessageToAgent
Expand Down
12 changes: 11 additions & 1 deletion client/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ def __eq__(self, other) -> bool:
return self.error == other.error


@dataclass(frozen=True, eq=False)
class SerialMonitorRequestStop:
"""Message to request serial monitor stopping"""
pass


@dataclass(frozen=True, eq=False)
class SerialMonitorRequest:
"""Message to request serial monitor for a given microcontroller"""
Expand Down Expand Up @@ -112,7 +118,11 @@ class PingMessage(Generic[T]):
"""Message for sending heartbeats to server"""


CommonIncomingMessage = Union[UploadMessage, SerialMonitorRequest, SerialMonitorMessageToAgent]
CommonIncomingMessage = Union[
UploadMessage,
SerialMonitorRequest,
SerialMonitorRequestStop,
SerialMonitorMessageToAgent]
CommonOutgoingMessage = Union[UploadResultMessage, PingMessage, SerialMonitorResult, SerialMonitorMessageToClient]

MonitorListenerIncomingMessage = Union[MonitorUnavailable, SerialMonitorMessageToClient]
Expand Down
Loading

0 comments on commit f4f39e4

Please sign in to comment.