Skip to content

Commit

Permalink
Merge branch 'kveinbahs/v3.0.0' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Krisjanis Veinbahs committed Jan 7, 2022
2 parents e25a148 + 994226e commit cc7d755
Show file tree
Hide file tree
Showing 31 changed files with 1,267 additions and 925 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ object Charsets {
def fromBase64(charset: Charset = defaultCharset): Option[String] =
Try(Base64.getDecoder.decode(value).asCharsetString(charset)).toOption

def asBase64Bytes: Array[Byte] =
Base64.getDecoder.decode(value)

def asCharsetBytes(charset: Charset = defaultCharset): Array[Byte] =
value.getBytes(charset)
}
Expand Down
38 changes: 29 additions & 9 deletions backend/web/app/diptestbed/web/actors/HardwareControlActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@ package diptestbed.web.actors

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import diptestbed.domain.{HardwareId, SerialConfig, SoftwareId, HardwareControlMessage => Control}
import play.api.http.websocket.{CloseCodes, CloseMessage, WebSocketCloseException}
import play.api.http.websocket.{BinaryMessage, CloseCodes, CloseMessage, Message, TextMessage}
import play.api.mvc.WebSocket.MessageFlowTransformer
import io.circe.parser.decode
import io.circe.syntax.EncoderOps
import diptestbed.protocol.Codecs.{hardwareControlMessageDecoder, hardwareControlMessageEncoder}
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.stream.scaladsl.Flow
import cats.effect.IO
import diptestbed.web.actors.HardwareControlActor.hardwareSerialMonitorTopic
import cats.effect.unsafe.IORuntime
import diptestbed.domain.HardwareControlMessage._
import akka.util.Timeout
import akka.util.{ByteString, Timeout}
import cats.data.EitherT
import cats.implicits.toTraverseOps
import com.typesafe.scalalogging.LazyLogging
import diptestbed.domain.Charsets._
import diptestbed.domain.HardwareSerialMonitorMessage.{SerialMessageToAgent, SerialMessageToClient}
import diptestbed.web.actors.HardwareControlState.{ConfiguringMonitor, Initial, Uploading}
import diptestbed.web.actors.QueryActor.Promise
import play.api.libs.streams.AkkaStreams

import scala.concurrent.duration.{DurationInt, FiniteDuration}

