Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate exceptions raised by subscriber message handlers #262

Merged
merged 16 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,15 @@ public virtual async Task CanTolerateSubscriberFailureAsync() {
return;

try {
var countdown = new AsyncCountdownEvent(2);
var countdown = new AsyncCountdownEvent(4);
await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
Assert.Equal("Hello", msg.Data);
countdown.Signal();
});
await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
Assert.Equal("Hello", msg.Data);
countdown.Signal();
});
await messageBus.SubscribeAsync<SimpleMessageA>(msg => {
throw new Exception();
});
Expand Down
14 changes: 9 additions & 5 deletions src/Foundatio/Messaging/InMemoryMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ public void ResetMessagesSent() {
_messageCounts.Clear();
}

protected override Task PublishImplAsync(string messageType, object message, TimeSpan? delay, CancellationToken cancellationToken) {
protected override async Task PublishImplAsync(string messageType, object message, TimeSpan? delay, CancellationToken cancellationToken) {
Interlocked.Increment(ref _messagesSent);
_messageCounts.AddOrUpdate(messageType, t => 1, (t, c) => c + 1);
Type mappedType = GetMappedMessageType(messageType);

if (_subscribers.IsEmpty)
return Task.CompletedTask;
return;

bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
if (delay.HasValue && delay.Value > TimeSpan.Zero) {
if (isTraceLogLevelEnabled)
_logger.LogTrace("Schedule delayed message: {MessageType} ({Delay}ms)", messageType, delay.Value.TotalMilliseconds);
SendDelayedMessage(mappedType, message, delay.Value);
return Task.CompletedTask;
return;
}

var body = SerializeMessageBody(messageType, message);
Expand All @@ -54,8 +54,12 @@ protected override Task PublishImplAsync(string messageType, object message, Tim
Data = body
};

SendMessageToSubscribers(messageData);
return Task.CompletedTask;
try {
await SendMessageToSubscribers(messageData);
} catch (Exception ex) {
// swallow exceptions from subscriber handlers for the in memory bus
jcono marked this conversation as resolved.
Show resolved Hide resolved
_logger.LogWarning(ex, "Error sending message to subscribers: {ErrorMessage}", ex.Message);
}
}
}
}
38 changes: 21 additions & 17 deletions src/Foundatio/Messaging/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected virtual object DeserializeMessageBody(string messageType, byte[] data)
return body;
}

protected void SendMessageToSubscribers(IMessage message) {
protected async Task SendMessageToSubscribers(IMessage message) {
bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
var subscribers = GetMessageSubscribers(message);

Expand All @@ -167,19 +167,19 @@ protected void SendMessageToSubscribers(IMessage message) {
return;
}

foreach (var subscriber in subscribers) {
var subscriberHandlers = subscribers.Select(subscriber => {
if (subscriber.CancellationToken.IsCancellationRequested) {
if (_subscribers.TryRemove(subscriber.Id, out var _)) {
if (_subscribers.TryRemove(subscriber.Id, out _)) {
if (isTraceLogLevelEnabled)
_logger.LogTrace("Removed cancelled subscriber: {SubscriberId}", subscriber.Id);
} else if (isTraceLogLevelEnabled) {
_logger.LogTrace("Unable to remove cancelled subscriber: {SubscriberId}", subscriber.Id);
}

continue;
return Task.CompletedTask;
}

Task.Factory.StartNew(async () => {
return Task.Run(async () => {
if (subscriber.CancellationToken.IsCancellationRequested) {
if (isTraceLogLevelEnabled)
_logger.LogTrace("The cancelled subscriber action will not be called: {SubscriberId}", subscriber.Id);
Expand All @@ -190,19 +190,23 @@ protected void SendMessageToSubscribers(IMessage message) {
if (isTraceLogLevelEnabled)
_logger.LogTrace("Calling subscriber action: {SubscriberId}", subscriber.Id);

try {
if (subscriber.Type == typeof(IMessage))
await subscriber.Action(message, subscriber.CancellationToken).AnyContext();
else
await subscriber.Action(body.Value, subscriber.CancellationToken).AnyContext();

if (isTraceLogLevelEnabled)
_logger.LogTrace("Finished calling subscriber action: {SubscriberId}", subscriber.Id);
} catch (Exception ex) {
if (_logger.IsEnabled(LogLevel.Warning))
_logger.LogWarning(ex, "Error sending message to subscriber: {ErrorMessage}", ex.Message);
}
if (subscriber.Type == typeof(IMessage))
await subscriber.Action(message, subscriber.CancellationToken).AnyContext();
else
await subscriber.Action(body.Value, subscriber.CancellationToken).AnyContext();

if (isTraceLogLevelEnabled)
_logger.LogTrace("Finished calling subscriber action: {SubscriberId}", subscriber.Id);
});
});

try {
await Task.WhenAll(subscriberHandlers.ToArray());
} catch (Exception ex) {
if (_logger.IsEnabled(LogLevel.Warning))
_logger.LogWarning(ex, "Error sending message to subscribers: {ErrorMessage}", ex.Message);

throw;
ejsmith marked this conversation as resolved.
Show resolved Hide resolved
}

if (isTraceLogLevelEnabled)
Expand Down