Skip to content

Commit

Permalink
Eventstream channels (#2101)
Browse files Browse the repository at this point in the history
* subscribe to eventstream using typed channel

* assert publish

* rename
  • Loading branch information
rogeralsing authored Feb 28, 2024
1 parent c721e27 commit 1ea5d4f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/Proto.Actor/EventStream/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,31 @@ public EventStreamSubscription<T> Subscribe(Channel<T> channel, IDispatcher? dis

return sub;
}

/// <summary>
/// Subscribe to messages and yields the result onto a Channel
/// </summary>
/// <param name="channel">a Channel which receives the event</param>
/// <param name="dispatcher">Optional: the dispatcher, will use <see cref="Dispatchers.SynchronousDispatcher" /> by default</param>
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe<TMsg>(Channel<TMsg> channel, IDispatcher? dispatcher = null) where TMsg:T
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
async x =>
{
if (x is TMsg tc)
{
await channel.Writer.WriteAsync(tc).ConfigureAwait(false);
}
}
);

_subscriptions.TryAdd(sub.Id, sub);

return sub;
}

/// <summary>
/// Subscribe to messages with an asynchronous handler
Expand Down
18 changes: 18 additions & 0 deletions tests/Proto.Actor.Tests/EventStreamTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading.Channels;
using System.Threading.Tasks;
using Proto.Mailbox;
using Xunit;
Expand Down Expand Up @@ -80,4 +81,21 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes_Async()

eventStream.Publish("hello");
}

[Fact]
public async Task EventStream_CanSubscribeUsingChannel()
{
var system = new ActorSystem();
await using var _ = system;
var eventStream = system.EventStream;

var channel = Channel.CreateUnbounded<string>();
eventStream.Subscribe(channel);
eventStream.Publish(123);
eventStream.Publish(false);
eventStream.Publish("hello");

var res = await channel.Reader.ReadAsync();
Assert.Equal("hello",res);
}
}

0 comments on commit 1ea5d4f

Please sign in to comment.