Skip to content

Commit

Permalink
Merge pull request #745 from riversdark0/feature/StreamingHubOnConnected
Browse files Browse the repository at this point in the history
Add StreamingHub OnConnected support
  • Loading branch information
mayuki authored Mar 21, 2024
2 parents b1d7bc6 + ebc6e42 commit 015e5a0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
14 changes: 14 additions & 0 deletions src/MagicOnion.Server/Hubs/StreamingHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ protected virtual ValueTask OnConnecting()
return CompletedTask;
}

/// <summary>
/// Called after connect (headers and marker have been sent).
/// Allow the server send message to the client or broadcast to group.
/// </summary>
protected virtual ValueTask OnConnected()
{
return CompletedTask;
}

/// <summary>
/// Called after disconnect.
/// </summary>
Expand Down Expand Up @@ -158,6 +167,11 @@ async Task HandleMessageAsync()
// NOTE: To prevent buffering by AWS ALB or reverse-proxy.
await writer.WriteAsync(MarkerResponseBytes);

// Call OnConnected after sending the headers and marker.
// The server can send messages or broadcast to client after OnConnected.
// eg: Send the current game state to the client.
await OnConnected();

var handlers = StreamingHubHandlerRepository.GetHandlers(Context.MethodHandler);

// Main loop of StreamingHub.
Expand Down
26 changes: 24 additions & 2 deletions tests/MagicOnion.Server.Tests/StreamingHubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace MagicOnion.Server.Tests;

public interface IMessageReceiver
{
void VoidOnConnected(int x, string y, double z);
//Task ZeroArgument();
//Task OneArgument(int x);
//Task MoreArgument(int x, string y, double z);
Expand Down Expand Up @@ -60,6 +61,11 @@ protected override async ValueTask OnConnecting()
group = await Group.AddAsync("global");
}

protected override async ValueTask OnConnected()
{
BroadcastToSelf(group).VoidOnConnected(123, "foo", 12.3f);
}

protected override async ValueTask OnDisconnected()
{
if (group != null) await group.RemoveAsync(Context);
Expand Down Expand Up @@ -134,6 +140,15 @@ public BasicStreamingHubTest(ITestOutputHelper logger, ServerFixture<TestHub> se
this.channel = server.DefaultChannel;
}

[Fact]
public async Task OnConnected()
{
client = await StreamingHubClient.ConnectAsync<ITestHub, IMessageReceiver>(channel, this);
var x = await voidOnConnectedTask.Task;
x.Should().Be((123, "foo", 12.3f));
await client.DisposeAsync();
}

[Fact]
public async Task ZeroArgument()
{
Expand Down Expand Up @@ -327,6 +342,13 @@ public async Task RetrunOneArgument3()
// one3Task.TrySetResult(x);
//}

TaskCompletionSource<(int, string, double)> voidOnConnectedTask = new TaskCompletionSource<(int, string, double)>();
void IMessageReceiver.VoidOnConnected(int x, string y, double z)
{
voidOnConnectedTask.TrySetResult((x, y, z));
}


TaskCompletionSource<(int, string, double)> voidmoreTask = new TaskCompletionSource<(int, string, double)>();
void IMessageReceiver.VoidMoreArgument(int x, string y, double z)
{
Expand Down Expand Up @@ -399,7 +421,7 @@ public async Task StatusCodeAsync()
[StreamingHubTestFilter]
public async Task FilterCheckAsync()
{

}
}

Expand All @@ -415,7 +437,7 @@ public override async ValueTask Invoke(StreamingHubContext context, Func<Streami
context.Items["HubFilter1_BF"] = "AfterOK";
}
}

public class MoreCheckHubTest : IEmptyReceiver, IDisposable, IClassFixture<ServerFixture<MoreCheckHub>>
{
ITestOutputHelper logger;
Expand Down

0 comments on commit 015e5a0

Please sign in to comment.