Skip to content

Commit 364913c

Browse files
committedSep 10, 2019
add mapping function support for rsocket metadata; known use case for routed/brokered rsocket messages
1 parent 67a77c8 commit 364913c

File tree

2 files changed

+13
-8
lines changed

2 files changed

+13
-8
lines changed
 

‎RSocket.Rpc.Core/RSocketService.Metadata.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace RSocket.RPC
66
{
77
partial class RSocketService
88
{
9-
public struct RemoteProcedureCallMetadata //SPEC: https://github.com/rsocket/rsocket-rpc-java/blob/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/frames/Metadata.java
9+
public struct RemoteProcedureCallMetadata //SPEC: https://github.com/rsocket/rsocket-rpc-java/blob/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/frames/Metadata.java
1010
{
1111
public const UInt16 VERSION = 1;
1212

@@ -35,6 +35,11 @@ public RemoteProcedureCallMetadata(ReadOnlySequence<byte> metadata)
3535
Metadata = reader.Sequence.Slice(reader.Position, reader.Remaining);
3636
}
3737

38+
public static RemoteProcedureCallMetadata create(ReadOnlySequence<byte> metadata)
39+
{
40+
return new RemoteProcedureCallMetadata(metadata);
41+
}
42+
3843
public static implicit operator ReadOnlySequence<byte>(RemoteProcedureCallMetadata _)
3944
{
4045
var memory = new Memory<byte>(new byte[_.Length]); //FUTURE PERFORMANCE: Someday, maybe use a buffer pool instead of allocating. These are presumed small, but the string scan adds some overhead.

‎RSocket.Rpc.Core/RSocketService.cs

+7-7
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ public abstract partial class RSocketService
1919

2020
public RSocketService(RSocket socket) { Socket = socket; }
2121

22-
23-
protected Task __RequestFireAndForget<TMessage>(TMessage message, Func<TMessage, byte[]> messagemapper,
22+
protected Task __RequestFireAndForget<TMessage>(TMessage message, Func<TMessage, byte[]> messagemapper,
2423
ReadOnlySequence<byte> metadata = default, ReadOnlySequence<byte> tracing = default, string service = default, [CallerMemberName]string method = default)
2524
=> __RequestFireAndForget(new ReadOnlySequence<byte>(messagemapper(message)), metadata, tracing, service: service, method: method);
2625

@@ -88,23 +87,24 @@ private protected IAsyncEnumerable<T> __RequestChannel<TMessage, T>(IAsyncEnumer
8887

8988
static System.Collections.Concurrent.ConcurrentDictionary<string, IRSocketService> Services = new System.Collections.Concurrent.ConcurrentDictionary<string, IRSocketService>();
9089

91-
static public void Register(RSocket socket, IRSocketService service)
90+
static public void Register(RSocket socket, IRSocketService service, Func<ReadOnlySequence<byte>, RSocketService.RemoteProcedureCallMetadata> metadataMapper = null)
9291
{
9392
Services[service.ServiceName] = service;
9493

95-
//TODO Need to ensure that this really only happens once per Socket.
94+
metadataMapper = metadataMapper ?? RemoteProcedureCallMetadata.create;
95+
//TODO Need to ensure that this really only happens once per Socket.
9696

97-
socket.Respond(message => (RPC: new RSocketService.RemoteProcedureCallMetadata(message.Metadata), message.Data),
97+
socket.Respond(message => (RPC: metadataMapper(message.Metadata), message.Data),
9898
request => Dispatch(request.Data, request.RPC.Service, request.RPC.Method, request.RPC.Tracing, request.RPC.Metadata),
9999
result => (Data: result, Metadata: default));
100100

101101
//TODO This looks data/metadata backwards?
102-
socket.Stream(message => (RPC: new RSocketService.RemoteProcedureCallMetadata(message.Metadata), message.Data),
102+
socket.Stream(message => (RPC: metadataMapper(message.Metadata), message.Data),
103103
request => Dispatch(request.Data, request.RPC.Service, request.RPC.Method, request.RPC.Tracing, request.RPC.Metadata),
104104
result => (Data: result, Metadata: default));
105105

106106
socket.Channel((request, messages) => Dispatch(request.Data, request.RPC.Service, request.RPC.Method, request.RPC.Tracing, request.RPC.Metadata, messages.ToAsyncEnumerable()),
107-
message => (RPC: new RSocketService.RemoteProcedureCallMetadata(message.Metadata), message.Data),
107+
message => (RPC: metadataMapper(message.Metadata), message.Data),
108108
incoming => incoming.Data,
109109
result => (Data: result, Metadata: default));
110110
}

0 commit comments

Comments
 (0)