Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed minor issues #30

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 52 additions & 63 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,95 +1,84 @@
play-sockjs
===========

Play2 plugin for SockJS (Not ready yet for using it)
* So far, you can add only one sockjs
Play2 plugin for SockJS (In Progress)
* You can use it but still requires some work

## How to use

1. Clone the repo to your local machine
1- Clone the repo to your local machine

```
git clone https://github.com/ashihaby/play-sockjs.git
git clone git@github.com:dosht/play-sockjs.git
```
2. Compile and publish the plugin to local play repo
2- Compile and publish the plugin to local play repo

```
cd play-sockjs/project-code
play publish-local
cd play-sockjs/project-code
play publish-local
```
3. Add it to your play project dependencies
3- Add it to your play project dependencies

```scala
val sockjs = "play-sockjs" % "play-sockjs_2.10" % "1.0-SNAPSHOT"
val appDependencies = Seq(
sockjs
)
val appDependencies = Seq(
"play-sockjs" % "play-sockjs_2.10" % "1.0-SNAPSHOT"
)
```
4. Include the pluing in conf/play.plugins
4- Include the pluing in conf/play.plugins

```
10000:com.cloud9ers.play2.sockjs.SockJsPlugin
10000:com.cloud9ers.play2.sockjs.SockJsPlugin
```
5. Write your controller and inherit from SockJsTrait
5- Write your controller and inherit from SockJsTrait

```scala
package controllers

import com.cloud9ers.play2.sockjs.SockJs

import play.api.libs.concurrent.Promise
import play.api.libs.iteratee.{Concurrent, Iteratee}
import play.api.libs.json.JsValue
import play.api.mvc.{Controller, RequestHeader}

object SockJsService extends Controller with SockJs {
def handler(rh: RequestHeader) = {
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val iteratee = Iteratee.foreach[JsValue] { msg =>
// msg: is the message comming from the client
// channel: is where you send messages to the client
channel push msg // just send back the message
}
Promise.pure(iteratee, enumerator)
}

def sockJsAction = SockJs.async(handler)

def websocket[String] = SockJs.websocket(handler)
}
package controllers

import com.cloud9ers.play2.sockjs.SockJs

import play.api.libs.concurrent.Promise
import play.api.libs.iteratee.{Concurrent, Iteratee}
import play.api.libs.json.JsValue
import play.api.mvc.{Controller, RequestHeader}

object SockJsService extends Controller with SockJs {
val SockJsHandler(echoAction, echoWebsocket) = SockJs async { rh =>
val (downEnumerator, downChannel) = Concurrent.broadcast[JsValue]
val upIteratee = Iteratee.foreach[JsValue] (msg => downChannel push msg)
Promise.pure(upIteratee, downEnumerator)
}
}
```
6. Add object Global in the default package in Global.scala
* This is required so far to avoid play routing because sockjs requires complex routing scheme
* We hope to find a better approach soon
6- Add routing. Unfortunatly it requires a complex routing scheme :(

```scala
import com.cloud9ers.play2.sockjs.SockJsGlobalSettings
object Global extends SockJsGlobalSettings {
def sockJsAction = controllers.SockJsService.sockJsHandler
def sockJsWebsocket = controllers.SockJsService.websocket
}
GET /echo/:svr/:ses/websocket controllers.SockJsService.echoWebsocket(svr, ses)
GET /echo controllers.SockJsService.echoAction(route="")
GET /echo/$route<.*> controllers.SockJsService.echoAction(route)
OPTIONS /echo controllers.SockJsService.echoAction(route="")
OPTIONS /echo/$route<.*> controllers.SockJsService.echoAction(route)
POST /echo/$route<.*> controllers.SockJsService.echoAction(route)
```
7. Finnaly, you need to add the base Url to the configuration in application.con
7- Finnaly, you need to add the base Url to the configuration in application.con
```
sockjs.prefix=echo
```

* Full configurations:

```
sockjs.prefix=echo
sockjs.responseLimit=1000
sockjs.jsessionid=false
sockjs.heartbeetDelay=1000
sockjs.diconnectDelay=5000
sockjs.websocketEnabled=false
sockjs {
akka {
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
log-dead-letters = 10
log-dead-letters-during-shutdown = on
}
}
sockjs.prefix=echo
sockjs.responseLimit=1000
sockjs.jsessionid=false
sockjs.heartbeetDelay=1000
sockjs.diconnectDelay=5000
sockjs.websocketEnabled=false
sockjs {
akka {
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
log-dead-letters = 10
log-dead-letters-during-shutdown = on
}
}
```
54 changes: 0 additions & 54 deletions project-code/app/com/cloud9ers/play2/sockjs/Global.scala

This file was deleted.

143 changes: 95 additions & 48 deletions project-code/app/com/cloud9ers/play2/sockjs/Session.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.cloud9ers.play2.sockjs

import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, DurationLong}
import scala.concurrent.duration.{ DurationInt, DurationLong }

