Skip to content

Commit

Permalink
Merge branch 'master' into nack
Browse files Browse the repository at this point in the history
  • Loading branch information
dionjansen committed Dec 23, 2020
2 parents 163ea45 + 8f09af7 commit 6fc4550
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 15 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.10.1] - 2020-12-23

### Added

- MessageId implements IComparable\<MessageId\>

### Fixed

- Do not throw exceptions when disposing consumers, readers or producers

## [0.10.0] - 2020-12-16

### Added
Expand Down
2 changes: 1 addition & 1 deletion src/DotPulsar/DotPulsar.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks>
<Version>0.10.0</Version>
<Version>0.10.1</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>DanskeCommodities;dblank</Authors>
Expand Down
1 change: 0 additions & 1 deletion src/DotPulsar/Internal/ConsumerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public async ValueTask DisposeAsync()

private async ValueTask SendFlow(CancellationToken cancellationToken)
{
//TODO Should sending the flow command be handled on another thread and thereby not slow down the consumer?
await _connection.Send(_cachedCommandFlow, cancellationToken).ConfigureAwait(false);

if (_firstFlow)
Expand Down
6 changes: 3 additions & 3 deletions src/DotPulsar/Internal/NotReadyChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel, IReade
public ValueTask DisposeAsync()
=> new ValueTask();

public ValueTask ClosedByClient(CancellationToken cancellationToken)
=> new ValueTask();

public ValueTask<Message> Receive(CancellationToken cancellationToken = default)
=> throw GetException();

Expand All @@ -51,9 +54,6 @@ public Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancella
public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
=> throw GetException();

public ValueTask ClosedByClient(CancellationToken cancellationToken)
=> throw GetException();

private static Exception GetException()
=> new ChannelNotReadyException();
}
Expand Down
34 changes: 33 additions & 1 deletion src/DotPulsar/MessageId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace DotPulsar
/// <summary>
/// Unique identifier of a single message.
/// </summary>
public sealed class MessageId : IEquatable<MessageId>
public sealed class MessageId : IEquatable<MessageId>, IComparable<MessageId>
{
static MessageId()
{
Expand Down Expand Up @@ -72,6 +72,38 @@ public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex)
/// </summary>
public int BatchIndex => Data.BatchIndex;

public int CompareTo(MessageId? other)
{
if (other is null)
return 1;

var result = LedgerId.CompareTo(other.LedgerId);
if (result != 0)
return result;

result = EntryId.CompareTo(other.EntryId);
if (result != 0)
return result;

result = Partition.CompareTo(other.Partition);
if (result != 0)
return result;

return BatchIndex.CompareTo(other.BatchIndex);
}

public static bool operator >(MessageId x, MessageId y)
=> x is not null && x.CompareTo(y) >= 1;

public static bool operator <(MessageId x, MessageId y)
=> x is not null ? x.CompareTo(y) <= -1 : y is not null;

public static bool operator >=(MessageId x, MessageId y)
=> x is not null ? x.CompareTo(y) >= 0 : y is null;

public static bool operator <=(MessageId x, MessageId y)
=> x is not null ? x.CompareTo(y) <= 0 : true;

public override bool Equals(object? o)
=> o is MessageId id && Equals(id);

Expand Down
96 changes: 87 additions & 9 deletions tests/DotPulsar.Tests/MessageIdTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
* limitations under the License.
*/

// We cannot assume consumers of this library will obey the nullability guarantees
#nullable disable

namespace DotPulsar.Tests
Expand All @@ -23,10 +22,83 @@ namespace DotPulsar.Tests

public class MessageIdTests
{
[Fact]
public void CompareTo_GivenTheSameValues_ShouldBeEqual()
{
var m1 = new MessageId(1, 2, 3, 4);
var m2 = new MessageId(1, 2, 3, 4);

m1.CompareTo(m2).Should().Be(0);
(m1 < m2).Should().BeFalse();
(m1 > m2).Should().BeFalse();
(m1 >= m2).Should().BeTrue();
(m1 <= m2).Should().BeTrue();
}

[Fact]
public void CompareTo_GivenAllNull_ShouldBeEqual()
{
MessageId m1 = null;
MessageId m2 = null;

(m1 < m2).Should().BeFalse();
(m1 > m2).Should().BeFalse();
(m1 <= m2).Should().BeTrue();
(m1 >= m2).Should().BeTrue();
}

[Fact]
public void CompareTo_GivenOneNull_ShouldFollowNull()
{
var m1 = new MessageId(1, 2, 3, 4);
MessageId m2 = null;

m1.CompareTo(m2).Should().BePositive();
(m1 < m2).Should().BeFalse();
(m1 > m2).Should().BeTrue();
(m1 <= m2).Should().BeFalse();
(m1 >= m2).Should().BeTrue();

(m2 < m1).Should().BeTrue();
(m2 > m1).Should().BeFalse();
(m2 <= m1).Should().BeTrue();
(m2 >= m1).Should().BeFalse();
}

[Theory]
[InlineData(2, 2, 3, 4)] // LegderId is greater
[InlineData(1, 3, 3, 4)] // EntryId is greater
[InlineData(1, 2, 4, 4)] // Partition is greater
[InlineData(1, 2, 3, 5)] // BatchIndex is greater
public void CompareTo_GivenCurrentFollowsArgument_ShouldReturnPositive(ulong ledgerId, ulong entryId, int partition, int batchIndex)
{
var m1 = new MessageId(ledgerId, entryId, partition, batchIndex);
var m2 = new MessageId(1, 2, 3, 4);

m1.CompareTo(m2).Should().BePositive();
(m1 > m2).Should().BeTrue();
(m1 < m2).Should().BeFalse();
}

[Theory]
[InlineData(0, 2, 3, 4)] // LegderId is less
[InlineData(1, 1, 3, 4)] // EntryId is less
[InlineData(1, 2, 2, 4)] // Partition is less
[InlineData(1, 2, 3, 3)] // BatchIndex is less
public void CompareTo_GivenCurrentPrecedesArgument_ShouldReturnNegative(ulong ledgerId, ulong entryId, int partition, int batchIndex)
{
var m1 = new MessageId(ledgerId, entryId, partition, batchIndex);
var m2 = new MessageId(1, 2, 3, 4);

m1.CompareTo(m2).Should().BeNegative();
(m1 < m2).Should().BeTrue();
(m1 > m2).Should().BeFalse();
}

[Fact]
public void Equals_GivenTheSameObject_ShouldBeEqual()
{
var m1 = new MessageId(1234, 5678, 9876, 5432);
var m1 = new MessageId(1, 2, 3, 4);
var m2 = m1;

m1.Equals(m2).Should().BeTrue();
Expand All @@ -37,19 +109,23 @@ public void Equals_GivenTheSameObject_ShouldBeEqual()
[Fact]
public void Equals_GivenTheSameValues_ShouldBeEqual()
{
var m1 = new MessageId(1234, 5678, 9876, 5432);
var m2 = new MessageId(1234, 5678, 9876, 5432);
var m1 = new MessageId(1, 2, 3, 4);
var m2 = new MessageId(1, 2, 3, 4);

m1.Equals(m2).Should().BeTrue();
(m1 == m2).Should().BeTrue();
(m1 != m2).Should().BeFalse();
}

[Fact]
public void Equals_GivenDifferentValues_ShouldNotBeEqual()
[Theory]
[InlineData(0, 2, 3, 4)] // LegerId not the same
[InlineData(1, 0, 3, 4)] // EntryId not the same
[InlineData(1, 2, 0, 4)] // Partition not the same
[InlineData(1, 2, 3, 0)] // BatchIndex not the same
public void Equals_GivenDifferentValues_ShouldNotBeEqual(ulong ledgerId, ulong entryId, int partition, int batchIndex)
{
var m1 = new MessageId(1234, 5678, 9876, 5432);
var m2 = new MessageId(9876, 6432, 1234, 6678);
var m1 = new MessageId(ledgerId, entryId, partition, batchIndex);
var m2 = new MessageId(1, 2, 3, 4);

m1.Equals(m2).Should().BeFalse();
(m1 == m2).Should().BeFalse();
Expand All @@ -70,7 +146,7 @@ public void Equals_GivenAllNull_ShouldBeEqual()
[Fact]
public void Equals_GivenOneNull_ShouldNotBeEqual()
{
var m1 = new MessageId(1234, 5678, 9876, 5432);
var m1 = new MessageId(1, 2, 3, 4);
MessageId m2 = null;

(m1 == null).Should().BeFalse();
Expand All @@ -80,3 +156,5 @@ public void Equals_GivenOneNull_ShouldNotBeEqual()
}
}
}

#nullable enable

0 comments on commit 6fc4550

Please sign in to comment.