Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce actor to publish events in eventstream #2078

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Proto.Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Proto.Diagnostics;
using Proto.Extensions;
using Proto.Future;
using Proto.Mailbox;
using Proto.Metrics;
using Proto.Utils;

Expand Down Expand Up @@ -49,8 +50,8 @@ public ActorSystem(ActorSystemConfig config)
DeadLetter = dl.Configure();
ProcessRegistry.TryAdd("$deadletter", DeadLetter);
Guardians = new Guardians(this);
EventStream = new EventStream(this);
Metrics = new ProtoMetrics(config.MetricsEnabled);
EventStream = new EventStream(this);
var eventStream = new EventStreamProcess(this).Configure();
ProcessRegistry.TryAdd("$eventstream", eventStream);
Extensions = new ActorSystemExtensions(this);
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Actor/EventStream/DeadLetter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace Proto;

/// <summary>
/// A wrapper for a message that could not be delivered to the original recipient. Such message is wrapped in
/// a <see cref="DeadLetterEvent{T}" /> by the <see cref="DeadLetterProcess" /> and forwarded
/// to the <see cref="EventStream{T}" />
/// a <see cref="DeadLetterEvent" /> by the <see cref="DeadLetterProcess" /> and forwarded
/// to the <see cref="EventStream" />
/// </summary>
[PublicAPI]
public class DeadLetterEvent
Expand Down
124 changes: 88 additions & 36 deletions src/Proto.Actor/EventStream/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand All @@ -24,10 +25,19 @@ namespace Proto;
[PublicAPI]
public class EventStream : EventStream<object>
{
#pragma warning disable CS0618 // Type or member is obsolete
private readonly ILogger _logger = Log.CreateLogger<EventStream>();
private readonly PID _pid;
private readonly ActorSystem _system;
#pragma warning restore CS0618 // Type or member is obsolete

internal EventStream(ActorSystem system)
{
_system = system;
var props = Props.FromProducer(() => new EventStreamPublisherActor());
_pid = system.Root.SpawnNamedSystem( props,"$eventstream-actor");


if (!_logger.IsEnabled(LogLevel.Information))
{
return;
Expand Down Expand Up @@ -60,6 +70,59 @@ internal EventStream(ActorSystem system)
}
);
}

public override void Publish(object msg)
{
if (_pid == null)
{
SpinWait.SpinUntil(() => _pid != null);
}

foreach (var sub in Subscriptions.Values)
{
void Action()
{
try
{
sub.Action(msg);
}
catch (Exception ex)
{
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}
}

var runner = new EventStreamRunner(Action);
_system.Root.Send(_pid, runner);
}
}
}


public record EventStreamRunner(Action Run);
public class EventStreamPublisherActor : IActor
{
#pragma warning disable CS0618 // Type or member is obsolete
private readonly ILogger _logger = Log.CreateLogger<EventStreamPublisherActor>();
#pragma warning restore CS0618 // Type or member is obsolete
public Task ReceiveAsync(IContext context)
{
if (context.Message is EventStreamRunner runner)
{
try
{
runner.Run();
}
catch(Exception ex)
{
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}
}

return Task.CompletedTask;
}
}

