Skip to content

Commit

Permalink
Add auth to websocket connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Krisjanis Veinbahs committed Mar 17, 2022
1 parent c059796 commit 1942ba8
Show file tree
Hide file tree
Showing 54 changed files with 825 additions and 222 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package diptestbed.domain

import cats.Monad
import cats.implicits._
import diptestbed.domain.HardwareCameraEvent._
import diptestbed.domain.HardwareCameraMessage._

object HardwareCameraAuthProjection {
def project[F[_]: Monad, A](
auth: (String, String) => F[Either[String, User]],
send: (A, HardwareCameraMessage) => F[Unit]
)(state: HardwareCameraState[A], event: HardwareCameraEvent[A]): Option[F[Unit]] = {
event match {
case CheckingAuth(username, password) =>
Some(auth(username, password).flatMap(_.bimap(
error => send(state.self, AuthFailure(error)),
user => send(state.self, AuthSuccess(user))
).merge))
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ package diptestbed.domain

sealed trait HardwareCameraError
object HardwareCameraError {
case class NoReaction() extends HardwareCameraError
case class RequestInquirerObligatory() extends HardwareCameraError
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ object HardwareCameraEvent {
// Lifecycle
case class Started[A]() extends HardwareCameraEvent[A]
case class Ended[A]() extends HardwareCameraEvent[A]
case class CheckingAuth[A](username: String, password: String) extends HardwareCameraEvent[A]
case class AuthSucceeded[A](user: User) extends HardwareCameraEvent[A]
case class AuthFailed[A](reason: String) extends HardwareCameraEvent[A]

// Camera content
case class ChunkReceived[A](chunk: Array[Byte]) extends HardwareCameraEvent[A]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object HardwareCameraEventEngine {
def onMessage[A, F[_]: Temporal: Parallel](
lastState: => HardwareCameraState[A],
die: F[Unit],
auth: (String, String) => F[Either[String, User]],
send: (A, Any) => F[Unit],
publish: (PubSubTopic, Any) => F[Unit],
subscriptionMessage: (PubSubTopic, A) => Any,
Expand All @@ -20,6 +21,7 @@ object HardwareCameraEventEngine {
List(
HardwareCameraMailProjection.project(die, send, publish, subscriptionMessage),
HardwareCameraHeartbeatProjection.project(send, publish),
HardwareCameraAuthProjection.project(auth, send),
),
),
HardwareCameraEventStateProjection.project,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ object HardwareCameraEventStateProjection {
case _ => previousState
}

case AuthSucceeded(user) => previousState.copy(auth = Some(user))
case AuthFailed(_) => previousState.copy(auth = None)

case Subscription(_) => previousState.copy(broadcasting = true)
case BroadcastStopped() => previousState.copy(broadcasting = false, initialChunks = List.empty)

case CameraListenerHeartbeatStarted() => previousState.copy(listenerHeartbeatsReceived = 0)
case CameraListenerHeartbeatReceived() =>
previousState.copy(listenerHeartbeatsReceived = previousState.listenerHeartbeatsReceived + 1)

case _: CameraPinged[A] | _: CameraListenerHeartbeatFinished[A] | _: CameraDropped[A] | _: Started[A] | _: Ended[A] =>
case _: CheckingAuth[A] | _: CameraPinged[A] | _: CameraListenerHeartbeatFinished[A] | _: CameraDropped[A] | _: Started[A] | _: Ended[A] =>
previousState
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect.Temporal
import cats.implicits._
import diptestbed.domain.HardwareCameraEvent._
import diptestbed.domain.HardwareCameraListenerMessage.{ListenerCameraChunk, ListenerCameraDropped}
import diptestbed.domain.HardwareCameraMessage.{CameraSubscription, StopBroadcasting}
import diptestbed.domain.HardwareCameraMessage._

object HardwareCameraMailProjection {
def project[F[_]: Temporal, A](
Expand All @@ -14,8 +14,13 @@ object HardwareCameraMailProjection {
subscriptionMessage: (PubSubTopic, A) => Any
)(state: HardwareCameraState[A], event: HardwareCameraEvent[A]): Option[F[Unit]] = {
event match {
case _: CameraPinged[A] | _: CameraListenerHeartbeatEvent[A] =>
case _: CheckingAuth[A] | _: CameraPinged[A] | _: CameraListenerHeartbeatEvent[A] =>
None
case AuthSucceeded(_) => Some(
send(state.camera, AuthResult(None)) >>
send(state.self, StartLifecycle()))
case AuthFailed(reason) => Some(send(state.camera, AuthResult(Some(reason))))

case BroadcastStopped() => Some(send(state.camera, StopBroadcasting()))
case Started() =>
Some(state.hardwareIds.map(hardwareId =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package diptestbed.domain
sealed abstract class HardwareCameraMessage
sealed abstract class HardwareCameraMessageExternal extends HardwareCameraMessage
object HardwareCameraMessage {
case class AuthRequest(username: String, password: String) extends HardwareCameraMessageExternal
case class AuthSuccess(user: User) extends HardwareCameraMessage
case class AuthFailure(reason: String) extends HardwareCameraMessage
case class AuthResult(error: Option[String]) extends HardwareCameraMessageExternal

case class StartLifecycle() extends HardwareCameraMessage
case class EndLifecycle() extends HardwareCameraMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,27 @@ object HardwareCameraMessageHandler {
def handle[A](
inquirer: => Option[A],
)(@unused state: HardwareCameraState[A], message: HardwareCameraMessage): HardwareCameraResult[A] =
message match {
case _: Ping => Right(NonEmptyList.of(CameraPinged()))
case _: StartLifecycle => Right(NonEmptyList.of(Started()))
case _: EndLifecycle => Right(NonEmptyList.of(Ended()))
case CameraSubscription() =>
NonEmptyList.fromList(inquirer.map(Subscription(_)).toList).toRight(RequestInquirerObligatory())
case StopBroadcasting() => Right(NonEmptyList.of(BroadcastStopped()))
case CameraUnavailable(reason) => Right(NonEmptyList.of(CameraDropped(reason)))
case CameraChunk(chunk) => Right(NonEmptyList.of(ChunkReceived(chunk)))
case CameraListenersHeartbeatStart() => Right(NonEmptyList.of(CameraListenerHeartbeatStarted()))
case CameraListenersHeartbeatPong() => Right(NonEmptyList.of(CameraListenerHeartbeatReceived()))
case CameraListenersHeartbeatFinish() => Right(NonEmptyList.of(CameraListenerHeartbeatFinished()))
state.auth match {
case None =>
message match {
case m: AuthRequest => Right(NonEmptyList.of(CheckingAuth(m.username, m.password)))
case m: AuthSuccess => Right(NonEmptyList.of(AuthSucceeded(m.user)))
case m: AuthFailure => Right(NonEmptyList.of(AuthFailed(m.reason)))
case _ => Left(NoReaction())
}
case Some(_) =>
message match {
case _: Ping => Right(NonEmptyList.of(CameraPinged()))
case _: StartLifecycle => Right(NonEmptyList.of(Started()))
case _: EndLifecycle => Right(NonEmptyList.of(Ended()))
case CameraSubscription() =>
NonEmptyList.fromList(inquirer.map(Subscription(_)).toList).toRight(RequestInquirerObligatory())
case StopBroadcasting() => Right(NonEmptyList.of(BroadcastStopped()))
case CameraUnavailable(reason) => Right(NonEmptyList.of(CameraDropped(reason)))
case CameraChunk(chunk) => Right(NonEmptyList.of(ChunkReceived(chunk)))
case CameraListenersHeartbeatStart() => Right(NonEmptyList.of(CameraListenerHeartbeatStarted()))
case CameraListenersHeartbeatPong() => Right(NonEmptyList.of(CameraListenerHeartbeatReceived()))
case CameraListenersHeartbeatFinish() => Right(NonEmptyList.of(CameraListenerHeartbeatFinished()))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package diptestbed.domain
case class HardwareCameraState[A](
self: A,
camera: A,
auth: Option[User],
pubSubMediator: A,
hardwareIds: List[HardwareId],
listenerHeartbeatsReceived: Int,
Expand All @@ -27,6 +28,7 @@ object HardwareCameraState {
HardwareCameraState(
self,
camera,
None,
pubSubMediator,
hardwareIds,
0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package diptestbed.domain

import cats.Monad
import cats.implicits._
import diptestbed.domain.HardwareControlEvent._
import diptestbed.domain.HardwareControlMessageExternalBinary._
import diptestbed.domain.HardwareControlMessageExternalNonBinary._
import diptestbed.domain.HardwareControlMessageInternal._
import diptestbed.domain.HardwareSerialMonitorMessageBinary._

object HardwareControlAuthProjection {
def project[F[_]: Monad, A](
auth: (String, String) => F[Either[String, User]],
send: (A, HardwareControlMessage) => F[Unit]
)(state: HardwareControlState[A], event: HardwareControlEvent[A]): Option[F[Unit]] = {
event match {
case CheckingAuth(username, password) =>
Some(auth(username, password).flatMap(_.bimap(
error => send(state.self, AuthFailure(error)),
user => send(state.self, AuthSuccess(user))
).merge))
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package diptestbed.domain

sealed abstract class HardwareControlEvent[A]
object HardwareControlEvent {
// Auth
case class CheckingAuth[A](username: String, password: String) extends HardwareControlEvent[A]
case class AuthSucceeded[A](user: User) extends HardwareControlEvent[A]
case class AuthFailed[A](reason: String) extends HardwareControlEvent[A]

// Lifecycle
case class Started[A]() extends HardwareControlEvent[A]
case class Ended[A]() extends HardwareControlEvent[A]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object HardwareControlEventEngine {

def onMessage[A, F[_]: Temporal: Parallel](
lastState: => HardwareControlState[A],
auth: (String, String) => F[Either[String, User]],
send: (A, HardwareControlMessage) => F[Unit],
publish: (PubSubTopic, HardwareControlMessage) => F[Unit],
inquirer: => Option[A],
Expand All @@ -44,6 +45,7 @@ object HardwareControlEventEngine {
List(
HardwareControlMailProjection.project(send, publish),
HardwareControlHeartbeatProjection.project(send, publish),
HardwareControlAuthProjection.project(auth, send),
),
),
HardwareControlEventStateProjection.project,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ object HardwareControlEventStateProjection {
case UploadStarted(inquirer, _) => previousState.copy(agentState = Uploading(inquirer))
case UploadFinished(_, _) => previousState.copy(agentState = Initial())

case AuthSucceeded(user) => previousState.copy(auth = Some(user))
case AuthFailed(_) => previousState.copy(auth = None)

case MonitorConfigurationStarted(inquirer, _) => previousState.copy(agentState = ConfiguringMonitor(inquirer))
case MonitorConfigurationFinished(_, _) => previousState.copy(agentState = Initial())

case ListenerHeartbeatStarted() => previousState.copy(listenerHeartbeatsReceived = 0)
case ListenerHeartbeatReceived() =>
previousState.copy(listenerHeartbeatsReceived = previousState.listenerHeartbeatsReceived + 1)

case _: MonitorDropped[A] | _: MonitorDropExpected[A] | _: MonitorMessageToClient[A] | _: MonitorMessageToAgent[A] |
case _: CheckingAuth[A] | _: MonitorDropped[A] | _: MonitorDropExpected[A] | _: MonitorMessageToClient[A] | _: MonitorMessageToAgent[A] |
_: MonitorMessageToClient[A] | _: MonitorMessageToAgent[A] | _: ListenerHeartbeatFinished[A] | _: Started[A] |
_: Ended[A] =>
previousState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ object HardwareControlMailProjection {
publish: (PubSubTopic, HardwareControlMessage) => F[Unit],
)(state: HardwareControlState[A], event: HardwareControlEvent[A]): Option[F[Unit]] = {
event match {
case Started() | Ended() | ListenerHeartbeatFinished() | ListenerHeartbeatReceived() |
case CheckingAuth(_, _) | Started() | Ended() | ListenerHeartbeatFinished() | ListenerHeartbeatReceived() |
ListenerHeartbeatStarted() =>
None

case AuthSucceeded(_) => Some(
send(state.agent, AuthResult(None)) >>
send(state.self, StartLifecycle()))
case AuthFailed(reason) => Some(send(state.agent, AuthResult(Some(reason))))

case UploadStarted(_, softwareId) => Some(send(state.agent, UploadSoftwareRequest(softwareId)))
case UploadFinished(oldInquirer, error) => Some(oldInquirer.traverse(send(_, UploadSoftwareResult(error))).void)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ sealed trait HardwareControlMessageNonBinary extends HardwareControlMessage
*/
sealed trait HardwareControlMessageInternal extends HardwareControlMessageNonBinary
object HardwareControlMessageInternal {
// Goes to agent
case class AuthResult(error: Option[String]) extends HardwareControlMessageInternal

// Goes back into actor
case class AuthSuccess(user: User) extends HardwareControlMessageInternal
case class AuthFailure(reason: String) extends HardwareControlMessageInternal

case class StartLifecycle() extends HardwareControlMessageInternal
case class EndLifecycle() extends HardwareControlMessageInternal

Expand All @@ -34,6 +41,7 @@ sealed trait HardwareControlMessageExternalNonBinary
extends HardwareControlMessageExternal
with HardwareControlMessageNonBinary
object HardwareControlMessageExternalNonBinary {
case class AuthRequest(username: String, password: String) extends HardwareControlMessageExternalNonBinary
case class UploadSoftwareRequest(softwareId: SoftwareId) extends HardwareControlMessageExternalNonBinary
case class UploadSoftwareResult(error: Option[String]) extends HardwareControlMessageExternalNonBinary
object UploadSoftwareResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,37 @@ object HardwareControlMessageHandler {

def handle[A](
inquirer: => Option[A],
)(state: HardwareControlState[A], message: HardwareControlMessage): HardwareControlResult[A] =
message match {
case _: StartLifecycle => Right(NonEmptyList.of(Started()))
case _: EndLifecycle => Right(NonEmptyList.of(Ended()))
case m: UploadSoftwareRequest => handleUploadSoftwareRequest(state, inquirer, m)
case m: UploadSoftwareResult => handleUploadSoftwareResult(state, m)
case m: SerialMonitorRequest => handleSerialMonitorRequest(state, inquirer, m)
case _: SerialMonitorRequestStop => handleSerialMonitorRequestStop()
case m: SerialMonitorResult => handleSerialMonitorResult(state, m)
case m: SerialMonitorMessageToClient => handleSerialMonitorMessageToClient(m)
case m: SerialMonitorMessageToAgent => handleSerialMonitorMessageToAgent(m)
case _: SerialMonitorListenersHeartbeatStart => handleSerialMonitorListenersHeartbeatStart()
case _: SerialMonitorListenersHeartbeatPing => Left(NoReaction)
case _: SerialMonitorListenersHeartbeatPong => handleSerialMonitorListenersHeartbeatPong()
case _: SerialMonitorListenersHeartbeatFinish => handleSerialMonitorListenersHeartbeatFinish()
case m: SerialMonitorUnavailable => handleMonitorUnavailable(m)
case _: Ping => Left(NoReaction)
)(state: HardwareControlState[A], message: HardwareControlMessage): HardwareControlResult[A] = {
state.auth match {
case None =>
message match {
case m: AuthRequest => Right(NonEmptyList.of(CheckingAuth(m.username, m.password)))
case m: AuthSuccess => Right(NonEmptyList.of(AuthSucceeded(m.user)))
case m: AuthFailure => Right(NonEmptyList.of(AuthFailed(m.reason)))
case _ => Left(NoReaction)
}
case Some(_) =>
message match {
case _: StartLifecycle => Right(NonEmptyList.of(Started()))
case _: EndLifecycle => Right(NonEmptyList.of(Ended()))
case m: UploadSoftwareRequest => handleUploadSoftwareRequest(state, inquirer, m)
case m: UploadSoftwareResult => handleUploadSoftwareResult(state, m)
case m: SerialMonitorRequest => handleSerialMonitorRequest(state, inquirer, m)
case _: SerialMonitorRequestStop => handleSerialMonitorRequestStop()
case m: SerialMonitorResult => handleSerialMonitorResult(state, m)
case m: SerialMonitorMessageToClient => handleSerialMonitorMessageToClient(m)
case m: SerialMonitorMessageToAgent => handleSerialMonitorMessageToAgent(m)
case _: SerialMonitorListenersHeartbeatStart => handleSerialMonitorListenersHeartbeatStart()
case _: SerialMonitorListenersHeartbeatPing => Left(NoReaction)
case _: SerialMonitorListenersHeartbeatPong => handleSerialMonitorListenersHeartbeatPong()
case _: SerialMonitorListenersHeartbeatFinish => handleSerialMonitorListenersHeartbeatFinish()
case m: SerialMonitorUnavailable => handleMonitorUnavailable(m)
case _: Ping => Left(NoReaction)
case _: AuthRequest | _: AuthSuccess | _ :AuthFailure => Left(NoReaction)

}
}
}

def handleUploadSoftwareRequest[A](
state: HardwareControlState[A],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package diptestbed.domain
import diptestbed.domain.HardwareControlAgentState._

case class HardwareControlState[A](
auth: Option[User],
self: A,
agent: A,
hardwareId: HardwareId,
Expand All @@ -19,6 +20,7 @@ object HardwareControlState {
listenerHeartbeatConfig: HardwareListenerHeartbeatConfig,
): HardwareControlState[A] =
HardwareControlState(
None,
self,
agent,
hardwareId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ object HardwareSerialMonitorMessageBinary {

sealed trait HardwareSerialMonitorMessageNonBinary extends HardwareSerialMonitorMessage
object HardwareSerialMonitorMessageNonBinary {
case class AuthResult(error: Option[String]) extends HardwareSerialMonitorMessageNonBinary
case class MonitorUnavailable(reason: String) extends HardwareSerialMonitorMessageNonBinary
case class ConnectionClosed(reason: String) extends HardwareSerialMonitorMessageNonBinary
}
Loading

0 comments on commit 1942ba8

Please sign in to comment.