Skip to content

Commit

Permalink
nats-io#636 - JetStream Batch Get Client support
Browse files Browse the repository at this point in the history
* Added implementation GetBatchDirectAsync
  • Loading branch information
Ivandemidov00 committed Nov 23, 2024
1 parent ae90e37 commit 075f749
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/NATS.Client.JetStream/INatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ ValueTask UpdateAsync(
/// </summary>
/// <param name="request">Batch message request.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="includeEob"><c>true</c> to send the last empty message with eobCode in the header; otherwise <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default);

ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default);
}
29 changes: 29 additions & 0 deletions src/NATS.Client.JetStream/NatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,27 @@ public ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INat
cancellationToken: cancellationToken);
}

/// <summary>
/// Request a direct batch message
/// </summary>
/// <param name="request">Batch message request.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="includeEob"><c>true</c> to send the last empty message with eobCode in the header; otherwise <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="InvalidOperationException">There was an issue, stream must have allow direct set.</exception>
public IAsyncEnumerable<NatsMsg<T>> GetBatchDirectAsync<T>(StreamMsgBatchGetRequest request, INatsDeserialize<T>? serializer = default, bool includeEob = false, CancellationToken cancellationToken = default)
{
ValidateStream();

return _context.Connection.RequestManyAsync<StreamMsgBatchGetRequest, T>(
subject: $"{_context.Opts.Prefix}.DIRECT.GET.{_name}",
data: request,
requestSerializer: NatsJSJsonSerializer<StreamMsgBatchGetRequest>.Default,
replySerializer: serializer,
replyOpts: new NatsSubOpts() { StopOnEmptyMsg = !includeEob, ThrowIfNoResponders = true },
cancellationToken: cancellationToken);
}

public ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default) =>
_context.JSRequestResponseAsync<StreamMsgGetRequest, StreamMsgGetResponse>(
subject: $"{_context.Opts.Prefix}.STREAM.MSG.GET.{_name}",
Expand All @@ -192,4 +213,12 @@ private void ThrowIfDeleted()
if (_deleted)
throw new NatsJSException($"Stream '{_name}' is deleted");
}

private void ValidateStream()
{
if (!Info.Config.AllowDirect)
{
throw new InvalidOperationException("StreamMsgBatchGetRequest is not permitted when AllowDirect on stream disable");
}
}
}

0 comments on commit 075f749

Please sign in to comment.