Access underlying network stream #7
-
We are writing some IoT service. There are various types of devices. We write adapter for each device kind. Adapter communicate with server core using IceRPC. Devices send data/commands to adapter using various methods (TCP, http, MQTT...), and adapters, in turn, call IceRPC on server core. For sending commands to devices we are using IceRPC callback (Server calls client). The server calls the method on adapter and the adapter sends it to the related device. We are trying to use observer pattern so that the server does not know about the device kind, but simply sends the command to the device unique id, and the adapters use pub/sub way to consume the command and send it to the device. Keep in mind that both server and adapters can scale horizontally. We are using redis pub/sub for this. At first we wrote code for just one kind of device and anything was good! Now that we are trying to add another device (adapter), we found out that we have to put the server to client invocations into redis (pub/sub), then the corresponding adapter that has subscribed to the special device, could get the message and parse it to send to the device. The shining solution would be to get the IceRPC underlying network stream (When server calling client) and save it into redis, and the subscriber then reads the stream and dispatches the related function call. Can you help us? |
Beta Was this translation helpful? Give feedback.
Replies: 13 comments
-
I am tryin to create a new transport: RedisStream Transport. I am reading ColocTransport and I want to replace .NET Channels with Redis Streams + Consumer Groups. Can you help me write the transport? Is it a good idea for invoker? |
Beta Was this translation helpful? Give feedback.
-
Hi Kamyar, We designed IceRPC to be flexible and modular - this way you can easily extend IceRPC, including in ways we didn't envision. I don't have a clear picture of what you're trying to do, however, I think you're on the wrong track with the multiplexed transport over Redis streams. The multiplexed transport abstraction is closely related to QUIC - it's basically a QUIC abstraction for the icerpc protocol. And we provide two implementations: the Slic adapter over the duplex transport abstraction and an implementation over .NET QUIC. Maybe you could implement this abstraction over Redis streams, but it strikes me as an odd idea. Redis streams are fairly high level, while the transport streams are fairly basic byte streams (with unique IDs, flow control and other nice features provided by QUIC / Slic).
If you want to implement a multiplexed transport abstraction, looking at the Coloc transport is not that useful since Coloc implements the duplex transport abstraction. You want to look at the QUIC transport.
IceRPC does not give you access to the underlying network streams. For example, when you receive an incoming request over QUIC, you can't somehow get hold of the underlying .NET QuicStream. I don't see this as a limitation. The incoming request corresponds to the headers decoded from this QUIC stream + payload as a PipeReader. And this PipeReader corresponds to the bytes carried by this QuicStream, without any copying or extra framing. Here, you could for example dispatch an incoming request by forwarding it to a Redis stream: you just need to write a Dispatcher. Naturally, writing a dispatcher is much easier than writing a new multiplexed transport. Cheers, |
Beta Was this translation helpful? Give feedback.
-
Thank you for answering! I guess I got your point. I can implement IDispatcher (For Client side) and IInvoker (For Server side) for using redis to call methods. But today another challenge emerged: We have to create a dedicated stream for each device in Redis. The unique id of the devices are the first parameter of out IceRPC methods. So we have to extract parameters from incoming frames, send the whole frame to Redis stream and then consume at the dispatcher side. Is there an easy way to extract data from Ice protocol in IInvoker? I do not want to deserialize the whole frame. I would rather to fine some way to preserve unique device id to be used as redis stream key. |
Beta Was this translation helpful? Give feedback.
-
I believe we've identified a viable solution: We can implement the IInvoker interface by introducing a constructor that accepts the device's unique identifier. This approach eliminates the need to repeatedly decode the payload for the unique id. Our invoker will simply publish the payload obtained from the OutgoingRequest to a Redis stream. For enhanced code quality, we can leverage a DynamicProxy (utilizing Castle). In practice, when we invoke a specific function, we create a fresh instance of our invoker using the unique identifier. Subsequently, we pass this invoker to the constructor of the IceRPC proxy, and we invoke the desired method, all of which occurs within the interceptor. On the server side, which is effectively our adapter, we can consume the Redis stream and construct an IncomingRequest using the payload. We then instruct the default IceRPC dispatcher (Router) to call the corresponding class as mapped. |
Beta Was this translation helpful? Give feedback.
-
We wrote the code. For he IceRPC client side (Our backend service), we successfully created an invoker and put the payload into Redis streams. On the IceRPC server side (Our Adapter) we read the payload from Redis stream and try to use the IDispatcher for calling method. We create an object of the ICeRPC service class which has DispatchAsync (Since it inherits Service class). Calling DispatchAsync using an incomingRequest object created from payload read from Redis and a dummy connectionContext does nothing! No error!
|
Beta Was this translation helpful? Give feedback.
-
Hi Kamyar, You are not setting the The A second issue with your code is that the I think you can also simplify the creation of the PipeReader with something like:
Cheers, |
Beta Was this translation helpful? Give feedback.
-
Thank you! We successfully dispatched and called the method by adding Operation property.
|
Beta Was this translation helpful? Give feedback.
-
The path is typically used by the Router to route the request to the target service, if you are already passing the request to the target service it might not be required in this case.
What is the error? How are you creating this incoming response? |
Beta Was this translation helpful? Give feedback.
-
Hi,
I got this error:
The stack trace is:
|
Beta Was this translation helpful? Give feedback.
-
This code doesn't look correct, you are trying to use some UTF8 bytes as the response's payload, but that is not a valid encoding for IceRpc + Slice response payload. The simpler way to encode the payload for a given response is to use the generated Response classes
This is assuming the Redis "response" represents the "string" return value of a Slice operation. It is difficult to give better advice here without a clear picture of the whole system, seems odd to get the decoded response args from Redis and then encode it back. |
Beta Was this translation helpful? Give feedback.
-
I fixed the Encoding and received byte[] instead of string from the Redis. I created an IncommingResponse:
First, I decoded it via the generated proxy class and No error happened. So, it seems the created IncomingResponse and its payload does not have any problem.
Second, I decoded it in this way (decode method that after returning incoming response will occur) and I got an Error!
What's wrong here? |
Beta Was this translation helpful? Give feedback.
-
You cannot read a pipe reader twice that way, the second time the pipe reader is at the end. It should work if you comment out the |
Beta Was this translation helpful? Give feedback.
-
Finally, We found the problem. We were using IBikeDevice (generated interface) to call methods. We changed it to BikeDeviceProxy (generated) and the error has been fixed. |
Beta Was this translation helpful? Give feedback.
Finally, We found the problem. We were using IBikeDevice (generated interface) to call methods. We changed it to BikeDeviceProxy (generated) and the error has been fixed.
Thank you for your responses.