diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b63fca9d..9d8b20655 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,20 @@ ## 4.17.0 [unreleased] +### Breaking Changes + +#### API + +- `ApiResponse` headers has been changed to `IEnumerable<(string Name, string Value)>` + ### Bug Fixes 1. [#649](https://github.com/influxdata/influxdb-client-csharp/pull/649): Use HttpCompletionOption.ResponseHeadersRead for asynchronous QueryApi +### Dependencies +Update dependencies: + +#### Build: +- [#650](https://github.com/influxdata/influxdb-client-csharp/pull/650): `RestSharp` to `111.4.0` + ## 4.16.0 [2024-06-24] ### Features: diff --git a/Client.Core/Client.Core.csproj b/Client.Core/Client.Core.csproj index 255d140f0..d9bd75a0e 100644 --- a/Client.Core/Client.Core.csproj +++ b/Client.Core/Client.Core.csproj @@ -40,7 +40,7 @@ - + diff --git a/Client.Core/Internal/AbstractQueryClient.cs b/Client.Core/Internal/AbstractQueryClient.cs index 519586783..b184be46b 100644 --- a/Client.Core/Internal/AbstractQueryClient.cs +++ b/Client.Core/Internal/AbstractQueryClient.cs @@ -13,6 +13,7 @@ using InfluxDB.Client.Core.Flux.Internal; using Newtonsoft.Json.Linq; using RestSharp; +using RestSharp.Interceptors; namespace InfluxDB.Client.Core.Internal { @@ -39,7 +40,7 @@ protected AbstractQueryClient(IFluxResultMapper mapper, FluxCsvParser csvParser) _csvParser = csvParser; } - protected Task Query(Func, RestRequest> queryFn, + protected Task Query(RestRequest query, FluxCsvParser.IFluxResponseConsumer responseConsumer, Action onError, Action onComplete, CancellationToken cancellationToken) @@ -56,10 +57,10 @@ void Consumer(Stream bufferedStream) } } - return Query(queryFn, Consumer, onError, onComplete, cancellationToken); + return Query(query, Consumer, onError, onComplete, cancellationToken); } - protected Task QueryRaw(Func, RestRequest> queryFn, + protected Task QueryRaw(RestRequest query, Action onResponse, Action onError, Action onComplete, CancellationToken cancellationToken) @@ -76,10 +77,10 @@ void Consumer(Stream bufferedStream) } } - return Query(queryFn, Consumer, onError, onComplete, cancellationToken); + return Query(query, Consumer, onError, onComplete, cancellationToken); } - protected void QuerySync(Func, RestRequest> queryFn, + protected void QuerySync(RestRequest query, FluxCsvParser.IFluxResponseConsumer responseConsumer, Action onError, Action onComplete, @@ -97,21 +98,21 @@ void Consumer(CancellationToken cancellable, Stream bufferedStream) } } - QuerySync(queryFn, Consumer, onError, onComplete, cancellationToken); + QuerySync(query, Consumer, onError, onComplete, cancellationToken); } - private async Task Query(Func, RestRequest> queryFn, + private async Task Query(RestRequest query, Action consumer, Action onError, Action onComplete, CancellationToken cancellationToken) { - Arguments.CheckNotNull(queryFn, "queryFn"); + Arguments.CheckNotNull(query, "query"); Arguments.CheckNotNull(consumer, "consumer"); Arguments.CheckNotNull(onError, "onError"); Arguments.CheckNotNull(onComplete, "onComplete"); try { - var query = queryFn.Invoke((response, request) => + query.AdvancedResponseWriter = (response, request) => { var result = GetStreamFromResponse(response, cancellationToken); result = AfterIntercept((int)response.StatusCode, @@ -122,7 +123,7 @@ private async Task Query(Func, RestRequest> queryFn, + private void QuerySync(RestRequest query, Action consumer, Action onError, Action onComplete, CancellationToken cancellationToken) { - Arguments.CheckNotNull(queryFn, "queryFn"); + Arguments.CheckNotNull(query, "query"); Arguments.CheckNotNull(consumer, "consumer"); Arguments.CheckNotNull(onError, "onError"); Arguments.CheckNotNull(onComplete, "onComplete"); try { - var query = queryFn.Invoke((response, request) => + query.AdvancedResponseWriter = (response, request) => { var result = GetStreamFromResponse(response, cancellationToken); result = AfterIntercept((int)response.StatusCode, @@ -166,7 +167,7 @@ private void QuerySync(Func consumer(cancellationToken, result); return FromHttpResponseMessage(response, request); - }); + }; BeforeIntercept(query); @@ -188,32 +189,21 @@ private void QuerySync(Func } protected async IAsyncEnumerable QueryEnumerable( - Func, RestRequest> queryFn, + RestRequest query, Func convert, [EnumeratorCancellation] CancellationToken cancellationToken) { - Arguments.CheckNotNull(queryFn, nameof(queryFn)); + Arguments.CheckNotNull(query, nameof(query)); - Stream stream = null; - var query = queryFn.Invoke((response, request) => + query.Interceptors = new List { - stream = GetStreamFromResponse(response, cancellationToken); - stream = AfterIntercept((int)response.StatusCode, - () => response.Headers.ToHeaderParameters(response.Content.Headers), stream); - - RaiseForInfluxError(response, stream); - - return FromHttpResponseMessage(response, request); - }); - - BeforeIntercept(query); - - var restResponse = await RestClient.ExecuteAsync(query, cancellationToken).ConfigureAwait(false); - if (restResponse.ErrorException != null) - { - throw restResponse.ErrorException; - } + new RequestBeforeAfterInterceptor( + BeforeIntercept, + (statusCode, headers, body) => AfterIntercept(statusCode, headers, body) + ) + }; + var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false); await foreach (var (_, record) in _csvParser .ParseFluxResponseAsync(new StreamReader(stream), cancellationToken) .ConfigureAwait(false)) @@ -409,4 +399,42 @@ private Stream GetStreamFromResponse(HttpResponseMessage response, CancellationT return streamFromResponse; } } + + /// + /// The interceptor that is called before and after the request. + /// + internal class RequestBeforeAfterInterceptor : Interceptor + { + private readonly Action _beforeRequest; + private readonly Action>, T> _afterRequest; + + /// + /// Construct the interceptor. + /// + /// Intercept request before HTTP call + /// Intercept response before parsing resutlts + internal RequestBeforeAfterInterceptor( + Action beforeRequest = null, + Action>, T> afterRequest = null) + { + _beforeRequest = beforeRequest; + _afterRequest = afterRequest; + } + + public override ValueTask BeforeRequest(RestRequest request, CancellationToken cancellationToken) + { + _beforeRequest?.Invoke(request); + return base.BeforeRequest(request, cancellationToken); + } + + public override ValueTask AfterHttpRequest(HttpResponseMessage responseMessage, + CancellationToken cancellationToken) + { + _afterRequest?.Invoke( + (int)responseMessage.StatusCode, + () => responseMessage.Headers.ToHeaderParameters(responseMessage.Content.Headers), + default); + return base.AfterHttpRequest(responseMessage, cancellationToken); + } + } } \ No newline at end of file diff --git a/Client.Core/Internal/RestSharpExtensions.cs b/Client.Core/Internal/RestSharpExtensions.cs index fcb01b114..72e1c5541 100644 --- a/Client.Core/Internal/RestSharpExtensions.cs +++ b/Client.Core/Internal/RestSharpExtensions.cs @@ -1,11 +1,7 @@ -using System; using System.Collections.Generic; using System.Linq; -using System.Net.Http; using System.Net.Http.Headers; -using System.Reflection; using System.Threading; -using System.Threading.Tasks; using RestSharp; namespace InfluxDB.Client.Core.Internal @@ -27,20 +23,10 @@ internal static IEnumerable ToHeaderParameters(this HttpHeaders .Select(x => new HeaderParameter(x.Key, x.y)); } - internal static RestRequest AddAdvancedResponseHandler(this RestRequest restRequest, - Func advancedResponseWriter) - { - var field = restRequest.GetType() - .GetField("_advancedResponseHandler", BindingFlags.Instance | BindingFlags.NonPublic); - field!.SetValue(restRequest, advancedResponseWriter); - - return restRequest; - } - internal static RestResponse ExecuteSync(this RestClient client, RestRequest request, CancellationToken cancellationToken = default) { - return client.Execute(request, cancellationToken); + return client.Execute(request); } } } \ No newline at end of file diff --git a/Client.Legacy/FluxClient.cs b/Client.Legacy/FluxClient.cs index 1c6e935dd..e5bfe6f0e 100644 --- a/Client.Legacy/FluxClient.cs +++ b/Client.Legacy/FluxClient.cs @@ -423,10 +423,9 @@ private RestRequest PingRequest() return new RestRequest("ping"); } - private Func, RestRequest> QueryRequest(string query) + private RestRequest QueryRequest(string query) { - return advancedResponseWriter => new RestRequest("api/v2/query", Method.Post) - .AddAdvancedResponseHandler(advancedResponseWriter) + return new RestRequest("api/v2/query", Method.Post) .AddParameter(new BodyParameter("application/json", query, "application/json")); } } diff --git a/Client.Test/ItWriteQueryApiTest.cs b/Client.Test/ItWriteQueryApiTest.cs index 56ec9e237..4f8d3bf67 100644 --- a/Client.Test/ItWriteQueryApiTest.cs +++ b/Client.Test/ItWriteQueryApiTest.cs @@ -4,6 +4,7 @@ using System.Diagnostics; using System.Linq; using System.Threading; +using System.Threading.Tasks; using InfluxDB.Client.Api.Domain; using InfluxDB.Client.Core; using InfluxDB.Client.Writes; @@ -944,5 +945,37 @@ public async Task GzipWithLargeAmountOfData() Assert.AreEqual(1000, tables.Count); Assert.AreEqual(1, tables[0].Records.Count); } + + [Test] + public async Task QueryAsyncEnumerableGzip() + { + Client.EnableGzip(); + Client.SetLogLevel(LogLevel.Body); + + await Client.GetWriteApiAsync().WriteMeasurementsAsync(new[] + { + new H20Measurement + { + Location = "angel_bay", Level = 2.927, Time = DateTime.UtcNow.Add(-TimeSpan.FromSeconds(10)) + }, + new H20Measurement + { + Location = "angel_bay", Level = 1.927, Time = DateTime.UtcNow.Add(-TimeSpan.FromSeconds(20)) + } + }); + + var query = $@"from(bucket: ""{_bucket.Name}"") + |> range(start: 0) + |> filter(fn: (r) => r[""location""] == ""angel_bay"") + |> pivot(rowKey:[""_time""], columnKey: [""_field""], valueColumn: ""_value"")"; + + var list = new List(); + await foreach (var item in _queryApi.QueryAsyncEnumerable(query).ConfigureAwait(false)) + list.Add(item); + + Assert.AreEqual(2, list.Count); + Assert.AreEqual(1.927, list[0].Level); + Assert.AreEqual(2.927, list[1].Level); + } } } \ No newline at end of file diff --git a/Client/InfluxDB.Client.Api/Client/ApiResponse.cs b/Client/InfluxDB.Client.Api/Client/ApiResponse.cs index 0849b4b39..d2942306b 100644 --- a/Client/InfluxDB.Client.Api/Client/ApiResponse.cs +++ b/Client/InfluxDB.Client.Api/Client/ApiResponse.cs @@ -43,7 +43,7 @@ public class ApiResponse /// HTTP status code. /// HTTP headers. /// Data (parsed HTTP body) - public ApiResponse(int statusCode, IEnumerable<(string Name, object Value)> headers, T data) + public ApiResponse(int statusCode, IEnumerable<(string Name, string Value)> headers, T data) { StatusCode = statusCode; Headers = headers diff --git a/Client/InvokableScriptsApi.cs b/Client/InvokableScriptsApi.cs index c26d1d373..9e603d29d 100644 --- a/Client/InvokableScriptsApi.cs +++ b/Client/InvokableScriptsApi.cs @@ -300,14 +300,13 @@ private Task InvokeScript(string scriptId, FluxCsvParser.IFluxResponseConsumer c cancellationToken); } - private Func, RestRequest> CreateRequest(string scriptId, + private RestRequest CreateRequest(string scriptId, Dictionary bindParams = default) { Arguments.CheckNonEmptyString(scriptId, nameof(scriptId)); - return advancedResponseWriter => _service - .PostScriptsIDInvokeWithRestRequest(scriptId, new ScriptInvocationParams(bindParams)) - .AddAdvancedResponseHandler(advancedResponseWriter); + return _service + .PostScriptsIDInvokeWithRestRequest(scriptId, new ScriptInvocationParams(bindParams)); } } } \ No newline at end of file diff --git a/Client/QueryApi.cs b/Client/QueryApi.cs index 34b6a6725..2946ff26b 100644 --- a/Client/QueryApi.cs +++ b/Client/QueryApi.cs @@ -783,18 +783,18 @@ private Task QueryAsync(Query query, FluxCsvParser.IFluxResponseConsumer consume cancellationToken); } - private Func, RestRequest> CreateRequest(Query query, - string org = null) + private RestRequest CreateRequest(Query query, string org = null) { Arguments.CheckNotNull(query, nameof(query)); var optionsOrg = org ?? _options.Org; Arguments.CheckNonEmptyString(optionsOrg, OrgArgumentValidation); - return advancedResponseWriter => _service + var postQueryWithRestRequest = _service .PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query, - HttpCompletionOption.ResponseHeadersRead) - .AddAdvancedResponseHandler(advancedResponseWriter); + HttpCompletionOption.ResponseHeadersRead); + + return postQueryWithRestRequest; } internal static Query CreateQuery(string query, Dialect dialect = null) diff --git a/Client/QueryApiSync.cs b/Client/QueryApiSync.cs index dc0c30622..ce0baa26e 100644 --- a/Client/QueryApiSync.cs +++ b/Client/QueryApiSync.cs @@ -140,15 +140,11 @@ public List QuerySync(Query query, string org = null, CancellationToken ca var consumer = new FluxResponseConsumerPoco(poco => { measurements.Add(poco); }, Mapper); - RestRequest QueryFn(Func advancedResponseWriter) - { - return _service - .PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query, - HttpCompletionOption.ResponseHeadersRead) - .AddAdvancedResponseHandler(advancedResponseWriter); - } + var restRequest = _service + .PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query, + HttpCompletionOption.ResponseHeadersRead); - QuerySync(QueryFn, consumer, ErrorConsumer, EmptyAction, cancellationToken); + QuerySync(restRequest, consumer, ErrorConsumer, EmptyAction, cancellationToken); return measurements; } @@ -193,15 +189,11 @@ public List QuerySync(Query query, string org = null, CancellationTok var optionsOrg = org ?? _options.Org; Arguments.CheckNonEmptyString(optionsOrg, OrgArgumentValidation); - RestRequest QueryFn(Func advancedResponseWriter) - { - return _service - .PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query, - HttpCompletionOption.ResponseHeadersRead) - .AddAdvancedResponseHandler(advancedResponseWriter); - } + var restRequest = _service + .PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query, + HttpCompletionOption.ResponseHeadersRead); - QuerySync(QueryFn, consumer, ErrorConsumer, EmptyAction, cancellationToken); + QuerySync(restRequest, consumer, ErrorConsumer, EmptyAction, cancellationToken); return consumer.Tables; }