Skip to content

Commit

Permalink
Migrated to MQTTnet NuGet for code consistency with UA Cloud Publishe…
Browse files Browse the repository at this point in the history
…r and enabled WebSocket support. Also updated Kafka NuGet.
  • Loading branch information
barnstee committed May 10, 2024
1 parent 5d26157 commit 9092035
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 64 deletions.
275 changes: 213 additions & 62 deletions MQTTClient.cs
Original file line number Diff line number Diff line change
@@ -1,84 +1,242 @@

namespace Opc.Ua.Cloud.Commander
{
using MQTTnet;
using MQTTnet.Adapter;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using Newtonsoft.Json;
using Opc.Ua;
using Serilog;
using System;
using System.Net.Security;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Web;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;

public class MQTTClient
{
private ApplicationConfiguration _appConfig = null;
private MqttClient _mqttClient = null;
private IMqttClient _client = null;

public MQTTClient(ApplicationConfiguration appConfig)
private readonly ApplicationConfiguration _uAApplication;

private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();

public MQTTClient(ApplicationConfiguration uAApplication)
{
_appConfig = appConfig;
_uAApplication = uAApplication;
}

public class MqttClientCertificatesProvider : IMqttClientCertificatesProvider
{
private readonly ApplicationConfiguration _uAApplication;

public MqttClientCertificatesProvider(ApplicationConfiguration uAApplication)
{
_uAApplication = uAApplication;
}

X509CertificateCollection IMqttClientCertificatesProvider.GetCertificates()
{
X509Certificate2 appCert = _uAApplication.SecurityConfiguration.ApplicationCertificate.Certificate;
if (appCert == null)
{
throw new Exception($"Cannot access OPC UA application certificate!");
}

return new X509CertificateCollection() { appCert };
}
}

public void Connect()
{
// create MQTT client
string brokerName = Environment.GetEnvironmentVariable("BROKERNAME");
string brokerName = Environment.GetEnvironmentVariable("BROKERNAME");
int brokerPort = int.Parse(Environment.GetEnvironmentVariable("BROKERPORT"));
string clientName = Environment.GetEnvironmentVariable("CLIENTNAME");
string userName = Environment.GetEnvironmentVariable("USERNAME");
string password = Environment.GetEnvironmentVariable("PASSWORD");
string topic = Environment.GetEnvironmentVariable("TOPIC");
_mqttClient = new MqttClient(brokerName, 8883, true, MqttSslProtocols.TLSv1_2, CertificateValidationCallback, null);
bool createBrokerSASToken = !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("CREATE_SAS_PASSWORD"));
bool useTLS = !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("USE_TLS"));
bool useUACertAuth = !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("USE_UA_CERT_AUTH"));

if (Environment.GetEnvironmentVariable("CREATE_SAS_PASSWORD") != null)
try
{
// create SAS token as password
TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
int week = 60 * 60 * 24 * 7;
string expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + week);
string stringToSign = HttpUtility.UrlEncode(brokerName + "/devices/" + clientName) + "\n" + expiry;
HMACSHA256 hmac = new HMACSHA256(Convert.FromBase64String(password));
string signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
password = "SharedAccessSignature sr=" + HttpUtility.UrlEncode(brokerName + "/devices/" + clientName) + "&sig=" + HttpUtility.UrlEncode(signature) + "&se=" + expiry;
}
// disconnect if still connected
if ((_client != null) && _client.IsConnected)
{
_client.DisconnectAsync().GetAwaiter().GetResult();

// register publish received and disconnect handler callbacks
_mqttClient.MqttMsgPublishReceived += PublishReceived;
_mqttClient.ConnectionClosed += ConnectionClosed;
_cancellationTokenSource.Cancel();
}

// subscribe to all our topics
_mqttClient.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
if (string.IsNullOrEmpty(brokerName))
{
// no broker URL configured = nothing to connect to!
Log.Logger.Error("Broker URL not configured. Cannot connect to broker!");
return;
}

// connect to MQTT broker
byte returnCode = _mqttClient.Connect(clientName, userName, password, false, 5);
if (returnCode != MqttMsgConnack.CONN_ACCEPTED)
{
Log.Logger.Error("Connection to MQTT broker failed with " + returnCode.ToString() + "!");
// create MQTT password
if (createBrokerSASToken)
{
// create SAS token as password
TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
int week = 60 * 60 * 24 * 7;
string expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + week);
string stringToSign = HttpUtility.UrlEncode(brokerName + "/devices/" + clientName) + "\n" + expiry;
HMACSHA256 hmac = new HMACSHA256(Convert.FromBase64String(password));
string signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
password = "SharedAccessSignature sr=" + HttpUtility.UrlEncode(brokerName + "/devices/" + clientName) + "&sig=" + HttpUtility.UrlEncode(signature) + "&se=" + expiry;
}

