diff --git a/Providers/Resgrid.Providers.Bus.Rabbit/RabbitInboundQueueProvider.cs b/Providers/Resgrid.Providers.Bus.Rabbit/RabbitInboundQueueProvider.cs index 79347012..34ab2314 100644 --- a/Providers/Resgrid.Providers.Bus.Rabbit/RabbitInboundQueueProvider.cs +++ b/Providers/Resgrid.Providers.Bus.Rabbit/RabbitInboundQueueProvider.cs @@ -46,246 +46,294 @@ private async Task StartMonitoring() { if (SystemBehaviorConfig.ServiceBusType == ServiceBusTypes.Rabbit) { - var callQueueReceivedConsumer = new EventingBasicConsumer(_channel); - callQueueReceivedConsumer.Received += async (model, ea) => + if (CallQueueReceived != null) { - if (ea != null && ea.Body.Length > 0) + var callQueueReceivedConsumer = new EventingBasicConsumer(_channel); + callQueueReceivedConsumer.Received += async (model, ea) => { - CallQueueItem cqi = null; - try - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body.ToArray()); - cqi = ObjectSerialization.Deserialize(message); - } - catch (Exception ex) + if (ea != null && ea.Body.Length > 0) { - _channel.BasicNack(ea.DeliveryTag, false, false); - Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); - } + CallQueueItem cqi = null; + try + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body.ToArray()); + cqi = ObjectSerialization.Deserialize(message); + } + catch (Exception ex) + { + _channel.BasicNack(ea.DeliveryTag, false, false); + Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); + } - try - { - if (cqi != null) + try { - if (CallQueueReceived != null) + if (cqi != null) { - await CallQueueReceived.Invoke(cqi); - _channel.BasicAck(ea.DeliveryTag, false); + if (CallQueueReceived != null) + { + await CallQueueReceived.Invoke(cqi); + _channel.BasicAck(ea.DeliveryTag, false); + } } } + catch (Exception ex) + { + Logging.LogException(ex); + if (RetryQueueItem(ea, ex)) + _channel.BasicNack(ea.DeliveryTag, false, false); + else + _channel.BasicNack(ea.DeliveryTag, false, true); + } } - catch (Exception ex) - { - Logging.LogException(ex); - if (RetryQueueItem(ea, ex)) - _channel.BasicNack(ea.DeliveryTag, false, false); - else - _channel.BasicNack(ea.DeliveryTag, false, true); - } - } - }; + }; - var messageQueueReceivedConsumer = new EventingBasicConsumer(_channel); - messageQueueReceivedConsumer.Received += async (model, ea) => + String callQueueReceivedConsumerTag = _channel.BasicConsume( + queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.CallBroadcastQueueName), + autoAck: false, + consumer: callQueueReceivedConsumer); + } + + if (MessageQueueReceived != null) { - if (ea != null && ea.Body.Length > 0) + var messageQueueReceivedConsumer = new EventingBasicConsumer(_channel); + messageQueueReceivedConsumer.Received += async (model, ea) => { - MessageQueueItem mqi = null; - try - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body.ToArray()); - mqi = ObjectSerialization.Deserialize(message); - } - catch (Exception ex) + if (ea != null && ea.Body.Length > 0) { - _channel.BasicNack(ea.DeliveryTag, false, false); - Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); - } + MessageQueueItem mqi = null; + try + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body.ToArray()); + mqi = ObjectSerialization.Deserialize(message); + } + catch (Exception ex) + { + _channel.BasicNack(ea.DeliveryTag, false, false); + Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); + } - try - { - if (mqi != null) + try { - if (MessageQueueReceived != null) + if (mqi != null) { - await MessageQueueReceived.Invoke(mqi); - _channel.BasicAck(ea.DeliveryTag, false); + if (MessageQueueReceived != null) + { + await MessageQueueReceived.Invoke(mqi); + _channel.BasicAck(ea.DeliveryTag, false); + } } } + catch (Exception ex) + { + Logging.LogException(ex); + if (RetryQueueItem(ea, ex)) + _channel.BasicAck(ea.DeliveryTag, false); + else + _channel.BasicNack(ea.DeliveryTag, false, true); + } } - catch (Exception ex) - { - Logging.LogException(ex); - if (RetryQueueItem(ea, ex)) - _channel.BasicAck(ea.DeliveryTag, false); - else - _channel.BasicNack(ea.DeliveryTag, false, true); - } - } - }; + }; - var distributionListQueueReceivedConsumer = new EventingBasicConsumer(_channel); - distributionListQueueReceivedConsumer.Received += async (model, ea) => + String messageQueueReceivedConsumerTag = _channel.BasicConsume( + queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.MessageBroadcastQueueName), + autoAck: false, + consumer: messageQueueReceivedConsumer); + } + + if (DistributionListQueueReceived != null) { - if (ea != null && ea.Body.Length > 0) + var distributionListQueueReceivedConsumer = new EventingBasicConsumer(_channel); + distributionListQueueReceivedConsumer.Received += async (model, ea) => { - DistributionListQueueItem dlqi = null; - try - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body.ToArray()); - dlqi = ObjectSerialization.Deserialize(message); - } - catch (Exception ex) + if (ea != null && ea.Body.Length > 0) { - _channel.BasicNack(ea.DeliveryTag, false, false); - Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); - } + DistributionListQueueItem dlqi = null; + try + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body.ToArray()); + dlqi = ObjectSerialization.Deserialize(message); + } + catch (Exception ex) + { + _channel.BasicNack(ea.DeliveryTag, false, false); + Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); + } - try - { - if (dlqi != null) + try { - if (DistributionListQueueReceived != null) + if (dlqi != null) { - await DistributionListQueueReceived.Invoke(dlqi); - _channel.BasicAck(ea.DeliveryTag, false); + if (DistributionListQueueReceived != null) + { + await DistributionListQueueReceived.Invoke(dlqi); + _channel.BasicAck(ea.DeliveryTag, false); + } } } + catch (Exception ex) + { + Logging.LogException(ex); + if (RetryQueueItem(ea, ex)) + _channel.BasicAck(ea.DeliveryTag, false); + else + _channel.BasicNack(ea.DeliveryTag, false, true); + } } - catch (Exception ex) - { - Logging.LogException(ex); - if (RetryQueueItem(ea, ex)) - _channel.BasicAck(ea.DeliveryTag, false); - else - _channel.BasicNack(ea.DeliveryTag, false, true); - } - } - }; + }; - var notificationQueueReceivedConsumer = new EventingBasicConsumer(_channel); - notificationQueueReceivedConsumer.Received += async (model, ea) => + String distributionListQueueReceivedConsumerTag = _channel.BasicConsume( + queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.EmailBroadcastQueueName), + autoAck: false, + consumer: distributionListQueueReceivedConsumer); + } + + if (NotificationQueueReceived != null) { - if (ea != null && ea.Body.Length > 0) + var notificationQueueReceivedConsumer = new EventingBasicConsumer(_channel); + notificationQueueReceivedConsumer.Received += async (model, ea) => { - NotificationItem ni = null; - try - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body.ToArray()); - ni = ObjectSerialization.Deserialize(message); - } - catch (Exception ex) + if (ea != null && ea.Body.Length > 0) { - _channel.BasicNack(ea.DeliveryTag, false, false); - Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); - } + NotificationItem ni = null; + try + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body.ToArray()); + ni = ObjectSerialization.Deserialize(message); + } + catch (Exception ex) + { + _channel.BasicNack(ea.DeliveryTag, false, false); + Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); + } - try - { - if (ni != null) + try { - if (NotificationQueueReceived != null) + if (ni != null) { - await NotificationQueueReceived.Invoke(ni); - _channel.BasicAck(ea.DeliveryTag, false); + if (NotificationQueueReceived != null) + { + await NotificationQueueReceived.Invoke(ni); + _channel.BasicAck(ea.DeliveryTag, false); + } } } + catch (Exception ex) + { + Logging.LogException(ex); + if (RetryQueueItem(ea, ex)) + _channel.BasicAck(ea.DeliveryTag, false); + else + _channel.BasicNack(ea.DeliveryTag, false, true); + } } - catch (Exception ex) - { - Logging.LogException(ex); - if (RetryQueueItem(ea, ex)) - _channel.BasicAck(ea.DeliveryTag, false); - else - _channel.BasicNack(ea.DeliveryTag, false, true); - } - } - }; + }; - var shiftNotificationQueueReceivedConsumer = new EventingBasicConsumer(_channel); - shiftNotificationQueueReceivedConsumer.Received += async (model, ea) => + String notificationQueueReceivedConsumerTag = _channel.BasicConsume( + queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.NotificaitonBroadcastQueueName), + autoAck: false, + consumer: notificationQueueReceivedConsumer); + } + + if (ShiftNotificationQueueReceived != null) { - if (ea != null && ea.Body.Length > 0) + var shiftNotificationQueueReceivedConsumer = new EventingBasicConsumer(_channel); + shiftNotificationQueueReceivedConsumer.Received += async (model, ea) => { - ShiftQueueItem sqi = null; - try - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body.ToArray()); - sqi = ObjectSerialization.Deserialize(message); - } - catch (Exception ex) - { - _channel.BasicNack(ea.DeliveryTag, false, false); - Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); - } - - try + if (ea != null && ea.Body.Length > 0) { + ShiftQueueItem sqi = null; + try + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body.ToArray()); + sqi = ObjectSerialization.Deserialize(message); + } + catch (Exception ex) + { + _channel.BasicNack(ea.DeliveryTag, false, false); + Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); + } - if (sqi != null) + try { - if (ShiftNotificationQueueReceived != null) + + if (sqi != null) { - await ShiftNotificationQueueReceived.Invoke(sqi); - _channel.BasicAck(ea.DeliveryTag, false); + if (ShiftNotificationQueueReceived != null) + { + await ShiftNotificationQueueReceived.Invoke(sqi); + _channel.BasicAck(ea.DeliveryTag, false); + } } } + catch (Exception ex) + { + Logging.LogException(ex); + if (RetryQueueItem(ea, ex)) + _channel.BasicAck(ea.DeliveryTag, false); + else + _channel.BasicNack(ea.DeliveryTag, false, true); + } } - catch (Exception ex) - { - Logging.LogException(ex); - if (RetryQueueItem(ea, ex)) - _channel.BasicAck(ea.DeliveryTag, false); - else - _channel.BasicNack(ea.DeliveryTag, false, true); - } - } - }; + }; + + String shiftNotificationQueueReceivedConsumerTag = _channel.BasicConsume( + queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.ShiftNotificationsQueueName), + autoAck: false, + consumer: shiftNotificationQueueReceivedConsumer); + } - var cqrsEventQueueReceivedConsumer = new EventingBasicConsumer(_channel); - cqrsEventQueueReceivedConsumer.Received += async (model, ea) => + if (CqrsEventQueueReceived != null) { - if (ea != null && ea.Body.Length > 0) + var cqrsEventQueueReceivedConsumer = new EventingBasicConsumer(_channel); + cqrsEventQueueReceivedConsumer.Received += async (model, ea) => { - CqrsEvent cqrs = null; - try - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body.ToArray()); - cqrs = ObjectSerialization.Deserialize(message); - } - catch (Exception ex) + if (ea != null && ea.Body.Length > 0) { - _channel.BasicNack(ea.DeliveryTag, false, false); - Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); - } + CqrsEvent cqrs = null; + try + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body.ToArray()); + cqrs = ObjectSerialization.Deserialize(message); + } + catch (Exception ex) + { + _channel.BasicNack(ea.DeliveryTag, false, false); + Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); + } - try - { - if (cqrs != null) + try { - if (CqrsEventQueueReceived != null) + if (cqrs != null) { - await CqrsEventQueueReceived.Invoke(cqrs); - _channel.BasicAck(ea.DeliveryTag, false); + if (CqrsEventQueueReceived != null) + { + await CqrsEventQueueReceived.Invoke(cqrs); + _channel.BasicAck(ea.DeliveryTag, false); + } } } + catch (Exception ex) + { + Logging.LogException(ex); + if (RetryQueueItem(ea, ex)) + _channel.BasicAck(ea.DeliveryTag, false); + else + _channel.BasicNack(ea.DeliveryTag, false, true); + } } - catch (Exception ex) - { - Logging.LogException(ex); - if (RetryQueueItem(ea, ex)) - _channel.BasicAck(ea.DeliveryTag, false); - else - _channel.BasicNack(ea.DeliveryTag, false, true); - } - } - }; + }; + + String cqrsEventQueueReceivedConsumerTag = _channel.BasicConsume( + queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.SystemQueueName), + autoAck: false, + consumer: cqrsEventQueueReceivedConsumer); + } if (PaymentEventQueueReceived != null) { @@ -335,166 +383,145 @@ private async Task StartMonitoring() consumer: paymentEventQueueReceivedConsumer); } - var auditEventQueueReceivedConsumer = new EventingBasicConsumer(_channel); - auditEventQueueReceivedConsumer.Received += async (model, ea) => + if (AuditEventQueueReceived != null) { - if (ea != null && ea.Body.Length > 0) + var auditEventQueueReceivedConsumer = new EventingBasicConsumer(_channel); + auditEventQueueReceivedConsumer.Received += async (model, ea) => { - AuditEvent audit = null; - try - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body.ToArray()); - audit = ObjectSerialization.Deserialize(message); - } - catch (Exception ex) + if (ea != null && ea.Body.Length > 0) { - _channel.BasicNack(ea.DeliveryTag, false, false); - Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); - } + AuditEvent audit = null; + try + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body.ToArray()); + audit = ObjectSerialization.Deserialize(message); + } + catch (Exception ex) + { + _channel.BasicNack(ea.DeliveryTag, false, false); + Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); + } - try - { - if (audit != null) + try { - if (AuditEventQueueReceived != null) + if (audit != null) { - await AuditEventQueueReceived.Invoke(audit); - _channel.BasicAck(ea.DeliveryTag, false); + if (AuditEventQueueReceived != null) + { + await AuditEventQueueReceived.Invoke(audit); + _channel.BasicAck(ea.DeliveryTag, false); + } } } + catch (Exception ex) + { + Logging.LogException(ex); + if (RetryQueueItem(ea, ex)) + _channel.BasicAck(ea.DeliveryTag, false); + else + _channel.BasicNack(ea.DeliveryTag, false, true); + } } - catch (Exception ex) - { - Logging.LogException(ex); - if (RetryQueueItem(ea, ex)) - _channel.BasicAck(ea.DeliveryTag, false); - else - _channel.BasicNack(ea.DeliveryTag, false, true); - } - } - }; + }; + + String auditEventQueueReceivedConsumerTag = _channel.BasicConsume( + queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.AuditQueueName), + autoAck: false, + consumer: auditEventQueueReceivedConsumer); + } - var unitLocationQueueReceivedConsumer = new EventingBasicConsumer(_channel); - unitLocationQueueReceivedConsumer.Received += async (model, ea) => + if (UnitLocationEventQueueReceived != null) { - if (ea != null && ea.Body.Length > 0) + var unitLocationQueueReceivedConsumer = new EventingBasicConsumer(_channel); + unitLocationQueueReceivedConsumer.Received += async (model, ea) => { - UnitLocationEvent unitLocation = null; - try - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body.ToArray()); - unitLocation = ObjectSerialization.Deserialize(message); - } - catch (Exception ex) + if (ea != null && ea.Body.Length > 0) { - //_channel.BasicNack(ea.DeliveryTag, false, false); - Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); - } + UnitLocationEvent unitLocation = null; + try + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body.ToArray()); + unitLocation = ObjectSerialization.Deserialize(message); + } + catch (Exception ex) + { + //_channel.BasicNack(ea.DeliveryTag, false, false); + Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); + } - try - { - if (unitLocation != null) + try { - if (UnitLocationEventQueueReceived != null) + if (unitLocation != null) { - await UnitLocationEventQueueReceived.Invoke(unitLocation); - //_channel.BasicAck(ea.DeliveryTag, false); + if (UnitLocationEventQueueReceived != null) + { + await UnitLocationEventQueueReceived.Invoke(unitLocation); + //_channel.BasicAck(ea.DeliveryTag, false); + } } } + catch (Exception ex) + { + // Discard unit location events. + Logging.LogException(ex); + //_channel.BasicNack(ea.DeliveryTag, false, true); + } } - catch (Exception ex) - { - // Discard unit location events. - Logging.LogException(ex); - //_channel.BasicNack(ea.DeliveryTag, false, true); - } - } - }; + }; + + String unitLocationEventQueueReceivedConsumerTag = _channel.BasicConsume( + queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.UnitLoactionQueueName), + autoAck: true, + consumer: unitLocationQueueReceivedConsumer); + } - var personnelLocationQueueReceivedConsumer = new EventingBasicConsumer(_channel); - personnelLocationQueueReceivedConsumer.Received += async (model, ea) => + if (UnitLocationEventQueueReceived != null) { - if (ea != null && ea.Body.Length > 0) + var personnelLocationQueueReceivedConsumer = new EventingBasicConsumer(_channel); + personnelLocationQueueReceivedConsumer.Received += async (model, ea) => { - PersonnelLocationEvent personnelLocation = null; - try - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body.ToArray()); - personnelLocation = ObjectSerialization.Deserialize(message); - } - catch (Exception ex) + if (ea != null && ea.Body.Length > 0) { - //_channel.BasicNack(ea.DeliveryTag, false, false); - Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); - } + PersonnelLocationEvent personnelLocation = null; + try + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body.ToArray()); + personnelLocation = ObjectSerialization.Deserialize(message); + } + catch (Exception ex) + { + //_channel.BasicNack(ea.DeliveryTag, false, false); + Logging.LogException(ex, Encoding.UTF8.GetString(ea.Body.ToArray())); + } - try - { - if (personnelLocation != null) + try { - if (UnitLocationEventQueueReceived != null) + if (personnelLocation != null) { - await PersonnelLocationEventQueueReceived.Invoke(personnelLocation); - //_channel.BasicAck(ea.DeliveryTag, false); + if (UnitLocationEventQueueReceived != null) + { + await PersonnelLocationEventQueueReceived.Invoke(personnelLocation); + //_channel.BasicAck(ea.DeliveryTag, false); + } } } + catch (Exception ex) + { + // Discard unit location events. + Logging.LogException(ex); + //_channel.BasicNack(ea.DeliveryTag, false, true); + } } - catch (Exception ex) - { - // Discard unit location events. - Logging.LogException(ex); - //_channel.BasicNack(ea.DeliveryTag, false, true); - } - } - }; - - String callQueueReceivedConsumerTag = _channel.BasicConsume( - queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.CallBroadcastQueueName), - autoAck: false, - consumer: callQueueReceivedConsumer); - - String messageQueueReceivedConsumerTag = _channel.BasicConsume( - queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.MessageBroadcastQueueName), - autoAck: false, - consumer: messageQueueReceivedConsumer); - - String distributionListQueueReceivedConsumerTag = _channel.BasicConsume( - queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.EmailBroadcastQueueName), - autoAck: false, - consumer: distributionListQueueReceivedConsumer); - - String notificationQueueReceivedConsumerTag = _channel.BasicConsume( - queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.NotificaitonBroadcastQueueName), - autoAck: false, - consumer: notificationQueueReceivedConsumer); - - String shiftNotificationQueueReceivedConsumerTag = _channel.BasicConsume( - queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.ShiftNotificationsQueueName), - autoAck: false, - consumer: shiftNotificationQueueReceivedConsumer); - - String cqrsEventQueueReceivedConsumerTag = _channel.BasicConsume( - queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.SystemQueueName), - autoAck: false, - consumer: cqrsEventQueueReceivedConsumer); - - String auditEventQueueReceivedConsumerTag = _channel.BasicConsume( - queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.AuditQueueName), - autoAck: false, - consumer: auditEventQueueReceivedConsumer); - - String unitLocationEventQueueReceivedConsumerTag = _channel.BasicConsume( - queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.UnitLoactionQueueName), - autoAck: true, - consumer: unitLocationQueueReceivedConsumer); - - String personnelLocationEventQueueReceivedConsumerTag = _channel.BasicConsume( - queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.PersonnelLoactionQueueName), - autoAck: true, - consumer: personnelLocationQueueReceivedConsumer); + }; + + String personnelLocationEventQueueReceivedConsumerTag = _channel.BasicConsume( + queue: RabbitConnection.SetQueueNameForEnv(ServiceBusConfig.PersonnelLoactionQueueName), + autoAck: true, + consumer: personnelLocationQueueReceivedConsumer); + } } }