This repo is a POC on how to expose Spring Services via RSocket protocol.
This is a very early draft, I’ve cut a lot of corners to get something that at least runs.
It’s meant to attract interest and feedback on the programming model
If you are wondering why another remote protocol when we have things such as gRPC and REST already please take a moment to read RSocket motivations to understand how this is different.
RSocket is not a traditional RPC protocol where one can map directly a method with several parameters into a remote endpoint.
Instead it embraces the reactive async nature of message passing, therefore, methods are always expected to receive one parameter and return zero or one results.
The protocol defines 4 exchange modes
-
Fire and Forget
-
Request Response
-
Request Stream
-
Channel (bi directional channels)
Following Spring conventions, this project aims to allow developers to annotate their classes with a series of annotations that would expose the methods to be invoked over rsocket.
-
path: The path where the service method is exposed
-
mimeType: The mimeType used for encoding the payload (currently only
application/java-serialized-object
andapplication/json
are supported)
Reactive sockets is about messaging passing, so because of this any method annotated with any of the exchange modes explained bellow must follow the convention:
-
At least one argument is needed
-
In case of a single argument, that is mapped to the Payload from RSocket
-
In case of multiple arguments, at most one must be annotated with
@Payload
-
@RequestManyMapping
and@RequestStreamMapping
methods must return a typeFlux
-
@RequestStreamMapping
methods must receive a typeFlux
as the payload argument (resolved as explained before)
One way methods map to a fire/forget in Rsocket and therefore any result from your Service will be ignored and not sent to the wire to use
@OneWayMapping(path="/receive", mimeType="application/json")
public void receive(Foo foo){
}
This is very similar to a traditional RPC scenario
@RequestOneMapping(path="/convert", mimeType="application/json")
public Bar convert(Foo foo){
}
RequestMany or Request/Stream is where reactive streams start to show it’s value on this protocol.
The server can open a channel and push data as it arrives to the client. The client control back pressure to the server using reactor backpressure support.
In the example bellow a service would emmit Temperature data on a live connection to the client.
@RequestManyMapping(path="/temperature", mimeType="application/json")
public Flux<Temperature> temperature(Location location){
return temperatureService.stream(location);
}
In this mode, client and server keep a full duplex communication.
An example could be a client sending a Stream of Strings that the server hashes and returns to the client
@RequestStreamMapping(path="/hash", mimeType="application/json")
public Flux<Integer> hash(Flux<String> flux){
return flux.map(s -> s.hashCode());
}
On the server side, add @EnableReactiveSocket
annotation to a Spring Configuration class:
@SpringBootApplication
@EnableReactiveSockets
public class MyReactiveSocketsApplication {
}
Any class scanned via boot’s classpath scanning that contains any of the annotations above, will be registered as a remote endpoint
As many boot applications, if not defined a default TCPTransport is used, and it binds to localhost
on port 5000
To override the default properties just pass them on an application.properties file:
reactive.socket.port=9000
reactive.socket.host=10.10.10.1
To use a different transport just provide a ServerTransport
as a bean in your application, refer to rsocket-java to see the available implementations.
To use the client, just pass an interface of the service annotated with the same annotations.
public interface MyService {
@RequestStreamMapping(value = "/hash", mimeType = "application/json")
public Flux<Integer> hash(Flux<String> flux);
}
ReactiveSocketClient client = new ReactiveSocketClient("localhost", 5000);
Flux<String> flux = flux.just("A","B");
MyService service = client.create(MyService.class);
service.hash(flux).subscribe(System.out::println);
-
Provide a functional model to both server and client and not only annotation style
-
Create a Starter
-
@EnableReactiveSocketClient
to allow injection of a client in the application context as well scanning any services such is done in Feign -
Tests, Tests, Tests
-
Improve a lot the boilerplate code, revisit serialization options
-
Explore resume operations and backpressure