Skip to content

Commit

Permalink
remove dispatchers from eventstream
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Dec 10, 2023
1 parent e96602c commit 530810f
Showing 1 changed file with 24 additions and 44 deletions.
68 changes: 24 additions & 44 deletions src/Proto.Actor/EventStream/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,20 @@ public override void Publish(object msg)

foreach (var sub in Subscriptions.Values)
{
var action = () =>
void Action()
{
sub.Dispatcher.Schedule(
() =>
{
try
{
sub.Action(msg);
}
catch (Exception ex)
{
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}

return Task.CompletedTask;
}
);
};
try
{
sub.Action(msg);
}
catch (Exception ex)
{
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}
}

var runner = new EventStreamRunner(action);
var runner = new EventStreamRunner(Action);
_system.Root.Send(_pid, runner);
}
}
Expand Down Expand Up @@ -159,7 +152,6 @@ public EventStreamSubscription<T> Subscribe(Action<T> action, IDispatcher? dispa
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
x =>
{
action(x);
Expand All @@ -183,7 +175,6 @@ public EventStreamSubscription<T> Subscribe(Channel<T> channel, IDispatcher? dis
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
async x => { await channel.Writer.WriteAsync(x).ConfigureAwait(false); }
);

Expand All @@ -200,7 +191,7 @@ public EventStreamSubscription<T> Subscribe(Channel<T> channel, IDispatcher? dis
/// <returns>A new subscription that can be used to unsubscribe</returns>
public EventStreamSubscription<T> Subscribe(Func<T, Task> action, IDispatcher? dispatcher = null)
{
var sub = new EventStreamSubscription<T>(this, dispatcher ?? Dispatchers.SynchronousDispatcher, action);
var sub = new EventStreamSubscription<T>(this, action);
Subscriptions.TryAdd(sub.Id, sub);

return sub;
Expand All @@ -217,7 +208,6 @@ public EventStreamSubscription<T> Subscribe<TMsg>(Action<TMsg> action, IDispatch
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
msg =>
{
if (msg is TMsg typed)
Expand Down Expand Up @@ -249,7 +239,6 @@ public EventStreamSubscription<T> Subscribe<TMsg>(
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
msg =>
{
if (msg is TMsg typed && predicate(typed))
Expand All @@ -276,7 +265,6 @@ public EventStreamSubscription<T> Subscribe<TMsg>(ISenderContext context, params
{
var sub = new EventStreamSubscription<T>(
this,
Dispatchers.SynchronousDispatcher,
msg =>
{
if (msg is TMsg)
Expand Down Expand Up @@ -307,7 +295,6 @@ public EventStreamSubscription<T> Subscribe<TMsg>(Func<TMsg, Task> action, IDisp
{
var sub = new EventStreamSubscription<T>(
this,
dispatcher ?? Dispatchers.SynchronousDispatcher,
msg => msg is TMsg typed ? action(typed) : Task.CompletedTask
);

Expand All @@ -324,25 +311,20 @@ public virtual void Publish(T msg)
{
foreach (var sub in Subscriptions.Values)
{
sub.Dispatcher.Schedule(
() =>
{
try
{
sub.Action(msg);
}
catch (Exception ex)
{
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}

return Task.CompletedTask;
}
);
try
{
sub.Action(msg);
}
catch (Exception ex)
{
ex.CheckFailFast();
_logger.LogError(0, ex, "Exception has occurred when publishing a message");
}
}
}


/// <summary>
/// Remove a subscription by id
/// </summary>
Expand All @@ -366,16 +348,14 @@ public class EventStreamSubscription<T>
{
private readonly EventStream<T> _eventStream;

public EventStreamSubscription(EventStream<T> eventStream, IDispatcher dispatcher, Func<T, Task> action)
public EventStreamSubscription(EventStream<T> eventStream, Func<T, Task> action)
{
Id = Guid.NewGuid();
_eventStream = eventStream;
Dispatcher = dispatcher;
Action = action;
}

public Guid Id { get; }
public IDispatcher Dispatcher { get; }
public Func<T, Task> Action { get; }

public void Unsubscribe() => _eventStream.Unsubscribe(Id);
Expand Down

0 comments on commit 530810f

Please sign in to comment.