Skip to content

Commit

Permalink
Lock function
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Jan 28, 2024
1 parent 15bf6ea commit 3efe37c
Showing 1 changed file with 9 additions and 49 deletions.
58 changes: 9 additions & 49 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ public async ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken ca

public async ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellationToken)
{
if (!await LockAsync(cancellationToken).ConfigureAwait(false))
{
return;
}
await LockAsync(cancellationToken).ConfigureAwait(false);

try
{
Expand All @@ -162,10 +159,7 @@ public async ValueTask PingAsync(PingCommand pingCommand, CancellationToken canc

public async ValueTask PongAsync(CancellationToken cancellationToken = default)
{
if (!await LockAsync(cancellationToken).ConfigureAwait(false))
{
return;
}
await LockAsync(cancellationToken).ConfigureAwait(false);

try
{
Expand Down Expand Up @@ -205,10 +199,7 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,

public async ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken)
{
if (!await LockAsync(cancellationToken).ConfigureAwait(false))
{
return;
}
await LockAsync(cancellationToken).ConfigureAwait(false);

try
{
Expand All @@ -229,10 +220,7 @@ public async ValueTask SubscribeAsync(int sid, string subject, string? queueGrou

public async ValueTask UnsubscribeAsync(int sid, int? maxMsgs, CancellationToken cancellationToken)
{
if (!await LockAsync(cancellationToken).ConfigureAwait(false))
{
return;
}
await LockAsync(cancellationToken).ConfigureAwait(false);

try
{
Expand Down Expand Up @@ -267,10 +255,7 @@ private PipeWriter GetWriter()
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
private async ValueTask PublishLockedAsync(string subject, string? replyTo, NatsPooledBufferWriter<byte> payloadBuffer, NatsPooledBufferWriter<byte>? headersBuffer, CancellationToken cancellationToken)
{
if (!await LockAsync(cancellationToken).ConfigureAwait(false))
{
return;
}
await LockAsync(cancellationToken).ConfigureAwait(false);

try
{
Expand Down Expand Up @@ -302,41 +287,16 @@ private async ValueTask PublishLockedAsync(string subject, string? replyTo, Nat
}
}

private async Task UnLockAsync(CancellationToken cancellationToken)
private ValueTask<int> UnLockAsync(CancellationToken cancellationToken)
{
while (!_channelLock.Reader.TryRead(out _))
{
try
{
await _channelLock.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
}

Interlocked.Decrement(ref _counter.PendingMessages);
return _channelLock.Reader.ReadAsync(cancellationToken);
}

private async Task<bool> LockAsync(CancellationToken cancellationToken)
private ValueTask LockAsync(CancellationToken cancellationToken)
{
Interlocked.Increment(ref _counter.PendingMessages);

try
{
await _channelLock.Writer.WriteAsync(1, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return false;
}
catch (ChannelClosedException)
{
return false;
}

return true;
return _channelLock.Writer.WriteAsync(1, cancellationToken);
}

private async Task ReaderLoopAsync()
Expand Down

0 comments on commit 3efe37c

Please sign in to comment.