Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): upgrade RestSharp to 111.4.0 #650

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
## 4.17.0 [unreleased]

### Breaking Changes

#### API

- `ApiResponse` headers has been changed to `IEnumerable<(string Name, string Value)>`

### Bug Fixes
1. [#649](https://github.com/influxdata/influxdb-client-csharp/pull/649): Use HttpCompletionOption.ResponseHeadersRead for asynchronous QueryApi

### Dependencies
Update dependencies:

#### Build:
- [#650](https://github.com/influxdata/influxdb-client-csharp/pull/650): `RestSharp` to `111.4.0`

## 4.16.0 [2024-06-24]

### Features:
Expand Down
2 changes: 1 addition & 1 deletion Client.Core/Client.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="NodaTime" Version="3.1.11" />
<PackageReference Include="NodaTime.Serialization.JsonNet" Version="3.1.0" />
<PackageReference Include="RestSharp" Version="110.1.0" />
<PackageReference Include="RestSharp" Version="111.4.0" />
</ItemGroup>

</Project>
96 changes: 62 additions & 34 deletions Client.Core/Internal/AbstractQueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using InfluxDB.Client.Core.Flux.Internal;
using Newtonsoft.Json.Linq;
using RestSharp;
using RestSharp.Interceptors;

namespace InfluxDB.Client.Core.Internal
{
Expand All @@ -39,7 +40,7 @@ protected AbstractQueryClient(IFluxResultMapper mapper, FluxCsvParser csvParser)
_csvParser = csvParser;
}

protected Task Query(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
protected Task Query(RestRequest query,
FluxCsvParser.IFluxResponseConsumer responseConsumer,
Action<Exception> onError,
Action onComplete, CancellationToken cancellationToken)
Expand All @@ -56,10 +57,10 @@ void Consumer(Stream bufferedStream)
}
}

return Query(queryFn, Consumer, onError, onComplete, cancellationToken);
return Query(query, Consumer, onError, onComplete, cancellationToken);
}

protected Task QueryRaw(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
protected Task QueryRaw(RestRequest query,
Action<string> onResponse,
Action<Exception> onError,
Action onComplete, CancellationToken cancellationToken)
Expand All @@ -76,10 +77,10 @@ void Consumer(Stream bufferedStream)
}
}

return Query(queryFn, Consumer, onError, onComplete, cancellationToken);
return Query(query, Consumer, onError, onComplete, cancellationToken);
}

protected void QuerySync(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
protected void QuerySync(RestRequest query,
FluxCsvParser.IFluxResponseConsumer responseConsumer,
Action<Exception> onError,
Action onComplete,
Expand All @@ -97,21 +98,21 @@ void Consumer(CancellationToken cancellable, Stream bufferedStream)
}
}

QuerySync(queryFn, Consumer, onError, onComplete, cancellationToken);
QuerySync(query, Consumer, onError, onComplete, cancellationToken);
}

