diff --git a/src/Client/IMqttClient.cs b/src/Client/IMqttClient.cs index 6ec40753..930d96ed 100644 --- a/src/Client/IMqttClient.cs +++ b/src/Client/IMqttClient.cs @@ -7,26 +7,26 @@ namespace System.Net.Mqtt /// public interface IMqttClient : IDisposable { - /// - /// Event raised when the Client gets disconnected. - /// The Client disconnection could be caused by a protocol disconnect, - /// an error or a remote disconnection produced by the Server. - /// See for more details on the disconnection information - /// + /// + /// Event raised when the Client gets disconnected. + /// The Client disconnection could be caused by a protocol disconnect, + /// an error or a remote disconnection produced by the Server. + /// See for more details on the disconnection information + /// event EventHandler Disconnected; - /// - /// Id of the connected Client. - /// This Id correspond to the parameter passed to - /// method - /// + /// + /// Id of the connected Client. + /// This Id correspond to the parameter passed to + /// method + /// string Id { get; } - /// - /// Indicates if the Client is connected by protocol. - /// This means that a CONNECT packet has been sent, - /// by calling method - /// + /// + /// Indicates if the Client is connected by protocol. + /// This means that a CONNECT packet has been sent, + /// by calling method + /// bool IsConnected { get; } /// @@ -67,7 +67,7 @@ public interface IMqttClient : IDisposable /// See MQTT Connect /// for more details about the protocol connection /// - Task ConnectAsync (MqttClientCredentials credentials, MqttLastWill will = null, bool cleanSession = false); + Task ConnectAsync(MqttClientCredentials credentials, MqttLastWill will = null, bool cleanSession = false); /// /// Represents the protocol connection, which consists of sending a CONNECT packet @@ -88,7 +88,7 @@ public interface IMqttClient : IDisposable /// See MQTT Connect /// for more details about the protocol connection /// - Task ConnectAsync (MqttLastWill will = null); + Task ConnectAsync(MqttLastWill will = null); /// /// Represents the protocol subscription, which consists of sending a SUBSCRIBE packet @@ -108,55 +108,55 @@ public interface IMqttClient : IDisposable /// See MQTT Subscribe /// for more details about the protocol subscription /// - Task SubscribeAsync (string topicFilter, MqttQualityOfService qos); + Task SubscribeAsync(string topicFilter, MqttQualityOfService qos); - /// - /// Represents the protocol publish, which consists of sending a PUBLISH packet - /// and awaiting the corresponding ACK packet, if applies, based on the QoS defined - /// - /// - /// The application message to publish to the Server. - /// See for more details about the application messages - /// - /// - /// The Quality Of Service (QoS) associated to the application message, which determines - /// the sequence of acknowledgements that Client and Server should send each other to consider the message as delivered - /// See for more details about the QoS values - /// - /// - /// Indicates if the application message should be retained by the Server for future subscribers. - /// Only the last message of each topic is retained - /// - /// - /// See MQTT Publish - /// for more details about the protocol publish - /// - Task PublishAsync (MqttApplicationMessage message, MqttQualityOfService qos, bool retain = false); + /// + /// Represents the protocol publish, which consists of sending a PUBLISH packet + /// and awaiting the corresponding ACK packet, if applies, based on the QoS defined + /// + /// + /// The application message to publish to the Server. + /// See for more details about the application messages + /// + /// + /// The Quality Of Service (QoS) associated to the application message, which determines + /// the sequence of acknowledgements that Client and Server should send each other to consider the message as delivered + /// See for more details about the QoS values + /// + /// + /// Indicates if the application message should be retained by the Server for future subscribers. + /// Only the last message of each topic is retained + /// + /// + /// See MQTT Publish + /// for more details about the protocol publish + /// + Task PublishAsync(MqttApplicationMessage message, MqttQualityOfService qos, bool retain = false); - /// - /// Represents the protocol unsubscription, which consists of sending an UNSUBSCRIBE packet - /// and awaiting the corresponding UNSUBACK packet from the Server - /// - /// - /// The list of topics to unsubscribe from - /// Once the unsubscription completes, no more application messages for those topics will arrive to - /// - /// MqttClientException - /// - /// See MQTT Unsubscribe - /// for more details about the protocol unsubscription - /// - Task UnsubscribeAsync (params string[] topics); + /// + /// Represents the protocol unsubscription, which consists of sending an UNSUBSCRIBE packet + /// and awaiting the corresponding UNSUBACK packet from the Server + /// + /// + /// The list of topics to unsubscribe from + /// Once the unsubscription completes, no more application messages for those topics will arrive to + /// + /// MqttClientException + /// + /// See MQTT Unsubscribe + /// for more details about the protocol unsubscription + /// + Task UnsubscribeAsync(params string[] topics); - /// - /// Represents the protocol disconnection, which consists of sending a DISCONNECT packet to the Server - /// No acknowledgement is sent by the Server on the disconnection - /// Once the client is successfully disconnected, the event will be fired - /// - /// - /// See MQTT Disconnect - /// for more details about the protocol disconnection - /// - Task DisconnectAsync (); + /// + /// Represents the protocol disconnection, which consists of sending a DISCONNECT packet to the Server + /// No acknowledgement is sent by the Server on the disconnection + /// Once the client is successfully disconnected, the event will be fired + /// + /// + /// See MQTT Disconnect + /// for more details about the protocol disconnection + /// + Task DisconnectAsync(); } } diff --git a/src/Client/Sdk/MqttClientImpl.cs b/src/Client/Sdk/MqttClientImpl.cs index 551cf084..bc1476b0 100644 --- a/src/Client/Sdk/MqttClientImpl.cs +++ b/src/Client/Sdk/MqttClientImpl.cs @@ -12,7 +12,7 @@ namespace System.Net.Mqtt.Sdk { internal class MqttClientImpl : IMqttClient { - static readonly ITracer tracer = Tracer.Get (); + static readonly ITracer tracer = Tracer.Get(); bool disposed; bool isProtocolConnected; @@ -27,7 +27,7 @@ internal class MqttClientImpl : IMqttClient readonly MqttConfiguration configuration; readonly TaskRunner clientSender; - internal MqttClientImpl (IPacketChannelFactory channelFactory, + internal MqttClientImpl(IPacketChannelFactory channelFactory, IProtocolFlowProvider flowProvider, IRepositoryProvider repositoryProvider, IPacketIdProvider packetIdProvider, @@ -36,11 +36,11 @@ internal MqttClientImpl (IPacketChannelFactory channelFactory, receiver = new Subject(); this.channelFactory = channelFactory; this.flowProvider = flowProvider; - sessionRepository = repositoryProvider.GetRepository (); + sessionRepository = repositoryProvider.GetRepository(); this.packetIdProvider = packetIdProvider; this.configuration = configuration; - clientSender = TaskRunner.Get (); - } + clientSender = TaskRunner.Get(); + } public event EventHandler Disconnected = (sender, args) => { }; @@ -50,7 +50,7 @@ public bool IsConnected { get { - CheckUnderlyingConnection (); + CheckUnderlyingConnection(); return isProtocolConnected && Channel.IsConnected; } @@ -62,359 +62,413 @@ private set public IObservable MessageStream { get { return receiver; } } - internal IMqttChannel Channel { get; private set; } + internal IMqttChannel Channel { get; private set; } - public async Task ConnectAsync (MqttClientCredentials credentials, MqttLastWill will = null, bool cleanSession = false) + public async Task ConnectAsync(MqttClientCredentials credentials, MqttLastWill will = null, bool cleanSession = false) { - if (disposed) { - throw new ObjectDisposedException (GetType ().FullName); + if (disposed) + { + throw new ObjectDisposedException(GetType().FullName); } - try { - if (IsConnected) { - throw new MqttClientException (string.Format (Properties.Resources.Client_AlreadyConnected, Id)); + try + { + if (IsConnected) + { + throw new MqttClientException(string.Format(Properties.Resources.Client_AlreadyConnected, Id)); } - if (string.IsNullOrEmpty (credentials.ClientId) && !cleanSession) { - throw new MqttClientException (Properties.Resources.Client_AnonymousClientWithoutCleanSession); + if (string.IsNullOrEmpty(credentials.ClientId) && !cleanSession) + { + throw new MqttClientException(Properties.Resources.Client_AnonymousClientWithoutCleanSession); } - Id = string.IsNullOrEmpty (credentials.ClientId) ? - MqttClient.GetAnonymousClientId () : + Id = string.IsNullOrEmpty(credentials.ClientId) ? + MqttClient.GetAnonymousClientId() : credentials.ClientId; - OpenClientSession (cleanSession); + OpenClientSession(cleanSession); - await InitializeChannelAsync ().ConfigureAwait (continueOnCapturedContext: false); + await InitializeChannelAsync().ConfigureAwait(continueOnCapturedContext: false); - var connect = new Connect (Id, cleanSession) { + var connect = new Connect(Id, cleanSession) + { UserName = credentials.UserName, Password = credentials.Password, Will = will, KeepAlive = configuration.KeepAliveSecs }; - await SendPacketAsync (connect) - .ConfigureAwait (continueOnCapturedContext: false); + await SendPacketAsync(connect) + .ConfigureAwait(continueOnCapturedContext: false); - var connectTimeout = TimeSpan.FromSeconds (configuration.WaitTimeoutSecs); + var connectTimeout = TimeSpan.FromSeconds(configuration.WaitTimeoutSecs); var ack = await packetListener - .PacketStream - .ObserveOn (NewThreadScheduler.Default) - .OfType () - .FirstOrDefaultAsync () - .Timeout (connectTimeout); + .PacketStream + .ObserveOn(NewThreadScheduler.Default) + .OfType() + .FirstOrDefaultAsync() + .Timeout(connectTimeout); - if (ack == null) { + if (ack == null) + { var message = string.Format(Properties.Resources.Client_ConnectionDisconnected, Id); - throw new MqttClientException (message); + throw new MqttClientException(message); } - if (ack.Status != MqttConnectionStatus.Accepted) { - throw new MqttConnectionException (ack.Status); + if (ack.Status != MqttConnectionStatus.Accepted) + { + throw new MqttConnectionException(ack.Status); } IsConnected = true; return ack.SessionPresent ? SessionState.SessionPresent : SessionState.CleanSession; - } catch (TimeoutException timeEx) { - Close (timeEx); - throw new MqttClientException (string.Format (Properties.Resources.Client_ConnectionTimeout, Id), timeEx); - } catch (MqttConnectionException connectionEx) { - Close (connectionEx); + } + catch (TimeoutException timeEx) + { + Close(timeEx); + throw new MqttClientException(string.Format(Properties.Resources.Client_ConnectionTimeout, Id), timeEx); + } + catch (MqttConnectionException connectionEx) + { + Close(connectionEx); - var message = string.Format (Properties.Resources.Client_ConnectNotAccepted, Id, connectionEx.ReturnCode); + var message = string.Format(Properties.Resources.Client_ConnectNotAccepted, Id, connectionEx.ReturnCode); - throw new MqttClientException (message, connectionEx); - } catch (MqttClientException clientEx) { - Close (clientEx); + throw new MqttClientException(message, connectionEx); + } + catch (MqttClientException clientEx) + { + Close(clientEx); throw; - } catch (Exception ex) { - Close (ex); - throw new MqttClientException (string.Format (Properties.Resources.Client_ConnectionError, Id), ex); + } + catch (Exception ex) + { + Close(ex); + throw new MqttClientException(string.Format(Properties.Resources.Client_ConnectionError, Id), ex); } } - public Task ConnectAsync (MqttLastWill will = null) => - ConnectAsync (new MqttClientCredentials (), will, cleanSession: true); + public Task ConnectAsync(MqttLastWill will = null) => + ConnectAsync(new MqttClientCredentials(), will, cleanSession: true); - public async Task SubscribeAsync (string topicFilter, MqttQualityOfService qos) + public async Task SubscribeAsync(string topicFilter, MqttQualityOfService qos) { - if (disposed) { - throw new ObjectDisposedException (GetType ().FullName); + if (disposed) + { + throw new ObjectDisposedException(GetType().FullName); } - try { - var packetId = packetIdProvider.GetPacketId (); - var subscribe = new Subscribe (packetId, new Subscription (topicFilter, qos)); + try + { + var packetId = packetIdProvider.GetPacketId(); + var subscribe = new Subscribe(packetId, new Subscription(topicFilter, qos)); - var ack = default (SubscribeAck); - var subscribeTimeout = TimeSpan.FromSeconds (configuration.WaitTimeoutSecs); + var ack = default(SubscribeAck); + var subscribeTimeout = TimeSpan.FromSeconds(configuration.WaitTimeoutSecs); - await SendPacketAsync (subscribe) - .ConfigureAwait (continueOnCapturedContext: false); + await SendPacketAsync(subscribe) + .ConfigureAwait(continueOnCapturedContext: false); ack = await packetListener - .PacketStream - .ObserveOn (NewThreadScheduler.Default) - .OfType () - .FirstOrDefaultAsync (x => x.PacketId == packetId) - .Timeout (subscribeTimeout); + .PacketStream + .ObserveOn(NewThreadScheduler.Default) + .OfType() + .FirstOrDefaultAsync(x => x.PacketId == packetId) + .Timeout(subscribeTimeout); - if (ack == null) { + if (ack == null) + { var message = string.Format(Properties.Resources.Client_SubscriptionDisconnected, Id, topicFilter); - tracer.Error (message); + tracer.Error(message); - throw new MqttClientException (message); + throw new MqttClientException(message); } - if (ack.ReturnCodes.FirstOrDefault () == SubscribeReturnCode.Failure) { - var message = string.Format(Properties.Resources.Client_SubscriptionRejected, Id, topicFilter); + if (ack.ReturnCodes.FirstOrDefault() == SubscribeReturnCode.Failure) + { + var message = string.Format(Properties.Resources.Client_SubscriptionRejected, Id, topicFilter); - tracer.Error(message); + tracer.Error(message); - throw new MqttClientException (message); - } - } catch (TimeoutException timeEx) { - Close (timeEx); + throw new MqttClientException(message); + } + } + catch (TimeoutException timeEx) + { + Close(timeEx); - var message = string.Format (Properties.Resources.Client_SubscribeTimeout, Id, topicFilter); + var message = string.Format(Properties.Resources.Client_SubscribeTimeout, Id, topicFilter); - throw new MqttClientException (message, timeEx); - } catch (MqttClientException clientEx) { - Close (clientEx); + throw new MqttClientException(message, timeEx); + } + catch (MqttClientException clientEx) + { + Close(clientEx); throw; - } catch (Exception ex) { - Close (ex); + } + catch (Exception ex) + { + Close(ex); - var message = string.Format (Properties.Resources.Client_SubscribeError, Id, topicFilter); + var message = string.Format(Properties.Resources.Client_SubscribeError, Id, topicFilter); - throw new MqttClientException (message, ex); + throw new MqttClientException(message, ex); } } - public async Task PublishAsync (MqttApplicationMessage message, MqttQualityOfService qos, bool retain = false) + public async Task PublishAsync(MqttApplicationMessage message, MqttQualityOfService qos, bool retain = false) { - if (disposed) { - throw new ObjectDisposedException (GetType ().FullName); + if (disposed) + { + throw new ObjectDisposedException(GetType().FullName); } - try { - ushort? packetId = qos == MqttQualityOfService.AtMostOnce ? null : (ushort?)packetIdProvider.GetPacketId (); - var publish = new Publish (message.Topic, qos, retain, duplicated: false, packetId: packetId) + try + { + ushort? packetId = qos == MqttQualityOfService.AtMostOnce ? null : (ushort?)packetIdProvider.GetPacketId(); + var publish = new Publish(message.Topic, qos, retain, duplicated: false, packetId: packetId) { Payload = message.Payload }; - var senderFlow = flowProvider.GetFlow (); + var senderFlow = flowProvider.GetFlow(); - await clientSender.Run (async () => { - await senderFlow.SendPublishAsync (Id, publish, Channel) - .ConfigureAwait (continueOnCapturedContext: false); - }).ConfigureAwait (continueOnCapturedContext: false); - } catch (Exception ex) { - Close (ex); + await clientSender.Run(async () => + { + await senderFlow.SendPublishAsync(Id, publish, Channel) + .ConfigureAwait(continueOnCapturedContext: false); + }).ConfigureAwait(continueOnCapturedContext: false); + } + catch (Exception ex) + { + Close(ex); throw; } } - public async Task UnsubscribeAsync (params string[] topics) + public async Task UnsubscribeAsync(params string[] topics) { - if (disposed) { - throw new ObjectDisposedException (GetType ().FullName); + if (disposed) + { + throw new ObjectDisposedException(GetType().FullName); } - try { - topics = topics ?? new string[] { }; + try + { + topics = topics ?? new string[] { }; - var packetId = packetIdProvider.GetPacketId (); + var packetId = packetIdProvider.GetPacketId(); var unsubscribe = new Unsubscribe(packetId, topics); - var ack = default (UnsubscribeAck); + var ack = default(UnsubscribeAck); var unsubscribeTimeout = TimeSpan.FromSeconds(configuration.WaitTimeoutSecs); - await SendPacketAsync (unsubscribe) - .ConfigureAwait (continueOnCapturedContext: false); + await SendPacketAsync(unsubscribe) + .ConfigureAwait(continueOnCapturedContext: false); ack = await packetListener - .PacketStream - .ObserveOn (NewThreadScheduler.Default) - .OfType () - .FirstOrDefaultAsync (x => x.PacketId == packetId) - .Timeout (unsubscribeTimeout); + .PacketStream + .ObserveOn(NewThreadScheduler.Default) + .OfType() + .FirstOrDefaultAsync(x => x.PacketId == packetId) + .Timeout(unsubscribeTimeout); - if (ack == null) { + if (ack == null) + { var message = string.Format(Properties.Resources.Client_UnsubscribeDisconnected, Id, string.Join(", ", topics)); - tracer.Error (message); + tracer.Error(message); - throw new MqttClientException (message); + throw new MqttClientException(message); } - } catch (TimeoutException timeEx) { - Close (timeEx); + } + catch (TimeoutException timeEx) + { + Close(timeEx); - var message = string.Format (Properties.Resources.Client_UnsubscribeTimeout, Id, string.Join(", ", topics)); + var message = string.Format(Properties.Resources.Client_UnsubscribeTimeout, Id, string.Join(", ", topics)); - tracer.Error (message); + tracer.Error(message); - throw new MqttClientException (message, timeEx); - } catch (MqttClientException clientEx) { - Close (clientEx); + throw new MqttClientException(message, timeEx); + } + catch (MqttClientException clientEx) + { + Close(clientEx); throw; - } catch (Exception ex) { - Close (ex); + } + catch (Exception ex) + { + Close(ex); - var message = string.Format (Properties.Resources.Client_UnsubscribeError, Id, string.Join(", ", topics)); + var message = string.Format(Properties.Resources.Client_UnsubscribeError, Id, string.Join(", ", topics)); - tracer.Error (message); + tracer.Error(message); - throw new MqttClientException (message, ex); + throw new MqttClientException(message, ex); } } - public async Task DisconnectAsync () + public async Task DisconnectAsync() { - try { - if (!IsConnected) { - throw new MqttClientException (Properties.Resources.Client_AlreadyDisconnected); + try + { + if (!IsConnected) + { + throw new MqttClientException(Properties.Resources.Client_AlreadyDisconnected); } - packetsSubscription?.Dispose (); + packetsSubscription?.Dispose(); - await SendPacketAsync (new Disconnect ()) - .ConfigureAwait (continueOnCapturedContext: false); + await SendPacketAsync(new Disconnect()) + .ConfigureAwait(continueOnCapturedContext: false); await packetListener .PacketStream - .LastOrDefaultAsync (); + .LastOrDefaultAsync(); - Close (DisconnectedReason.SelfDisconnected); - } catch (Exception ex) { - Close (ex); + Close(DisconnectedReason.SelfDisconnected); + } + catch (Exception ex) + { + Close(ex); } } - void IDisposable.Dispose () + void IDisposable.Dispose() { - DisposeAsync (disposing: true).Wait (); - GC.SuppressFinalize (this); + DisposeAsync(disposing: true).Wait(); + GC.SuppressFinalize(this); } - protected virtual async Task DisposeAsync (bool disposing) + protected virtual async Task DisposeAsync(bool disposing) { if (disposed) return; - if (disposing) { - if (IsConnected) { - await DisconnectAsync ().ConfigureAwait (continueOnCapturedContext: false); + if (disposing) + { + if (IsConnected) + { + await DisconnectAsync().ConfigureAwait(continueOnCapturedContext: false); } - (clientSender as IDisposable)?.Dispose (); + (clientSender as IDisposable)?.Dispose(); disposed = true; } } - void Close (Exception ex) + void Close(Exception ex) { - tracer.Error (ex); - Close (DisconnectedReason.Error, ex.Message); + tracer.Error(ex); + Close(DisconnectedReason.Error, ex.Message); } - void Close (DisconnectedReason reason, string message = null) + void Close(DisconnectedReason reason, string message = null) { - tracer.Info (Properties.Resources.Client_Closing, Id, reason); + tracer.Info(Properties.Resources.Client_Closing, Id, reason); - CloseClientSession (); - packetsSubscription?.Dispose (); - packetListener?.Dispose (); + CloseClientSession(); + packetsSubscription?.Dispose(); + packetListener?.Dispose(); ResetReceiver(); - Channel?.Dispose (); + Channel?.Dispose(); IsConnected = false; - Id = null; + Id = null; - Disconnected (this, new MqttEndpointDisconnected (reason, message)); + Disconnected(this, new MqttEndpointDisconnected(reason, message)); } - async Task InitializeChannelAsync () + async Task InitializeChannelAsync() { Channel = await channelFactory - .CreateAsync () - .ConfigureAwait (continueOnCapturedContext: false); + .CreateAsync() + .ConfigureAwait(continueOnCapturedContext: false); - packetListener = new ClientPacketListener (Channel, flowProvider, configuration); - packetListener.Listen (); - ObservePackets (); + packetListener = new ClientPacketListener(Channel, flowProvider, configuration); + packetListener.Listen(); + ObservePackets(); } - void OpenClientSession (bool cleanSession) + void OpenClientSession(bool cleanSession) { - var session = string.IsNullOrEmpty (Id) ? default (ClientSession) : sessionRepository.Read (Id); + var session = string.IsNullOrEmpty(Id) ? default(ClientSession) : sessionRepository.Read(Id); var sessionPresent = cleanSession ? false : session != null; - if (cleanSession && session != null) { - sessionRepository.Delete (session.Id); + if (cleanSession && session != null) + { + sessionRepository.Delete(session.Id); session = null; - tracer.Info (Properties.Resources.Client_CleanedOldSession, Id); + tracer.Info(Properties.Resources.Client_CleanedOldSession, Id); } - if (session == null) { - session = new ClientSession (Id, cleanSession); + if (session == null) + { + session = new ClientSession(Id, cleanSession); - sessionRepository.Create (session); + sessionRepository.Create(session); - tracer.Info (Properties.Resources.Client_CreatedSession, Id); + tracer.Info(Properties.Resources.Client_CreatedSession, Id); } } - void CloseClientSession () + void CloseClientSession() { - var session = string.IsNullOrEmpty (Id) ? default (ClientSession) : sessionRepository.Read (Id); + var session = string.IsNullOrEmpty(Id) ? default(ClientSession) : sessionRepository.Read(Id); - if (session == null) { + if (session == null) + { return; } - if (session.Clean) { - sessionRepository.Delete (session.Id); + if (session.Clean) + { + sessionRepository.Delete(session.Id); - tracer.Info (Properties.Resources.Client_DeletedSessionOnDisconnect, Id); + tracer.Info(Properties.Resources.Client_DeletedSessionOnDisconnect, Id); } } - async Task SendPacketAsync (IPacket packet) + async Task SendPacketAsync(IPacket packet) { - await clientSender.Run (async () => await Channel.SendAsync (packet).ConfigureAwait (continueOnCapturedContext: false)) - .ConfigureAwait (continueOnCapturedContext: false); + await clientSender.Run(async () => await Channel.SendAsync(packet).ConfigureAwait(continueOnCapturedContext: false)) + .ConfigureAwait(continueOnCapturedContext: false); } - void CheckUnderlyingConnection () + void CheckUnderlyingConnection() { - if (isProtocolConnected && !Channel.IsConnected) { - Close (DisconnectedReason.Error, Properties.Resources.Client_UnexpectedChannelDisconnection); + if (isProtocolConnected && !Channel.IsConnected) + { + Close(DisconnectedReason.Error, Properties.Resources.Client_UnexpectedChannelDisconnection); } } - void ObservePackets () + void ObservePackets() { packetsSubscription = packetListener - .PacketStream - .ObserveOn (NewThreadScheduler.Default) - .Subscribe (packet => { - if (packet.Type == MqttPacketType.Publish) { + .PacketStream + .ObserveOn(NewThreadScheduler.Default) + .Subscribe(packet => + { + if (packet.Type == MqttPacketType.Publish) + { var publish = packet as Publish; - var message = new MqttApplicationMessage (publish.Topic, publish.Payload); + var message = new MqttApplicationMessage(publish.Topic, publish.Payload); - receiver.OnNext (message); - tracer.Info (Properties.Resources.Client_NewApplicationMessageReceived, Id, publish.Topic); + receiver.OnNext(message); + tracer.Info(Properties.Resources.Client_NewApplicationMessageReceived, Id, publish.Topic); } - }, ex => { - Close (ex); - }, () => { - tracer.Warn (Properties.Resources.Client_PacketsObservableCompleted); - Close (DisconnectedReason.RemoteDisconnected); + }, ex => + { + Close(ex); + }, () => + { + tracer.Warn(Properties.Resources.Client_PacketsObservableCompleted); + Close(DisconnectedReason.RemoteDisconnected); }); }