Skip to content

Commit

Permalink
Add some hardcoded things in configs
Browse files Browse the repository at this point in the history
  • Loading branch information
Krisjanis Veinbahs committed Mar 15, 2022
1 parent 6f34235 commit c059796
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package diptestbed.domain

import java.util.UUID
import scala.concurrent.duration.{DurationInt, FiniteDuration}

case class DIPTestbedConfig(
testConfig: TestConfig,
Expand All @@ -9,7 +10,12 @@ case class DIPTestbedConfig(
basePath: String,
adminUsername: Option[String],
adminPassword: Option[String],
adminEnabled: Boolean
adminEnabled: Boolean,
heartbeatingTime: FiniteDuration,
heartbeatingTimeout: FiniteDuration,
heartbeatsInCycle: Int,
maxStreamTime: FiniteDuration,
cameraInitTimeout: FiniteDuration
) {
def makeTitle(contentTitle: String) = s"${contentTitle} · ${title}"

Expand Down Expand Up @@ -40,9 +46,14 @@ object DIPTestbedConfig {
clusterized: Option[Boolean],
title: String,
basePath: String,
adminUsername: Option[String],
adminPassword: Option[String],
adminEnabled: Option[Boolean]
adminUsername: Option[String] = None,
adminPassword: Option[String] = None,
adminEnabled: Option[Boolean] = None,
heartbeatingTime: Option[FiniteDuration] = None,
heartbeatingTimeout: Option[FiniteDuration] = None,
heartbeatsInCycle: Option[Int] = None,
maxStreamTime: Option[FiniteDuration] = None,
cameraInitTimeout: Option[FiniteDuration] = None
): DIPTestbedConfig =
DIPTestbedConfig(
testConfig,
Expand All @@ -51,5 +62,11 @@ object DIPTestbedConfig {
basePath.stripSuffix("/"),
adminUsername,
adminPassword,
adminEnabled.getOrElse(false))
adminEnabled.getOrElse(false),
heartbeatingTime.getOrElse(10.seconds),
heartbeatingTimeout.getOrElse(8.seconds),
heartbeatsInCycle.getOrElse(3),
maxStreamTime.getOrElse(15.minutes),
cameraInitTimeout.getOrElse(3.seconds)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ object HardwareCameraHeartbeatProjection {
val stop = send(previousState.self, StopBroadcasting())
val expectTimeout = previousState.listenerHeartbeatConfig.waitTimeout
val scheduleTimeout = previousState.listenerHeartbeatConfig.nextTimeout
val heartbeatsInCycle = previousState.listenerHeartbeatConfig.heartbeatsInCycle
event match {
case Started() => Some(start)
case CameraListenerHeartbeatStarted() => Some(expectHeartbeats(request, finish, expectTimeout))
case CameraListenerHeartbeatStarted() => Some(expectHeartbeats(request, finish, expectTimeout, heartbeatsInCycle))
case CameraListenerHeartbeatFinished() => Some(
if (previousState.listenerHeartbeatsReceived == 0) stop
else implicitly[Temporal[F]].sleep(scheduleTimeout) >> start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import diptestbed.domain.HardwareCameraMessage.{CameraListenersHeartbeatPong, Ca
import scala.concurrent.duration.{DurationInt, FiniteDuration}

case class HardwareCameraListenerState[F[_], T](
auth: Option[User],
self: T,
pubSubMediator: T,
hardwareId: HardwareId,
enqueue: Array[Byte] => F[Unit],
fail: Exception => F[Unit],
complete: F[Unit],
maxLifetime: Option[FiniteDuration],
initCheckTimeout: FiniteDuration = 3.seconds,
initCheckTimeout: FiniteDuration,
initialized: Boolean = false,
ending: Boolean = false,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ object HardwareControlHeartbeatProjection {
val stop = send(previousState.agent, SerialMonitorRequestStop())
val expectTimeout = previousState.listenerHeartbeatConfig.waitTimeout
val scheduleTimeout = previousState.listenerHeartbeatConfig.nextTimeout
val heartbeatsInCycle = previousState.listenerHeartbeatConfig.heartbeatsInCycle
event match {
case MonitorConfigurationFinished(_, None) => Some(start)
case ListenerHeartbeatStarted() => Some(expectHeartbeats(request, finish, expectTimeout))
case ListenerHeartbeatStarted() => Some(expectHeartbeats(request, finish, expectTimeout, heartbeatsInCycle))
case ListenerHeartbeatFinished() => Some(
if (previousState.listenerHeartbeatsReceived == 0) stop
else implicitly[Temporal[F]].sleep(scheduleTimeout) >> start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@ case class HardwareListenerHeartbeatConfig(
nextTimeout: FiniteDuration,
// Time to wait for heartbeats
waitTimeout: FiniteDuration,
// How many heartbeat requests are issued in a single cycle
heartbeatsInCycle: Int
)

object HardwareListenerHeartbeatConfig {
def default(): HardwareListenerHeartbeatConfig = HardwareListenerHeartbeatConfig(10.seconds, 8.seconds)
def default(): HardwareListenerHeartbeatConfig = HardwareListenerHeartbeatConfig(10.seconds, 8.seconds, 3)
def fromConfig(appConfig: DIPTestbedConfig): HardwareListenerHeartbeatConfig =
HardwareListenerHeartbeatConfig(
appConfig.heartbeatingTimeout,
appConfig.heartbeatingTime,
appConfig.heartbeatsInCycle)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ object HeartbeatProjection {
def expectHeartbeats[F[_]: Temporal, A](
requestHeartbeats: F[Unit],
finishHeartbeats: F[Unit],
expectTimeout: FiniteDuration
expectTimeout: FiniteDuration,
heartbeatsInCycle: Int
): F[Unit] = {
def requestAndWait(time: FiniteDuration) =
requestHeartbeats >> implicitly[Temporal[F]].sleep(time)

requestAndWait(expectTimeout / 3) >>
requestAndWait(expectTimeout / 3) >>
requestAndWait(expectTimeout / 3) >>
finishHeartbeats
def partialRequestAndWait: F[Unit] = requestAndWait(expectTimeout / heartbeatsInCycle)

List.fill(heartbeatsInCycle)(partialRequestAndWait).sequence.void >> finishHeartbeats
}
}
11 changes: 9 additions & 2 deletions backend/web/app/diptestbed/web/actors/HardwareCameraActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import io.circe.syntax._
import diptestbed.protocol.Codecs._

class HardwareCameraActor(
val appConfig: DIPTestbedConfig,
val pubSubMediator: ActorRef,
val camera: ActorRef,
val hardwareIds: List[HardwareId],
Expand All @@ -28,7 +29,12 @@ class HardwareCameraActor(
val startMessage: Option[HardwareCameraMessage] = Some(StartLifecycle())
val endMessage: Option[HardwareCameraMessage] = Some(EndLifecycle())
var state: HardwareCameraState[ActorRef] =
HardwareCameraState.initial(self, camera, pubSubMediator, hardwareIds, HardwareListenerHeartbeatConfig.default())
HardwareCameraState.initial(
self,
camera,
pubSubMediator,
hardwareIds,
HardwareListenerHeartbeatConfig.fromConfig(appConfig))

override def receiveMessage: PartialFunction[Any, (Some[ActorRef], HardwareCameraMessage)] = {
case message: HardwareCameraMessage => (Some(sender()), message)
Expand All @@ -48,12 +54,13 @@ class HardwareCameraActor(

object HardwareCameraActor {
def props(
appConfig: DIPTestbedConfig,
pubSubMediator: ActorRef,
out: ActorRef,
hardwareIds: List[HardwareId],
)(implicit
iort: IORuntime,
): Props = Props(new HardwareCameraActor(pubSubMediator, out, hardwareIds))
): Props = Props(new HardwareCameraActor(appConfig, pubSubMediator, out, hardwareIds))

val cameraControlTransformer: MessageFlowTransformer[HardwareCameraMessage, HardwareCameraMessage] =
websocketFlowTransformer({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import scala.concurrent.duration.DurationInt
import scala.util.Try

class HardwareCameraListenerActor(
val appConfig: DIPTestbedConfig,
val pubSubMediator: ActorRef,
val hardwareId: HardwareId,
val queue: SourceQueueWithComplete[Array[Byte]]
Expand All @@ -32,13 +33,15 @@ class HardwareCameraListenerActor(
val endMessage: Option[HardwareCameraListenerMessage] = Some(EndLifecycle())
var state: HardwareCameraListenerState[IO, ActorRef] =
HardwareCameraListenerState(
None,
self,
pubSubMediator,
hardwareId,
bytes => IO.fromFuture(IO(queue.offer(bytes))).void,
exception => IO(queue.fail(exception)) >> IO(self ! PoisonPill),
IO(queue.complete()),
Some(15.minutes))
Some(appConfig.maxStreamTime),
appConfig.cameraInitTimeout)

override def receiveMessage: PartialFunction[Any, (Some[ActorRef], HardwareCameraListenerMessage)] = {
case message: HardwareCameraListenerMessage =>
Expand All @@ -59,14 +62,16 @@ class HardwareCameraListenerActor(

object HardwareCameraListenerActor {
def props(
appConfig: DIPTestbedConfig,
pubSubMediator: ActorRef,
hardwareId: HardwareId,
queue: SourceQueueWithComplete[Array[Byte]]
)(implicit
iort: IORuntime,
): Props = Props(new HardwareCameraListenerActor(pubSubMediator, hardwareId, queue))
): Props = Props(new HardwareCameraListenerActor(appConfig, pubSubMediator, hardwareId, queue))

def spawnCameraSource(
appConfig: DIPTestbedConfig,
pubSubMediator: ActorRef,
hardwareId: HardwareId)(implicit
mat: Materializer,
Expand All @@ -84,7 +89,7 @@ object HardwareCameraListenerActor {
// Spawn camera listener actor which forwards to the queue
(queue, source) = prematerialized
_ <- EitherT(IO {
Try(actorSystem.actorOf(HardwareCameraListenerActor.props(pubSubMediator, hardwareId, queue))).toEither
Try(actorSystem.actorOf(HardwareCameraListenerActor.props(appConfig, pubSubMediator, hardwareId, queue))).toEither
}).leftMap(error => s"Failed to connect to camera, reason: ${error}")

_ = source.recover(e => {
Expand Down
10 changes: 8 additions & 2 deletions backend/web/app/diptestbed/web/actors/HardwareControlActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import diptestbed.web.actors.ActorHelper._
import diptestbed.web.actors.QueryActor.Promise

class HardwareControlActor(
val appConfig: DIPTestbedConfig,
val pubSubMediator: ActorRef,
val agent: ActorRef,
val hardwareId: HardwareId,
Expand All @@ -33,7 +34,11 @@ class HardwareControlActor(
val startMessage: Option[HardwareControlMessage] = Some(StartLifecycle())
val endMessage: Option[HardwareControlMessage] = Some(EndLifecycle())
var state: HardwareControlState[ActorRef] =
HardwareControlState.initial(self, agent, hardwareId, HardwareListenerHeartbeatConfig.default())
HardwareControlState.initial(
self,
agent,
hardwareId,
HardwareListenerHeartbeatConfig.fromConfig(appConfig))

override def receiveMessage: PartialFunction[Any, (Some[ActorRef], HardwareControlMessage)] = {
case message: HardwareControlMessage =>
Expand All @@ -51,12 +56,13 @@ class HardwareControlActor(

object HardwareControlActor {
def props(
appConfig: DIPTestbedConfig,
pubSubMediator: ActorRef,
out: ActorRef,
hardwareId: HardwareId,
)(implicit
iort: IORuntime,
): Props = Props(new HardwareControlActor(pubSubMediator, out, hardwareId))
): Props = Props(new HardwareControlActor(appConfig, pubSubMediator, out, hardwareId))

val controlTransformer: MessageFlowTransformer[HardwareControlMessage, HardwareControlMessage] =
websocketFlowTransformer(
Expand Down
7 changes: 7 additions & 0 deletions backend/web/app/diptestbed/web/config/DIPTestbedConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import diptestbed.web.config.TestConfig._
import com.typesafe.config.Config
import play.api.{ConfigLoader, Configuration}

import scala.concurrent.duration.FiniteDuration

object DIPTestbedConfig {
implicit val dipTestbedConfigLoader: ConfigLoader[DomainDIPTestbedConfig] = (rootConfig: Config, path: String) => {
val config: Configuration = Configuration(rootConfig.getConfig(path))
Expand All @@ -17,6 +19,11 @@ object DIPTestbedConfig {
config.get[Option[String]]("adminUsername"),
config.get[Option[String]]("adminPassword"),
config.get[Option[Boolean]]("adminEnabled"),
config.get[Option[FiniteDuration]]("heartbeatingTime"),
config.get[Option[FiniteDuration]]("heartbeatingTimeout"),
config.get[Option[Int]]("heartbeatsInCycle"),
config.get[Option[FiniteDuration]]("maxStreamTime"),
config.get[Option[FiniteDuration]]("cameraInitTimeout"),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import akka.util.Timeout
import cats.data.EitherT
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import cats.syntax._
import cats.implicits._
import diptestbed.database.driver.DatabaseOutcome.DatabaseException
import diptestbed.database.services.{HardwareService, UserService}
import diptestbed.domain.{DIPTestbedConfig, Hardware, HardwareCameraMessage, HardwareControlMessage, HardwareId, HardwareSerialMonitorMessage, SerialConfig, SoftwareId}
import diptestbed.domain.{DIPTestbedConfig, HardwareCameraMessage, HardwareControlMessage, HardwareId, HardwareSerialMonitorMessage, SerialConfig, SoftwareId}
import diptestbed.protocol._
import diptestbed.protocol.Codecs._
import diptestbed.protocol.WebResult._
Expand All @@ -24,7 +22,6 @@ import diptestbed.web.ioControls.PipelineOps._
import diptestbed.web.ioControls._
import play.api.mvc.WebSocket.MessageFlowTransformer
import play.api.mvc._

import scala.concurrent.duration.DurationInt

class ApiHardwareController(
Expand Down Expand Up @@ -84,7 +81,7 @@ class ApiHardwareController(
controlTransformer
WebSocket.accept[HardwareControlMessage, HardwareControlMessage](_ => {
BetterActorFlow.actorRef(
subscriber => HardwareControlActor.props(pubSubMediator, subscriber, hardwareId),
subscriber => HardwareControlActor.props(appConfig, pubSubMediator, subscriber, hardwareId),
maybeName = hardwareId.actorId().text().some,
)
})
Expand Down Expand Up @@ -123,7 +120,7 @@ class ApiHardwareController(
cameraControlTransformer
WebSocket.accept[HardwareCameraMessage, HardwareCameraMessage](_ => {
BetterActorFlow.actorRef(subscriber =>
HardwareCameraActor.props(pubSubMediator, subscriber, hardwareIds),
HardwareCameraActor.props(appConfig, pubSubMediator, subscriber, hardwareIds),
)
})
}
Expand All @@ -146,7 +143,7 @@ class ApiHardwareController(
result <-
request.headers.get("Range") match {
case None => EitherT.fromEither[IO](Right[Result, Result](withStreamHeaders(Ok(""))))
case Some(_) => HardwareCameraListenerActor.spawnCameraSource(pubSubMediator, hardwareId).bimap[Result, Result](
case Some(_) => HardwareCameraListenerActor.spawnCameraSource(appConfig, pubSubMediator, hardwareId).bimap[Result, Result](
errorMessage => Failure(errorMessage).withHttpStatus(BAD_REQUEST),
source =>
withStreamHeaders(Ok.chunked(source)),
Expand Down
5 changes: 5 additions & 0 deletions backend/web/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ diptestbed = {
adminUsername = "admin"
adminPassword = "admin"
adminEnabled = true
heartbeatingTime = "8 seconds"
heartbeatingTimeout = "10 seconds"
heartbeatsInCycle = 5
maxStreamTime = "15 minutes"
cameraInitTimeout = "3 seconds"
}

# Play
Expand Down

0 comments on commit c059796

Please sign in to comment.