diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5b63fca9d..9d8b20655 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,8 +1,20 @@
## 4.17.0 [unreleased]
+### Breaking Changes
+
+#### API
+
+- `ApiResponse` headers has been changed to `IEnumerable<(string Name, string Value)>`
+
### Bug Fixes
1. [#649](https://github.com/influxdata/influxdb-client-csharp/pull/649): Use HttpCompletionOption.ResponseHeadersRead for asynchronous QueryApi
+### Dependencies
+Update dependencies:
+
+#### Build:
+- [#650](https://github.com/influxdata/influxdb-client-csharp/pull/650): `RestSharp` to `111.4.0`
+
## 4.16.0 [2024-06-24]
### Features:
diff --git a/Client.Core/Client.Core.csproj b/Client.Core/Client.Core.csproj
index 255d140f0..d9bd75a0e 100644
--- a/Client.Core/Client.Core.csproj
+++ b/Client.Core/Client.Core.csproj
@@ -40,7 +40,7 @@
-
+
diff --git a/Client.Core/Internal/AbstractQueryClient.cs b/Client.Core/Internal/AbstractQueryClient.cs
index 519586783..b184be46b 100644
--- a/Client.Core/Internal/AbstractQueryClient.cs
+++ b/Client.Core/Internal/AbstractQueryClient.cs
@@ -13,6 +13,7 @@
using InfluxDB.Client.Core.Flux.Internal;
using Newtonsoft.Json.Linq;
using RestSharp;
+using RestSharp.Interceptors;
namespace InfluxDB.Client.Core.Internal
{
@@ -39,7 +40,7 @@ protected AbstractQueryClient(IFluxResultMapper mapper, FluxCsvParser csvParser)
_csvParser = csvParser;
}
- protected Task Query(Func, RestRequest> queryFn,
+ protected Task Query(RestRequest query,
FluxCsvParser.IFluxResponseConsumer responseConsumer,
Action onError,
Action onComplete, CancellationToken cancellationToken)
@@ -56,10 +57,10 @@ void Consumer(Stream bufferedStream)
}
}
- return Query(queryFn, Consumer, onError, onComplete, cancellationToken);
+ return Query(query, Consumer, onError, onComplete, cancellationToken);
}
- protected Task QueryRaw(Func, RestRequest> queryFn,
+ protected Task QueryRaw(RestRequest query,
Action onResponse,
Action onError,
Action onComplete, CancellationToken cancellationToken)
@@ -76,10 +77,10 @@ void Consumer(Stream bufferedStream)
}
}
- return Query(queryFn, Consumer, onError, onComplete, cancellationToken);
+ return Query(query, Consumer, onError, onComplete, cancellationToken);
}
- protected void QuerySync(Func, RestRequest> queryFn,
+ protected void QuerySync(RestRequest query,
FluxCsvParser.IFluxResponseConsumer responseConsumer,
Action onError,
Action onComplete,
@@ -97,21 +98,21 @@ void Consumer(CancellationToken cancellable, Stream bufferedStream)
}
}
- QuerySync(queryFn, Consumer, onError, onComplete, cancellationToken);
+ QuerySync(query, Consumer, onError, onComplete, cancellationToken);
}
- private async Task Query(Func, RestRequest> queryFn,
+ private async Task Query(RestRequest query,
Action consumer,
Action onError, Action onComplete, CancellationToken cancellationToken)
{
- Arguments.CheckNotNull(queryFn, "queryFn");
+ Arguments.CheckNotNull(query, "query");
Arguments.CheckNotNull(consumer, "consumer");
Arguments.CheckNotNull(onError, "onError");
Arguments.CheckNotNull(onComplete, "onComplete");
try
{
- var query = queryFn.Invoke((response, request) =>
+ query.AdvancedResponseWriter = (response, request) =>
{
var result = GetStreamFromResponse(response, cancellationToken);
result = AfterIntercept((int)response.StatusCode,
@@ -122,7 +123,7 @@ private async Task Query(Func, RestRequest> queryFn,
+ private void QuerySync(RestRequest query,
Action consumer,
Action onError, Action onComplete, CancellationToken cancellationToken)
{
- Arguments.CheckNotNull(queryFn, "queryFn");
+ Arguments.CheckNotNull(query, "query");
Arguments.CheckNotNull(consumer, "consumer");
Arguments.CheckNotNull(onError, "onError");
Arguments.CheckNotNull(onComplete, "onComplete");
try
{
- var query = queryFn.Invoke((response, request) =>
+ query.AdvancedResponseWriter = (response, request) =>
{
var result = GetStreamFromResponse(response, cancellationToken);
result = AfterIntercept((int)response.StatusCode,
@@ -166,7 +167,7 @@ private void QuerySync(Func
consumer(cancellationToken, result);
return FromHttpResponseMessage(response, request);
- });
+ };
BeforeIntercept(query);
@@ -188,32 +189,21 @@ private void QuerySync(Func
}
protected async IAsyncEnumerable QueryEnumerable(
- Func, RestRequest> queryFn,
+ RestRequest query,
Func convert,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
- Arguments.CheckNotNull(queryFn, nameof(queryFn));
+ Arguments.CheckNotNull(query, nameof(query));
- Stream stream = null;
- var query = queryFn.Invoke((response, request) =>
+ query.Interceptors = new List
{
- stream = GetStreamFromResponse(response, cancellationToken);
- stream = AfterIntercept((int)response.StatusCode,
- () => response.Headers.ToHeaderParameters(response.Content.Headers), stream);
-
- RaiseForInfluxError(response, stream);
-
- return FromHttpResponseMessage(response, request);
- });
-
- BeforeIntercept(query);
-
- var restResponse = await RestClient.ExecuteAsync(query, cancellationToken).ConfigureAwait(false);
- if (restResponse.ErrorException != null)
- {
- throw restResponse.ErrorException;
- }
+ new RequestBeforeAfterInterceptor(
+ BeforeIntercept,
+ (statusCode, headers, body) => AfterIntercept(statusCode, headers, body)
+ )
+ };
+ var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false);
await foreach (var (_, record) in _csvParser
.ParseFluxResponseAsync(new StreamReader(stream), cancellationToken)
.ConfigureAwait(false))
@@ -409,4 +399,42 @@ private Stream GetStreamFromResponse(HttpResponseMessage response, CancellationT
return streamFromResponse;
}
}
+
+ ///
+ /// The interceptor that is called before and after the request.
+ ///
+ internal class RequestBeforeAfterInterceptor : Interceptor
+ {
+ private readonly Action _beforeRequest;
+ private readonly Action>, T> _afterRequest;
+
+ ///
+ /// Construct the interceptor.
+ ///
+ /// Intercept request before HTTP call
+ /// Intercept response before parsing resutlts
+ internal RequestBeforeAfterInterceptor(
+ Action beforeRequest = null,
+ Action>, T> afterRequest = null)
+ {
+ _beforeRequest = beforeRequest;
+ _afterRequest = afterRequest;
+ }
+
+ public override ValueTask BeforeRequest(RestRequest request, CancellationToken cancellationToken)
+ {
+ _beforeRequest?.Invoke(request);
+ return base.BeforeRequest(request, cancellationToken);
+ }
+
+ public override ValueTask AfterHttpRequest(HttpResponseMessage responseMessage,
+ CancellationToken cancellationToken)
+ {
+ _afterRequest?.Invoke(
+ (int)responseMessage.StatusCode,
+ () => responseMessage.Headers.ToHeaderParameters(responseMessage.Content.Headers),
+ default);
+ return base.AfterHttpRequest(responseMessage, cancellationToken);
+ }
+ }
}
\ No newline at end of file
diff --git a/Client.Core/Internal/RestSharpExtensions.cs b/Client.Core/Internal/RestSharpExtensions.cs
index fcb01b114..72e1c5541 100644
--- a/Client.Core/Internal/RestSharpExtensions.cs
+++ b/Client.Core/Internal/RestSharpExtensions.cs
@@ -1,11 +1,7 @@
-using System;
using System.Collections.Generic;
using System.Linq;
-using System.Net.Http;
using System.Net.Http.Headers;
-using System.Reflection;
using System.Threading;
-using System.Threading.Tasks;
using RestSharp;
namespace InfluxDB.Client.Core.Internal
@@ -27,20 +23,10 @@ internal static IEnumerable ToHeaderParameters(this HttpHeaders
.Select(x => new HeaderParameter(x.Key, x.y));
}
- internal static RestRequest AddAdvancedResponseHandler(this RestRequest restRequest,
- Func advancedResponseWriter)
- {
- var field = restRequest.GetType()
- .GetField("_advancedResponseHandler", BindingFlags.Instance | BindingFlags.NonPublic);
- field!.SetValue(restRequest, advancedResponseWriter);
-
- return restRequest;
- }
-
internal static RestResponse ExecuteSync(this RestClient client,
RestRequest request, CancellationToken cancellationToken = default)
{
- return client.Execute(request, cancellationToken);
+ return client.Execute(request);
}
}
}
\ No newline at end of file
diff --git a/Client.Legacy/FluxClient.cs b/Client.Legacy/FluxClient.cs
index 1c6e935dd..e5bfe6f0e 100644
--- a/Client.Legacy/FluxClient.cs
+++ b/Client.Legacy/FluxClient.cs
@@ -423,10 +423,9 @@ private RestRequest PingRequest()
return new RestRequest("ping");
}
- private Func, RestRequest> QueryRequest(string query)
+ private RestRequest QueryRequest(string query)
{
- return advancedResponseWriter => new RestRequest("api/v2/query", Method.Post)
- .AddAdvancedResponseHandler(advancedResponseWriter)
+ return new RestRequest("api/v2/query", Method.Post)
.AddParameter(new BodyParameter("application/json", query, "application/json"));
}
}
diff --git a/Client.Test/ItWriteQueryApiTest.cs b/Client.Test/ItWriteQueryApiTest.cs
index 56ec9e237..4f8d3bf67 100644
--- a/Client.Test/ItWriteQueryApiTest.cs
+++ b/Client.Test/ItWriteQueryApiTest.cs
@@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
+using System.Threading.Tasks;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core;
using InfluxDB.Client.Writes;
@@ -944,5 +945,37 @@ public async Task GzipWithLargeAmountOfData()
Assert.AreEqual(1000, tables.Count);
Assert.AreEqual(1, tables[0].Records.Count);
}
+
+ [Test]
+ public async Task QueryAsyncEnumerableGzip()
+ {
+ Client.EnableGzip();
+ Client.SetLogLevel(LogLevel.Body);
+
+ await Client.GetWriteApiAsync().WriteMeasurementsAsync(new[]
+ {
+ new H20Measurement
+ {
+ Location = "angel_bay", Level = 2.927, Time = DateTime.UtcNow.Add(-TimeSpan.FromSeconds(10))
+ },
+ new H20Measurement
+ {
+ Location = "angel_bay", Level = 1.927, Time = DateTime.UtcNow.Add(-TimeSpan.FromSeconds(20))
+ }
+ });
+
+ var query = $@"from(bucket: ""{_bucket.Name}"")
+ |> range(start: 0)
+ |> filter(fn: (r) => r[""location""] == ""angel_bay"")
+ |> pivot(rowKey:[""_time""], columnKey: [""_field""], valueColumn: ""_value"")";
+
+ var list = new List();
+ await foreach (var item in _queryApi.QueryAsyncEnumerable(query).ConfigureAwait(false))
+ list.Add(item);
+
+ Assert.AreEqual(2, list.Count);
+ Assert.AreEqual(1.927, list[0].Level);
+ Assert.AreEqual(2.927, list[1].Level);
+ }
}
}
\ No newline at end of file
diff --git a/Client/InfluxDB.Client.Api/Client/ApiResponse.cs b/Client/InfluxDB.Client.Api/Client/ApiResponse.cs
index 0849b4b39..d2942306b 100644
--- a/Client/InfluxDB.Client.Api/Client/ApiResponse.cs
+++ b/Client/InfluxDB.Client.Api/Client/ApiResponse.cs
@@ -43,7 +43,7 @@ public class ApiResponse
/// HTTP status code.
/// HTTP headers.
/// Data (parsed HTTP body)
- public ApiResponse(int statusCode, IEnumerable<(string Name, object Value)> headers, T data)
+ public ApiResponse(int statusCode, IEnumerable<(string Name, string Value)> headers, T data)
{
StatusCode = statusCode;
Headers = headers
diff --git a/Client/InvokableScriptsApi.cs b/Client/InvokableScriptsApi.cs
index c26d1d373..9e603d29d 100644
--- a/Client/InvokableScriptsApi.cs
+++ b/Client/InvokableScriptsApi.cs
@@ -300,14 +300,13 @@ private Task InvokeScript(string scriptId, FluxCsvParser.IFluxResponseConsumer c
cancellationToken);
}
- private Func, RestRequest> CreateRequest(string scriptId,
+ private RestRequest CreateRequest(string scriptId,
Dictionary bindParams = default)
{
Arguments.CheckNonEmptyString(scriptId, nameof(scriptId));
- return advancedResponseWriter => _service
- .PostScriptsIDInvokeWithRestRequest(scriptId, new ScriptInvocationParams(bindParams))
- .AddAdvancedResponseHandler(advancedResponseWriter);
+ return _service
+ .PostScriptsIDInvokeWithRestRequest(scriptId, new ScriptInvocationParams(bindParams));
}
}
}
\ No newline at end of file
diff --git a/Client/QueryApi.cs b/Client/QueryApi.cs
index 34b6a6725..2946ff26b 100644
--- a/Client/QueryApi.cs
+++ b/Client/QueryApi.cs
@@ -783,18 +783,18 @@ private Task QueryAsync(Query query, FluxCsvParser.IFluxResponseConsumer consume
cancellationToken);
}
- private Func, RestRequest> CreateRequest(Query query,
- string org = null)
+ private RestRequest CreateRequest(Query query, string org = null)
{
Arguments.CheckNotNull(query, nameof(query));
var optionsOrg = org ?? _options.Org;
Arguments.CheckNonEmptyString(optionsOrg, OrgArgumentValidation);
- return advancedResponseWriter => _service
+ var postQueryWithRestRequest = _service
.PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
- HttpCompletionOption.ResponseHeadersRead)
- .AddAdvancedResponseHandler(advancedResponseWriter);
+ HttpCompletionOption.ResponseHeadersRead);
+
+ return postQueryWithRestRequest;
}
internal static Query CreateQuery(string query, Dialect dialect = null)
diff --git a/Client/QueryApiSync.cs b/Client/QueryApiSync.cs
index dc0c30622..ce0baa26e 100644
--- a/Client/QueryApiSync.cs
+++ b/Client/QueryApiSync.cs
@@ -140,15 +140,11 @@ public List QuerySync(Query query, string org = null, CancellationToken ca
var consumer = new FluxResponseConsumerPoco(poco => { measurements.Add(poco); }, Mapper);
- RestRequest QueryFn(Func advancedResponseWriter)
- {
- return _service
- .PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
- HttpCompletionOption.ResponseHeadersRead)
- .AddAdvancedResponseHandler(advancedResponseWriter);
- }
+ var restRequest = _service
+ .PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
+ HttpCompletionOption.ResponseHeadersRead);
- QuerySync(QueryFn, consumer, ErrorConsumer, EmptyAction, cancellationToken);
+ QuerySync(restRequest, consumer, ErrorConsumer, EmptyAction, cancellationToken);
return measurements;
}
@@ -193,15 +189,11 @@ public List QuerySync(Query query, string org = null, CancellationTok
var optionsOrg = org ?? _options.Org;
Arguments.CheckNonEmptyString(optionsOrg, OrgArgumentValidation);
- RestRequest QueryFn(Func advancedResponseWriter)
- {
- return _service
- .PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
- HttpCompletionOption.ResponseHeadersRead)
- .AddAdvancedResponseHandler(advancedResponseWriter);
- }
+ var restRequest = _service
+ .PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
+ HttpCompletionOption.ResponseHeadersRead);
- QuerySync(QueryFn, consumer, ErrorConsumer, EmptyAction, cancellationToken);
+ QuerySync(restRequest, consumer, ErrorConsumer, EmptyAction, cancellationToken);
return consumer.Tables;
}