diff --git a/backend/domain/src/main/scala/iotfrisbee/domain/HardwareControlMessage.scala b/backend/domain/src/main/scala/iotfrisbee/domain/HardwareControlMessage.scala index 3e8ddc3..eafe0d6 100644 --- a/backend/domain/src/main/scala/iotfrisbee/domain/HardwareControlMessage.scala +++ b/backend/domain/src/main/scala/iotfrisbee/domain/HardwareControlMessage.scala @@ -4,4 +4,5 @@ sealed trait HardwareControlMessage {} object HardwareControlMessage { case class UploadSoftwareRequest(softwareId: SoftwareId) extends HardwareControlMessage case class UploadSoftwareResult(error: Option[String]) extends HardwareControlMessage + case class Ping() extends HardwareControlMessage } diff --git a/backend/protocol/src/main/scala/iotfrisbee/protocol/Codecs.scala b/backend/protocol/src/main/scala/iotfrisbee/protocol/Codecs.scala index e4139fc..2ca736a 100644 --- a/backend/protocol/src/main/scala/iotfrisbee/protocol/Codecs.scala +++ b/backend/protocol/src/main/scala/iotfrisbee/protocol/Codecs.scala @@ -27,19 +27,24 @@ object Codecs { private implicit val uploadSoftwareRequestCodec: Codec[UploadSoftwareRequest] = deriveCodec[UploadSoftwareRequest] private implicit val uploadSoftwareResultCodec: Codec[UploadSoftwareResult] = deriveCodec[UploadSoftwareResult] + private implicit val pingCodec: Codec[Ping] = deriveCodec[Ping] implicit val hardwareControlMessageEncoder: Encoder[HardwareControlMessage] = Encoder.instance { case c: UploadSoftwareRequest => NamedMessage("uploadSoftwareRequest", c.asJson).asJson - case c: UploadSoftwareResult => NamedMessage("uploadSoftwareResult", c.asJson).asJson + case c: UploadSoftwareResult => NamedMessage("uploadSoftwareResult", c.asJson).asJson + case c: Ping => NamedMessage("ping", c.asJson).asJson } implicit val hardwareControlMessageDecoder: Decoder[HardwareControlMessage] = - Decoder[NamedMessage].emap { m => { - val codec: Option[Decoder[HardwareControlMessage]] = m.command match { - case "uploadSoftwareRequest" => Decoder[UploadSoftwareRequest].widen[HardwareControlMessage].some - case "uploadSoftwareResult" => Decoder[UploadSoftwareResult].widen[HardwareControlMessage].some - case _ => None + Decoder[NamedMessage].emap { m => + { + val codec: Option[Decoder[HardwareControlMessage]] = m.command match { + case "uploadSoftwareRequest" => Decoder[UploadSoftwareRequest].widen[HardwareControlMessage].some + case "uploadSoftwareResult" => Decoder[UploadSoftwareResult].widen[HardwareControlMessage].some + case "ping" => Decoder[Ping].widen[HardwareControlMessage].some + case _ => None + } + codec.toRight("Unknown command").flatMap(_.decodeJson(m.payload).leftMap(_.message)) } - codec.toRight("Unknown command").flatMap(_.decodeJson(m.payload).leftMap(_.message)) - }} + } implicit def webResultSuccessCodec[A: Encoder: Decoder]: Codec[Success[A]] = Codec.forProduct1[Success[A], A]("success")(Success(_))(_.value) diff --git a/backend/web/app/iotfrisbee/web/actors/HardwareControlActor.scala b/backend/web/app/iotfrisbee/web/actors/HardwareControlActor.scala index ac47e49..63ccbd1 100644 --- a/backend/web/app/iotfrisbee/web/actors/HardwareControlActor.scala +++ b/backend/web/app/iotfrisbee/web/actors/HardwareControlActor.scala @@ -11,9 +11,10 @@ import akka.cluster.pubsub.DistributedPubSubMediator.Publish import cats.effect.IO import iotfrisbee.web.actors.HardwareControlActor.hardwareEventTopic import cats.effect.unsafe.IORuntime -import iotfrisbee.domain.HardwareControlMessage.{UploadSoftwareRequest, UploadSoftwareResult} +import iotfrisbee.domain.HardwareControlMessage.{Ping, UploadSoftwareRequest, UploadSoftwareResult} import akka.util.Timeout import cats.implicits.catsSyntaxOptionId + import scala.annotation.unused import com.typesafe.scalalogging.LazyLogging import iotfrisbee.web.actors.HardwareControlState.{Initial, Uploading} @@ -28,11 +29,12 @@ object HardwareControlState { class HardwareControlActor( pubSubMediator: ActorRef, out: ActorRef, - hardwareId: HardwareId + hardwareId: HardwareId, )(implicit iort: IORuntime, - @unused timeout: Timeout -) extends Actor with LazyLogging { + @unused timeout: Timeout, +) extends Actor + with LazyLogging { logger.info(s"Actor for hardware #${hardwareId} spawned") val eventTopic: String = hardwareEventTopic(hardwareId) @@ -40,7 +42,11 @@ class HardwareControlActor( def receive: Receive = { case controlMessage: Control => { - logger.info(s"Receiving anonymous control message: ${controlMessage}") + controlMessage match { + case Ping() => () + case interestingMessage => + logger.info(s"Receiving anonymous, non-ping control message: ${interestingMessage}") + } receiveControl(None, controlMessage) } case Promise(inquirer, controlMessage: Control) => { @@ -54,24 +60,14 @@ class HardwareControlActor( def receiveControl(inquirer: Option[ActorRef], message: Control): Unit = { val publish = IO(pubSubMediator ! Publish(eventTopic, message)) val process = message match { - case m: UploadSoftwareResult => handleUploadSoftwareResult(m) + case m: UploadSoftwareResult => handleUploadSoftwareResult(m) case m: UploadSoftwareRequest => handleUploadSoftwareRequest(inquirer, m) + case _: Ping => IO(()) } (publish *> process).unsafeRunSync() } - def messageForStateAllowed(message: Control, state: HardwareControlState): Boolean = message match { - case _: UploadSoftwareRequest => state match { - case Initial => true - case _ => false - } - case _: UploadSoftwareResult => state match { - case _: Uploading => true - case _ => false - } - } - def sendToAgent(message: Control): IO[Unit] = IO(out ! message.asJson.noSpaces) def sendToRequester(requester: ActorRef, message: Control): IO[Unit] = { logger.info(s"Sending upload result to requester ${requester.path.toStringWithoutAddress}") @@ -88,7 +84,9 @@ class HardwareControlActor( case Uploading(None) => IO.pure(false) } // Store the current state of "uploading" - _ <- if (isUploading) IO { state = Uploading(inquirer) } else IO(()) + _ <- + if (isUploading) IO { state = Uploading(inquirer) } + else IO(()) } yield () def handleUploadSoftwareResult(message: UploadSoftwareResult): IO[Unit] = @@ -96,11 +94,13 @@ class HardwareControlActor( // Send message to agent or respond with failure immediately isUploaded <- state match { case Uploading(Some(requester)) => sendToRequester(requester, message) *> IO.pure(true) - case Uploading(None) => IO.pure(true) - case _ => IO.pure(false) + case Uploading(None) => IO.pure(true) + case _ => IO.pure(false) } // Store the current state of "uploading" - _ <- if (isUploaded) IO { state = Initial } else IO(()) + _ <- + if (isUploaded) IO { state = Initial } + else IO(()) } yield () } @@ -111,17 +111,16 @@ object HardwareControlActor { def props( pubSubMediator: ActorRef, out: ActorRef, - hardwareId: HardwareId + hardwareId: HardwareId, )(implicit iort: IORuntime, - timeout: Timeout + timeout: Timeout, ): Props = Props(new HardwareControlActor(pubSubMediator, out, hardwareId)) implicit val transformer: MessageFlowTransformer[Control, String] = MessageFlowTransformer.stringMessageFlowTransformer.map(x => { decode[Control](x).toTry.getOrElse { - throw WebSocketCloseException( - CloseMessage(Some(CloseCodes.Unacceptable), "Failed to parse message")) + throw WebSocketCloseException(CloseMessage(Some(CloseCodes.Unacceptable), "Failed to parse message")) } }) } diff --git a/client/agent.py b/client/agent.py index 5d1564b..090aba4 100644 --- a/client/agent.py +++ b/client/agent.py @@ -42,18 +42,22 @@ async def agent( return 0 # Start communication/logic loop - LOGGER.info("Connected to control server, listening for commands") + LOGGER.info( + "Connected to control server, listening for commands, running start hook") + engine.on_start(websocket) while True: incoming_result = await websocket.rx() if isinstance(incoming_result, Err) \ and isinstance(incoming_result.value, ConnectionClosedError): LOGGER.error("Control server connection closed") + engine.on_end() return 1 if isinstance(incoming_result, Err) \ and isinstance(incoming_result.value, CodecParseException): LOGGER.error("Unknown command received, ignoring") elif isinstance(incoming_result, Err): LOGGER.error("Failed to receive message: %s", pformat(incoming_result.value, indent=4)) + engine.on_end() await websocket.disconnect() return 1 else: @@ -69,6 +73,7 @@ async def agent( LOGGER.error( "Failed to process message: %s", pformat(outgoing_result.value, indent=4)) + engine.on_end() await websocket.disconnect() return 1 else: @@ -79,6 +84,7 @@ async def agent( "Failed to transmit message (%s): %s", pformat(outgoing_message, indent=4), pformat(transmit_error, indent=4)) + engine.on_end() await websocket.disconnect() return 1 else: diff --git a/client/agent_anvyl.py b/client/agent_anvyl.py index 3e8d21d..4208c3a 100644 --- a/client/agent_anvyl.py +++ b/client/agent_anvyl.py @@ -1,6 +1,7 @@ #!/usr/bin/env python """Anvyl FPGA client functionality.""" +import asyncio from typing import Sequence, Tuple, Any from result import Result, Err from engine import Engine, EngineConfig @@ -8,6 +9,7 @@ from sh import root_relative_path, outcome_sh from agent_util import AgentConfig import log +from ws import WebSocket LOGGER = log.timed_named_logger("anvyl") FIRMWARE_UPLOAD_PATH = 'static/digilent_anvyl/upload.sh' diff --git a/client/agent_util.py b/client/agent_util.py index 4666d28..4c91cb7 100644 --- a/client/agent_util.py +++ b/client/agent_util.py @@ -11,3 +11,4 @@ class AgentConfig: """Common i.e. microcontroller-non-specific client configuration options""" hardware_id: UUID backend: BackendConfig + heartbeat_seconds: int diff --git a/client/cli.py b/client/cli.py index 6ed46ce..0d9bef0 100755 --- a/client/cli.py +++ b/client/cli.py @@ -3,6 +3,7 @@ import os import sys +import shutil from dataclasses import asdict from uuid import UUID import click @@ -39,6 +40,7 @@ def cli_client(): @cli_util.HARDWARE_ID_OPTION @cli_util.CONTROL_SERVER_OPTION @cli_util.STATIC_SERVER_OPTION +@cli_util.HEARTBEAT_SECONDS_OPTION @click.option( '--device', '-d', "device_str", type=str, envvar="DIP_NRF52_DEVICE", @@ -53,17 +55,21 @@ def cli_client(): help='Baudrate for NRF52 microcontroller serial port communications. ' 'E.g. baud rate of 115200' ) -def cli_agent_nrf52_upload( +def agent_nrf52_upload( hardware_id_str: str, control_server_str: str, static_server_str: str, + heartbeat_seconds: int, device_str: str, baudrate: int ): - """Agent for managing NRF52 microcontroller uploads""" + """[Linux] NRF52 microcontroller upload agent""" # Agent config constructor agent_config_result = cli_util.agent_config( - hardware_id_str, control_server_str, static_server_str) + hardware_id_str, + control_server_str, + static_server_str, + heartbeat_seconds) if isinstance(agent_config_result, Err): print_error(f"Failed to construct agent config: {agent_config_result.value}") sys.exit(1) @@ -79,6 +85,20 @@ def cli_agent_nrf52_upload( print_error("Baudrate must be a positive number") sys.exit(1) + # Validate dependencies + if shutil.which("adafruit-nrfutil") is None: + print_error("'adafruit-nrfutil' must be installed") + sys.exit(1) + if shutil.which("bash") is None: + print_error("'bash' must be installed") + sys.exit(1) + if shutil.which("grep") is None: + print_error("'grep' must be installed") + sys.exit(1) + if shutil.which("tee") is None: + print_error("'tee' must be installed") + sys.exit(1) + # Construct engine engine_config = EngineNRF52Config(agent_config, device_str, baudrate) agent_entrypoints.supervise_agent_nrf52(agent_config, engine_config) @@ -88,25 +108,28 @@ def cli_agent_nrf52_upload( @cli_util.HARDWARE_ID_OPTION @cli_util.CONTROL_SERVER_OPTION @cli_util.STATIC_SERVER_OPTION +@cli_util.HEARTBEAT_SECONDS_OPTION @click.option('--device', '-d', "device_str", show_envvar=True, type=str, envvar="DIP_ANVYL_DEVICE", required=True, help='Device user name (e.g. Anvyl).') @click.option('--scanchainindex', '-s', "scan_chain_index", show_envvar=True, type=int, envvar="DIP_ANVYL_SCAN_CHAIN_IDEX", required=True, help='Scan chain index of target JTAG device (e.g. 0)') -def cli_agent_anvyl_upload( +def agent_anvyl_upload( hardware_id_str: str, control_server_str: str, static_server_str: str, + heartbeat_seconds: int, device_str: str, scan_chain_index: int ): - """Agent for managing Anvyl FPGA uploads""" + """[Linux] Anvyl FPGA upload agent""" # Agent config constructor agent_config_result = cli_util.agent_config( hardware_id_str, control_server_str, - static_server_str) + static_server_str, + heartbeat_seconds) if isinstance(agent_config_result, Err): print_error(f"Failed to construct agent config: {agent_config_result.value}") sys.exit(1) @@ -117,6 +140,14 @@ def cli_agent_anvyl_upload( print_error("Scan chain index must be a non-negative number") sys.exit(1) + # Validate dependencies + if shutil.which("djtgcfg") is None: + print_error("'djtgcfg' must be installed") + sys.exit(1) + if shutil.which("bash") is None: + print_error("'bash' must be installed") + sys.exit(1) + # Construct engine engine_config = EngineAnvylConfig(agent_config, device_str, scan_chain_index) agent_entrypoints.supervise_agent_anvyl(agent_config, engine_config) diff --git a/client/cli_util.py b/client/cli_util.py index 3ca2368..c0f504a 100755 --- a/client/cli_util.py +++ b/client/cli_util.py @@ -32,7 +32,12 @@ SOFTWARE_ID_OPTION = click.option( '--software-id', '-i', "software_id_str", show_envvar=True, type=str, envvar="DIP_SOFTWARE_ID", required=True, - help='Software id (e.g. \'16db6c30-3328-11ec-ae41-ff1d66202dcc\'' + help='Software id (e.g. \'16db6c30-3328-11ec-ae41-ff1d66202dcc\')' +) +HEARTBEAT_SECONDS_OPTION = click.option( + '--heartbeat-seconds', '-r', "heartbeat_seconds", show_envvar=True, + type=int, envvar="DIP_HEARTBEAT_SECONDS", required=True, default=30, + help='Regular interval in which to ping the server' ) JSON_OUTPUT_OPTION = click.option( "--json-output", '-j', "json_output", show_envvar=True, default=False, @@ -75,7 +80,12 @@ def backend_config(control_server_str: Optional[str], static_server_str: Optiona return Ok(BackendConfig(control_server, static_server)) -def agent_config(hardware_id_str: str, control_server_str: str, static_server_str: str): +def agent_config( + hardware_id_str: str, + control_server_str: str, + static_server_str: str, + heartbeat_seconds: int +): """Construct common agent config from CLI params""" # Hardware id validation try: @@ -83,6 +93,10 @@ def agent_config(hardware_id_str: str, control_server_str: str, static_server_st except Exception as e: return Err(f"Invalid hardware id: ${e}") + # Heartbeat validation + if heartbeat_seconds <= 0: + return Err("Heartbeat seconds must be positive number") + # Backend config constructor be_config_result = backend_config(control_server_str, static_server_str) if isinstance(be_config_result, Err): @@ -90,4 +104,4 @@ def agent_config(hardware_id_str: str, control_server_str: str, static_server_st be_config = be_config_result.value # Agent configuration - return Ok(AgentConfig(hardware_id, be_config)) + return Ok(AgentConfig(hardware_id, be_config, heartbeat_seconds)) diff --git a/client/engine.py b/client/engine.py index c0d75ed..a8e9dcb 100644 --- a/client/engine.py +++ b/client/engine.py @@ -1,13 +1,20 @@ #!/usr/bin/env python """Engine which reacts to server commands and supervises microcontroller""" -from typing import TypeVar, Generic, Callable, Tuple +from typing import TypeVar, Generic, Callable, Tuple, Any +import asyncio import os from result import Result, Err, Ok from pprint import pformat import log from agent_util import AgentConfig -from protocol import UploadMessage, UploadResultMessage +from protocol import \ + PingMessage, \ + UploadMessage, \ + UploadResultMessage, \ + CommonIncomingMessage, \ + CommonOutgoingMessage +from ws import WebSocket LOGGER = log.timed_named_logger("engine") PI = TypeVar('PI') @@ -25,10 +32,41 @@ def __init__(self, agent: AgentConfig): class Engine(Generic[PI, PO]): """Implementation of generic microcontroller agent engine""" config: EngineConfig + ping_enabled: bool = True + engine_on: bool = True + active_ping_task: Any def __init__(self, config): self.config = config + async def keep_pinging( + self, + socket: WebSocket[CommonIncomingMessage, CommonOutgoingMessage] + ): + """Keeps pinging server while engine is on""" + while self.engine_on: + await socket.tx(PingMessage()) + await asyncio.sleep(self.config.agent.heartbeat_seconds) + + def start_ping(self, socket: WebSocket): + """Start sending regular heartbeat to server""" + loop = asyncio.get_event_loop() + self.active_ping_task = loop.create_task(self.keep_pinging(socket)) + + def stop_ping(self): + """Stop sending regular heartbeat to server""" + self.active_ping_task.cancel() + + def on_start(self, socket: WebSocket): + """Engine start hook""" + self.engine_on = True + self.start_ping(socket) + + def on_end(self): + """Engine end hook""" + self.engine_on = False + self.stop_ping() + # W0613: ignore unused message, because this class is abstract # R0201: ignore no-self-use, because I want this method here regardless # pylint: disable=W0613,R0201 diff --git a/client/protocol.py b/client/protocol.py index 67e86dc..dea567e 100644 --- a/client/protocol.py +++ b/client/protocol.py @@ -57,5 +57,10 @@ class FailureMessage(Generic[T]): value: T +@dataclass(frozen=True, eq=False) +class PingMessage(Generic[T]): + """Message for sending heartbeats to server""" + + CommonIncomingMessage = Union[UploadMessage] -CommonOutgoingMessage = Union[UploadResultMessage] +CommonOutgoingMessage = Union[UploadResultMessage, PingMessage] diff --git a/client/s11n.py b/client/s11n.py index 041d015..ccb9d61 100644 --- a/client/s11n.py +++ b/client/s11n.py @@ -203,6 +203,40 @@ def upload_result_message_decode(value: str) -> Result[protocol.UploadResultMess Codec(UPLOAD_RESULT_MESSAGE_DECODER, UPLOAD_RESULT_MESSAGE_ENCODER) +# protocol.PingMessage +def ping_message_encode(_: protocol.PingMessage) -> str: + """Serialize PingMessage to JSON""" + message = { + "command": "ping", + "payload": {} + } + return json.dumps(message, separators=NO_WHITESPACE_SEPERATORS) + + +def ping_message_decode(value: str) -> Result[protocol.PingMessage, CodecParseException]: + """Un-serialize PingMessage from JSON""" + json_result = json_decode(value) + if isinstance(json_result, Err): + return Err(json_result.value) + command_result = named_message_extract("ping", json_result.value) + if isinstance(command_result, Err): + return Err(command_result.value) + result = command_result.value + + if isinstance(result, dict): + return Ok(protocol.PingMessage()) + else: + return Err(CodecParseException("PingMessage must be an object")) + + +PING_MESSAGE_ENCODER: Encoder[protocol.PingMessage] = \ + Encoder(ping_message_encode) +PING_MESSAGE_DECODER: Decoder[protocol.PingMessage] = \ + Decoder(ping_message_decode) +PING_MESSAGE_CODEC: Codec[protocol.UploadResultMessage] = \ + Codec(UPLOAD_RESULT_MESSAGE_DECODER, UPLOAD_RESULT_MESSAGE_ENCODER) + + # protocol.CreateUserMessage def create_user_message_encode(value: protocol.CreateUserMessage) -> str: """Serialize CreateUserMessage to JSON""" @@ -313,10 +347,12 @@ def decode(value: str) -> Result[protocol.FailureMessage[FAILURE_GENERIC], Codec # protocol.CommonOutgoingMessage COMMON_OUTGOING_MESSAGE_ENCODER = union_encoder({ - protocol.UploadResultMessage: UPLOAD_RESULT_MESSAGE_ENCODER + protocol.UploadResultMessage: UPLOAD_RESULT_MESSAGE_ENCODER, + protocol.PingMessage: PING_MESSAGE_ENCODER }) COMMON_OUTGOING_MESSAGE_DECODER = union_decoder({ - protocol.UploadResultMessage: UPLOAD_RESULT_MESSAGE_DECODER + protocol.UploadResultMessage: UPLOAD_RESULT_MESSAGE_DECODER, + protocol.PingMessage: PING_MESSAGE_DECODER }) COMMON_OUTGOING_MESSAGE_CODEC = Codec( COMMON_OUTGOING_MESSAGE_DECODER,