private async Task Query(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
private async Task Query(RestRequest query,
Action<Stream> consumer,
Action<Exception> onError, Action onComplete, CancellationToken cancellationToken)
{
Arguments.CheckNotNull(queryFn, "queryFn");
Arguments.CheckNotNull(query, "query");
Arguments.CheckNotNull(consumer, "consumer");
Arguments.CheckNotNull(onError, "onError");
Arguments.CheckNotNull(onComplete, "onComplete");

try
{
var query = queryFn.Invoke((response, request) =>
query.AdvancedResponseWriter = (response, request) =>
{
var result = GetStreamFromResponse(response, cancellationToken);
result = AfterIntercept((int)response.StatusCode,
Expand All @@ -122,7 +123,7 @@ private async Task Query(Func<Func<HttpResponseMessage, RestRequest, RestRespons
consumer(result);

return FromHttpResponseMessage(response, request);
});
};

BeforeIntercept(query);

Expand All @@ -144,18 +145,18 @@ private async Task Query(Func<Func<HttpResponseMessage, RestRequest, RestRespons
}
}

private void QuerySync(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
private void QuerySync(RestRequest query,
Action<CancellationToken, Stream> consumer,
Action<Exception> onError, Action onComplete, CancellationToken cancellationToken)
{
Arguments.CheckNotNull(queryFn, "queryFn");
Arguments.CheckNotNull(query, "query");
Arguments.CheckNotNull(consumer, "consumer");
Arguments.CheckNotNull(onError, "onError");
Arguments.CheckNotNull(onComplete, "onComplete");

try
{
var query = queryFn.Invoke((response, request) =>
query.AdvancedResponseWriter = (response, request) =>
{
var result = GetStreamFromResponse(response, cancellationToken);
result = AfterIntercept((int)response.StatusCode,
Expand All @@ -166,7 +167,7 @@ private void QuerySync(Func<Func<HttpResponseMessage, RestRequest, RestResponse>
consumer(cancellationToken, result);

return FromHttpResponseMessage(response, request);
});
};

BeforeIntercept(query);

Expand All @@ -188,32 +189,21 @@ private void QuerySync(Func<Func<HttpResponseMessage, RestRequest, RestResponse>
}

protected async IAsyncEnumerable<T> QueryEnumerable<T>(
Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
RestRequest query,
Func<FluxRecord, T> convert,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
Arguments.CheckNotNull(queryFn, nameof(queryFn));
Arguments.CheckNotNull(query, nameof(query));

Stream stream = null;
var query = queryFn.Invoke((response, request) =>
query.Interceptors = new List<Interceptor>
{
stream = GetStreamFromResponse(response, cancellationToken);
stream = AfterIntercept((int)response.StatusCode,
() => response.Headers.ToHeaderParameters(response.Content.Headers), stream);

RaiseForInfluxError(response, stream);

return FromHttpResponseMessage(response, request);
});

BeforeIntercept(query);

var restResponse = await RestClient.ExecuteAsync(query, cancellationToken).ConfigureAwait(false);
if (restResponse.ErrorException != null)
{
throw restResponse.ErrorException;
}
new RequestBeforeAfterInterceptor<T>(
BeforeIntercept,
(statusCode, headers, body) => AfterIntercept(statusCode, headers, body)
)
};

var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false);
await foreach (var (_, record) in _csvParser
.ParseFluxResponseAsync(new StreamReader(stream), cancellationToken)
.ConfigureAwait(false))
Expand Down Expand Up @@ -409,4 +399,42 @@ private Stream GetStreamFromResponse(HttpResponseMessage response, CancellationT
return streamFromResponse;
}
}

/// <summary>
/// The interceptor that is called before and after the request.
/// </summary>
internal class RequestBeforeAfterInterceptor<T> : Interceptor
{
private readonly Action<RestRequest> _beforeRequest;
private readonly Action<int, Func<IEnumerable<HeaderParameter>>, T> _afterRequest;

/// <summary>
/// Construct the interceptor.
/// </summary>
/// <param name="beforeRequest">Intercept request before HTTP call</param>
/// <param name="afterRequest">Intercept response before parsing resutlts</param>
internal RequestBeforeAfterInterceptor(
Action<RestRequest> beforeRequest = null,
Action<int, Func<IEnumerable<HeaderParameter>>, T> afterRequest = null)
{
_beforeRequest = beforeRequest;
_afterRequest = afterRequest;
}

public override ValueTask BeforeRequest(RestRequest request, CancellationToken cancellationToken)
{
_beforeRequest?.Invoke(request);
return base.BeforeRequest(request, cancellationToken);
}

public override ValueTask AfterHttpRequest(HttpResponseMessage responseMessage,
CancellationToken cancellationToken)
{
_afterRequest?.Invoke(
(int)responseMessage.StatusCode,
() => responseMessage.Headers.ToHeaderParameters(responseMessage.Content.Headers),
default);
return base.AfterHttpRequest(responseMessage, cancellationToken);
}
}
}
16 changes: 1 addition & 15 deletions Client.Core/Internal/RestSharpExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using RestSharp;

namespace InfluxDB.Client.Core.Internal
Expand All @@ -27,20 +23,10 @@ internal static IEnumerable<HeaderParameter> ToHeaderParameters(this HttpHeaders
.Select(x => new HeaderParameter(x.Key, x.y));
}

internal static RestRequest AddAdvancedResponseHandler(this RestRequest restRequest,
Func<HttpResponseMessage, RestRequest, RestResponse> advancedResponseWriter)
{
var field = restRequest.GetType()
.GetField("_advancedResponseHandler", BindingFlags.Instance | BindingFlags.NonPublic);
field!.SetValue(restRequest, advancedResponseWriter);

return restRequest;
}

internal static RestResponse ExecuteSync(this RestClient client,
RestRequest request, CancellationToken cancellationToken = default)
{
return client.Execute(request, cancellationToken);
return client.Execute(request);
}
}
}
5 changes: 2 additions & 3 deletions Client.Legacy/FluxClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,9 @@ private RestRequest PingRequest()
return new RestRequest("ping");
}

private Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> QueryRequest(string query)
private RestRequest QueryRequest(string query)
{
return advancedResponseWriter => new RestRequest("api/v2/query", Method.Post)
.AddAdvancedResponseHandler(advancedResponseWriter)
return new RestRequest("api/v2/query", Method.Post)
.AddParameter(new BodyParameter("application/json", query, "application/json"));
}
}
Expand Down
33 changes: 33 additions & 0 deletions Client.Test/ItWriteQueryApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core;
using InfluxDB.Client.Writes;
Expand Down Expand Up @@ -944,5 +945,37 @@ public async Task GzipWithLargeAmountOfData()
Assert.AreEqual(1000, tables.Count);
Assert.AreEqual(1, tables[0].Records.Count);
}

