Skip to content

Commit

Permalink
Fix NextPublishSeqNo when retrieved concurrently
Browse files Browse the repository at this point in the history
Discovered while updating `rabbitmq/rabbitmq-tutorials` to version `7.0.0-rc.8` of this library.

* Add `trackConfirmations` argument to `ConfirmSelectAsync` to allow disabling internal confirm tracking.

* Increase CI timeouts since GHA Windows runners are slow (actions/runner-images#7320)
  • Loading branch information
lukebakken committed Sep 6, 2024
1 parent f63c9c8 commit a583dd1
Show file tree
Hide file tree
Showing 10 changed files with 418 additions and 81 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Integration Tests
timeout-minutes: 15
timeout-minutes: 25
run: |
$tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; `
Start-Sleep -Seconds 1; `
Expand Down Expand Up @@ -115,7 +115,7 @@ jobs:
id: install-start-rabbitmq
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
- name: Sequential Integration Tests
timeout-minutes: 15
timeout-minutes: 25
run: dotnet test `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
Expand Down
1 change: 1 addition & 0 deletions Build.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<ProjectReference Include="projects/Test/Common/Common.csproj" />
<ProjectReference Include="projects/Test/Applications/CreateChannel/CreateChannel.csproj" />
<ProjectReference Include="projects/Test/Applications/MassPublish/MassPublish.csproj" />
<ProjectReference Include="projects/Test/Applications/PublisherConfirms/PublisherConfirms.csproj" />
<ProjectReference Include="projects/Test/Integration/Integration.csproj" />
<ProjectReference Include="projects/Test/SequentialIntegration/SequentialIntegration.csproj" />
<ProjectReference Include="projects/Test/Unit/Unit.csproj" />
Expand Down
9 changes: 8 additions & 1 deletion RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "project
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Test\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -102,6 +104,10 @@ Global
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.Build.0 = Release|Any CPU
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -117,6 +123,7 @@ Global
{C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
{13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,6 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.IChannel.BasicRejectAsync(ulong deliveryTag, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
~RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.ShutdownEventArgs reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.CloseAsync(ushort replyCode, string replyText, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.ConfirmSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.ConsumerCountAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
~RabbitMQ.Client.IChannel.ExchangeBindAsync(string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.ExchangeDeclareAsync(string exchange, string type, bool durable, bool autoDelete, System.Collections.Generic.IDictionary<string, object> arguments = null, bool passive = false, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
Expand Down Expand Up @@ -897,3 +896,4 @@ RabbitMQ.Client.ICredentialsProvider.GetCredentialsAsync(System.Threading.Cancel
RabbitMQ.Client.ICredentialsProvider.Name.get -> string!
RabbitMQ.Client.PlainMechanism.HandleChallengeAsync(byte[]? challenge, RabbitMQ.Client.ConnectionConfig! config, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<byte[]!>!
readonly RabbitMQ.Client.ConnectionConfig.CredentialsProvider -> RabbitMQ.Client.ICredentialsProvider!
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
8 changes: 6 additions & 2 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,13 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
Task CloseAsync(ShutdownEventArgs reason, bool abort,
CancellationToken cancellationToken = default);

/// <summary>Asynchronously enable publisher confirmations.</summary>
/// <summary>
/// Asynchronously enable publisher confirmations.
/// </summary>
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcks"/> and <see cref="BasicNacks"/> yourself.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
Task ConfirmSelectAsync(CancellationToken cancellationToken = default);
Task ConfirmSelectAsync(bool trackConfirmations = true,
CancellationToken cancellationToken = default);

/// <summary>Asynchronously declare an exchange.</summary>
/// <param name="exchange">The name of the exchange.</param>
Expand Down
8 changes: 5 additions & 3 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
private bool _usesPublisherConfirms;
private bool _tracksPublisherConfirmations;
private bool _usesTransactions;

internal IConsumerDispatcher ConsumerDispatcher => InnerChannel.ConsumerDispatcher;
Expand Down Expand Up @@ -177,7 +178,7 @@ await newChannel.BasicQosAsync(0, _prefetchCountGlobal, true, cancellationToken)

if (_usesPublisherConfirms)
{
await newChannel.ConfirmSelectAsync(cancellationToken)
await newChannel.ConfirmSelectAsync(_tracksPublisherConfirmations, cancellationToken)
.ConfigureAwait(false);
}

Expand Down Expand Up @@ -334,11 +335,12 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken);
}

public async Task ConfirmSelectAsync(CancellationToken cancellationToken)
public async Task ConfirmSelectAsync(bool trackConfirmations = true, CancellationToken cancellationToken = default)
{
await InnerChannel.ConfirmSelectAsync(cancellationToken)
await InnerChannel.ConfirmSelectAsync(trackConfirmations, cancellationToken)
.ConfigureAwait(false);
_usesPublisherConfirms = true;
_tracksPublisherConfirmations = trackConfirmations;
}

public async Task ExchangeBindAsync(string destination, string source, string routingKey,
Expand Down
Loading

0 comments on commit a583dd1

Please sign in to comment.