// create MQTT client
_client = new MqttFactory().CreateMqttClient();
_client.ApplicationMessageReceivedAsync += msg => HandleMessageAsync(msg);

MqttClientOptionsBuilder clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(brokerName, brokerPort)
.WithClientId(clientName)
.WithTlsOptions(new MqttClientTlsOptions { UseTls = useTLS })
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithTimeout(TimeSpan.FromSeconds(10))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(100))
.WithCleanSession(true) // clear existing subscriptions
.WithCredentials(userName, password);

if (brokerPort == 443)
{
clientOptions = new MqttClientOptionsBuilder()
.WithWebSocketServer( o => o.WithUri(brokerName))
.WithClientId(clientName)
.WithTlsOptions(new MqttClientTlsOptions { UseTls = useTLS })
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithTimeout(TimeSpan.FromSeconds(10))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(100))
.WithCleanSession(true) // clear existing subscriptions
.WithCredentials(userName, password);
}

if (useUACertAuth)
{
clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(brokerName)
.WithClientId(clientName)
.WithTlsOptions(new MqttClientTlsOptions
{
UseTls = true,
AllowUntrustedCertificates = true,
IgnoreCertificateChainErrors = true,
ClientCertificatesProvider = new MqttClientCertificatesProvider(_uAApplication)
})
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithTimeout(TimeSpan.FromSeconds(10))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(100))
.WithCleanSession(true) // clear existing subscriptions
.WithCredentials(clientName, string.Empty);
}

// setup disconnection handling
_client.DisconnectedAsync += disconnectArgs =>
{
Log.Logger.Warning($"Disconnected from MQTT broker: {disconnectArgs.Reason}");

// wait a 5 seconds, then simply reconnect again, if needed
Task.Delay(TimeSpan.FromSeconds(5)).GetAwaiter().GetResult();

MqttClientConnectResult connectResult = _client.ConnectAsync(clientOptions.Build(), _cancellationTokenSource.Token).GetAwaiter().GetResult();
if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
{
string status = GetStatus(connectResult.UserProperties)?.ToString("x4");
throw new Exception($"Connection to MQTT broker failed. Status: {connectResult.ResultCode}; status: {status}");
}

return Task.CompletedTask;
};

try
{
_cancellationTokenSource.Dispose();
_cancellationTokenSource = new CancellationTokenSource();

MqttClientConnectResult connectResult = _client.ConnectAsync(clientOptions.Build(), _cancellationTokenSource.Token).GetAwaiter().GetResult();
if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
{
string status = GetStatus(connectResult.UserProperties)?.ToString("x4");
throw new Exception($"Connection to MQTT broker failed. Status: {connectResult.ResultCode}; status: {status}");
}

if (!string.IsNullOrEmpty(topic))
{
MqttClientSubscribeResult subscribeResult = _client.SubscribeAsync(
new MqttTopicFilter
{
Topic = topic,
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
}).GetAwaiter().GetResult();

// make sure subscriptions were successful
if (subscribeResult.Items.Count != 1 || subscribeResult.Items.ElementAt(0).ResultCode != MqttClientSubscribeResultCode.GrantedQoS0)
{
throw new ApplicationException("Failed to subscribe");
}
}

Log.Logger.Information("Connected to MQTT broker.");
}
catch (MqttConnectingFailedException ex)
{
Log.Logger.Error($"Failed to connect with reason {ex.ResultCode} and message: {ex.Message}");
if (ex.Result?.UserProperties != null)
{
foreach (var prop in ex.Result.UserProperties)
{
Log.Logger.Error($"{prop.Name}: {prop.Value}");
}
}
}
}
else
catch (Exception ex)
{
Log.Logger.Information("Connected to MQTT broker.");
Log.Logger.Error("Failed to connect to MQTT broker: " + ex.Message);
}
}

private void ConnectionClosed(object sender, EventArgs e)
private MqttApplicationMessage BuildResponse(string status, string id, byte[] payload)
{
string responseTopic = Environment.GetEnvironmentVariable("RESPONSE_TOPIC");

return new MqttApplicationMessageBuilder()
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithTopic($"{responseTopic}/{status}/{id}")
.WithPayload(payload)
.Build();
}

// parses status from packet properties
private int? GetStatus(List<MqttUserProperty> properties)
{
Log.Logger.Warning("Disconnected from MQTT broker.");
var status = properties.FirstOrDefault(up => up.Name == "status");
if (status == null)
{
return null;
}

// simply reconnect again
Connect();
return int.Parse(status.Value, NumberStyles.HexNumber, CultureInfo.InvariantCulture);
}