[Test]
public async Task QueryAsyncEnumerableGzip()
{
Client.EnableGzip();
Client.SetLogLevel(LogLevel.Body);

await Client.GetWriteApiAsync().WriteMeasurementsAsync(new[]
{
new H20Measurement
{
Location = "angel_bay", Level = 2.927, Time = DateTime.UtcNow.Add(-TimeSpan.FromSeconds(10))
},
new H20Measurement
{
Location = "angel_bay", Level = 1.927, Time = DateTime.UtcNow.Add(-TimeSpan.FromSeconds(20))
}
});

var query = $@"from(bucket: ""{_bucket.Name}"")
|> range(start: 0)
|> filter(fn: (r) => r[""location""] == ""angel_bay"")
|> pivot(rowKey:[""_time""], columnKey: [""_field""], valueColumn: ""_value"")";

var list = new List<H20Measurement>();
await foreach (var item in _queryApi.QueryAsyncEnumerable<H20Measurement>(query).ConfigureAwait(false))
list.Add(item);

Assert.AreEqual(2, list.Count);
Assert.AreEqual(1.927, list[0].Level);
Assert.AreEqual(2.927, list[1].Level);
}
}
}
2 changes: 1 addition & 1 deletion Client/InfluxDB.Client.Api/Client/ApiResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ApiResponse<T>
/// <param name="statusCode">HTTP status code.</param>
/// <param name="headers">HTTP headers.</param>
/// <param name="data">Data (parsed HTTP body)</param>
public ApiResponse(int statusCode, IEnumerable<(string Name, object Value)> headers, T data)
public ApiResponse(int statusCode, IEnumerable<(string Name, string Value)> headers, T data)
{
StatusCode = statusCode;
Headers = headers
Expand Down
7 changes: 3 additions & 4 deletions Client/InvokableScriptsApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,13 @@ private Task InvokeScript(string scriptId, FluxCsvParser.IFluxResponseConsumer c
cancellationToken);
}

private Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> CreateRequest(string scriptId,
private RestRequest CreateRequest(string scriptId,
Dictionary<string, object> bindParams = default)
{
Arguments.CheckNonEmptyString(scriptId, nameof(scriptId));

return advancedResponseWriter => _service
.PostScriptsIDInvokeWithRestRequest(scriptId, new ScriptInvocationParams(bindParams))
.AddAdvancedResponseHandler(advancedResponseWriter);
return _service
.PostScriptsIDInvokeWithRestRequest(scriptId, new ScriptInvocationParams(bindParams));
}
}
}
10 changes: 5 additions & 5 deletions Client/QueryApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -783,18 +783,18 @@ private Task QueryAsync(Query query, FluxCsvParser.IFluxResponseConsumer consume
cancellationToken);
}

private Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> CreateRequest(Query query,
string org = null)
private RestRequest CreateRequest(Query query, string org = null)
{
Arguments.CheckNotNull(query, nameof(query));

var optionsOrg = org ?? _options.Org;
Arguments.CheckNonEmptyString(optionsOrg, OrgArgumentValidation);

return advancedResponseWriter => _service
var postQueryWithRestRequest = _service
.PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
HttpCompletionOption.ResponseHeadersRead)
.AddAdvancedResponseHandler(advancedResponseWriter);
HttpCompletionOption.ResponseHeadersRead);

return postQueryWithRestRequest;
}

internal static Query CreateQuery(string query, Dialect dialect = null)
Expand Down
Loading