Skip to content

Commit

Permalink
Add ping capability to agent
Browse files Browse the repository at this point in the history
  • Loading branch information
kshaa committed Oct 22, 2021
1 parent 220b636 commit e35b44a
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ sealed trait HardwareControlMessage {}
object HardwareControlMessage {
case class UploadSoftwareRequest(softwareId: SoftwareId) extends HardwareControlMessage
case class UploadSoftwareResult(error: Option[String]) extends HardwareControlMessage
case class Ping() extends HardwareControlMessage
}
21 changes: 13 additions & 8 deletions backend/protocol/src/main/scala/iotfrisbee/protocol/Codecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,24 @@ object Codecs {

private implicit val uploadSoftwareRequestCodec: Codec[UploadSoftwareRequest] = deriveCodec[UploadSoftwareRequest]
private implicit val uploadSoftwareResultCodec: Codec[UploadSoftwareResult] = deriveCodec[UploadSoftwareResult]
private implicit val pingCodec: Codec[Ping] = deriveCodec[Ping]
implicit val hardwareControlMessageEncoder: Encoder[HardwareControlMessage] = Encoder.instance {
case c: UploadSoftwareRequest => NamedMessage("uploadSoftwareRequest", c.asJson).asJson
case c: UploadSoftwareResult => NamedMessage("uploadSoftwareResult", c.asJson).asJson
case c: UploadSoftwareResult => NamedMessage("uploadSoftwareResult", c.asJson).asJson
case c: Ping => NamedMessage("ping", c.asJson).asJson
}
implicit val hardwareControlMessageDecoder: Decoder[HardwareControlMessage] =
Decoder[NamedMessage].emap { m => {
val codec: Option[Decoder[HardwareControlMessage]] = m.command match {
case "uploadSoftwareRequest" => Decoder[UploadSoftwareRequest].widen[HardwareControlMessage].some
case "uploadSoftwareResult" => Decoder[UploadSoftwareResult].widen[HardwareControlMessage].some
case _ => None
Decoder[NamedMessage].emap { m =>
{
val codec: Option[Decoder[HardwareControlMessage]] = m.command match {
case "uploadSoftwareRequest" => Decoder[UploadSoftwareRequest].widen[HardwareControlMessage].some
case "uploadSoftwareResult" => Decoder[UploadSoftwareResult].widen[HardwareControlMessage].some
case "ping" => Decoder[Ping].widen[HardwareControlMessage].some
case _ => None
}
codec.toRight("Unknown command").flatMap(_.decodeJson(m.payload).leftMap(_.message))
}
codec.toRight("Unknown command").flatMap(_.decodeJson(m.payload).leftMap(_.message))
}}
}

implicit def webResultSuccessCodec[A: Encoder: Decoder]: Codec[Success[A]] =
Codec.forProduct1[Success[A], A]("success")(Success(_))(_.value)
Expand Down
49 changes: 24 additions & 25 deletions backend/web/app/iotfrisbee/web/actors/HardwareControlActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import cats.effect.IO
import iotfrisbee.web.actors.HardwareControlActor.hardwareEventTopic
import cats.effect.unsafe.IORuntime
import iotfrisbee.domain.HardwareControlMessage.{UploadSoftwareRequest, UploadSoftwareResult}
import iotfrisbee.domain.HardwareControlMessage.{Ping, UploadSoftwareRequest, UploadSoftwareResult}
import akka.util.Timeout
import cats.implicits.catsSyntaxOptionId

import scala.annotation.unused
import com.typesafe.scalalogging.LazyLogging
import iotfrisbee.web.actors.HardwareControlState.{Initial, Uploading}
Expand All @@ -28,19 +29,24 @@ object HardwareControlState {
class HardwareControlActor(
pubSubMediator: ActorRef,
out: ActorRef,
hardwareId: HardwareId
hardwareId: HardwareId,
)(implicit
iort: IORuntime,
@unused timeout: Timeout
) extends Actor with LazyLogging {
@unused timeout: Timeout,
) extends Actor
with LazyLogging {
logger.info(s"Actor for hardware #${hardwareId} spawned")

val eventTopic: String = hardwareEventTopic(hardwareId)
var state: HardwareControlState = Initial

def receive: Receive = {
case controlMessage: Control => {
logger.info(s"Receiving anonymous control message: ${controlMessage}")
controlMessage match {
case Ping() => ()
case interestingMessage =>
logger.info(s"Receiving anonymous, non-ping control message: ${interestingMessage}")
}
receiveControl(None, controlMessage)
}
case Promise(inquirer, controlMessage: Control) => {
Expand All @@ -54,24 +60,14 @@ class HardwareControlActor(
def receiveControl(inquirer: Option[ActorRef], message: Control): Unit = {
val publish = IO(pubSubMediator ! Publish(eventTopic, message))
val process = message match {
case m: UploadSoftwareResult => handleUploadSoftwareResult(m)
case m: UploadSoftwareResult => handleUploadSoftwareResult(m)
case m: UploadSoftwareRequest => handleUploadSoftwareRequest(inquirer, m)
case _: Ping => IO(())
}

(publish *> process).unsafeRunSync()
}

def messageForStateAllowed(message: Control, state: HardwareControlState): Boolean = message match {
case _: UploadSoftwareRequest => state match {
case Initial => true
case _ => false
}
case _: UploadSoftwareResult => state match {
case _: Uploading => true
case _ => false
}
}

def sendToAgent(message: Control): IO[Unit] = IO(out ! message.asJson.noSpaces)
def sendToRequester(requester: ActorRef, message: Control): IO[Unit] = {
logger.info(s"Sending upload result to requester ${requester.path.toStringWithoutAddress}")
Expand All @@ -88,19 +84,23 @@ class HardwareControlActor(
case Uploading(None) => IO.pure(false)
}
// Store the current state of "uploading"
_ <- if (isUploading) IO { state = Uploading(inquirer) } else IO(())
_ <-
if (isUploading) IO { state = Uploading(inquirer) }
else IO(())
} yield ()

def handleUploadSoftwareResult(message: UploadSoftwareResult): IO[Unit] =
for {
// Send message to agent or respond with failure immediately
isUploaded <- state match {
case Uploading(Some(requester)) => sendToRequester(requester, message) *> IO.pure(true)
case Uploading(None) => IO.pure(true)
case _ => IO.pure(false)
case Uploading(None) => IO.pure(true)
case _ => IO.pure(false)
}
// Store the current state of "uploading"
_ <- if (isUploaded) IO { state = Initial } else IO(())
_ <-
if (isUploaded) IO { state = Initial }
else IO(())
} yield ()
}

Expand All @@ -111,17 +111,16 @@ object HardwareControlActor {
def props(
pubSubMediator: ActorRef,
out: ActorRef,
hardwareId: HardwareId
hardwareId: HardwareId,
)(implicit
iort: IORuntime,
timeout: Timeout
timeout: Timeout,
): Props = Props(new HardwareControlActor(pubSubMediator, out, hardwareId))

implicit val transformer: MessageFlowTransformer[Control, String] =
MessageFlowTransformer.stringMessageFlowTransformer.map(x => {
decode[Control](x).toTry.getOrElse {
throw WebSocketCloseException(
CloseMessage(Some(CloseCodes.Unacceptable), "Failed to parse message"))
throw WebSocketCloseException(CloseMessage(Some(CloseCodes.Unacceptable), "Failed to parse message"))
}
})
}
8 changes: 7 additions & 1 deletion client/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@ async def agent(
return 0

# Start communication/logic loop
LOGGER.info("Connected to control server, listening for commands")
LOGGER.info(
"Connected to control server, listening for commands, running start hook")
engine.on_start(websocket)
while True:
incoming_result = await websocket.rx()
if isinstance(incoming_result, Err) \
and isinstance(incoming_result.value, ConnectionClosedError):
LOGGER.error("Control server connection closed")
engine.on_end()
return 1
if isinstance(incoming_result, Err) \
and isinstance(incoming_result.value, CodecParseException):
LOGGER.error("Unknown command received, ignoring")
elif isinstance(incoming_result, Err):
LOGGER.error("Failed to receive message: %s", pformat(incoming_result.value, indent=4))
engine.on_end()
await websocket.disconnect()
return 1
else:
Expand All @@ -69,6 +73,7 @@ async def agent(
LOGGER.error(
"Failed to process message: %s",
pformat(outgoing_result.value, indent=4))
engine.on_end()
await websocket.disconnect()
return 1
else:
Expand All @@ -79,6 +84,7 @@ async def agent(
"Failed to transmit message (%s): %s",
pformat(outgoing_message, indent=4),
pformat(transmit_error, indent=4))
engine.on_end()
await websocket.disconnect()
return 1
else:
Expand Down
2 changes: 2 additions & 0 deletions client/agent_anvyl.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#!/usr/bin/env python
"""Anvyl FPGA client functionality."""

import asyncio
from typing import Sequence, Tuple, Any
from result import Result, Err
from engine import Engine, EngineConfig
from protocol import CommonOutgoingMessage, CommonIncomingMessage, UploadMessage
from sh import root_relative_path, outcome_sh
from agent_util import AgentConfig
import log
from ws import WebSocket

LOGGER = log.timed_named_logger("anvyl")
FIRMWARE_UPLOAD_PATH = 'static/digilent_anvyl/upload.sh'
Expand Down
1 change: 1 addition & 0 deletions client/agent_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class AgentConfig:
"""Common i.e. microcontroller-non-specific client configuration options"""
hardware_id: UUID
backend: BackendConfig
heartbeat_seconds: int
43 changes: 37 additions & 6 deletions client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import os
import sys
import shutil
from dataclasses import asdict
from uuid import UUID
import click
Expand Down Expand Up @@ -39,6 +40,7 @@ def cli_client():
@cli_util.HARDWARE_ID_OPTION
@cli_util.CONTROL_SERVER_OPTION
@cli_util.STATIC_SERVER_OPTION
@cli_util.HEARTBEAT_SECONDS_OPTION
@click.option(
'--device', '-d', "device_str",
type=str, envvar="DIP_NRF52_DEVICE",
Expand All @@ -53,17 +55,21 @@ def cli_client():
help='Baudrate for NRF52 microcontroller serial port communications. '
'E.g. baud rate of 115200'
)
def cli_agent_nrf52_upload(
def agent_nrf52_upload(
hardware_id_str: str,
control_server_str: str,
static_server_str: str,
heartbeat_seconds: int,
device_str: str,
baudrate: int
):
"""Agent for managing NRF52 microcontroller uploads"""
"""[Linux] NRF52 microcontroller upload agent"""
# Agent config constructor
agent_config_result = cli_util.agent_config(
hardware_id_str, control_server_str, static_server_str)
hardware_id_str,
control_server_str,
static_server_str,
heartbeat_seconds)
if isinstance(agent_config_result, Err):
print_error(f"Failed to construct agent config: {agent_config_result.value}")
sys.exit(1)
Expand All @@ -79,6 +85,20 @@ def cli_agent_nrf52_upload(
print_error("Baudrate must be a positive number")
sys.exit(1)

# Validate dependencies
if shutil.which("adafruit-nrfutil") is None:
print_error("'adafruit-nrfutil' must be installed")
sys.exit(1)
if shutil.which("bash") is None:
print_error("'bash' must be installed")
sys.exit(1)
if shutil.which("grep") is None:
print_error("'grep' must be installed")
sys.exit(1)
if shutil.which("tee") is None:
print_error("'tee' must be installed")
sys.exit(1)

# Construct engine
engine_config = EngineNRF52Config(agent_config, device_str, baudrate)
agent_entrypoints.supervise_agent_nrf52(agent_config, engine_config)
Expand All @@ -88,25 +108,28 @@ def cli_agent_nrf52_upload(
@cli_util.HARDWARE_ID_OPTION
@cli_util.CONTROL_SERVER_OPTION
@cli_util.STATIC_SERVER_OPTION
@cli_util.HEARTBEAT_SECONDS_OPTION
@click.option('--device', '-d', "device_str", show_envvar=True,
type=str, envvar="DIP_ANVYL_DEVICE", required=True,
help='Device user name (e.g. Anvyl).')
@click.option('--scanchainindex', '-s', "scan_chain_index", show_envvar=True,
type=int, envvar="DIP_ANVYL_SCAN_CHAIN_IDEX", required=True,
help='Scan chain index of target JTAG device (e.g. 0)')
def cli_agent_anvyl_upload(
def agent_anvyl_upload(
hardware_id_str: str,
control_server_str: str,
static_server_str: str,
heartbeat_seconds: int,
device_str: str,
scan_chain_index: int
):
"""Agent for managing Anvyl FPGA uploads"""
"""[Linux] Anvyl FPGA upload agent"""
# Agent config constructor
agent_config_result = cli_util.agent_config(
hardware_id_str,
control_server_str,
static_server_str)
static_server_str,
heartbeat_seconds)
if isinstance(agent_config_result, Err):
print_error(f"Failed to construct agent config: {agent_config_result.value}")
sys.exit(1)
Expand All @@ -117,6 +140,14 @@ def cli_agent_anvyl_upload(
print_error("Scan chain index must be a non-negative number")
sys.exit(1)

# Validate dependencies
if shutil.which("djtgcfg") is None:
print_error("'djtgcfg' must be installed")
sys.exit(1)
if shutil.which("bash") is None:
print_error("'bash' must be installed")
sys.exit(1)

# Construct engine
engine_config = EngineAnvylConfig(agent_config, device_str, scan_chain_index)
agent_entrypoints.supervise_agent_anvyl(agent_config, engine_config)
Expand Down
20 changes: 17 additions & 3 deletions client/cli_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@
SOFTWARE_ID_OPTION = click.option(
'--software-id', '-i', "software_id_str", show_envvar=True,
type=str, envvar="DIP_SOFTWARE_ID", required=True,
help='Software id (e.g. \'16db6c30-3328-11ec-ae41-ff1d66202dcc\''
help='Software id (e.g. \'16db6c30-3328-11ec-ae41-ff1d66202dcc\')'
)
HEARTBEAT_SECONDS_OPTION = click.option(
'--heartbeat-seconds', '-r', "heartbeat_seconds", show_envvar=True,
type=int, envvar="DIP_HEARTBEAT_SECONDS", required=True, default=30,
help='Regular interval in which to ping the server'
)
JSON_OUTPUT_OPTION = click.option(
"--json-output", '-j', "json_output", show_envvar=True, default=False,
Expand Down Expand Up @@ -75,19 +80,28 @@ def backend_config(control_server_str: Optional[str], static_server_str: Optiona
return Ok(BackendConfig(control_server, static_server))


def agent_config(hardware_id_str: str, control_server_str: str, static_server_str: str):
def agent_config(
hardware_id_str: str,
control_server_str: str,
static_server_str: str,
heartbeat_seconds: int
):
"""Construct common agent config from CLI params"""
# Hardware id validation
try:
hardware_id = UUID(hardware_id_str)
except Exception as e:
return Err(f"Invalid hardware id: ${e}")

# Heartbeat validation
if heartbeat_seconds <= 0:
return Err("Heartbeat seconds must be positive number")

# Backend config constructor
be_config_result = backend_config(control_server_str, static_server_str)
if isinstance(be_config_result, Err):
return Err(f"Failed to construct backend config: {be_config_result.value}")
be_config = be_config_result.value

# Agent configuration
return Ok(AgentConfig(hardware_id, be_config))
return Ok(AgentConfig(hardware_id, be_config, heartbeat_seconds))
Loading

0 comments on commit e35b44a

Please sign in to comment.