/// <summary>
Expand All @@ -69,9 +132,11 @@ internal EventStream(ActorSystem system)
[PublicAPI]
public class EventStream<T>
{
#pragma warning disable CS0618 // Type or member is obsolete
private readonly ILogger _logger = Log.CreateLogger<EventStream<T>>();
#pragma warning restore CS0618 // Type or member is obsolete

private readonly ConcurrentDictionary<Guid, EventStreamSubscription<T>> _subscriptions = new();
public ConcurrentDictionary<Guid, EventStreamSubscription<T>> Subscriptions { get; } = new();

internal EventStream()
{
Expand All @@ -87,7 +152,6 @@ public EventStreamSubscription<T> Subscribe(Action<T> action, IDispatcher? dispa
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
x =>
{
action(x);
Expand All @@ -96,7 +160,7 @@ public EventStreamSubscription<T> Subscribe(Action<T> action, IDispatcher? dispa
}
);

_subscriptions.TryAdd(sub.Id, sub);
Subscriptions.TryAdd(sub.Id, sub);

return sub;
}
Expand All @@ -111,11 +175,10 @@ public EventStreamSubscription<T> Subscribe(Channel<T> channel, IDispatcher? dis
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
async x => { await channel.Writer.WriteAsync(x).ConfigureAwait(false); }
);

_subscriptions.TryAdd(sub.Id, sub);
Subscriptions.TryAdd(sub.Id, sub);

return sub;
}
Expand All @@ -128,8 +191,8 @@ public EventStreamSubscription<T> Subscribe(Channel<T> channel, IDispatcher? dis
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe(Func<T, Task> action, IDispatcher? dispatcher = null)
{
var sub = new EventStreamSubscription<T>(this, dispatcher ?? Dispatchers.SynchronousDispatcher, action);
_subscriptions.TryAdd(sub.Id, sub);
var sub = new EventStreamSubscription<T>(this, action);
Subscriptions.TryAdd(sub.Id, sub);

return sub;
}
Expand All @@ -145,7 +208,6 @@ public EventStreamSubscription<T> Subscribe<TMsg>(Action<TMsg> action, IDispatch
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
msg =>
{
if (msg is TMsg typed)
Expand All @@ -157,7 +219,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(Action<TMsg> action, IDispatch
}
);

_subscriptions.TryAdd(sub.Id, sub);
Subscriptions.TryAdd(sub.Id, sub);

return sub;
}
Expand All @@ -177,7 +239,6 @@ public EventStreamSubscription<T> Subscribe<TMsg>(
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
msg =>
{
if (msg is TMsg typed && predicate(typed))
Expand All @@ -189,7 +250,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(
}
);

_subscriptions.TryAdd(sub.Id, sub);
Subscriptions.TryAdd(sub.Id, sub);

return sub;
}
Expand All @@ -204,7 +265,6 @@ public EventStreamSubscription<T> Subscribe<TMsg>(ISenderContext context, params
{
var sub = new EventStreamSubscription<T>(
this,
Dispatchers.SynchronousDispatcher,
msg =>
{
if (msg is TMsg)
Expand All @@ -219,7 +279,7 @@ public EventStreamSubscription<T> Subscribe<TMsg>(ISenderContext context, params
}
);

_subscriptions.TryAdd(sub.Id, sub);
Subscriptions.TryAdd(sub.Id, sub);

return sub;
}
Expand All @@ -235,11 +295,10 @@ public EventStreamSubscription<T> Subscribe<TMsg>(Func<TMsg, Task> action, IDisp
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
msg => msg is TMsg typed ? action(typed) : Task.CompletedTask
);

_subscriptions.TryAdd(sub.Id, sub);
Subscriptions.TryAdd(sub.Id, sub);

return sub;
}
Expand All @@ -248,34 +307,29 @@ public EventStreamSubscription<T> Subscribe<TMsg>(Func<TMsg, Task> action, IDisp
/// Publish a message to the event stream
/// </summary>
/// <param name="msg">A message to publish</param>
public void Publish(T msg)
public virtual void Publish(T msg)
{
foreach (var sub in _subscriptions.Values)
foreach (var sub in Subscriptions.Values)
{
sub.Dispatcher.Schedule(
() =>
{
try
{
sub.Action(msg);
}
catch (Exception ex)
{
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}

return Task.CompletedTask;
}
);
try
{
sub.Action(msg);
}
catch (Exception ex)
{
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}
}
}


/// <summary>
/// Remove a subscription by id
/// </summary>
/// <param name="id">Subscription id</param>
public void Unsubscribe(Guid id) => _subscriptions.TryRemove(id, out _);
public void Unsubscribe(Guid id) => Subscriptions.TryRemove(id, out _);

/// <summary>
/// Remove a subscription
Expand All @@ -294,16 +348,14 @@ public class EventStreamSubscription<T>
{
private readonly EventStream<T> _eventStream;

public EventStreamSubscription(EventStream<T> eventStream, IDispatcher dispatcher, Func<T, Task> action)
public EventStreamSubscription(EventStream<T> eventStream, Func<T, Task> action)
{
Id = Guid.NewGuid();
_eventStream = eventStream;
Dispatcher = dispatcher;
Action = action;
}

public Guid Id { get; }
public IDispatcher Dispatcher { get; }
public Func<T, Task> Action { get; }

public void Unsubscribe() => _eventStream.Unsubscribe(Id);
Expand Down
14 changes: 14 additions & 0 deletions tests/Proto.Actor.Tests/EventStreamTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Proto.Mailbox;
using Xunit;
Expand All @@ -17,6 +18,9 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes()

eventStream.Subscribe<string>(theString => received = theString);
eventStream.Publish("hello");

await Task.Delay(1000);

Assert.Equal("hello", received);
}

Expand All @@ -32,6 +36,9 @@ public async Task EventStream_CanSubscribeToAllEventTypes()
eventStream.Publish("hello");
eventStream.Publish(1);
eventStream.Publish(true);

await Task.Delay(1000);

Assert.Equal(3, receivedEvents.Count);
}

Expand All @@ -46,6 +53,9 @@ public async Task EventStream_CanUnsubscribeFromEvents()
eventStream.Publish("first message");
subscription.Unsubscribe();
eventStream.Publish("second message");

await Task.Delay(1000);

Assert.Single(receivedEvents);
}

Expand All @@ -59,6 +69,9 @@ public async Task EventStream_OnlyReceiveSubscribedToEventTypes()
var eventsReceived = new List<object>();
eventStream.Subscribe<int>(@event => eventsReceived.Add(@event));
eventStream.Publish("not an int");

await Task.Delay(1000);

Assert.Empty(eventsReceived);
}

Expand All @@ -79,5 +92,6 @@ public async Task EventStream_CanSubscribeToSpecificEventTypes_Async()
);

eventStream.Publish("hello");
await Task.Delay(1000);
}
}
Loading