diff --git a/src/Proto.Actor/ActorSystem.cs b/src/Proto.Actor/ActorSystem.cs index c67a35cec5..e808b50a6c 100644 --- a/src/Proto.Actor/ActorSystem.cs +++ b/src/Proto.Actor/ActorSystem.cs @@ -13,6 +13,7 @@ using Proto.Diagnostics; using Proto.Extensions; using Proto.Future; +using Proto.Mailbox; using Proto.Metrics; using Proto.Utils; @@ -49,8 +50,8 @@ public ActorSystem(ActorSystemConfig config) DeadLetter = dl.Configure(); ProcessRegistry.TryAdd("$deadletter", DeadLetter); Guardians = new Guardians(this); - EventStream = new EventStream(this); Metrics = new ProtoMetrics(config.MetricsEnabled); + EventStream = new EventStream(this); var eventStream = new EventStreamProcess(this).Configure(); ProcessRegistry.TryAdd("$eventstream", eventStream); Extensions = new ActorSystemExtensions(this); diff --git a/src/Proto.Actor/EventStream/DeadLetter.cs b/src/Proto.Actor/EventStream/DeadLetter.cs index 8072063449..fa40dc7c56 100644 --- a/src/Proto.Actor/EventStream/DeadLetter.cs +++ b/src/Proto.Actor/EventStream/DeadLetter.cs @@ -16,8 +16,8 @@ namespace Proto; /// /// A wrapper for a message that could not be delivered to the original recipient. Such message is wrapped in -/// a by the and forwarded -/// to the +/// a by the and forwarded +/// to the /// [PublicAPI] public class DeadLetterEvent diff --git a/src/Proto.Actor/EventStream/EventStream.cs b/src/Proto.Actor/EventStream/EventStream.cs index d4b8553445..c17f53c56b 100644 --- a/src/Proto.Actor/EventStream/EventStream.cs +++ b/src/Proto.Actor/EventStream/EventStream.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Concurrent; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using JetBrains.Annotations; @@ -24,10 +25,19 @@ namespace Proto; [PublicAPI] public class EventStream : EventStream { +#pragma warning disable CS0618 // Type or member is obsolete private readonly ILogger _logger = Log.CreateLogger(); + private readonly PID _pid; + private readonly ActorSystem _system; +#pragma warning restore CS0618 // Type or member is obsolete internal EventStream(ActorSystem system) { + _system = system; + var props = Props.FromProducer(() => new EventStreamPublisherActor()); + _pid = system.Root.SpawnNamedSystem( props,"$eventstream-actor"); + + if (!_logger.IsEnabled(LogLevel.Information)) { return; @@ -60,6 +70,59 @@ internal EventStream(ActorSystem system) } ); } + + public override void Publish(object msg) + { + if (_pid == null) + { + SpinWait.SpinUntil(() => _pid != null); + } + + foreach (var sub in Subscriptions.Values) + { + void Action() + { + try + { + sub.Action(msg); + } + catch (Exception ex) + { + ex.CheckFailFast(); + _logger.LogError(0, ex, "Exception has occurred when publishing a message"); + } + } + + var runner = new EventStreamRunner(Action); + _system.Root.Send(_pid, runner); + } + } +} + + +public record EventStreamRunner(Action Run); +public class EventStreamPublisherActor : IActor +{ +#pragma warning disable CS0618 // Type or member is obsolete + private readonly ILogger _logger = Log.CreateLogger(); +#pragma warning restore CS0618 // Type or member is obsolete + public Task ReceiveAsync(IContext context) + { + if (context.Message is EventStreamRunner runner) + { + try + { + runner.Run(); + } + catch(Exception ex) + { + ex.CheckFailFast(); + _logger.LogError(0, ex, "Exception has occurred when publishing a message"); + } + } + + return Task.CompletedTask; + } } /// @@ -69,9 +132,11 @@ internal EventStream(ActorSystem system) [PublicAPI] public class EventStream { +#pragma warning disable CS0618 // Type or member is obsolete private readonly ILogger _logger = Log.CreateLogger>(); +#pragma warning restore CS0618 // Type or member is obsolete - private readonly ConcurrentDictionary> _subscriptions = new(); + public ConcurrentDictionary> Subscriptions { get; } = new(); internal EventStream() { @@ -87,7 +152,6 @@ public EventStreamSubscription Subscribe(Action action, IDispatcher? dispa { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, x => { action(x); @@ -96,7 +160,7 @@ public EventStreamSubscription Subscribe(Action action, IDispatcher? dispa } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -111,11 +175,10 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, async x => { await channel.Writer.WriteAsync(x).ConfigureAwait(false); } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -128,8 +191,8 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis /// A new subscription that can be used to unsubscribe public EventStreamSubscription Subscribe(Func action, IDispatcher? dispatcher = null) { - var sub = new EventStreamSubscription(this, dispatcher ?? Dispatchers.SynchronousDispatcher, action); - _subscriptions.TryAdd(sub.Id, sub); + var sub = new EventStreamSubscription(this, action); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -145,7 +208,6 @@ public EventStreamSubscription Subscribe(Action action, IDispatch { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, msg => { if (msg is TMsg typed) @@ -157,7 +219,7 @@ public EventStreamSubscription Subscribe(Action action, IDispatch } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -177,7 +239,6 @@ public EventStreamSubscription Subscribe( { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, msg => { if (msg is TMsg typed && predicate(typed)) @@ -189,7 +250,7 @@ public EventStreamSubscription Subscribe( } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -204,7 +265,6 @@ public EventStreamSubscription Subscribe(ISenderContext context, params { var sub = new EventStreamSubscription( this, - Dispatchers.SynchronousDispatcher, msg => { if (msg is TMsg) @@ -219,7 +279,7 @@ public EventStreamSubscription Subscribe(ISenderContext context, params } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -235,11 +295,10 @@ public EventStreamSubscription Subscribe(Func action, IDisp { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, msg => msg is TMsg typed ? action(typed) : Task.CompletedTask ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -248,34 +307,29 @@ public EventStreamSubscription Subscribe(Func action, IDisp /// Publish a message to the event stream /// /// A message to publish - public void Publish(T msg) + public virtual void Publish(T msg) { - foreach (var sub in _subscriptions.Values) + foreach (var sub in Subscriptions.Values) { - sub.Dispatcher.Schedule( - () => - { - try - { - sub.Action(msg); - } - catch (Exception ex) - { - ex.CheckFailFast(); - _logger.LogError(0, ex, "Exception has occurred when publishing a message"); - } - return Task.CompletedTask; - } - ); + try + { + sub.Action(msg); + } + catch (Exception ex) + { + ex.CheckFailFast(); + _logger.LogError(0, ex, "Exception has occurred when publishing a message"); + } } } + /// /// Remove a subscription by id /// /// Subscription id - public void Unsubscribe(Guid id) => _subscriptions.TryRemove(id, out _); + public void Unsubscribe(Guid id) => Subscriptions.TryRemove(id, out _); /// /// Remove a subscription @@ -294,16 +348,14 @@ public class EventStreamSubscription { private readonly EventStream _eventStream; - public EventStreamSubscription(EventStream eventStream, IDispatcher dispatcher, Func action) + public EventStreamSubscription(EventStream eventStream, Func action) { Id = Guid.NewGuid(); _eventStream = eventStream; - Dispatcher = dispatcher; Action = action; } public Guid Id { get; } - public IDispatcher Dispatcher { get; } public Func Action { get; } public void Unsubscribe() => _eventStream.Unsubscribe(Id); diff --git a/tests/Proto.Actor.Tests/EventStreamTests.cs b/tests/Proto.Actor.Tests/EventStreamTests.cs index 6daa2913fe..1970a4948d 100644 --- a/tests/Proto.Actor.Tests/EventStreamTests.cs +++ b/tests/Proto.Actor.Tests/EventStreamTests.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Proto.Mailbox; using Xunit; @@ -17,6 +18,9 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes() eventStream.Subscribe(theString => received = theString); eventStream.Publish("hello"); + + await Task.Delay(1000); + Assert.Equal("hello", received); } @@ -32,6 +36,9 @@ public async Task EventStream_CanSubscribeToAllEventTypes() eventStream.Publish("hello"); eventStream.Publish(1); eventStream.Publish(true); + + await Task.Delay(1000); + Assert.Equal(3, receivedEvents.Count); } @@ -46,6 +53,9 @@ public async Task EventStream_CanUnsubscribeFromEvents() eventStream.Publish("first message"); subscription.Unsubscribe(); eventStream.Publish("second message"); + + await Task.Delay(1000); + Assert.Single(receivedEvents); } @@ -59,6 +69,9 @@ public async Task EventStream_OnlyReceiveSubscribedToEventTypes() var eventsReceived = new List(); eventStream.Subscribe(@event => eventsReceived.Add(@event)); eventStream.Publish("not an int"); + + await Task.Delay(1000); + Assert.Empty(eventsReceived); } @@ -79,5 +92,6 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes_Async() ); eventStream.Publish("hello"); + await Task.Delay(1000); } } \ No newline at end of file