Hippo Transport Library enhances spark-commons with easy stream management & handling
,.I ....
... ZO.. .. M .
...=. .,,.
.,D ..?...O.
..=MD~,.. ., .O . O
.., +I. . .,N, ,$N,,...
O. .. .~.+. . N, .
7.,, . 8. .. ... ,O.
I.DMM,. .M .O ,D
...MZ . ~. .... ..N.. :
? .I. ,.. .. ,
+. ,MM= ..Z. .,. .MDMN~$
.I. .MMD ..M . .. =.. :. . ..
.,M .... .Z. . +=. . ..
~M~ ... 7D... .=~. . . .
..$Z... ...+MO.. .M .
.M. ,. .I .?. ..
.~ .. Z=I7.. .7. .ZM~+N.. ..
.O D . , .M ...M . . .: .
. NNN.I....O.... .. M:. .M,=8..
....,...,. .. ... ..
mvn clean compile install
add repository in pom.xml
:
<dependency>
<groupId>org.grapheco</groupId>
<artifactId>hippo-rpc</artifactId>
<version>0.1.2</version>
</dependency>
HippoServer
enhances TransportServer with stream manager(open, streaming fetch, close)
val server = HippoServer.create("test", new HippoRpcHandler() {
...
}, 1224)
HippoClient
enhances TransportClient with stream request and result boxing (as Stream[T])
val client = HippoClient.create("test", "localhost", 1224)
more examples, see https://github.com/bluejoe2008/hippo-rpc/blob/master/src/test/scala/hippo/HippoRpcTest.scala
ask[T](message: Any, consumeResponse: (ByteBuffer) => T): Future[T]
asks for an response,message
as request, useconsumeResponse
to parse response messageaskWithBuffer[T](message: Any, extra: ByteBuf*): Future[T]
asks for an response, sendsmessage
andextra
as requestgetInputStream(request: Any, waitStreamTimeout: Duration): InputStream
gets for anInputStream
, e.g, stream of a remote filegetChunkedStream[T](request: Any, waitStreamTimeout: Duration): Stream[T]
gets results as aStream
, e.g, results of a SQL executiongetChunkedInputStream(request: Any, waitStreamTimeout: Duration): InputStream
gets for anInputStream
chunk by chunk, e.g, stream of a remote file
spark-rpc
, or kraps-rpc
uses a messaging mechanism to improve the performance of RPC calling.
HippoRpcEnvFactory
is provided to enable RPC calls on kraps-rpc
, and stream handling on hippo-rpc
.
HippoRpcEnv
enhances NettyRpcEnv
with stream handling functions, besides RPC messaging
usage of HippoRpcEnv
is like that of NettyRpcEnv
:
rpcEnv = HippoRpcEnvFactory.create(config)
val endpoint: RpcEndpoint = new FileRpcEndpoint(rpcEnv)
rpcEnv.setupEndpoint("endpoint-name", endpoint)
rpcEnv.setRpcHandler(new MyRpcHandler())
...
...
val endPointRef = rpcEnv.setupEndpointRef(RpcAddress(...), "...");
to provide a customized HippoRpcHandler
, a set of methods will be implemented:
openCompleteStream
: provides a stream for given requestopenChunkedStream
: provides a chunkable stream for given requestreceiveWithBuffer
: defines how to respond on received request buffer
more examples, see https://github.com/bluejoe2008/hippo-rpc/blob/master/src/test/scala/hippo/HippoRpcEnvFactoryTest.scala
ask[T](message: Any): Future[T]
asks for an response,message
as requestask[T](message: Any, timeout: RpcTimeout): Future[T]
asks for an response, with max timeout limitask[T](message: Any, timeout: Duration): Future[T]
asks for an response, with max timeout limitsend(message: Any): Unit
sends a messageask[T](message: Any, consumeResponse: (ByteBuffer) => T): Future[T]
asks for an response,message
as request, useconsumeResponse
to parse response messageaskWithBuffer[T](message: Any, extra: ByteBuf*): Future[T]
asks for an response, sendsmessage
andextra
as requestgetInputStream(request: Any, waitStreamTimeout: Duration): InputStream
gets for anInputStream
, e.g, stream of a remote filegetChunkedStream[T](request: Any, waitStreamTimeout: Duration): Stream[T]
gets results as aStream
, e.g, results of a SQL executiongetChunkedInputStream(request: Any, waitStreamTimeout: Duration): InputStream
gets for anInputStream
chunk by chunk, e.g, stream of a remote file
spark-commons
: spark common library, https://github.com/apache/sparkkraps-rpc
: A RPC framework leveraging Spark RPC module, https://github.com/neoremind/kraps-rpc
hippo-rpc
is licensed under the BSD 2-Clause "Simplified" License.