Skip to content

Commit

Permalink
Merge pull request #169 from onerain88/fix-ws-concurrence
Browse files Browse the repository at this point in the history
Fix ws concurrence
  • Loading branch information
onerain88 authored Sep 2, 2021
2 parents be20a73 + 82bb8e4 commit 530c914
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 40 deletions.
86 changes: 67 additions & 19 deletions Realtime/Realtime.Test/Conversation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,26 @@ public async Task MuteMembers() {
Assert.True(conversation.MutedMemberIds.Contains(lean.Id));
f1 = true;
if (f1 && f2) {
tcs.SetResult(null);
tcs.TrySetResult(null);
}
};
lean.OnMuted = (conv, initBy) => {
WriteLine($"{lean.Id} is muted by {initBy}");
f2 = true;
if (f1 && f2) {
tcs.SetResult(null);
tcs.TrySetResult(null);
}
};
await conversation.MuteMembers(new string[] { "lean" });
Assert.True(conversation.MutedMemberIds.Contains("lean"));
try {
await conversation.MuteMembers(new string[] { "lean" });
Assert.True(conversation.MutedMemberIds.Contains("lean"));
} catch (LCException e) {
if (e.Code == 4325) {
tcs.TrySetResult(null);
} else {
throw e;
}
}
await tcs.Task;
}

Expand All @@ -92,8 +100,17 @@ public async Task UnmuteMembers() {
tcs.SetResult(null);
}
};
await conversation.UnmuteMembers(new string[] { "lean" });
Assert.False(conversation.MutedMemberIds.Contains("lean"));
try {
await conversation.UnmuteMembers(new string[] { "lean" });
Assert.False(conversation.MutedMemberIds.Contains("lean"));
} catch (LCException e) {
if (e.Code == 4325) {
tcs.TrySetResult(null);
} else {
throw e;
}
}

await tcs.Task;
}

Expand All @@ -113,12 +130,21 @@ public async Task BlockMembers() {
WriteLine($"{lean.Id} is blocked by {initBy}");
f2 = true;
if (f1 && f2) {
tcs.SetResult(null);
tcs.TrySetResult(null);
}
};
await conversation.BlockMembers(new string[] { "lean" });
LCIMPageResult result = await conversation.QueryBlockedMembers();
Assert.True(result.Results.Contains("lean"));
try {
await conversation.BlockMembers(new string[] { "lean" });
LCIMPageResult result = await conversation.QueryBlockedMembers();
Assert.True(result.Results.Contains("lean"));
} catch (LCException e) {
if (e.Code == 4544) {
tcs.TrySetResult(null);
} else {
throw e;
}
}

await tcs.Task;
}

Expand All @@ -141,9 +167,18 @@ public async Task UnblockMembers() {
tcs.SetResult(null);
}
};
await conversation.UnblockMembers(new string[] { "lean" });
LCIMPageResult result = await conversation.QueryBlockedMembers();
Assert.False(result.Results.Contains("lean"));
try {
await conversation.UnblockMembers(new string[] { "lean" });
LCIMPageResult result = await conversation.QueryBlockedMembers();
Assert.False(result.Results.Contains("lean"));
} catch (LCException e) {
if (e.Code == 4544) {
tcs.TrySetResult(null);
} else {
throw e;
}
}

await tcs.Task;
}

Expand All @@ -153,11 +188,19 @@ public async Task UpdateRole() {
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
c1.OnMemberInfoUpdated = (conv, member, role, initBy) => {
WriteLine($"{member} is {role} by {initBy}");
tcs.SetResult(null);
tcs.TrySetResult(null);
};
await conversation.UpdateMemberRole("cloud", LCIMConversationMemberInfo.Manager);
LCIMConversationMemberInfo memberInfo = await conversation.GetMemberInfo("cloud");
Assert.True(memberInfo.IsManager);
try {
await conversation.UpdateMemberRole("cloud", LCIMConversationMemberInfo.Manager);
LCIMConversationMemberInfo memberInfo = await conversation.GetMemberInfo("cloud");
Assert.True(memberInfo.IsManager);
} catch (LCException e) {
if (e.Code == 4325) {
tcs.TrySetResult(null);
} else {
throw e;
}
}
await tcs.Task;
}

