-
Notifications
You must be signed in to change notification settings - Fork 80
/
Copy pathServer.hs
160 lines (140 loc) · 4.72 KB
/
Server.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
-------------------------------------------------------------------
-- |
-- Module : Network.MessagePackRpc.Server
-- Copyright : (c) Hideyuki Tanaka, 2010
-- License : BSD3
--
-- Maintainer: [email protected]
-- Stability : experimental
-- Portability: portable
--
-- This module is server library of MessagePack-RPC.
-- The specification of MessagePack-RPC is at <http://redmine.msgpack.org/projects/msgpack/wiki/RPCProtocolSpec>.
--
-- A simple example:
--
-- >import Network.MessagePackRpc.Server
-- >
-- >add :: Int -> Int -> IO Int
-- >add x y = return $ x + y
-- >
-- >main =
-- > serve 1234 [("add", fun add)]
--
--------------------------------------------------------------------
module Network.MessagePackRpc.Server (
-- * RPC method types
RpcMethod,
RpcMethodType(..),
Endpoint(..),
-- * Create RPC method
fun,
-- * Start RPC server
serve,
) where
import Control.Applicative
import Control.Concurrent
import Control.DeepSeq
import Control.Exception as E
import Control.Monad
import Control.Monad.Trans
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Attoparsec as CA
import qualified Data.Attoparsec as A
import Data.Maybe
import Data.MessagePack
import Network
import System.IO
import System.ZMQ
import Prelude hiding (catch)
type RpcMethod = [Object] -> IO Object
class RpcMethodType f where
toRpcMethod :: f -> RpcMethod
instance OBJECT o => RpcMethodType (IO o) where
toRpcMethod m = \[] -> toObject <$> m
instance (OBJECT o, RpcMethodType r) => RpcMethodType (o -> r) where
toRpcMethod f = \(x:xs) -> toRpcMethod (f $! fromObject' x) xs
fromObject' :: OBJECT o => Object -> o
fromObject' o =
case tryFromObject o of
Left err -> error $ "argument type error: " ++ err
Right r -> r
-- | Create a RPC method from a Haskell function.
fun :: RpcMethodType f => f -> RpcMethod
fun = toRpcMethod
data Endpoint = TCP Int
| ZeroMQ [String]
deriving Show
-- | Start RPC server with a set of RPC methods.
serve :: Endpoint -- ^ listen on this endpoint
-> [(String, RpcMethod)] -- ^ list of (method name, RPC method)
-> IO ()
serve (TCP port) methods = withSocketsDo $ do
sock <- listenOn (PortNumber $ fromIntegral port)
forever $ do
(h, host, hostport) <- accept sock
forkIO $
(processRequests h `finally` hClose h) `catches`
[ Handler $ \e ->
case e of
CA.ParseError ["demandInput"] _ _ -> return ()
_ -> hPutStrLn stderr $ host ++ ":" ++ show hostport ++ ": " ++ show e
, Handler $ \e ->
hPutStrLn stderr $ host ++ ":" ++ show hostport ++ ": " ++ show (e :: SomeException)]
where
processRequests h =
C.runResourceT $ CB.sourceHandle h C.$$ forever $ processRequest h
processRequest h = do
(rtype, msgid, method, args) <- CA.sinkParser get
liftIO $ do
resp <- try $ getResponse rtype method args
case resp of
Left err ->
BL.hPutStr h $ pack (1 :: Int, msgid :: Int, show (err :: SomeException), ())
Right ret ->
BL.hPutStr h $ pack (1 :: Int, msgid :: Int, (), ret)
hFlush h
getResponse rtype method args = do
when (rtype /= (0 :: Int)) $
fail "request type is not 0"
r <- callMethod (method :: String) (args :: [Object])
r `deepseq` return r
callMethod methodName args =
case lookup methodName methods of
Nothing ->
fail $ "method '" ++ methodName ++ "' not found"
Just method ->
method args
serve (ZeroMQ endpoints) methods =
withContext 1 $ \ctx ->
withSocket ctx Rep $ \s -> do
mapM_ (bind s) endpoints
forever $ do
req <- receive s []
resp <- processRequest req
send s ((B.concat . BL.toChunks) resp) []
where
processRequest req =
case A.parseOnly get req of
Left _ -> fail "Parsing failed."
Right (rtype, msgid, method, args) -> do
resp <- try $ getResponse rtype method args
case resp of
Left err ->
return $ pack (1 :: Int, msgid :: Int, show (err :: SomeException), ())
Right ret ->
return $ pack (1 :: Int, msgid :: Int, (), ret)
getResponse rtype method args = do
when (rtype /= (0 :: Int)) $
fail "request type is not 0"
r <- callMethod (method :: String) (args :: [Object])
r `deepseq` return r
callMethod methodName args =
case lookup methodName methods of
Nothing ->
fail $ "method '" ++ methodName ++ "' not found"
Just method ->
method args