private void PublishReceived(object sender, MqttMsgPublishEventArgs e)
// handles all incoming messages
private async Task HandleMessageAsync(MqttApplicationMessageReceivedEventArgs args)
{
Log.Logger.Information($"Received cloud command with topic: {e.Topic} and payload: {Encoding.UTF8.GetString(e.Message)}");
Log.Logger.Information($"Received cloud command with topic: {args.ApplicationMessage.Topic} and payload: {args.ApplicationMessage.ConvertPayloadToString()}");

string requestTopic = Environment.GetEnvironmentVariable("TOPIC");
string responseTopic = Environment.GetEnvironmentVariable("RESPONSE_TOPIC");
string requestID = e.Topic.Substring(e.Topic.IndexOf("?"));
string requestID = args.ApplicationMessage.Topic.Substring(args.ApplicationMessage.Topic.IndexOf("?"));

ResponseModel response = new()
{
Expand All @@ -87,7 +245,7 @@ private void PublishReceived(object sender, MqttMsgPublishEventArgs e)

try
{
string requestPayload = Encoding.UTF8.GetString(e.Message);
string requestPayload = args.ApplicationMessage.ConvertPayloadToString();

// parse the message
RequestModel request = JsonConvert.DeserializeObject<RequestModel>(requestPayload);
Expand All @@ -102,52 +260,45 @@ private void PublishReceived(object sender, MqttMsgPublishEventArgs e)
response.CorrelationId = request.CorrelationId;

// route this to the right handler
if (e.Topic.StartsWith(requestTopic.TrimEnd('#') + "MethodCall"))
if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "MethodCall"))
{
new UAClient().ExecuteUACommand(_appConfig, requestPayload);
new UAClient().ExecuteUACommand(_uAApplication, requestPayload);
response.Success = true;
}
else if (e.Topic.StartsWith(requestTopic.TrimEnd('#') + "Read"))
else if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "Read"))
{
response.Status = new UAClient().ReadUAVariable(_appConfig, requestPayload);
response.Status = new UAClient().ReadUAVariable(_uAApplication, requestPayload);
response.Success = true;
}
else if (e.Topic.StartsWith(requestTopic.TrimEnd('#') + "HistoryRead"))
else if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "HistoryRead"))
{
response.Status = new UAClient().ReadUAHistory(_appConfig, requestPayload);
response.Status = new UAClient().ReadUAHistory(_uAApplication, requestPayload);
response.Success = true;
}
else if (e.Topic.StartsWith(requestTopic.TrimEnd('#') + "Write"))
else if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "Write"))
{
new UAClient().WriteUAVariable(_appConfig, requestPayload);
new UAClient().WriteUAVariable(_uAApplication, requestPayload);
response.Success = true;
}
else
{
Log.Logger.Error("Unknown command received: " + e.Topic);
response.Status = "Unkown command " + e.Topic;
Log.Logger.Error("Unknown command received: " + args.ApplicationMessage.Topic);
response.Status = "Unkown command " + args.ApplicationMessage.Topic;
response.Success = false;
}

// send reponse to MQTT broker
_mqttClient.Publish(responseTopic + "/200/" + requestID, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(response)), MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);

await _client.PublishAsync(BuildResponse("200", requestID, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(response))), _cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (Exception ex)
{
Log.Logger.Error(ex, "MQTTBrokerPublishReceived");
Log.Logger.Error(ex, "HandleMessageAsync");
response.Status = ex.Message;
response.Success = false;

// send error to MQTT broker
_mqttClient.Publish(responseTopic + "/500/" + requestID, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(response)), MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);
await _client.PublishAsync(BuildResponse("500", requestID, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(response))), _cancellationTokenSource.Token).ConfigureAwait(false);
}
}

private bool CertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
// always trust the MQTT broker certificate
return true;
}
}
}
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ A cross-platform, cloud-based OPC UA command & control reference implementation
The following environment variables are REQUIRED:

* BROKERNAME - Broker name to connect to
* BROKERPORT = Broker port to connect to. When 443 is specified, Websockets are used.
* CLIENTNAME - Client name, for example the device ID UA Cloud Commander is running on. If running as an Azure IoT Edge module, this is `<deviceID>/<moduleID>`
* TOPIC - Topic to subscribe to in the syntax `<YourTopicName>/#`. `Read`, `Write` and `Command` must be sub-topics of this topic. For IoT Hub, the topic is `$iothub/methods/POST/#`
* RESPONSE_TOPIC - Topic to send responses to, for IoT Hub, this is `$iothub/methods/res/`
Expand All @@ -16,6 +17,8 @@ The following environment variables are REQUIRED:
The following environment variables are optional:

* CREATE_SAS_PASSWORD - Create a SAS token from the password, this is for example needed when using IoT Hub as the MQTT broker
* USE_TLS - Use TLS
* USE_UA_CERT_AUTH - Use the UA Certificate to authenticate with the MQTT broker
* UA_USERNAME - Username for the OPC UA server to connect to
* UA_PASSWORD - Password for the OPC UA server to connect to
* USE_KAFKA - Use Kafka instead of MQTT for communication
Expand Down
Loading

0 comments on commit 9092035

Please sign in to comment.