Expand All @@ -176,16 +219,21 @@ public async Task RemoveMember() {

[Test]
[Order(7)]
[Timeout(60000)]
public async Task UpdateInfo() {
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
lean.OnConversationInfoUpdated = (conv, attrs, initBy) => {
WriteLine(attrs);
Assert.AreEqual(conv.Name, "leancloud");
Assert.AreEqual(conv["k1"], "v1");
Assert.AreEqual(conv["k2"], "v2");
Assert.AreEqual(attrs["k1"], "v1");
Assert.AreEqual(attrs["k2"], "v2");
tcs.SetResult(null);
tcs.TrySetResult(null);
};

await Task.Delay(5000);

await conversation.UpdateInfo(new Dictionary<string, object> {
{ "name", "leancloud" },
{ "k1", "v1" },
Expand All @@ -194,7 +242,7 @@ await conversation.UpdateInfo(new Dictionary<string, object> {
Assert.AreEqual(conversation.Name, "leancloud");
Assert.AreEqual(conversation["k1"], "v1");
Assert.AreEqual(conversation["k2"], "v2");
// BUG: 已知

//await tcs.Task;
}
}
Expand Down
35 changes: 28 additions & 7 deletions Realtime/Realtime.Test/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,22 +176,27 @@ public async Task Unread() {

client.OnUnreadMessagesCountUpdated = (convs) => {
foreach (LCIMConversation conv in convs) {
WriteLine($"unread count: {conv.Unread}");
//Assert.AreEqual(conv.Unread, 1);
//Assert.True(conv.LastMessage is LCIMTextMessage);
//LCIMTextMessage textMsg = conv.LastMessage as LCIMTextMessage;
//Assert.AreEqual(textMsg.Text, "hello");
WriteLine($"OnUnreadMessagesCountUpdated unread count: {conv.Unread}");
Assert.AreEqual(conv.Unread, 1);
Assert.True(conv.LastMessage is LCIMTextMessage);
LCIMTextMessage textMsg = conv.LastMessage as LCIMTextMessage;
Assert.AreEqual(textMsg.Text, "hello");
tcs.TrySetResult(null);
}
};
await Task.Delay(5000);
await client.Open();

await tcs.Task;

tcs = new TaskCompletionSource<object>();
client.OnMessage = (conv, msg) => {
WriteLine($"unread count: {conv.Unread}");
WriteLine($"OnMessage unread count: {conv.Unread}");
Assert.AreEqual(conv.Unread, 2);
Assert.True(conv.LastMessage is LCIMTextMessage);
LCIMTextMessage textMsg = conv.LastMessage as LCIMTextMessage;
Assert.AreEqual(textMsg.Text, "world");
tcs.SetResult(true);
tcs.TrySetResult(null);
};
textMessage = new LCIMTextMessage("world");
await conversation.Send(textMessage);
Expand Down Expand Up @@ -276,5 +281,21 @@ public async Task MentionAll() {

await tcs.Task;
}

[Test]
[Order(10)]
public async Task SendMessageConcurrently() {
LCIMClient client = new LCIMClient("client");
await client.Open();
LCIMConversation conv = await client.CreateConversation(new string[] { "xxx" });
LCIMTextMessage msg = new LCIMTextMessage("hello");
for (int i = 0; i < 1000; i++) {
_ = Task.Run(() => {
_ = conv.Send(msg);
});
}
await Task.Delay(5000);
await client.Close();
}
}
}
59 changes: 45 additions & 14 deletions Realtime/Realtime/Internal/WebSocket/LCWebSocketClient.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
using System;
using System.Threading.Tasks;
using System.Net.WebSockets;
using System.Collections.Concurrent;
using System.Text;

namespace LeanCloud.Realtime.Internal.WebSocket {
public class LCWebSocketClient {
class SendTask {
internal TaskCompletionSource<object> Tcs { get; set; }
internal ArraySegment<byte> Bytes { get; set; }
internal WebSocketMessageType MessageType { get; set; }
}

// .net standard 2.0 好像在拼合 Frame 时有 bug,所以将这个值调整大一些
private const int SEND_BUFFER_SIZE = 1024 * 5;
private const int RECV_BUFFER_SIZE = 1024 * 8;
Expand All @@ -20,6 +27,8 @@ public class LCWebSocketClient {

private ClientWebSocket ws;

private ConcurrentQueue<SendTask> sendQueue;

public async Task Connect(string server,
string subProtocol = null) {
LCLogger.Debug($"Connecting WebSocket: {server}");
Expand All @@ -34,6 +43,7 @@ public async Task Connect(string server,
LCLogger.Debug($"Connected WebSocket: {server}");
await connectTask;
// 接收
_ = StartSend();
_ = StartReceive();
} else {
throw new TimeoutException("Connect timeout");
Expand All @@ -59,27 +69,48 @@ public async Task Close() {
}
}

public async Task Send(byte[] data,
public Task Send(byte[] data,
WebSocketMessageType messageType = WebSocketMessageType.Binary) {
ArraySegment<byte> bytes = new ArraySegment<byte>(data);
if (ws.State == WebSocketState.Open) {
try {
await ws.SendAsync(bytes, messageType, true, default);
} catch (Exception e) {
LCLogger.Error(e);
throw e;
}
} else {
string message = $"Error Websocket state: {ws.State}";
LCLogger.Error(message);
throw new Exception(message);
}
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
sendQueue.Enqueue(new SendTask {
Tcs = tcs,
Bytes = new ArraySegment<byte>(data),
MessageType = messageType
});

return tcs.Task;
}

public async Task Send(string text) {
await Send(Encoding.UTF8.GetBytes(text), WebSocketMessageType.Text);
}

private async Task StartSend() {
sendQueue = new ConcurrentQueue<SendTask>();
try {
while (ws.State == WebSocketState.Open) {
if (sendQueue.Count == 0) {
await Task.Delay(10);
}
while (sendQueue.Count > 0) {
if (sendQueue.TryDequeue(out SendTask sendTask)) {
try {
await ws.SendAsync(sendTask.Bytes, sendTask.MessageType, true, default);
sendTask.Tcs.TrySetResult(null);
} catch (Exception e) {
LCLogger.Error(e);
sendTask.Tcs.TrySetException(e);
throw e;
}
}
}
}
} catch (Exception e) {
LCLogger.Error(e);
HandleExceptionClose();
}
}

private async Task StartReceive() {
byte[] recvBuffer = new byte[RECV_BUFFER_SIZE];
byte[] msgBuffer = new byte[MSG_BUFFER_SIZE];
Expand Down

0 comments on commit 530c914

Please sign in to comment.