From 364913ce745d044d9e3ba815a0a88c73fd7f1eec Mon Sep 17 00:00:00 2001 From: Kyle Bahr Date: Tue, 10 Sep 2019 14:49:18 -0700 Subject: [PATCH] add mapping function support for rsocket metadata; known use case for routed/brokered rsocket messages --- RSocket.Rpc.Core/RSocketService.Metadata.cs | 7 ++++++- RSocket.Rpc.Core/RSocketService.cs | 14 +++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/RSocket.Rpc.Core/RSocketService.Metadata.cs b/RSocket.Rpc.Core/RSocketService.Metadata.cs index 7d56b84..7331eca 100644 --- a/RSocket.Rpc.Core/RSocketService.Metadata.cs +++ b/RSocket.Rpc.Core/RSocketService.Metadata.cs @@ -6,7 +6,7 @@ namespace RSocket.RPC { partial class RSocketService { - public struct RemoteProcedureCallMetadata //SPEC: https://github.com/rsocket/rsocket-rpc-java/blob/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/frames/Metadata.java + public struct RemoteProcedureCallMetadata //SPEC: https://github.com/rsocket/rsocket-rpc-java/blob/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/frames/Metadata.java { public const UInt16 VERSION = 1; @@ -35,6 +35,11 @@ public RemoteProcedureCallMetadata(ReadOnlySequence metadata) Metadata = reader.Sequence.Slice(reader.Position, reader.Remaining); } + public static RemoteProcedureCallMetadata create(ReadOnlySequence metadata) + { + return new RemoteProcedureCallMetadata(metadata); + } + public static implicit operator ReadOnlySequence(RemoteProcedureCallMetadata _) { var memory = new Memory(new byte[_.Length]); //FUTURE PERFORMANCE: Someday, maybe use a buffer pool instead of allocating. These are presumed small, but the string scan adds some overhead. diff --git a/RSocket.Rpc.Core/RSocketService.cs b/RSocket.Rpc.Core/RSocketService.cs index ea171b7..b853164 100644 --- a/RSocket.Rpc.Core/RSocketService.cs +++ b/RSocket.Rpc.Core/RSocketService.cs @@ -19,8 +19,7 @@ public abstract partial class RSocketService public RSocketService(RSocket socket) { Socket = socket; } - - protected Task __RequestFireAndForget(TMessage message, Func messagemapper, + protected Task __RequestFireAndForget(TMessage message, Func messagemapper, ReadOnlySequence metadata = default, ReadOnlySequence tracing = default, string service = default, [CallerMemberName]string method = default) => __RequestFireAndForget(new ReadOnlySequence(messagemapper(message)), metadata, tracing, service: service, method: method); @@ -88,23 +87,24 @@ private protected IAsyncEnumerable __RequestChannel(IAsyncEnumer static System.Collections.Concurrent.ConcurrentDictionary Services = new System.Collections.Concurrent.ConcurrentDictionary(); - static public void Register(RSocket socket, IRSocketService service) + static public void Register(RSocket socket, IRSocketService service, Func, RSocketService.RemoteProcedureCallMetadata> metadataMapper = null) { Services[service.ServiceName] = service; - //TODO Need to ensure that this really only happens once per Socket. + metadataMapper = metadataMapper ?? RemoteProcedureCallMetadata.create; + //TODO Need to ensure that this really only happens once per Socket. - socket.Respond(message => (RPC: new RSocketService.RemoteProcedureCallMetadata(message.Metadata), message.Data), + socket.Respond(message => (RPC: metadataMapper(message.Metadata), message.Data), request => Dispatch(request.Data, request.RPC.Service, request.RPC.Method, request.RPC.Tracing, request.RPC.Metadata), result => (Data: result, Metadata: default)); //TODO This looks data/metadata backwards? - socket.Stream(message => (RPC: new RSocketService.RemoteProcedureCallMetadata(message.Metadata), message.Data), + socket.Stream(message => (RPC: metadataMapper(message.Metadata), message.Data), request => Dispatch(request.Data, request.RPC.Service, request.RPC.Method, request.RPC.Tracing, request.RPC.Metadata), result => (Data: result, Metadata: default)); socket.Channel((request, messages) => Dispatch(request.Data, request.RPC.Service, request.RPC.Method, request.RPC.Tracing, request.RPC.Metadata, messages.ToAsyncEnumerable()), - message => (RPC: new RSocketService.RemoteProcedureCallMetadata(message.Metadata), message.Data), + message => (RPC: metadataMapper(message.Metadata), message.Data), incoming => incoming.Data, result => (Data: result, Metadata: default)); }