Skip to content

Commit

Permalink
fix: make invokeId thread safe (svn@421)
Browse files Browse the repository at this point in the history
If you had one BACnetClient instance and you sent multiple requests in parallel, it was possible that the responses could get mixed up due to using same invokeId and/or not checking the sender address.
  • Loading branch information
gralin committed Mar 19, 2024
1 parent 3411ed7 commit 96b580d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 41 deletions.
60 changes: 28 additions & 32 deletions BACnetClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*********************************************************************/
using System.Collections.Concurrent;

namespace System.IO.BACnet;

public delegate void MessageRecievedHandler(IBacnetTransport sender, byte[] buffer, int offset, int msgLength, BacnetAddress remoteAddress);
Expand All @@ -33,7 +35,7 @@ namespace System.IO.BACnet;
public class BacnetClient : IDisposable
{
private int _retries;
private byte _invokeId;
private int _invokeId;

private readonly LastSegmentAck _lastSegmentAck = new();
private uint _writepriority;
Expand All @@ -43,7 +45,7 @@ public class BacnetClient : IDisposable
/// TODO: invoke-id should be PER (remote) DEVICE!
/// </summary>
private Dictionary<byte, List<Tuple<byte, byte[]>>> _segmentsPerInvokeId = new();
private Dictionary<byte, object> _locksPerInvokeId = new();
private ConcurrentDictionary<byte, object> _locksPerInvokeId = new();
private Dictionary<byte, byte> _expectedSegmentsPerInvokeId = new();

public const int DEFAULT_UDP_PORT = 0xBAC0;
Expand Down Expand Up @@ -676,13 +678,7 @@ protected void ProcessSegmentAck(BacnetAddress adr, BacnetPduTypes type, byte or

private void ProcessSegment(BacnetAddress address, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, BacnetMaxSegments maxSegments, BacnetMaxAdpu maxAdpu, bool server, byte sequenceNumber, byte proposedWindowNumber, byte[] buffer, int offset, int length)
{
if (!_locksPerInvokeId.TryGetValue(invokeId, out var lockObj))
{
lockObj = new object();
_locksPerInvokeId[invokeId] = lockObj;
}

lock (lockObj)
lock (_locksPerInvokeId.GetOrAdd(invokeId, () => new object()))
{
ProcessSegmentLocked(address, type, service, invokeId, maxSegments, maxAdpu, server, sequenceNumber,
proposedWindowNumber, buffer, offset, length);
Expand Down Expand Up @@ -1303,7 +1299,7 @@ public IAsyncResult BeginWriteFileRequest(BacnetAddress adr, BacnetObjectId obje
{
Log.Debug("Sending AtomicWriteFileRequest");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -1342,7 +1338,7 @@ public IAsyncResult BeginReadFileRequest(BacnetAddress adr, BacnetObjectId objec
{
Log.Debug("Sending AtomicReadFileRequest");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

//encode
var buffer = GetEncodeBuffer(Transport.HeaderLength);
Expand Down Expand Up @@ -1423,7 +1419,7 @@ private IAsyncResult BeginReadRangeRequestCore(BacnetAddress adr, BacnetObjectId
{
Log.Debug("Sending ReadRangeRequest");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

//encode
var buffer = GetEncodeBuffer(Transport.HeaderLength);
Expand Down Expand Up @@ -1529,7 +1525,7 @@ public IAsyncResult BeginSubscribeCOVRequest(BacnetAddress adr, BacnetObjectId o
{
Log.Debug($"Sending SubscribeCOVRequest {objectId}");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -1579,7 +1575,7 @@ public IAsyncResult BeginSendConfirmedEventNotificationRequest(BacnetAddress adr
{
Log.Debug($"Sending Confirmed Event Notification {eventData.eventType} {eventData.eventObjectIdentifier}");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr, source);
Expand Down Expand Up @@ -1628,7 +1624,7 @@ public IAsyncResult BeginSubscribePropertyRequest(BacnetAddress adr, BacnetObjec
{
Log.Debug($"Sending SubscribePropertyRequest {objectId}.{monitoredProperty}");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -1696,7 +1692,7 @@ public IAsyncResult BeginReadPropertyRequest(BacnetAddress address, BacnetObject
{
Log.Debug($"Sending ReadPropertyRequest {objectId} {propertyId}");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, address.RoutedSource, address.RoutedDestination);
Expand Down Expand Up @@ -1775,7 +1771,7 @@ public IAsyncResult BeginWritePropertyRequest(BacnetAddress adr, BacnetObjectId
{
Log.Debug($"Sending WritePropertyRequest {objectId} {propertyId}");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand All @@ -1792,7 +1788,7 @@ public IAsyncResult BeginWritePropertyRequest(BacnetAddress adr, BacnetObjectId
public IAsyncResult BeginWritePropertyMultipleRequest(BacnetAddress adr, BacnetObjectId objectId, ICollection<BacnetPropertyValue> valueList, bool waitForTransmit, byte invokeId = 0)
{
Log.Debug($"Sending WritePropertyMultipleRequest {objectId}");
if (invokeId == 0) invokeId = unchecked(_invokeId++);
if (invokeId == 0) invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
//BacnetNpduControls.PriorityNormalMessage
Expand Down Expand Up @@ -1845,7 +1841,7 @@ public IAsyncResult BeginWritePropertyMultipleRequest(BacnetAddress adr, ICollec
Log.Debug($"Sending WritePropertyMultipleRequest {objectIds}");

if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
//BacnetNpduControls.PriorityNormalMessage
Expand Down Expand Up @@ -1909,7 +1905,7 @@ public IAsyncResult BeginReadPropertyMultipleRequest(BacnetAddress adr, BacnetOb
var propertyIds = string.Join(", ", propertyIdAndArrayIndex.Select(v => (BacnetPropertyIds)v.propertyIdentifier));
Log.Debug($"Sending ReadPropertyMultipleRequest {objectId} {propertyIds}");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -1950,7 +1946,7 @@ public IAsyncResult BeginReadPropertyMultipleRequest(BacnetAddress adr, IList<Ba
var objectIds = string.Join(", ", properties.Select(v => v.objectIdentifier));
Log.Debug($"Sending ReadPropertyMultipleRequest {objectIds}");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -2016,7 +2012,7 @@ public bool CreateObjectRequest(BacnetAddress adr, BacnetObjectId objectId, ICol
public IAsyncResult BeginCreateObjectRequest(BacnetAddress adr, BacnetObjectId objectId, ICollection<BacnetPropertyValue> valueList, bool waitForTransmit, byte invokeId = 0)
{
Log.Debug("Sending CreateObjectRequest");
if (invokeId == 0) invokeId = unchecked(_invokeId++);
if (invokeId == 0) invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);

Expand Down Expand Up @@ -2065,7 +2061,7 @@ public bool DeleteObjectRequest(BacnetAddress adr, BacnetObjectId objectId, byte
public IAsyncResult BeginDeleteObjectRequest(BacnetAddress adr, BacnetObjectId objectId, bool waitForTransmit, byte invokeId = 0)
{
Log.Debug("Sending DeleteObjectRequest");
if (invokeId == 0) invokeId = unchecked(_invokeId++);
if (invokeId == 0) invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);

Expand Down Expand Up @@ -2139,7 +2135,7 @@ public IAsyncResult BeginRemoveListElementRequest(BacnetAddress adr, BacnetObjec
{
Log.Debug("Sending RemoveListElementRequest");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand All @@ -2157,7 +2153,7 @@ public IAsyncResult BeginAddListElementRequest(BacnetAddress adr, BacnetObjectId
{
Log.Debug($"Sending AddListElementRequest {objectId} {(BacnetPropertyIds)reference.propertyIdentifier}");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -2211,7 +2207,7 @@ public IAsyncResult BeginRawEncodedDecodedPropertyConfirmedRequest(BacnetAddress
{
Log.Debug("Sending RawEncodedRequest");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -2293,7 +2289,7 @@ public IAsyncResult BeginDeviceCommunicationControlRequest(BacnetAddress adr, ui
{
Log.Debug("Sending DeviceCommunicationControlRequest");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -2356,7 +2352,7 @@ public IAsyncResult BeginGetAlarmSummaryOrEventRequest(BacnetAddress adr, bool g
{
Log.Debug("Sending Alarm summary request");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -2438,7 +2434,7 @@ public IAsyncResult BeginAlarmAcknowledgement(BacnetAddress adr, BacnetObjectId
{
Log.Debug("Sending AlarmAcknowledgement");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -2482,7 +2478,7 @@ public IAsyncResult BeginReinitializeRequest(BacnetAddress adr, BacnetReinitiali
{
Log.Debug("Sending ReinitializeRequest");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand All @@ -2509,7 +2505,7 @@ public void EndReinitializeRequest(IAsyncResult result, out Exception ex)
public IAsyncResult BeginConfirmedNotify(BacnetAddress adr, uint subscriberProcessIdentifier, uint initiatingDeviceIdentifier, BacnetObjectId monitoredObjectIdentifier, uint timeRemaining, IList<BacnetPropertyValue> values, bool waitForTransmit, byte invokeId = 0)
{
Log.Debug("Sending Notify (confirmed)");
if (invokeId == 0) invokeId = unchecked(_invokeId++);
if (invokeId == 0) invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, adr.RoutedSource, adr.RoutedDestination);
Expand Down Expand Up @@ -2587,7 +2583,7 @@ public IAsyncResult BeginLifeSafetyOperationRequest(BacnetAddress address, Bacne
{
Log.Debug($"Sending {ToTitleCase(operation)} {objectId}");
if (invokeId == 0)
invokeId = unchecked(_invokeId++);
invokeId = (byte)Interlocked.Increment(ref _invokeId);

var buffer = GetEncodeBuffer(Transport.HeaderLength);
NPDU.Encode(buffer, BacnetNpduControls.PriorityNormalMessage | BacnetNpduControls.ExpectingReply, address.RoutedSource, address.RoutedDestination);
Expand Down
19 changes: 10 additions & 9 deletions BacnetAsyncResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ public class BacnetAsyncResult : IAsyncResult, IDisposable
private readonly bool _waitForTransmit;
private readonly int _transmitTimeout;
private ManualResetEvent _waitHandle;
private readonly BacnetAddress _address;

public bool Segmented { get; private set; }
public byte[] Result { get; private set; }
public object AsyncState { get; set; }
public bool CompletedSynchronously { get; private set; }
public WaitHandle AsyncWaitHandle => _waitHandle;
public bool IsCompleted => _waitHandle.WaitOne(0);
public BacnetAddress Address { get; }
public BacnetAddress Address => _address;

public Exception Error
{
Expand All @@ -33,7 +34,7 @@ public Exception Error
public BacnetAsyncResult(BacnetClient comm, BacnetAddress adr, byte invokeId, byte[] transmitBuffer, int transmitLength, bool waitForTransmit, int transmitTimeout)
{
_transmitTimeout = transmitTimeout;
Address = adr;
_address = adr;
_waitForTransmit = waitForTransmit;
_transmitBuffer = transmitBuffer;
_transmitLength = transmitLength;
Expand All @@ -52,7 +53,7 @@ public void Resend()
{
try
{
if (_comm.Transport.Send(_transmitBuffer, _comm.Transport.HeaderLength, _transmitLength, Address, _waitForTransmit, _transmitTimeout) < 0)
if (_comm.Transport.Send(_transmitBuffer, _comm.Transport.HeaderLength, _transmitLength, _address, _waitForTransmit, _transmitTimeout) < 0)
{
Error = new IOException("Write Timeout");
}
Expand All @@ -65,7 +66,7 @@ public void Resend()

private void OnSegment(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, BacnetMaxSegments maxSegments, BacnetMaxAdpu maxAdpu, byte sequenceNumber, byte[] buffer, int offset, int length)
{
if (invokeId != _waitInvokeId)
if (invokeId != _waitInvokeId || !adr.Equals(_address))
return;

Segmented = true;
Expand All @@ -74,39 +75,39 @@ private void OnSegment(BacnetClient sender, BacnetAddress adr, BacnetPduTypes ty

private void OnSimpleAck(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, byte[] data, int dataOffset, int dataLength)
{
if (invokeId != _waitInvokeId)
if (invokeId != _waitInvokeId || !adr.Equals(_address))
return;

_waitHandle.Set();
}

private void OnAbort(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, byte invokeId, BacnetAbortReason reason, byte[] buffer, int offset, int length)
{
if (invokeId != _waitInvokeId)
if (invokeId != _waitInvokeId || !adr.Equals(_address))
return;

Error = new Exception($"Abort from device, reason: {reason}");
}

private void OnReject(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, byte invokeId, BacnetRejectReason reason, byte[] buffer, int offset, int length)
{
if (invokeId != _waitInvokeId)
if (invokeId != _waitInvokeId || !adr.Equals(_address))
return;

Error = new Exception($"Reject from device, reason: {reason}");
}

private void OnError(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, BacnetErrorClasses errorClass, BacnetErrorCodes errorCode, byte[] buffer, int offset, int length)
{
if (invokeId != _waitInvokeId)
if (invokeId != _waitInvokeId || !adr.Equals(_address))
return;

Error = new Exception($"Error from device: {errorClass} - {errorCode}");
}

private void OnComplexAck(BacnetClient sender, BacnetAddress adr, BacnetPduTypes type, BacnetConfirmedServices service, byte invokeId, byte[] buffer, int offset, int length)
{
if (invokeId != _waitInvokeId)
if (invokeId != _waitInvokeId || !adr.Equals(_address))

This comment has been minimized.

Copy link
@g1n93r

g1n93r May 27, 2024

Beware I got some false positive "addresses unequal" on some Schneider BACnet Devices.
image
image

For some reason adr.RoutedSource != _address.RoutedSource even though it is the same address. Removing the condition fixed my problem.

I only observed this behavior on some specific Schneider Device. I do not know whether the device or the library is to blame.

return;

Segmented = false;
Expand Down

0 comments on commit 96b580d

Please sign in to comment.