diff --git a/src/Proto.Actor/ActorSystem.cs b/src/Proto.Actor/ActorSystem.cs
index c67a35cec5..e808b50a6c 100644
--- a/src/Proto.Actor/ActorSystem.cs
+++ b/src/Proto.Actor/ActorSystem.cs
@@ -13,6 +13,7 @@
using Proto.Diagnostics;
using Proto.Extensions;
using Proto.Future;
+using Proto.Mailbox;
using Proto.Metrics;
using Proto.Utils;
@@ -49,8 +50,8 @@ public ActorSystem(ActorSystemConfig config)
DeadLetter = dl.Configure();
ProcessRegistry.TryAdd("$deadletter", DeadLetter);
Guardians = new Guardians(this);
- EventStream = new EventStream(this);
Metrics = new ProtoMetrics(config.MetricsEnabled);
+ EventStream = new EventStream(this);
var eventStream = new EventStreamProcess(this).Configure();
ProcessRegistry.TryAdd("$eventstream", eventStream);
Extensions = new ActorSystemExtensions(this);
diff --git a/src/Proto.Actor/EventStream/DeadLetter.cs b/src/Proto.Actor/EventStream/DeadLetter.cs
index 8072063449..fa40dc7c56 100644
--- a/src/Proto.Actor/EventStream/DeadLetter.cs
+++ b/src/Proto.Actor/EventStream/DeadLetter.cs
@@ -16,8 +16,8 @@ namespace Proto;
///
/// A wrapper for a message that could not be delivered to the original recipient. Such message is wrapped in
-/// a by the and forwarded
-/// to the
+/// a by the and forwarded
+/// to the
///
[PublicAPI]
public class DeadLetterEvent
diff --git a/src/Proto.Actor/EventStream/EventStream.cs b/src/Proto.Actor/EventStream/EventStream.cs
index d4b8553445..c17f53c56b 100644
--- a/src/Proto.Actor/EventStream/EventStream.cs
+++ b/src/Proto.Actor/EventStream/EventStream.cs
@@ -8,6 +8,7 @@
using System;
using System.Collections.Concurrent;
+using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using JetBrains.Annotations;
@@ -24,10 +25,19 @@ namespace Proto;
[PublicAPI]
public class EventStream : EventStream
{
+#pragma warning disable CS0618 // Type or member is obsolete
private readonly ILogger _logger = Log.CreateLogger();
+ private readonly PID _pid;
+ private readonly ActorSystem _system;
+#pragma warning restore CS0618 // Type or member is obsolete
internal EventStream(ActorSystem system)
{
+ _system = system;
+ var props = Props.FromProducer(() => new EventStreamPublisherActor());
+ _pid = system.Root.SpawnNamedSystem( props,"$eventstream-actor");
+
+
if (!_logger.IsEnabled(LogLevel.Information))
{
return;
@@ -60,6 +70,59 @@ internal EventStream(ActorSystem system)
}
);
}
+
+ public override void Publish(object msg)
+ {
+ if (_pid == null)
+ {
+ SpinWait.SpinUntil(() => _pid != null);
+ }
+
+ foreach (var sub in Subscriptions.Values)
+ {
+ void Action()
+ {
+ try
+ {
+ sub.Action(msg);
+ }
+ catch (Exception ex)
+ {
+ ex.CheckFailFast();
+ _logger.LogError(0, ex, "Exception has occurred when publishing a message");
+ }
+ }
+
+ var runner = new EventStreamRunner(Action);
+ _system.Root.Send(_pid, runner);
+ }
+ }
+}
+
+
+public record EventStreamRunner(Action Run);
+public class EventStreamPublisherActor : IActor
+{
+#pragma warning disable CS0618 // Type or member is obsolete
+ private readonly ILogger _logger = Log.CreateLogger();
+#pragma warning restore CS0618 // Type or member is obsolete
+ public Task ReceiveAsync(IContext context)
+ {
+ if (context.Message is EventStreamRunner runner)
+ {
+ try
+ {
+ runner.Run();
+ }
+ catch(Exception ex)
+ {
+ ex.CheckFailFast();
+ _logger.LogError(0, ex, "Exception has occurred when publishing a message");
+ }
+ }
+
+ return Task.CompletedTask;
+ }
}
///
@@ -69,9 +132,11 @@ internal EventStream(ActorSystem system)
[PublicAPI]
public class EventStream
{
+#pragma warning disable CS0618 // Type or member is obsolete
private readonly ILogger _logger = Log.CreateLogger>();
+#pragma warning restore CS0618 // Type or member is obsolete
- private readonly ConcurrentDictionary> _subscriptions = new();
+ public ConcurrentDictionary> Subscriptions { get; } = new();
internal EventStream()
{
@@ -87,7 +152,6 @@ public EventStreamSubscription Subscribe(Action action, IDispatcher? dispa
{
var sub = new EventStreamSubscription(
this,
- dispatcher ?? Dispatchers.SynchronousDispatcher,
x =>
{
action(x);
@@ -96,7 +160,7 @@ public EventStreamSubscription Subscribe(Action action, IDispatcher? dispa
}
);
- _subscriptions.TryAdd(sub.Id, sub);
+ Subscriptions.TryAdd(sub.Id, sub);
return sub;
}
@@ -111,11 +175,10 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis
{
var sub = new EventStreamSubscription(
this,
- dispatcher ?? Dispatchers.SynchronousDispatcher,
async x => { await channel.Writer.WriteAsync(x).ConfigureAwait(false); }
);
- _subscriptions.TryAdd(sub.Id, sub);
+ Subscriptions.TryAdd(sub.Id, sub);
return sub;
}
@@ -128,8 +191,8 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis
/// A new subscription that can be used to unsubscribe
public EventStreamSubscription Subscribe(Func action, IDispatcher? dispatcher = null)
{
- var sub = new EventStreamSubscription(this, dispatcher ?? Dispatchers.SynchronousDispatcher, action);
- _subscriptions.TryAdd(sub.Id, sub);
+ var sub = new EventStreamSubscription(this, action);
+ Subscriptions.TryAdd(sub.Id, sub);
return sub;
}
@@ -145,7 +208,6 @@ public EventStreamSubscription Subscribe(Action action, IDispatch
{
var sub = new EventStreamSubscription(
this,
- dispatcher ?? Dispatchers.SynchronousDispatcher,
msg =>
{
if (msg is TMsg typed)
@@ -157,7 +219,7 @@ public EventStreamSubscription Subscribe(Action action, IDispatch
}
);
- _subscriptions.TryAdd(sub.Id, sub);
+ Subscriptions.TryAdd(sub.Id, sub);
return sub;
}
@@ -177,7 +239,6 @@ public EventStreamSubscription Subscribe(
{
var sub = new EventStreamSubscription(
this,
- dispatcher ?? Dispatchers.SynchronousDispatcher,
msg =>
{
if (msg is TMsg typed && predicate(typed))
@@ -189,7 +250,7 @@ public EventStreamSubscription Subscribe(
}
);
- _subscriptions.TryAdd(sub.Id, sub);
+ Subscriptions.TryAdd(sub.Id, sub);
return sub;
}
@@ -204,7 +265,6 @@ public EventStreamSubscription Subscribe(ISenderContext context, params
{
var sub = new EventStreamSubscription(
this,
- Dispatchers.SynchronousDispatcher,
msg =>
{
if (msg is TMsg)
@@ -219,7 +279,7 @@ public EventStreamSubscription Subscribe(ISenderContext context, params
}
);
- _subscriptions.TryAdd(sub.Id, sub);
+ Subscriptions.TryAdd(sub.Id, sub);
return sub;
}
@@ -235,11 +295,10 @@ public EventStreamSubscription Subscribe(Func action, IDisp
{
var sub = new EventStreamSubscription(
this,
- dispatcher ?? Dispatchers.SynchronousDispatcher,
msg => msg is TMsg typed ? action(typed) : Task.CompletedTask
);
- _subscriptions.TryAdd(sub.Id, sub);
+ Subscriptions.TryAdd(sub.Id, sub);
return sub;
}
@@ -248,34 +307,29 @@ public EventStreamSubscription Subscribe(Func action, IDisp
/// Publish a message to the event stream
///
/// A message to publish
- public void Publish(T msg)
+ public virtual void Publish(T msg)
{
- foreach (var sub in _subscriptions.Values)
+ foreach (var sub in Subscriptions.Values)
{
- sub.Dispatcher.Schedule(
- () =>
- {
- try
- {
- sub.Action(msg);
- }
- catch (Exception ex)
- {
- ex.CheckFailFast();
- _logger.LogError(0, ex, "Exception has occurred when publishing a message");
- }
- return Task.CompletedTask;
- }
- );
+ try
+ {
+ sub.Action(msg);
+ }
+ catch (Exception ex)
+ {
+ ex.CheckFailFast();
+ _logger.LogError(0, ex, "Exception has occurred when publishing a message");
+ }
}
}
+
///
/// Remove a subscription by id
///
/// Subscription id
- public void Unsubscribe(Guid id) => _subscriptions.TryRemove(id, out _);
+ public void Unsubscribe(Guid id) => Subscriptions.TryRemove(id, out _);
///
/// Remove a subscription
@@ -294,16 +348,14 @@ public class EventStreamSubscription
{
private readonly EventStream _eventStream;
- public EventStreamSubscription(EventStream eventStream, IDispatcher dispatcher, Func action)
+ public EventStreamSubscription(EventStream eventStream, Func action)
{
Id = Guid.NewGuid();
_eventStream = eventStream;
- Dispatcher = dispatcher;
Action = action;
}
public Guid Id { get; }
- public IDispatcher Dispatcher { get; }
public Func Action { get; }
public void Unsubscribe() => _eventStream.Unsubscribe(Id);
diff --git a/tests/Proto.Actor.Tests/EventStreamTests.cs b/tests/Proto.Actor.Tests/EventStreamTests.cs
index 6daa2913fe..1970a4948d 100644
--- a/tests/Proto.Actor.Tests/EventStreamTests.cs
+++ b/tests/Proto.Actor.Tests/EventStreamTests.cs
@@ -1,4 +1,5 @@
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using Proto.Mailbox;
using Xunit;
@@ -17,6 +18,9 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes()
eventStream.Subscribe(theString => received = theString);
eventStream.Publish("hello");
+
+ await Task.Delay(1000);
+
Assert.Equal("hello", received);
}
@@ -32,6 +36,9 @@ public async Task EventStream_CanSubscribeToAllEventTypes()
eventStream.Publish("hello");
eventStream.Publish(1);
eventStream.Publish(true);
+
+ await Task.Delay(1000);
+
Assert.Equal(3, receivedEvents.Count);
}
@@ -46,6 +53,9 @@ public async Task EventStream_CanUnsubscribeFromEvents()
eventStream.Publish("first message");
subscription.Unsubscribe();
eventStream.Publish("second message");
+
+ await Task.Delay(1000);
+
Assert.Single(receivedEvents);
}
@@ -59,6 +69,9 @@ public async Task EventStream_OnlyReceiveSubscribedToEventTypes()
var eventsReceived = new List();
eventStream.Subscribe(@event => eventsReceived.Add(@event));
eventStream.Publish("not an int");
+
+ await Task.Delay(1000);
+
Assert.Empty(eventsReceived);
}
@@ -79,5 +92,6 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes_Async()
);
eventStream.Publish("hello");
+ await Task.Delay(1000);
}
}
\ No newline at end of file