Skip to content

Commit

Permalink
Merge pull request #203 from WalletConnect/feat/request-queue
Browse files Browse the repository at this point in the history
feat: handle incoming requests one by one
  • Loading branch information
skibitsky authored Aug 29, 2024
2 parents dd50408 + af11113 commit c7c904c
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 27 deletions.
110 changes: 87 additions & 23 deletions WalletConnectSharp.Core/Controllers/TypedMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
using System.Security.Cryptography;
using System.Text;
using Newtonsoft.Json;
using WalletConnectSharp.Common.Events;
using WalletConnectSharp.Common.Model.Errors;
using Newtonsoft.Json;
using WalletConnectSharp.Common.Logging;
using WalletConnectSharp.Common.Utils;
using WalletConnectSharp.Core.Interfaces;
using WalletConnectSharp.Core.Models;
Expand All @@ -15,12 +12,18 @@ namespace WalletConnectSharp.Core.Controllers
{
public class TypedMessageHandler : ITypedMessageHandler
{
private bool _initialized = false;
private Dictionary<string, DecodeOptions> _decodeOptionsMap = new Dictionary<string, DecodeOptions>();
private HashSet<string> _typeSafeCache = new HashSet<string>();
private bool _initialized;
private bool _isRequestQueueProcessing;

private readonly Dictionary<string, DecodeOptions> _decodeOptionsMap = new();

private readonly HashSet<string> _typeSafeCache = [];

private readonly Queue<(string method, MessageEvent messageEvent)> _requestQueue = new();
private readonly Dictionary<string, List<Func<MessageEvent, Task>>> _requestCallbacksMap = new();
private readonly Dictionary<string, List<Action<MessageEvent>>> _responseCallbacksMap = new();

public event EventHandler<DecodedMessageEvent> RawMessage;
private EventHandlerMap<MessageEvent> messageEventHandlerMap = new();

protected bool Disposed;

Expand Down Expand Up @@ -57,24 +60,26 @@ public Task Init()
{
if (!_initialized)
{
this.Core.Relayer.OnMessageReceived += RelayerMessageCallback;
Core.Relayer.OnMessageReceived += RelayMessageCallback;
}

_initialized = true;
return Task.CompletedTask;
}

async void RelayerMessageCallback(object sender, MessageEvent e)
private async void RelayMessageCallback(object sender, MessageEvent e)
{
var topic = e.Topic;
var message = e.Message;


var options = DecodeOptionForTopic(topic);

var payload = await this.Core.Crypto.Decode<JsonRpcPayload>(topic, message, options);
if (payload.IsRequest)
{
messageEventHandlerMap[$"request_{payload.Method}"](this, e);
_requestQueue.Enqueue((payload.Method, e));
await ProcessRequestQueue();
}
else if (payload.IsResponse)
{
Expand All @@ -83,6 +88,44 @@ async void RelayerMessageCallback(object sender, MessageEvent e)
}
}

private async Task ProcessRequestQueue()
{
if (_isRequestQueueProcessing)
{
return;
}

_isRequestQueueProcessing = true;

try
{
while (_requestQueue.Count > 0)
{
var (method, messageEvent) = _requestQueue.Dequeue();
if (!_requestCallbacksMap.TryGetValue(method, out var callbacks))
{
continue;
}

foreach (var callback in callbacks)
{
try
{
await callback(messageEvent);
}
catch (Exception e)
{
WCLogger.LogError(e);
}
}
}
}
finally
{
_isRequestQueueProcessing = false;
}
}

/// <summary>
/// Handle a specific request / response type and call the given callbacks for requests and responses. The
/// response callback is only triggered when it originates from the request of the same type.
Expand All @@ -97,7 +140,7 @@ public async Task<DisposeHandlerToken> HandleMessageType<T, TR>(Func<string, Jso
var method = RpcMethodAttribute.MethodForType<T>();
var rpcHistory = await this.Core.History.JsonRpcHistoryOfType<T, TR>();

async void RequestCallback(object sender, MessageEvent e)
async Task RequestCallback(MessageEvent e)
{
try
{
Expand Down Expand Up @@ -128,7 +171,7 @@ async void RequestCallback(object sender, MessageEvent e)
}
}

async void ResponseCallback(object sender, MessageEvent e)
async void ResponseCallback(MessageEvent e)
{
if (responseCallback == null || Disposed)
{
Expand Down Expand Up @@ -184,15 +227,36 @@ record = await rpcHistory.Get(topic, payload.Id);
var resMethod = record.Request.Method;

// Trigger the true response event, which will trigger ResponseCallback
messageEventHandlerMap[$"response_{resMethod}"](this,
new MessageEvent

if (_responseCallbacksMap.TryGetValue(resMethod, out var callbacks))
{
var callbacksCopy = callbacks.ToList();
foreach (var callback in callbacksCopy)
{
Topic = topic, Message = message
});
callback(new MessageEvent
{
Topic = topic, Message = message
});
}
}
}

if (!_requestCallbacksMap.TryGetValue(method, out var requestCallbacks))
{
requestCallbacks = [];
_requestCallbacksMap.Add(method, requestCallbacks);
}

requestCallbacks.Add(RequestCallback);


if (!_responseCallbacksMap.TryGetValue(method, out var responseCallbacks))
{
responseCallbacks = [];
_responseCallbacksMap.Add(method, responseCallbacks);
}

messageEventHandlerMap[$"request_{method}"] += RequestCallback;
messageEventHandlerMap[$"response_{method}"] += ResponseCallback;
responseCallbacks.Add(ResponseCallback);

// Handle response_raw in this context
// This will allow us to examine response_raw in every typed context registered
Expand All @@ -202,8 +266,8 @@ record = await rpcHistory.Get(topic, payload.Id);
{
this.RawMessage -= InspectResponseRaw;

messageEventHandlerMap[$"request_{method}"] -= RequestCallback;
messageEventHandlerMap[$"response_{method}"] -= ResponseCallback;
_requestCallbacksMap[method].Remove(RequestCallback);
_responseCallbacksMap[method].Remove(ResponseCallback);
});
}

Expand Down Expand Up @@ -429,7 +493,7 @@ protected virtual void Dispose(bool disposing)

if (disposing)
{
this.Core.Relayer.OnMessageReceived -= RelayerMessageCallback;
Core.Relayer.OnMessageReceived -= RelayMessageCallback;
}

Disposed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ protected virtual async void Teardown()

protected virtual Task ResponseCallback(string arg1, JsonRpcResponse<TR> arg2)
{
WCLogger.Log($"Got generic response for type {typeof(TR)}");
var rea = new ResponseEventArgs<TR>(arg2, arg1);
return ResponsePredicate != null && !ResponsePredicate(rea) ? Task.CompletedTask :
_onResponse != null ? _onResponse(rea) : Task.CompletedTask;
Expand Down
4 changes: 2 additions & 2 deletions WalletConnectSharp.Sign/Internals/EngineHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ async Task IEnginePrivate.OnSessionEventRequest(string topic, JsonRpcRequest<Ses
var eventData = @params.Event;
var eventName = eventData.Name;

await PrivateThis.IsValidEmit(topic, eventData, @params.ChainId);

await IsValidSessionTopic(topic);
_customSessionEventsHandlerMap[eventName]?.Invoke(this, @params);

await MessageHandler.SendResult<SessionEvent<EventData<JToken>>, bool>(id, topic, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ protected override void Setup()

private Task WrappedRefOnOnResponse(ResponseEventArgs<TR> e)
{
WCLogger.Log($"Got response for type {typeof(TR)}");
return base.ResponseCallback(e.Topic, e.Response);
}

Expand Down

0 comments on commit c7c904c

Please sign in to comment.