sealed trait HardwareControlState {}
Expand Down Expand Up @@ -99,7 +104,12 @@ class HardwareControlActor(
scheduleHeartbeatTest().unsafeRunAsync(_ => ())

def sendToAgent(message: Control): IO[Unit] =
IO(out ! message.asJson.noSpaces)
IO(out ! (message match {
case m: Control.SerialMonitorMessageToAgent =>
val dehydrated = m.message.base64Bytes.asBase64Bytes
BinaryMessage(ByteString.fromArray(dehydrated))
case m: Control => TextMessage(m.asJson.noSpaces)
}))
def sendToRequester(requester: ActorRef, message: Control): IO[Unit] =
IO(requester ! message)

Expand Down Expand Up @@ -209,12 +219,22 @@ object HardwareControlActor extends ActorHelper {
iort: IORuntime,
): Props = Props(new HardwareControlActor(pubSubMediator, out, hardwareId))

implicit val transformer: MessageFlowTransformer[Control, String] =
MessageFlowTransformer.stringMessageFlowTransformer.map(x => {
decode[Control](x).toTry.getOrElse {
throw WebSocketCloseException(CloseMessage(Some(CloseCodes.Unacceptable), "Failed to parse message"))
}
})
def transformer(byteHandler: ByteString => Either[Control, Message]): MessageFlowTransformer[Control, Message] =
(flow: Flow[Control, Message, _]) => {
AkkaStreams.bypassWith[Message, Control, Message](Flow[Message].collect {
case TextMessage(text) => decode[Control](text).swap.map(e =>
CloseMessage(Some(CloseCodes.Unacceptable), e.getMessage))
case BinaryMessage(bytes: ByteString) => byteHandler(bytes)
})(flow)
}

val controlTransformer = transformer((bytes: ByteString) =>
Left(SerialMonitorMessageToClient(SerialMessageToClient(
bytes.toArray.asCharsetString(defaultCharset).toBase64()))))

val listenerTransformer = transformer((bytes: ByteString) =>
Left(SerialMonitorMessageToAgent(SerialMessageToAgent(
bytes.toArray.asCharsetString(defaultCharset).toBase64()))))

def requestSerialMonitor(
hardwareId: HardwareId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
package diptestbed.web.actors

import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
import diptestbed.domain.{
HardwareControlMessage,
HardwareId,
SerialConfig,
HardwareSerialMonitorMessage => MonitorMessage,
}
import diptestbed.domain.{HardwareControlMessage, HardwareId, SerialConfig, HardwareSerialMonitorMessage => MonitorMessage}
import io.circe.syntax.EncoderOps
import diptestbed.protocol.Codecs._
import akka.cluster.pubsub.DistributedPubSubMediator.{Subscribe, SubscribeAck}
import cats.effect.IO
import diptestbed.web.actors.HardwareControlActor._
import cats.effect.unsafe.IORuntime
import diptestbed.domain.HardwareControlMessage._
import akka.util.Timeout

import akka.util.{ByteString, Timeout}
import scala.annotation.unused
import com.typesafe.scalalogging.LazyLogging
import diptestbed.domain.HardwareSerialMonitorMessage.MonitorUnavailable
import io.circe.Encoder
import diptestbed.domain.Charsets.{ByteOps, StringOps, defaultCharset}
import diptestbed.domain.HardwareSerialMonitorMessage.{MonitorUnavailable, SerialMessageToAgent}
import play.api.http.websocket._

import scala.concurrent.duration.DurationInt
import io.circe.parser.decode

Expand Down Expand Up @@ -54,8 +47,12 @@ class HardwareSerialMonitorListenerActor(
.void
.unsafeRunAsync(_ => ())

def sendToListener[A: Encoder](message: A): IO[Unit] =
IO(out ! TextMessage(message.asJson.noSpaces))
def sendToListener(message: MonitorMessage): IO[Unit] =
IO(out ! (message match {
case m: MonitorMessage.SerialMessageToClient =>
BinaryMessage(ByteString.fromArray(m.base64Bytes.asBase64Bytes))
case m => TextMessage(m.asJson.noSpaces)
}))

def killListener(reason: String): IO[Unit] = {
// Stop websocket listener
Expand All @@ -71,7 +68,7 @@ class HardwareSerialMonitorListenerActor(
(sendToListener(message) >> killListener("Monitor not valid anymore"))
.unsafeRunAsync(_ => ())
case serialMessage: SerialMonitorMessageToClient =>
val message: HardwareControlMessage = serialMessage
val message: MonitorMessage = MonitorMessage.SerialMessageToClient(serialMessage.message.base64Bytes)
sendToListener(message).unsafeRunAsync(_ => ())
case _: SerialMonitorListenersHeartbeatPing =>
sendToHardwareActor(hardwareId, SerialMonitorListenersHeartbeatPong()).value.unsafeRunAsync(_ => ())
Expand All @@ -81,6 +78,11 @@ class HardwareSerialMonitorListenerActor(
sendToHardwareActor(hardwareId, serialMessage).value.unsafeRunAsync(_ => ())
case _ => ()
}
case binary: BinaryMessage =>
val hydrated = binary.data.toArray.asCharsetString(defaultCharset).toBase64()
val message = HardwareControlMessage.SerialMonitorMessageToAgent(
SerialMessageToAgent(hydrated))
sendToHardwareActor(hardwareId, message).value.unsafeRunAsync(_ => ())
case _: SubscribeAck =>
logger.info(s"Serial monitor listener for hardware #${hardwareId} subscribed")
case _ => ()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package diptestbed.web.controllers

import akka.actor.{ActorRef, ActorSystem}

import scala.annotation.unused
import akka.stream.Materializer
import akka.util.Timeout
Expand All @@ -14,11 +13,12 @@ import diptestbed.domain.{HardwareControlMessage, HardwareId, SerialConfig, Soft
import diptestbed.protocol._
import diptestbed.protocol.Codecs._
import diptestbed.protocol.WebResult._
import diptestbed.web.actors.HardwareControlActor.{hardwareActor, transformer}
import diptestbed.web.actors.HardwareControlActor.{controlTransformer, hardwareActor}
import diptestbed.web.actors.{BetterActorFlow, HardwareControlActor, HardwareSerialMonitorListenerActor}
import diptestbed.web.ioControls.PipelineOps._
import diptestbed.web.ioControls._
import play.api.http.websocket.{Message => WebsocketMessage}
import play.api.mvc.WebSocket.MessageFlowTransformer
import play.api.mvc._
import scala.concurrent.duration.DurationInt

Expand Down Expand Up @@ -74,7 +74,8 @@ class HardwareController(
}

def controlHardware(hardwareId: HardwareId): WebSocket = {
WebSocket.accept[HardwareControlMessage, String](_ => {
implicit val transformer: MessageFlowTransformer[HardwareControlMessage, WebsocketMessage] = controlTransformer
WebSocket.accept[HardwareControlMessage, WebsocketMessage](_ => {
BetterActorFlow.actorRef(
subscriber => HardwareControlActor.props(pubSubMediator, subscriber, hardwareId),
maybeName = hardwareActor(hardwareId).some,
Expand Down
8 changes: 4 additions & 4 deletions client/Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,17 @@

### Built client
- Run `./dist/dip_client --help` to print built client CLI usage definition

### File tree
- `dip_client.py` is just an entrypoint for the CLI interface from `cli.py`
- `cli.py` defines a CLI interface for all possible DIP client commands:
- Generic one-off client commands are defined in `backend_util.py`
- Microcontroller agent commands are defined in `agent_entrypoints.py`
- `agent_entrypoints.py` prepare and run a configured agent `agent.py`
- `agent.py` runs using an `AgentConfig`, `Engine`, serialization `Encoder`/`Decoder`
- `agent_util.py` defines a base `AgentConfig`
- `engine.py` defines a base `Engine`
- `agent_*.py` define custom `Engine`s and `AgentConfig`s
- Agents use `ws.py` to exchange WebSocket messages
- Agents use `s11n_*.py` to serialize messages
- Agents use `Engine` for stateful actions & resulting messages
10 changes: 6 additions & 4 deletions client/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
from agent_util import AgentConfig

LOGGER = log.timed_named_logger("agent")
RAW = TypeVar('RAW')
SERIALIZABLE = TypeVar('SERIALIZABLE')
PI = TypeVar('PI')
PO = TypeVar('PO')


async def agent(
config: AgentConfig,
encoder: Encoder[PO],
decoder: Decoder[PI],
engine: Engine[PI, PO]) -> int:
encoder: Encoder[RAW, SERIALIZABLE, PO],
decoder: Decoder[RAW, SERIALIZABLE, PI],
engine: Engine[SERIALIZABLE, PI, PO]) -> int:
"""Supervising client, which connects to a websocket, listens
to commands from server, passes them to an client-specific engine"""

Expand Down Expand Up @@ -62,8 +64,8 @@ async def agent(
await websocket.disconnect()
return 1
else:
LOGGER.debug("Message received: %s", pformat(incoming_result, indent=4))
incoming_message = incoming_result.value
LOGGER.debug("Message received: %s", pformat(incoming_message, indent=4))
LOGGER.debug("Message processing started")
outgoing_result = engine.process(incoming_message)
LOGGER.debug("Message processing stopped")
Expand Down
5 changes: 3 additions & 2 deletions client/agent_anvyl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
"""Anvyl FPGA client functionality."""

from typing import Sequence, Tuple, Any
from typing import Sequence, Tuple, Any, TypeVar
from result import Result, Err
from engine import Engine, EngineConfig
from protocol import \
Expand All @@ -17,6 +17,7 @@

LOGGER = log.timed_named_logger("anvyl")
FIRMWARE_UPLOAD_PATH = 'static/digilent_anvyl/upload.sh'
SERIALIZABLE = TypeVar('SERIALIZABLE')


def firmware_upload_args(
Expand Down Expand Up @@ -49,7 +50,7 @@ def __init__(self, agent: AgentConfig, device_name: str, device_path: str, scan_
self.scan_chain_index = scan_chain_index


class EngineAnvyl(Engine[CommonIncomingMessage, Any]):
class EngineAnvyl(Engine[SERIALIZABLE, CommonIncomingMessage, Any]):
"""Engine for Anvyl microcontroller"""
config: EngineAnvylConfig

Expand Down
18 changes: 10 additions & 8 deletions client/agent_entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import TypeVar
import sys
from pprint import pformat
import s11n
import s11n_hybrid
from codec import Encoder, Decoder
from engine import Engine
from agent import AgentConfig, agent
Expand All @@ -15,15 +15,17 @@

LOGGER = log.timed_named_logger("entrypoint")

RAW = TypeVar('RAW')
SERIALIZABLE = TypeVar('SERIALIZABLE')
PI = TypeVar('PI')
PO = TypeVar('PO')


def supervise_agent(
config: AgentConfig,
encoder: Encoder[PO],
decoder: Decoder[PI],
engine: Engine[PI, PO]):
encoder: Encoder[RAW, SERIALIZABLE, PO],
decoder: Decoder[RAW, SERIALIZABLE, PI],
engine: Engine[SERIALIZABLE, PI, PO]):
"""Run any given, configured async client"""
LOGGER.debug("Configuring client: %s", pformat(config, indent=4))
async_agent = agent(config, encoder, decoder, engine)
Expand All @@ -36,14 +38,14 @@ def supervise_agent(
def supervise_agent_nrf52(agent_config: AgentConfig, engine_config: EngineNRF52Config):
"""Initiate NRF52 microcontroller client"""
engine = EngineNRF52(engine_config)
encoder = s11n.COMMON_OUTGOING_MESSAGE_ENCODER
decoder = s11n.COMMON_INCOMING_MESSAGE_DECODER
encoder = s11n_hybrid.COMMON_OUTGOING_MESSAGE_ENCODER
decoder = s11n_hybrid.COMMON_INCOMING_MESSAGE_DECODER
supervise_agent(agent_config, encoder, decoder, engine)


def supervise_agent_anvyl(agent_config: AgentConfig, engine_config: EngineAnvylConfig):
"""Initiate Anvyl FPGA client"""
engine = EngineAnvyl(engine_config)
encoder = s11n.COMMON_OUTGOING_MESSAGE_ENCODER
decoder = s11n.COMMON_INCOMING_MESSAGE_DECODER
encoder = s11n_hybrid.COMMON_OUTGOING_MESSAGE_ENCODER
decoder = s11n_hybrid.COMMON_INCOMING_MESSAGE_DECODER
supervise_agent(agent_config, encoder, decoder, engine)
5 changes: 3 additions & 2 deletions client/agent_nrf52.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
"""NRF52 micro-controller client functionality."""

from typing import Sequence, Tuple, Any
from typing import Sequence, Tuple, Any, TypeVar
from result import Result, Err
from engine import Engine, EngineConfig
from protocol import \
Expand All @@ -17,6 +17,7 @@

LOGGER = log.timed_named_logger("nrf52")
FIRMWARE_UPLOAD_PATH = 'static/adafruit_nrf52/upload.sh'
SERIALIZABLE = TypeVar('SERIALIZABLE')


def firmware_upload_args(
Expand Down Expand Up @@ -47,7 +48,7 @@ def __init__(self, agent: AgentConfig, device: str, baudrate: int):
self.baudrate = baudrate


class EngineNRF52(Engine[CommonIncomingMessage, Any]):
class EngineNRF52(Engine[SERIALIZABLE, CommonIncomingMessage, Any]):
"""Engine for NRF52 microcontroller"""
config: EngineNRF52Config

Expand Down
4 changes: 2 additions & 2 deletions client/backend_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ class Hardware:
"""Class containing hardware information"""
id: UUID
name: str
owner_uuid: str
owner_id: UUID


@dataclass(frozen=True, eq=False)
class Software:
"""Class containing software information"""
id: UUID
name: str
owner_uuid: str
owner_id: UUID
Loading

0 comments on commit cc7d755

Please sign in to comment.