diff --git a/Foundatio.RabbitMQ.sln b/Foundatio.RabbitMQ.sln index 7b1d511..6d38288 100644 --- a/Foundatio.RabbitMQ.sln +++ b/Foundatio.RabbitMQ.sln @@ -12,6 +12,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution README.md = README.md docker-compose.yml = docker-compose.yml tests\Directory.Build.props = tests\Directory.Build.props + build\Dockerfile = build\Dockerfile EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Foundatio.RabbitMQ", "src\Foundatio.RabbitMQ\Foundatio.RabbitMQ.csproj", "{EAE3607D-73A1-4D02-BDAA-24A37DDA15CB}" diff --git a/build/Dockerfile b/build/Dockerfile index f1d3113..e7557aa 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -1,4 +1,4 @@ -FROM rabbitmq:3.10-management +FROM rabbitmq:4.0.3-management -COPY rabbitmq_delayed_message_exchange-3.10.0.ez /opt/rabbitmq/plugins +COPY rabbitmq_delayed_message_exchange-4.0.2.ez /opt/rabbitmq/plugins RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange diff --git a/build/common.props b/build/common.props index dbc0634..59ea214 100644 --- a/build/common.props +++ b/build/common.props @@ -1,7 +1,7 @@ - netstandard2.0 + netstandard2.1 Foundatio Pluggable foundation blocks for building distributed apps. https://github.com/FoundatioFx/Foundatio.RabbitMQ diff --git a/build/rabbitmq_delayed_message_exchange-3.10.0.ez b/build/rabbitmq_delayed_message_exchange-3.10.0.ez deleted file mode 100644 index 4f7d6a7..0000000 Binary files a/build/rabbitmq_delayed_message_exchange-3.10.0.ez and /dev/null differ diff --git a/build/rabbitmq_delayed_message_exchange-4.0.2.ez b/build/rabbitmq_delayed_message_exchange-4.0.2.ez new file mode 100644 index 0000000..c6dbbdf Binary files /dev/null and b/build/rabbitmq_delayed_message_exchange-4.0.2.ez differ diff --git a/docker-compose.yml b/docker-compose.yml index ae8179c..a259cf2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: rabbitmq: - image: rabbitmq:3.10-management + image: rabbitmq:4.0.3-management ports: - "5672:5672" - "8080:15672" # management ui - login with guest:guest diff --git a/src/Foundatio.RabbitMQ/Extensions/TaskExtensions.cs b/src/Foundatio.RabbitMQ/Extensions/TaskExtensions.cs index f0fd963..deba7e2 100644 --- a/src/Foundatio.RabbitMQ/Extensions/TaskExtensions.cs +++ b/src/Foundatio.RabbitMQ/Extensions/TaskExtensions.cs @@ -25,4 +25,10 @@ public static ConfiguredTaskAwaitable AnyContext(this Awaitabl { return task.ConfigureAwait(continueOnCapturedContext: false); } + + [DebuggerStepThrough] + public static ConfiguredValueTaskAwaitable AnyContext(this ValueTask task) + { + return task.ConfigureAwait(continueOnCapturedContext: false); + } } diff --git a/src/Foundatio.RabbitMQ/Foundatio.RabbitMQ.csproj b/src/Foundatio.RabbitMQ/Foundatio.RabbitMQ.csproj index 0d4c794..06e5cab 100644 --- a/src/Foundatio.RabbitMQ/Foundatio.RabbitMQ.csproj +++ b/src/Foundatio.RabbitMQ/Foundatio.RabbitMQ.csproj @@ -1,6 +1,6 @@ - + diff --git a/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs b/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs index e17a0df..a2b1349 100644 --- a/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs +++ b/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs @@ -13,15 +13,16 @@ namespace Foundatio.Messaging; -public class RabbitMQMessageBus : MessageBusBase +public class RabbitMQMessageBus : MessageBusBase, IAsyncDisposable { private readonly AsyncLock _lock = new(); private readonly ConnectionFactory _factory; private IConnection _publisherConnection; private IConnection _subscriberConnection; - private IModel _publisherModel; - private IModel _subscriberModel; + private IChannel _publisherChannel; + private IChannel _subscriberChannel; private bool? _delayedExchangePluginEnabled; + private bool _isDisposed; public RabbitMQMessageBus(RabbitMQMessageBusOptions options) : base(options) { @@ -32,12 +33,11 @@ public RabbitMQMessageBus(RabbitMQMessageBusOptions options) : base(options) // in case the server is restarted or there has been any network failures // Topology ( queues, exchanges, bindings and consumers) recovery "TopologyRecoveryEnabled" is already enabled // by default so no need to initialize it. NetworkRecoveryInterval is also by default set to 5 seconds. - // it can always be fine tuned if needed. + // it can always be fine-tuned if needed. _factory = new ConnectionFactory { Uri = new Uri(options.ConnectionString), - AutomaticRecoveryEnabled = true, - DispatchConsumersAsync = true + AutomaticRecoveryEnabled = true }; } @@ -48,58 +48,54 @@ protected override Task RemoveTopicSubscriptionAsync() { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("RemoveTopicSubscriptionAsync"); - CloseSubscriberConnection(); - return Task.CompletedTask; + + return CloseSubscriberConnectionAsync(); } protected override async Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) { - if (_subscriberModel != null) + if (_subscriberChannel != null) return; await EnsureTopicCreatedAsync(cancellationToken).AnyContext(); using (await _lock.LockAsync().AnyContext()) { - if (_subscriberModel != null) + if (_subscriberChannel != null) return; - _subscriberConnection = CreateConnection(); - _subscriberModel = _subscriberConnection.CreateModel(); + _subscriberConnection = await CreateConnectionAsync().AnyContext(); + _subscriberChannel = await _subscriberConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext(); - // If InitPublisher is called first, then we will never come in this if clause. - if (!CreateDelayedExchange(_subscriberModel)) + // If InitPublisher is called first, then we will never come in this if-clause. + if (!await CreateDelayedExchangeAsync(_subscriberChannel).AnyContext()) { - _subscriberModel.Close(); - _subscriberModel.Abort(); - _subscriberModel.Dispose(); - - _subscriberConnection.Close(); - _subscriberConnection.Dispose(); + await _subscriberChannel.DisposeAsync().AnyContext(); + await _subscriberConnection.DisposeAsync().AnyContext(); - _subscriberConnection = CreateConnection(); - _subscriberModel = _subscriberConnection.CreateModel(); - CreateRegularExchange(_subscriberModel); + _subscriberConnection = await CreateConnectionAsync().AnyContext(); + _subscriberChannel = await _subscriberConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext(); + await CreateRegularExchangeAsync(_subscriberChannel).AnyContext(); } - string queueName = CreateQueue(_subscriberModel); - var consumer = new AsyncEventingBasicConsumer(_subscriberModel); - consumer.Received += OnMessage; - consumer.Shutdown += OnConsumerShutdown; + string queueName = await CreateQueueAsync(_subscriberChannel).AnyContext(); + var consumer = new AsyncEventingBasicConsumer(_subscriberChannel); + consumer.ReceivedAsync += OnMessageAsync; + consumer.ShutdownAsync += OnConsumerShutdownAsync; - _subscriberModel.BasicConsume(queueName, _options.AcknowledgementStrategy == AcknowledgementStrategy.FireAndForget, consumer); + await _subscriberChannel.BasicConsumeAsync(queueName, _options.AcknowledgementStrategy == AcknowledgementStrategy.FireAndForget, consumer, cancellationToken: cancellationToken).AnyContext(); if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("The unique channel number for the subscriber is : {ChannelNumber}", _subscriberModel.ChannelNumber); + _logger.LogTrace("The unique channel number for the subscriber is : {ChannelNumber}", _subscriberChannel.ChannelNumber); } } - private Task OnConsumerShutdown(object sender, ShutdownEventArgs e) + private Task OnConsumerShutdownAsync(object sender, ShutdownEventArgs e) { _logger.LogInformation("Consumer shutdown. Reply Code: {ReplyCode} Reason: {ReplyText}", e.ReplyCode, e.ReplyText); return Task.CompletedTask; } - private async Task OnMessage(object sender, BasicDeliverEventArgs envelope) + private async Task OnMessageAsync(object sender, BasicDeliverEventArgs envelope) { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("OnMessageAsync({MessageId})", envelope.BasicProperties?.MessageId); @@ -110,7 +106,7 @@ private async Task OnMessage(object sender, BasicDeliverEventArgs envelope) _logger.LogTrace("No subscribers ({MessageId})", envelope.BasicProperties?.MessageId); if (_options.AcknowledgementStrategy == AcknowledgementStrategy.Automatic) - _subscriberModel.BasicReject(envelope.DeliveryTag, true); + await _subscriberChannel.BasicRejectAsync(envelope.DeliveryTag, true).AnyContext(); return; } @@ -121,14 +117,14 @@ private async Task OnMessage(object sender, BasicDeliverEventArgs envelope) await SendMessageToSubscribersAsync(message).AnyContext(); if (_options.AcknowledgementStrategy == AcknowledgementStrategy.Automatic) - _subscriberModel.BasicAck(envelope.DeliveryTag, false); + await _subscriberChannel.BasicAckAsync(envelope.DeliveryTag, false).AnyContext(); } catch (Exception ex) { _logger.LogError(ex, "Error handling message ({MessageId}): {Message}", envelope.BasicProperties?.MessageId, ex.Message); if (_options.AcknowledgementStrategy == AcknowledgementStrategy.Automatic) - _subscriberModel.BasicReject(envelope.DeliveryTag, true); + await _subscriberChannel.BasicRejectAsync(envelope.DeliveryTag, true).AnyContext(); } } @@ -161,43 +157,39 @@ protected virtual IMessage ConvertToMessage(BasicDeliverEventArgs envelope) protected override async Task EnsureTopicCreatedAsync(CancellationToken cancellationToken) { - if (_publisherModel != null) + if (_publisherChannel != null) return; using (await _lock.LockAsync().AnyContext()) { - if (_publisherModel != null) + if (_publisherChannel != null) return; // Create the client connection, channel, declares the exchange, queue and binds // the exchange with the publisher queue. It requires the name of our exchange, exchange type, durability and auto-delete. - // For now we are using same autoDelete for both exchange and queue ( it will survive a server restart ) - _publisherConnection = CreateConnection(); - _publisherModel = _publisherConnection.CreateModel(); + // For now, we are using same autoDelete for both exchange and queue ( it will survive a server restart ) + _publisherConnection = await CreateConnectionAsync().AnyContext(); + _publisherChannel = await _publisherConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext(); // We first attempt to create "x-delayed-type". For this plugin should be installed. - // However, we plugin is not installed this will throw an exception. In that case + // However, we plug in is not installed this will throw an exception. In that case // we attempt to create regular exchange. If regular exchange also throws and exception // then trouble shoot the problem. - if (!CreateDelayedExchange(_publisherModel)) + if (!await CreateDelayedExchangeAsync(_publisherChannel).AnyContext()) { // if the initial exchange creation was not successful then we must close the previous connection // and establish the new client connection and model otherwise you will keep receiving failure in creation // of the regular exchange too. - _publisherModel.Close(); - _publisherModel.Abort(); - _publisherModel.Dispose(); + await _publisherChannel.DisposeAsync().AnyContext(); + await _publisherConnection.DisposeAsync().AnyContext(); - _publisherConnection.Close(); - _publisherConnection.Dispose(); - - _publisherConnection = CreateConnection(); - _publisherModel = _publisherConnection.CreateModel(); - CreateRegularExchange(_publisherModel); + _publisherConnection = await CreateConnectionAsync().AnyContext(); + _publisherChannel = await _publisherConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext(); + await CreateRegularExchangeAsync(_publisherChannel).AnyContext(); } if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("The unique channel number for the publisher is : {ChannelNumber}", _publisherModel.ChannelNumber); + _logger.LogTrace("The unique channel number for the publisher is : {ChannelNumber}", _publisherChannel.ChannelNumber); } } @@ -212,7 +204,7 @@ protected override async Task EnsureTopicCreatedAsync(CancellationToken cancella /// The rule of thumb is: avoid sharing channels across threads. /// Publishers in your application that publish from separate threads should use their own channels. /// The same is a good idea for consumers. - protected override Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken) + protected override async Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken) { byte[] data = SerializeMessageBody(messageType, message); @@ -223,13 +215,16 @@ protected override Task PublishImplAsync(string messageType, object message, Mes if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Schedule delayed message: {MessageType} ({Delay}ms)", messageType, options.DeliveryDelay.Value.TotalMilliseconds); - return AddDelayedMessageAsync(mappedType, message, options.DeliveryDelay.Value); + await AddDelayedMessageAsync(mappedType, message, options.DeliveryDelay.Value).AnyContext(); + return; } - var basicProperties = _publisherModel.CreateBasicProperties(); - basicProperties.MessageId = options.UniqueId ?? Guid.NewGuid().ToString("N"); - basicProperties.CorrelationId = options.CorrelationId; - basicProperties.Type = messageType; + var basicProperties = new BasicProperties + { + MessageId = options.UniqueId ?? Guid.NewGuid().ToString("N"), + CorrelationId = options.CorrelationId, + Type = messageType, + }; if (_options.IsDurable) basicProperties.Persistent = true; @@ -246,7 +241,7 @@ protected override Task PublishImplAsync(string messageType, object message, Mes // RabbitMQ only supports delayed messages with a third party plugin called "rabbitmq_delayed_message_exchange" if (_delayedExchangePluginEnabled.Value && options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero) { - // Its necessary to typecast long to int because RabbitMQ on the consumer side is reading the + // It's necessary to typecast long to int because RabbitMQ on the consumer side is reading the // data back as signed (using BinaryReader#ReadInt64). You will see the value to be negative // and the data will be delivered immediately. basicProperties.Headers = new Dictionary { { "x-delay", Convert.ToInt32(options.DeliveryDelay.Value.TotalMilliseconds) } }; @@ -258,30 +253,27 @@ protected override Task PublishImplAsync(string messageType, object message, Mes if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Message publish type {MessageType} {MessageId}", messageType, basicProperties.MessageId); } - // The publication occurs with mandatory=false - lock (_publisherModel) - _publisherModel.BasicPublish(_options.Topic, String.Empty, basicProperties, data); + using (await _lock.LockAsync().AnyContext()) + await _publisherChannel.BasicPublishAsync(_options.Topic, String.Empty, mandatory: false, basicProperties, data, cancellationToken: cancellationToken).AnyContext(); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Done publishing type {MessageType} {MessageId}", messageType, basicProperties.MessageId); - - return Task.CompletedTask; } /// /// Connect to a broker - RabbitMQ /// /// - private IConnection CreateConnection() + private Task CreateConnectionAsync() { - return _factory.CreateConnection(); + return _factory.CreateConnectionAsync(); } /// /// Attempts to create the delayed exchange. /// - /// + /// /// true if the delayed exchange was successfully declared. Which means plugin was installed. - private bool CreateDelayedExchange(IModel model) + private async Task CreateDelayedExchangeAsync(IChannel channel) { bool success = true; if (_delayedExchangePluginEnabled.HasValue) @@ -293,24 +285,21 @@ private bool CreateDelayedExchange(IModel model) // Disclaimer : https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ // Please read the *Performance Impact* of the delayed exchange type. var args = new Dictionary { { "x-delayed-type", ExchangeType.Fanout } }; - model.ExchangeDeclare(_options.Topic, "x-delayed-message", _options.IsDurable, false, args); + await channel.ExchangeDeclareAsync(_options.Topic, "x-delayed-message", _options.IsDurable, false, args).AnyContext(); } catch (OperationInterruptedException ex) { - if (ex.ShutdownReason.ReplyCode == 503) - { - if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation(ex, "Not able to create x-delayed-type exchange"); - success = false; - } + _logger.LogInformation(ex, "Unable to create x-delayed-type exchange: {Message}", ex.Message); + success = false; } _delayedExchangePluginEnabled = success; return _delayedExchangePluginEnabled.Value; } - private void CreateRegularExchange(IModel model) + private Task CreateRegularExchangeAsync(IChannel channel) { - model.ExchangeDeclare(_options.Topic, ExchangeType.Fanout, _options.IsDurable, false, null); + return channel.ExchangeDeclareAsync(_options.Topic, ExchangeType.Fanout, _options.IsDurable, false); } /// @@ -319,17 +308,17 @@ private void CreateRegularExchange(IModel model) /// receiver attached which will process the message. We’ll initiate a dedicated message /// exchange and not use the default one. Note that a queue can be dedicated to one or more routing keys. /// - /// channel - private string CreateQueue(IModel model) + /// channel + private async Task CreateQueueAsync(IChannel channel) { - // Setup the queue where the messages will reside - it requires the queue name and durability. + // Set up the queue where the messages will reside - it requires the queue name and durability. // Durable (the queue will survive a broker restart) // Arguments (some brokers use it to implement additional features like message TTL) - var result = model.QueueDeclare(_options.SubscriptionQueueName, _options.IsDurable, _options.IsSubscriptionQueueExclusive, _options.SubscriptionQueueAutoDelete, _options.Arguments); + var result = await channel.QueueDeclareAsync(_options.SubscriptionQueueName, _options.IsDurable, _options.IsSubscriptionQueueExclusive, _options.SubscriptionQueueAutoDelete, _options.Arguments).AnyContext(); string queueName = result.QueueName; // bind the queue with the exchange. - model.QueueBind(queueName, _options.Topic, ""); + await channel.QueueBindAsync(queueName, _options.Topic, "").AnyContext(); return queueName; } @@ -338,11 +327,34 @@ public override void Dispose() { base.Dispose(); + if (_isDisposed) + return; + + _isDisposed = true; + if (_factory != null) _factory.AutomaticRecoveryEnabled = false; ClosePublisherConnection(); CloseSubscriberConnection(); + GC.SuppressFinalize(this); + } + + public async ValueTask DisposeAsync() + { + base.Dispose(); + + if (_isDisposed) + return; + + _isDisposed = true; + + if (_factory != null) + _factory.AutomaticRecoveryEnabled = false; + + await ClosePublisherConnectionAsync().AnyContext(); + await CloseSubscriberConnectionAsync().AnyContext(); + GC.SuppressFinalize(this); } private void ClosePublisherConnection() @@ -355,23 +367,44 @@ private void ClosePublisherConnection() if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("ClosePublisherConnection"); - if (_publisherModel != null) + if (_publisherChannel != null) { - _publisherModel.Close(); - _publisherModel.Abort(); - _publisherModel.Dispose(); - _publisherModel = null; + _publisherChannel.Dispose(); + _publisherChannel = null; } if (_publisherConnection != null) { - _publisherConnection.Close(); _publisherConnection.Dispose(); _publisherConnection = null; } } } + private async Task ClosePublisherConnectionAsync() + { + if (_publisherConnection == null) + return; + + using (await _lock.LockAsync().AnyContext()) + { + if (_logger.IsEnabled(LogLevel.Trace)) + _logger.LogTrace("ClosePublisherConnectionAsync"); + + if (_publisherChannel != null) + { + _publisherChannel.DisposeAsync().AnyContext(); + _publisherChannel = null; + } + + if (_publisherConnection != null) + { + await _publisherConnection.DisposeAsync().AnyContext(); + _publisherConnection = null; + } + } + } + private void CloseSubscriberConnection() { if (_subscriberConnection == null) @@ -382,20 +415,41 @@ private void CloseSubscriberConnection() if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("CloseSubscriberConnection"); - if (_subscriberModel != null) + if (_subscriberChannel != null) { - _subscriberModel.Close(); - _subscriberModel.Abort(); - _subscriberModel.Dispose(); - _subscriberModel = null; + _subscriberChannel.Dispose(); + _subscriberChannel = null; } if (_subscriberConnection != null) { - _subscriberConnection.Close(); _subscriberConnection.Dispose(); _subscriberConnection = null; } } } + + private async Task CloseSubscriberConnectionAsync() + { + if (_subscriberConnection == null) + return; + + using (await _lock.LockAsync().AnyContext()) + { + if (_logger.IsEnabled(LogLevel.Trace)) + _logger.LogTrace("CloseSubscriberConnectionAsync"); + + if (_subscriberChannel != null) + { + await _subscriberChannel.DisposeAsync().AnyContext(); + _subscriberChannel = null; + } + + if (_subscriberConnection != null) + { + await _subscriberConnection.DisposeAsync().AnyContext(); + _subscriberConnection = null; + } + } + } }