From 8385f6f56c237170c84f248edd211558d9687ff2 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 10 Dec 2023 19:54:55 +0100 Subject: [PATCH 1/6] introduce actor to publish events in eventstream --- src/Proto.Actor/EventStream/DeadLetter.cs | 4 +- src/Proto.Actor/EventStream/EventStream.cs | 88 +++++++++++++++++++--- 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/src/Proto.Actor/EventStream/DeadLetter.cs b/src/Proto.Actor/EventStream/DeadLetter.cs index 8072063449..fa40dc7c56 100644 --- a/src/Proto.Actor/EventStream/DeadLetter.cs +++ b/src/Proto.Actor/EventStream/DeadLetter.cs @@ -16,8 +16,8 @@ namespace Proto; /// /// A wrapper for a message that could not be delivered to the original recipient. Such message is wrapped in -/// a by the and forwarded -/// to the +/// a by the and forwarded +/// to the /// [PublicAPI] public class DeadLetterEvent diff --git a/src/Proto.Actor/EventStream/EventStream.cs b/src/Proto.Actor/EventStream/EventStream.cs index d4b8553445..68a7147639 100644 --- a/src/Proto.Actor/EventStream/EventStream.cs +++ b/src/Proto.Actor/EventStream/EventStream.cs @@ -24,10 +24,19 @@ namespace Proto; [PublicAPI] public class EventStream : EventStream { +#pragma warning disable CS0618 // Type or member is obsolete private readonly ILogger _logger = Log.CreateLogger(); + private readonly PID _pid; + private readonly ActorSystem _system; +#pragma warning restore CS0618 // Type or member is obsolete internal EventStream(ActorSystem system) { + _system = system; + var props = Props.FromProducer(() => new EventStreamPublisherActor()); + _pid = system.Root.SpawnNamedSystem( props,"$eventstream-actor"); + + if (!_logger.IsEnabled(LogLevel.Information)) { return; @@ -60,6 +69,61 @@ internal EventStream(ActorSystem system) } ); } + + public override void Publish(object msg) + { + foreach (var sub in Subscriptions.Values) + { + var action = () => + { + sub.Dispatcher.Schedule( + () => + { + try + { + sub.Action(msg); + } + catch (Exception ex) + { + ex.CheckFailFast(); + _logger.LogError(0, ex, "Exception has occurred when publishing a message"); + } + + return Task.CompletedTask; + } + ); + }; + + var runner = new EventStreamRunner(action); + _system.Root.Send(_pid, runner); + } + } +} + + +public record EventStreamRunner(Action Run); +public class EventStreamPublisherActor : IActor +{ +#pragma warning disable CS0618 // Type or member is obsolete + private readonly ILogger _logger = Log.CreateLogger(); +#pragma warning restore CS0618 // Type or member is obsolete + public Task ReceiveAsync(IContext context) + { + if (context.Message is EventStreamRunner runner) + { + try + { + runner.Run(); + } + catch(Exception ex) + { + ex.CheckFailFast(); + _logger.LogError(0, ex, "Exception has occurred when publishing a message"); + } + } + + return Task.CompletedTask; + } } /// @@ -69,9 +133,11 @@ internal EventStream(ActorSystem system) [PublicAPI] public class EventStream { +#pragma warning disable CS0618 // Type or member is obsolete private readonly ILogger _logger = Log.CreateLogger>(); +#pragma warning restore CS0618 // Type or member is obsolete - private readonly ConcurrentDictionary> _subscriptions = new(); + public ConcurrentDictionary> Subscriptions { get; } = new(); internal EventStream() { @@ -96,7 +162,7 @@ public EventStreamSubscription Subscribe(Action action, IDispatcher? dispa } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -115,7 +181,7 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis async x => { await channel.Writer.WriteAsync(x).ConfigureAwait(false); } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -129,7 +195,7 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis public EventStreamSubscription Subscribe(Func action, IDispatcher? dispatcher = null) { var sub = new EventStreamSubscription(this, dispatcher ?? Dispatchers.SynchronousDispatcher, action); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -157,7 +223,7 @@ public EventStreamSubscription Subscribe(Action action, IDispatch } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -189,7 +255,7 @@ public EventStreamSubscription Subscribe( } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -219,7 +285,7 @@ public EventStreamSubscription Subscribe(ISenderContext context, params } ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -239,7 +305,7 @@ public EventStreamSubscription Subscribe(Func action, IDisp msg => msg is TMsg typed ? action(typed) : Task.CompletedTask ); - _subscriptions.TryAdd(sub.Id, sub); + Subscriptions.TryAdd(sub.Id, sub); return sub; } @@ -248,9 +314,9 @@ public EventStreamSubscription Subscribe(Func action, IDisp /// Publish a message to the event stream /// /// A message to publish - public void Publish(T msg) + public virtual void Publish(T msg) { - foreach (var sub in _subscriptions.Values) + foreach (var sub in Subscriptions.Values) { sub.Dispatcher.Schedule( () => @@ -275,7 +341,7 @@ public void Publish(T msg) /// Remove a subscription by id /// /// Subscription id - public void Unsubscribe(Guid id) => _subscriptions.TryRemove(id, out _); + public void Unsubscribe(Guid id) => Subscriptions.TryRemove(id, out _); /// /// Remove a subscription From 8f26fd49755c1915df70cb58fbbf4c06e6f0c89d Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 10 Dec 2023 20:04:10 +0100 Subject: [PATCH 2/6] publish wait --- src/Proto.Actor/EventStream/EventStream.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Proto.Actor/EventStream/EventStream.cs b/src/Proto.Actor/EventStream/EventStream.cs index 68a7147639..1109cda58e 100644 --- a/src/Proto.Actor/EventStream/EventStream.cs +++ b/src/Proto.Actor/EventStream/EventStream.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Concurrent; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using JetBrains.Annotations; @@ -72,6 +73,11 @@ internal EventStream(ActorSystem system) public override void Publish(object msg) { + if (_pid == null) + { + SpinWait.SpinUntil(() => _pid != null); + } + foreach (var sub in Subscriptions.Values) { var action = () => From 713de3d9deda6c7837bf158f3cc0b66b6316a090 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 10 Dec 2023 20:07:25 +0100 Subject: [PATCH 3/6] . --- src/Proto.Actor/ActorSystem.cs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/Proto.Actor/ActorSystem.cs b/src/Proto.Actor/ActorSystem.cs index c67a35cec5..617d3f6dc5 100644 --- a/src/Proto.Actor/ActorSystem.cs +++ b/src/Proto.Actor/ActorSystem.cs @@ -13,6 +13,7 @@ using Proto.Diagnostics; using Proto.Extensions; using Proto.Future; +using Proto.Mailbox; using Proto.Metrics; using Proto.Utils; @@ -49,8 +50,9 @@ public ActorSystem(ActorSystemConfig config) DeadLetter = dl.Configure(); ProcessRegistry.TryAdd("$deadletter", DeadLetter); Guardians = new Guardians(this); - EventStream = new EventStream(this); Metrics = new ProtoMetrics(config.MetricsEnabled); + EventStream = new EventStream(this); + ActorDispatcher = new ActorDispatcher(this); var eventStream = new EventStreamProcess(this).Configure(); ProcessRegistry.TryAdd("$eventstream", eventStream); Extensions = new ActorSystemExtensions(this); @@ -88,6 +90,11 @@ public ActorSystem(ActorSystemConfig config) /// Manages all processes in the actor system (actors, futures, event stream, etc.). /// public ProcessRegistry ProcessRegistry { get; } + + /// + /// Dispatcher used to schedule messages via an actor. + /// + public IDispatcher ActorDispatcher { get; } /// /// Root context of the actor system. Use it to spawn actors or send messages from outside of an actor context. From fc5119fa00c2be49d1f74d2de79be7775939bd08 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 10 Dec 2023 20:12:49 +0100 Subject: [PATCH 4/6] . --- src/Proto.Actor/ActorSystem.cs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/Proto.Actor/ActorSystem.cs b/src/Proto.Actor/ActorSystem.cs index 617d3f6dc5..e808b50a6c 100644 --- a/src/Proto.Actor/ActorSystem.cs +++ b/src/Proto.Actor/ActorSystem.cs @@ -52,7 +52,6 @@ public ActorSystem(ActorSystemConfig config) Guardians = new Guardians(this); Metrics = new ProtoMetrics(config.MetricsEnabled); EventStream = new EventStream(this); - ActorDispatcher = new ActorDispatcher(this); var eventStream = new EventStreamProcess(this).Configure(); ProcessRegistry.TryAdd("$eventstream", eventStream); Extensions = new ActorSystemExtensions(this); @@ -90,11 +89,6 @@ public ActorSystem(ActorSystemConfig config) /// Manages all processes in the actor system (actors, futures, event stream, etc.). /// public ProcessRegistry ProcessRegistry { get; } - - /// - /// Dispatcher used to schedule messages via an actor. - /// - public IDispatcher ActorDispatcher { get; } /// /// Root context of the actor system. Use it to spawn actors or send messages from outside of an actor context. From e96602c5c5ec0b83f445462cac3d0a777ba2c6c4 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 10 Dec 2023 20:17:50 +0100 Subject: [PATCH 5/6] . --- tests/Proto.Actor.Tests/EventStreamTests.cs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/Proto.Actor.Tests/EventStreamTests.cs b/tests/Proto.Actor.Tests/EventStreamTests.cs index 6daa2913fe..1970a4948d 100644 --- a/tests/Proto.Actor.Tests/EventStreamTests.cs +++ b/tests/Proto.Actor.Tests/EventStreamTests.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Proto.Mailbox; using Xunit; @@ -17,6 +18,9 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes() eventStream.Subscribe(theString => received = theString); eventStream.Publish("hello"); + + await Task.Delay(1000); + Assert.Equal("hello", received); } @@ -32,6 +36,9 @@ public async Task EventStream_CanSubscribeToAllEventTypes() eventStream.Publish("hello"); eventStream.Publish(1); eventStream.Publish(true); + + await Task.Delay(1000); + Assert.Equal(3, receivedEvents.Count); } @@ -46,6 +53,9 @@ public async Task EventStream_CanUnsubscribeFromEvents() eventStream.Publish("first message"); subscription.Unsubscribe(); eventStream.Publish("second message"); + + await Task.Delay(1000); + Assert.Single(receivedEvents); } @@ -59,6 +69,9 @@ public async Task EventStream_OnlyReceiveSubscribedToEventTypes() var eventsReceived = new List(); eventStream.Subscribe(@event => eventsReceived.Add(@event)); eventStream.Publish("not an int"); + + await Task.Delay(1000); + Assert.Empty(eventsReceived); } @@ -79,5 +92,6 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes_Async() ); eventStream.Publish("hello"); + await Task.Delay(1000); } } \ No newline at end of file From 530810ff693730dda7abb2e67ec4c168089ebda3 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 10 Dec 2023 20:25:19 +0100 Subject: [PATCH 6/6] remove dispatchers from eventstream --- src/Proto.Actor/EventStream/EventStream.cs | 68 ++++++++-------------- 1 file changed, 24 insertions(+), 44 deletions(-) diff --git a/src/Proto.Actor/EventStream/EventStream.cs b/src/Proto.Actor/EventStream/EventStream.cs index 1109cda58e..c17f53c56b 100644 --- a/src/Proto.Actor/EventStream/EventStream.cs +++ b/src/Proto.Actor/EventStream/EventStream.cs @@ -80,27 +80,20 @@ public override void Publish(object msg) foreach (var sub in Subscriptions.Values) { - var action = () => + void Action() { - sub.Dispatcher.Schedule( - () => - { - try - { - sub.Action(msg); - } - catch (Exception ex) - { - ex.CheckFailFast(); - _logger.LogError(0, ex, "Exception has occurred when publishing a message"); - } - - return Task.CompletedTask; - } - ); - }; + try + { + sub.Action(msg); + } + catch (Exception ex) + { + ex.CheckFailFast(); + _logger.LogError(0, ex, "Exception has occurred when publishing a message"); + } + } - var runner = new EventStreamRunner(action); + var runner = new EventStreamRunner(Action); _system.Root.Send(_pid, runner); } } @@ -159,7 +152,6 @@ public EventStreamSubscription Subscribe(Action action, IDispatcher? dispa { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, x => { action(x); @@ -183,7 +175,6 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, async x => { await channel.Writer.WriteAsync(x).ConfigureAwait(false); } ); @@ -200,7 +191,7 @@ public EventStreamSubscription Subscribe(Channel channel, IDispatcher? dis /// A new subscription that can be used to unsubscribe public EventStreamSubscription Subscribe(Func action, IDispatcher? dispatcher = null) { - var sub = new EventStreamSubscription(this, dispatcher ?? Dispatchers.SynchronousDispatcher, action); + var sub = new EventStreamSubscription(this, action); Subscriptions.TryAdd(sub.Id, sub); return sub; @@ -217,7 +208,6 @@ public EventStreamSubscription Subscribe(Action action, IDispatch { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, msg => { if (msg is TMsg typed) @@ -249,7 +239,6 @@ public EventStreamSubscription Subscribe( { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, msg => { if (msg is TMsg typed && predicate(typed)) @@ -276,7 +265,6 @@ public EventStreamSubscription Subscribe(ISenderContext context, params { var sub = new EventStreamSubscription( this, - Dispatchers.SynchronousDispatcher, msg => { if (msg is TMsg) @@ -307,7 +295,6 @@ public EventStreamSubscription Subscribe(Func action, IDisp { var sub = new EventStreamSubscription( this, - dispatcher ?? Dispatchers.SynchronousDispatcher, msg => msg is TMsg typed ? action(typed) : Task.CompletedTask ); @@ -324,25 +311,20 @@ public virtual void Publish(T msg) { foreach (var sub in Subscriptions.Values) { - sub.Dispatcher.Schedule( - () => - { - try - { - sub.Action(msg); - } - catch (Exception ex) - { - ex.CheckFailFast(); - _logger.LogError(0, ex, "Exception has occurred when publishing a message"); - } - return Task.CompletedTask; - } - ); + try + { + sub.Action(msg); + } + catch (Exception ex) + { + ex.CheckFailFast(); + _logger.LogError(0, ex, "Exception has occurred when publishing a message"); + } } } + /// /// Remove a subscription by id /// @@ -366,16 +348,14 @@ public class EventStreamSubscription { private readonly EventStream _eventStream; - public EventStreamSubscription(EventStream eventStream, IDispatcher dispatcher, Func action) + public EventStreamSubscription(EventStream eventStream, Func action) { Id = Guid.NewGuid(); _eventStream = eventStream; - Dispatcher = dispatcher; Action = action; } public Guid Id { get; } - public IDispatcher Dispatcher { get; } public Func Action { get; } public void Unsubscribe() => _eventStream.Unsubscribe(Id);