Skip to content

Commit

Permalink
Updated NuGet packages and fixed consumers/readers sometimes throwing…
Browse files Browse the repository at this point in the history
… an exception if disposed right after being created
  • Loading branch information
blankensteiner committed Apr 3, 2024
1 parent c15379a commit 71ef474
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 55 deletions.
6 changes: 3 additions & 3 deletions benchmarks/Compression/Compression.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.12" />
<PackageReference Include="DotNetZip" Version="1.16.0" />
<PackageReference Include="Google.Protobuf" Version="3.25.3" />
<PackageReference Include="Google.Protobuf" Version="3.26.1" />
<PackageReference Include="Grpc.Tools" Version="2.62.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="IronSnappy" Version="1.3.1" />
<PackageReference Include="K4os.Compression.LZ4" Version="1.3.6" />
<PackageReference Include="K4os.Compression.LZ4" Version="1.3.8" />
<PackageReference Include="ZstdNet" Version="1.4.5" />
<PackageReference Include="ZstdSharp.Port" Version="0.7.5" />
<PackageReference Include="ZstdSharp.Port" Version="0.7.6" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/DotPulsar/DotPulsar.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<ItemGroup>
<PackageReference Include="HashDepot" Version="2.0.3" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.2" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.3" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" IncludeAssets="runtime; build; native; contentfiles; analyzers; buildtransitive" />
<PackageReference Include="protobuf-net" Version="3.2.30" />
<PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
Expand Down
51 changes: 27 additions & 24 deletions src/DotPulsar/Internal/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ public async ValueTask DisposeAsync()
await _processManager.DisposeAsync().ConfigureAwait(false);

foreach (var subConsumer in _subConsumers)
await subConsumer.DisposeAsync().ConfigureAwait(false);
{
if (subConsumer is not null)
await subConsumer.DisposeAsync().ConfigureAwait(false);
}

await _lock.DisposeAsync().ConfigureAwait(false);
_state.SetState(ConsumerState.Closed);
Expand All @@ -187,32 +190,32 @@ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellatio

var iterations = 0;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
while (true)
{
iterations++;
_subConsumerIndex++;
if (_subConsumerIndex == _subConsumers.Length)
_subConsumerIndex = 0;

var receiveTask = _receiveTasks[_subConsumerIndex];
if (receiveTask == _emptyTaskCompletionSource.Task)
while (true)
{
var receiveTaskValueTask = _subConsumers[_subConsumerIndex].Receive(cancellationToken);
if (receiveTaskValueTask.IsCompleted)
return receiveTaskValueTask.Result;
_receiveTasks[_subConsumerIndex] = receiveTaskValueTask.AsTask();
}
else
{
if (receiveTask.IsCompleted)
iterations++;
_subConsumerIndex++;
if (_subConsumerIndex == _subConsumers.Length)
_subConsumerIndex = 0;

var receiveTask = _receiveTasks[_subConsumerIndex];
if (receiveTask == _emptyTaskCompletionSource.Task)
{
_receiveTasks[_subConsumerIndex] = _emptyTaskCompletionSource.Task;
return receiveTask.Result;
var receiveTaskValueTask = _subConsumers[_subConsumerIndex].Receive(cancellationToken);
if (receiveTaskValueTask.IsCompleted)
return receiveTaskValueTask.Result;
_receiveTasks[_subConsumerIndex] = receiveTaskValueTask.AsTask();
}
else
{
if (receiveTask.IsCompleted)
{
_receiveTasks[_subConsumerIndex] = _emptyTaskCompletionSource.Task;
return receiveTask.Result;
}
}
if (iterations == _subConsumers.Length)
await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
}
if (iterations == _subConsumers.Length)
await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
}
}

public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken)
Expand Down Expand Up @@ -355,7 +358,7 @@ public async ValueTask<IEnumerable<MessageId>> GetLastMessageIds(CancellationTok
await Guard(cancellationToken).ConfigureAwait(false);

if (!_isPartitionedTopic)
return new[] { await _subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false) };
return [await _subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false)];

var getLastMessageIdsTasks = new List<Task<MessageId>>(_numberOfPartitions);

Expand Down
49 changes: 26 additions & 23 deletions src/DotPulsar/Internal/Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,32 +188,32 @@ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellatio

var iterations = 0;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
while (true)
{
iterations++;
_subReaderIndex++;
if (_subReaderIndex == _subReaders.Length)
_subReaderIndex = 0;

var receiveTask = _receiveTasks[_subReaderIndex];
if (receiveTask == _emptyTaskCompletionSource.Task)
{
var receiveTaskValueTask = _subReaders[_subReaderIndex].Receive(cancellationToken);
if (receiveTaskValueTask.IsCompleted)
return receiveTaskValueTask.Result;
_receiveTasks[_subReaderIndex] = receiveTaskValueTask.AsTask();
}
else
while (true)
{
if (receiveTask.IsCompleted)
iterations++;
_subReaderIndex++;
if (_subReaderIndex == _subReaders.Length)
_subReaderIndex = 0;

var receiveTask = _receiveTasks[_subReaderIndex];
if (receiveTask == _emptyTaskCompletionSource.Task)
{
var receiveTaskValueTask = _subReaders[_subReaderIndex].Receive(cancellationToken);
if (receiveTaskValueTask.IsCompleted)
return receiveTaskValueTask.Result;
_receiveTasks[_subReaderIndex] = receiveTaskValueTask.AsTask();
}
else
{
_receiveTasks[_subReaderIndex] = _emptyTaskCompletionSource.Task;
return receiveTask.Result;
if (receiveTask.IsCompleted)
{
_receiveTasks[_subReaderIndex] = _emptyTaskCompletionSource.Task;
return receiveTask.Result;
}
}
if (iterations == _subReaders.Length)
await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
}
if (iterations == _subReaders.Length)
await Task.WhenAny(_receiveTasks).ConfigureAwait(false);
}
}

public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
Expand Down Expand Up @@ -265,7 +265,10 @@ public async ValueTask DisposeAsync()
await _processManager.DisposeAsync().ConfigureAwait(false);

foreach (var subReader in _subReaders)
await subReader.DisposeAsync().ConfigureAwait(false);
{
if (subReader is not null)
await subReader.DisposeAsync().ConfigureAwait(false);
}

await _lock.DisposeAsync().ConfigureAwait(false);
_state.SetState(ReaderState.Closed);
Expand Down
8 changes: 4 additions & 4 deletions tests/DotPulsar.Tests/DotPulsar.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@
<PackageReference Include="DotNetZip" Version="1.16.0" />
<PackageReference Include="FluentAssertions" Version="6.12.0" />
<PackageReference Include="IronSnappy" Version="1.3.1" />
<PackageReference Include="K4os.Compression.LZ4" Version="1.3.6" />
<PackageReference Include="K4os.Compression.LZ4" Version="1.3.8" />
<PackageReference Include="NSubstitute" Version="5.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="Testcontainers" Version="3.7.0" />
<PackageReference Include="Testcontainers" Version="3.8.0" />
<PackageReference Include="ToxiproxyNetCore" Version="1.0.35" />
<PackageReference Include="xunit" Version="2.7.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.7">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="6.0.1">
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="ZstdNet" Version="1.4.5" />
<PackageReference Include="ZstdSharp.Port" Version="0.7.5" />
<PackageReference Include="ZstdSharp.Port" Version="0.7.6" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 71ef474

Please sign in to comment.