Skip to content

Commit

Permalink
add mapping function support for rsocket metadata; known use case for…
Browse files Browse the repository at this point in the history
… routed/brokered rsocket messages
  • Loading branch information
kbahr committed Sep 10, 2019
1 parent 67a77c8 commit 364913c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
7 changes: 6 additions & 1 deletion RSocket.Rpc.Core/RSocketService.Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace RSocket.RPC
{
partial class RSocketService
{
public struct RemoteProcedureCallMetadata //SPEC: https://github.com/rsocket/rsocket-rpc-java/blob/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/frames/Metadata.java
public struct RemoteProcedureCallMetadata //SPEC: https://github.com/rsocket/rsocket-rpc-java/blob/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/frames/Metadata.java
{
public const UInt16 VERSION = 1;

Expand Down Expand Up @@ -35,6 +35,11 @@ public RemoteProcedureCallMetadata(ReadOnlySequence<byte> metadata)
Metadata = reader.Sequence.Slice(reader.Position, reader.Remaining);
}

public static RemoteProcedureCallMetadata create(ReadOnlySequence<byte> metadata)
{
return new RemoteProcedureCallMetadata(metadata);
}

public static implicit operator ReadOnlySequence<byte>(RemoteProcedureCallMetadata _)
{
var memory = new Memory<byte>(new byte[_.Length]); //FUTURE PERFORMANCE: Someday, maybe use a buffer pool instead of allocating. These are presumed small, but the string scan adds some overhead.
Expand Down
14 changes: 7 additions & 7 deletions RSocket.Rpc.Core/RSocketService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ public abstract partial class RSocketService

public RSocketService(RSocket socket) { Socket = socket; }


protected Task __RequestFireAndForget<TMessage>(TMessage message, Func<TMessage, byte[]> messagemapper,
protected Task __RequestFireAndForget<TMessage>(TMessage message, Func<TMessage, byte[]> messagemapper,
ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default)
=> __RequestFireAndForget(new ReadOnlySequence<byte>(messagemapper(message)), metadata, tracing, service: service, method: method);

Expand Down Expand Up @@ -88,23 +87,24 @@ private protected IAsyncEnumerable<T> __RequestChannel<TMessage, T>(IAsyncEnumer

static System.Collections.Concurrent.ConcurrentDictionary<string, IRSocketService> Services = new System.Collections.Concurrent.ConcurrentDictionary<string, IRSocketService>();

static public void Register(RSocket socket, IRSocketService service)
static public void Register(RSocket socket, IRSocketService service, Func<ReadOnlySequence<byte>, RSocketService.RemoteProcedureCallMetadata> metadataMapper = null)
{
Services[service.ServiceName] = service;

//TODO Need to ensure that this really only happens once per Socket.
metadataMapper = metadataMapper ?? RemoteProcedureCallMetadata.create;
//TODO Need to ensure that this really only happens once per Socket.

socket.Respond(message => (RPC: new RSocketService.RemoteProcedureCallMetadata(message.Metadata), message.Data),
socket.Respond(message => (RPC: metadataMapper(message.Metadata), message.Data),
request => Dispatch(request.Data, request.RPC.Service, request.RPC.Method, request.RPC.Tracing, request.RPC.Metadata),
result => (Data: result, Metadata: default));

//TODO This looks data/metadata backwards?
socket.Stream(message => (RPC: new RSocketService.RemoteProcedureCallMetadata(message.Metadata), message.Data),
socket.Stream(message => (RPC: metadataMapper(message.Metadata), message.Data),
request => Dispatch(request.Data, request.RPC.Service, request.RPC.Method, request.RPC.Tracing, request.RPC.Metadata),
result => (Data: result, Metadata: default));

socket.Channel((request, messages) => Dispatch(request.Data, request.RPC.Service, request.RPC.Method, request.RPC.Tracing, request.RPC.Metadata, messages.ToAsyncEnumerable()),
message => (RPC: new RSocketService.RemoteProcedureCallMetadata(message.Metadata), message.Data),
message => (RPC: metadataMapper(message.Metadata), message.Data),
incoming => incoming.Data,
result => (Data: result, Metadata: default));
}
Expand Down

0 comments on commit 364913c

Please sign in to comment.