import akka.actor.{Actor, ActorRef, Cancellable, PoisonPill, actorRef2Scala}
import akka.actor.{ Actor, ActorRef, Cancellable, PoisonPill, actorRef2Scala }
import akka.event.Logging
import play.api.libs.iteratee.{Concurrent, Enumerator, Input, Iteratee}
import play.api.libs.json.{JsArray, JsValue}
import play.api.mvc.{AnyContent, Request, RequestHeader}
import play.api.libs.iteratee.{ Concurrent, Enumerator, Input, Iteratee }
import play.api.libs.json.{ JsArray, JsValue }
import play.api.mvc.{ AnyContent, Request, RequestHeader }

/**
* Session class to queue messages over multiple connection like xhr and xhr_send
Expand All @@ -18,6 +18,8 @@ class Session(handler: RequestHeader ⇒ Future[(Iteratee[JsValue, _], Enumerato
var transportListener: Option[ActorRef] = None
var heartBeatTask: Option[Cancellable] = None
var timer: Option[Cancellable] = None
var openWriten = false
var closeMessage = Session.Close(3000, "Go away!")
//TODO: Max Queue Size

implicit val executionContext = context.system.dispatcher
Expand All @@ -35,69 +37,113 @@ class Session(handler: RequestHeader ⇒ Future[(Iteratee[JsValue, _], Enumerato

def receive = connecting orElse timeout

def timeout: Receive = {
case Session.Timeout ⇒ doClose()
}

def connecting: Receive = {
case Session.Register ⇒
sender ! Session.OpenMessage
context become (open orElse timeout)
startHeartBeat()
logger.debug(s"state: CONNECTING, sender: $sender, message: ${Session.Register}")
(register andThen sendOpenMessage andThen resetListener andThen becomeOpen)(sender)

case c: Session.Close ⇒
logger.debug(s"state: CONNECTING, sender: $sender, message: $c")
becomeClosed.apply()
}

def timeout: Receive = {
case Session.Timeout ⇒ doClose()
}

def open: Receive = {
case Session.Register ⇒ register(sender)
case Session.Send(msgs) ⇒ handleMessages(msgs)
case Session.Write(msg) ⇒ write(msg)
case h @ Session.HeartBeat ⇒ for (tl ← transportListener) tl ! h
case c: Session.Close ⇒ close(c)
case Session.Register ⇒
logger.debug(s"state: OPEN, sender: $sender, message: ${Session.Register}")
register(sender)
if (!pendingWrites.isEmpty) (writePendingMessages andThen resetListener)(sender)

case s @ Session.Send(msgs) ⇒
logger.debug(s"state: OPEN, sender: $sender, message: $s")
handleMessages(msgs)

case w @ Session.Write(msg) ⇒
logger.debug(s"state: OPEN, sender: $sender, message: $w")
enqueue(msg)
transportListener map (writePendingMessages andThen resetListener)

case Session.HeartBeat ⇒
logger.debug(s"state: OPEN, sender: $sender, message: ${Session.HeartBeat}")
transportListener map (sendHeartBeatMessage andThen resetListener)

case c: Session.Close ⇒
logger.debug(s"state: OPEN, sender: $sender, message: $c")
this.closeMessage = c
transportListener map (sendCloseMessage andThen resetListener andThen becomeClosed) getOrElse becomeClosed
}

def closed: Receive = {
case Session.Register ⇒ sender ! Session.Close(3000, "Go away!")
case Session.Register if !openWriten ⇒
logger.debug(s"state: OPEN, sender: $sender, message: ${Session.Register}, openWriten: $openWriten")
(register andThen sendOpenMessage andThen resetListener)(sender)

case Session.Register ⇒
logger.debug(s"state: OPEN, sender: $sender, message: ${Session.Register}, openWriten: $openWriten")
(register andThen sendCloseMessage andThen resetListener)(sender)
}

def register(transport: ActorRef) {
if (transportListener.isEmpty || transportListener.get == sender) {
setTimer()
transportListener = Some(sender)
if (!pendingWrites.isEmpty) writePendingMessages(sender)
val register = (tl: ActorRef) ⇒ {
if (transportListener.isEmpty || transportListener.get == tl) {
transportListener = Some(tl)
} else {
sender ! Session.Close(2010, "Another connection still open")
tl ! Session.Close(2010, "Another connection still open")
logger.debug(s"Refuse transport, Another connection still open")
}
}
tl
}: ActorRef

def handleMessages(msgs: JsValue) {
msgs match {
case msg: JsArray ⇒ msg.value.foreach(m ⇒ upChannel push m)
case msg: JsValue ⇒ upChannel push msg
}
}
val sendOpenMessage = (tl: ActorRef) ⇒ {
tl ! Session.OpenMessage
openWriten = true
tl
}: ActorRef

def write(msg: JsValue) {
pendingWrites += msg
for (tl ← transportListener) writePendingMessages(tl)
}
val resetListener = (tl: ActorRef) ⇒ {
//TODO: should you notify the tl?
transportListener = None
setTimer()
}: Unit

def writePendingMessages(tl: ActorRef) {
val ms = pendingWrites.dequeueAll(_ ⇒ true).toList
tl ! Session.Message("a" + JsonCodec.encodeJson(JsArray(ms)))
resetListener()
logger.debug(s"writePendingMessages: tl: $tl, pendingWrites: pendingWrites")
}
val becomeOpen = (_: Unit) ⇒ {
context become (open orElse timeout)
startHeartBeat()
}: Unit

def close(closeMsg: Session.Close) {
logger.debug(s"Session is closing, code: ${closeMsg.code}, reason: ${closeMsg.reason}")
val becomeClosed = (_: Unit) ⇒ {
context become (closed orElse timeout)
upChannel push Input.EOF
for (tl ← transportListener) tl ! closeMsg
upChannel.eofAndEnd()
}: Unit

val writePendingMessages = (tl: ActorRef) ⇒ { //TODO: unify writes
logger.debug(s"writePendingMessages: tl: $tl, pendingWrites: $pendingWrites")
tl ! Session.Message("a" + JsonCodec.encodeJson(JsArray(pendingWrites.dequeueAll(_ ⇒ true).toList)))
tl
}: ActorRef

val sendCloseMessage = (tl: ActorRef) ⇒ {
tl ! closeMessage; tl
}: ActorRef

val sendHeartBeatMessage = (tl: ActorRef) ⇒ {
tl ! Session.HeartBeat
tl
}: ActorRef

def enqueue(msg: JsValue) {
//TODO: check the queue size
logger.debug(s"enqueue msg: $msg, pendingWrites: $pendingWrites")
pendingWrites += msg
}

def resetListener() {
transportListener = None
setTimer()
def handleMessages(msgs: JsValue) {
msgs match {
case msg: JsArray ⇒ msg.value.foreach(m ⇒ upChannel push m)
case msg: JsValue ⇒ upChannel push msg
}
}

def setTimer() {
Expand All @@ -123,6 +169,7 @@ class Session(handler: RequestHeader ⇒ Future[(Iteratee[JsValue, _], Enumerato
object Session {
case class Send(msg: JsValue) // JSClient send
case object Register // register the transport actor and holds for the next message to write to the JSClient
case object Unregister
case object OpenMessage
case class Message(msg: String)
case class Close(code: Int, reason: String)
Expand Down
Loading