Skip to content

Commit

Permalink
chore(deps): upgrade RestSharp to 111.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Jul 22, 2024
1 parent 210b502 commit 247872d
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Client.Core/Client.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="NodaTime" Version="3.1.11" />
<PackageReference Include="NodaTime.Serialization.JsonNet" Version="3.1.0" />
<PackageReference Include="RestSharp" Version="110.1.0" />
<PackageReference Include="RestSharp" Version="111.4.0" />
</ItemGroup>

</Project>
96 changes: 62 additions & 34 deletions Client.Core/Internal/AbstractQueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using InfluxDB.Client.Core.Flux.Internal;
using Newtonsoft.Json.Linq;
using RestSharp;
using RestSharp.Interceptors;

namespace InfluxDB.Client.Core.Internal
{
Expand All @@ -39,7 +40,7 @@ protected AbstractQueryClient(IFluxResultMapper mapper, FluxCsvParser csvParser)
_csvParser = csvParser;
}

protected Task Query(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
protected Task Query(RestRequest query,
FluxCsvParser.IFluxResponseConsumer responseConsumer,
Action<Exception> onError,
Action onComplete, CancellationToken cancellationToken)
Expand All @@ -56,10 +57,10 @@ void Consumer(Stream bufferedStream)
}
}

return Query(queryFn, Consumer, onError, onComplete, cancellationToken);
return Query(query, Consumer, onError, onComplete, cancellationToken);
}

protected Task QueryRaw(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
protected Task QueryRaw(RestRequest query,
Action<string> onResponse,
Action<Exception> onError,
Action onComplete, CancellationToken cancellationToken)
Expand All @@ -76,10 +77,10 @@ void Consumer(Stream bufferedStream)
}
}

return Query(queryFn, Consumer, onError, onComplete, cancellationToken);
return Query(query, Consumer, onError, onComplete, cancellationToken);
}

protected void QuerySync(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
protected void QuerySync(RestRequest query,
FluxCsvParser.IFluxResponseConsumer responseConsumer,
Action<Exception> onError,
Action onComplete,
Expand All @@ -97,21 +98,21 @@ void Consumer(CancellationToken cancellable, Stream bufferedStream)
}
}

QuerySync(queryFn, Consumer, onError, onComplete, cancellationToken);
QuerySync(query, Consumer, onError, onComplete, cancellationToken);
}

private async Task Query(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
private async Task Query(RestRequest query,
Action<Stream> consumer,
Action<Exception> onError, Action onComplete, CancellationToken cancellationToken)
{
Arguments.CheckNotNull(queryFn, "queryFn");
Arguments.CheckNotNull(query, "query");
Arguments.CheckNotNull(consumer, "consumer");
Arguments.CheckNotNull(onError, "onError");
Arguments.CheckNotNull(onComplete, "onComplete");

try
{
var query = queryFn.Invoke((response, request) =>
query.AdvancedResponseWriter = (response, request) =>
{
var result = GetStreamFromResponse(response, cancellationToken);
result = AfterIntercept((int)response.StatusCode,
Expand All @@ -122,7 +123,7 @@ private async Task Query(Func<Func<HttpResponseMessage, RestRequest, RestRespons
consumer(result);

return FromHttpResponseMessage(response, request);
});
};

BeforeIntercept(query);

Expand All @@ -144,18 +145,18 @@ private async Task Query(Func<Func<HttpResponseMessage, RestRequest, RestRespons
}
}

private void QuerySync(Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
private void QuerySync(RestRequest query,
Action<CancellationToken, Stream> consumer,
Action<Exception> onError, Action onComplete, CancellationToken cancellationToken)
{
Arguments.CheckNotNull(queryFn, "queryFn");
Arguments.CheckNotNull(query, "query");
Arguments.CheckNotNull(consumer, "consumer");
Arguments.CheckNotNull(onError, "onError");
Arguments.CheckNotNull(onComplete, "onComplete");

try
{
var query = queryFn.Invoke((response, request) =>
query.AdvancedResponseWriter = (response, request) =>
{
var result = GetStreamFromResponse(response, cancellationToken);
result = AfterIntercept((int)response.StatusCode,
Expand All @@ -166,7 +167,7 @@ private void QuerySync(Func<Func<HttpResponseMessage, RestRequest, RestResponse>
consumer(cancellationToken, result);

return FromHttpResponseMessage(response, request);
});
};

BeforeIntercept(query);

Expand All @@ -188,32 +189,21 @@ private void QuerySync(Func<Func<HttpResponseMessage, RestRequest, RestResponse>
}

protected async IAsyncEnumerable<T> QueryEnumerable<T>(
Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> queryFn,
RestRequest query,
Func<FluxRecord, T> convert,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
Arguments.CheckNotNull(queryFn, nameof(queryFn));
Arguments.CheckNotNull(query, nameof(query));

Stream stream = null;
var query = queryFn.Invoke((response, request) =>
query.Interceptors = new List<Interceptor>
{
stream = GetStreamFromResponse(response, cancellationToken);
stream = AfterIntercept((int)response.StatusCode,
() => response.Headers.ToHeaderParameters(response.Content.Headers), stream);

RaiseForInfluxError(response, stream);

return FromHttpResponseMessage(response, request);
});

BeforeIntercept(query);

var restResponse = await RestClient.ExecuteAsync(query, cancellationToken).ConfigureAwait(false);
if (restResponse.ErrorException != null)
{
throw restResponse.ErrorException;
}
new RequestBeforeAfterInterceptor<T>(
BeforeIntercept,
(statusCode, headers, body) => AfterIntercept(statusCode, headers, body)
)
};

var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false);
await foreach (var (_, record) in _csvParser
.ParseFluxResponseAsync(new StreamReader(stream), cancellationToken)
.ConfigureAwait(false))
Expand Down Expand Up @@ -409,4 +399,42 @@ private Stream GetStreamFromResponse(HttpResponseMessage response, CancellationT
return streamFromResponse;
}
}

/// <summary>
/// The interceptor that is called before and after the request.
/// </summary>
internal class RequestBeforeAfterInterceptor<T> : Interceptor
{
private readonly Action<RestRequest> _beforeRequest;
private readonly Action<int, Func<IEnumerable<HeaderParameter>>, T> _afterRequest;

/// <summary>
/// Construct the interceptor.
/// </summary>
/// <param name="beforeRequest">Intercept request before HTTP call</param>
/// <param name="afterRequest">Intercept response before parsing resutlts</param>
internal RequestBeforeAfterInterceptor(
Action<RestRequest> beforeRequest = null,
Action<int, Func<IEnumerable<HeaderParameter>>, T> afterRequest = null)
{
_beforeRequest = beforeRequest;
_afterRequest = afterRequest;
}

public override ValueTask BeforeRequest(RestRequest request, CancellationToken cancellationToken)
{
_beforeRequest?.Invoke(request);
return base.BeforeRequest(request, cancellationToken);
}

public override ValueTask AfterHttpRequest(HttpResponseMessage responseMessage,
CancellationToken cancellationToken)
{
_afterRequest?.Invoke(
(int)responseMessage.StatusCode,
() => responseMessage.Headers.ToHeaderParameters(responseMessage.Content.Headers),
default);
return base.AfterHttpRequest(responseMessage, cancellationToken);
}
}
}
16 changes: 1 addition & 15 deletions Client.Core/Internal/RestSharpExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using RestSharp;

namespace InfluxDB.Client.Core.Internal
Expand All @@ -27,20 +23,10 @@ internal static IEnumerable<HeaderParameter> ToHeaderParameters(this HttpHeaders
.Select(x => new HeaderParameter(x.Key, x.y));
}

internal static RestRequest AddAdvancedResponseHandler(this RestRequest restRequest,
Func<HttpResponseMessage, RestRequest, RestResponse> advancedResponseWriter)
{
var field = restRequest.GetType()
.GetField("_advancedResponseHandler", BindingFlags.Instance | BindingFlags.NonPublic);
field!.SetValue(restRequest, advancedResponseWriter);

return restRequest;
}

internal static RestResponse ExecuteSync(this RestClient client,
RestRequest request, CancellationToken cancellationToken = default)
{
return client.Execute(request, cancellationToken);
return client.Execute(request);
}
}
}
5 changes: 2 additions & 3 deletions Client.Legacy/FluxClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,9 @@ private RestRequest PingRequest()
return new RestRequest("ping");
}

private Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> QueryRequest(string query)
private RestRequest QueryRequest(string query)
{
return advancedResponseWriter => new RestRequest("api/v2/query", Method.Post)
.AddAdvancedResponseHandler(advancedResponseWriter)
return new RestRequest("api/v2/query", Method.Post)
.AddParameter(new BodyParameter("application/json", query, "application/json"));
}
}
Expand Down
33 changes: 33 additions & 0 deletions Client.Test/ItWriteQueryApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core;
using InfluxDB.Client.Writes;
Expand Down Expand Up @@ -944,5 +945,37 @@ public async Task GzipWithLargeAmountOfData()
Assert.AreEqual(1000, tables.Count);
Assert.AreEqual(1, tables[0].Records.Count);
}

[Test]
public async Task QueryAsyncEnumerableGzip()
{
Client.EnableGzip();
Client.SetLogLevel(LogLevel.Body);

await Client.GetWriteApiAsync().WriteMeasurementsAsync(new[]
{
new H20Measurement
{
Location = "angel_bay", Level = 2.927, Time = DateTime.UtcNow.Add(-TimeSpan.FromSeconds(10))
},
new H20Measurement
{
Location = "angel_bay", Level = 1.927, Time = DateTime.UtcNow.Add(-TimeSpan.FromSeconds(20))
}
});

var query = $@"from(bucket: ""{_bucket.Name}"")
|> range(start: 0)
|> filter(fn: (r) => r[""location""] == ""angel_bay"")
|> pivot(rowKey:[""_time""], columnKey: [""_field""], valueColumn: ""_value"")";

var list = new List<H20Measurement>();
await foreach (var item in _queryApi.QueryAsyncEnumerable<H20Measurement>(query).ConfigureAwait(false))
list.Add(item);

Assert.AreEqual(2, list.Count);
Assert.AreEqual(1.927, list[0].Level);
Assert.AreEqual(2.927, list[1].Level);
}
}
}
2 changes: 1 addition & 1 deletion Client/InfluxDB.Client.Api/Client/ApiResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ApiResponse<T>
/// <param name="statusCode">HTTP status code.</param>
/// <param name="headers">HTTP headers.</param>
/// <param name="data">Data (parsed HTTP body)</param>
public ApiResponse(int statusCode, IEnumerable<(string Name, object Value)> headers, T data)
public ApiResponse(int statusCode, IEnumerable<(string Name, string Value)> headers, T data)
{
StatusCode = statusCode;
Headers = headers
Expand Down
7 changes: 3 additions & 4 deletions Client/InvokableScriptsApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,13 @@ private Task InvokeScript(string scriptId, FluxCsvParser.IFluxResponseConsumer c
cancellationToken);
}

private Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> CreateRequest(string scriptId,
private RestRequest CreateRequest(string scriptId,
Dictionary<string, object> bindParams = default)
{
Arguments.CheckNonEmptyString(scriptId, nameof(scriptId));

return advancedResponseWriter => _service
.PostScriptsIDInvokeWithRestRequest(scriptId, new ScriptInvocationParams(bindParams))
.AddAdvancedResponseHandler(advancedResponseWriter);
return _service
.PostScriptsIDInvokeWithRestRequest(scriptId, new ScriptInvocationParams(bindParams));
}
}
}
11 changes: 6 additions & 5 deletions Client/QueryApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -783,17 +783,18 @@ private Task QueryAsync(Query query, FluxCsvParser.IFluxResponseConsumer consume
cancellationToken);
}

private Func<Func<HttpResponseMessage, RestRequest, RestResponse>, RestRequest> CreateRequest(Query query,
string org = null)
private RestRequest CreateRequest(Query query, string org = null)
{
Arguments.CheckNotNull(query, nameof(query));

var optionsOrg = org ?? _options.Org;
Arguments.CheckNonEmptyString(optionsOrg, OrgArgumentValidation);

return advancedResponseWriter => _service
.PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query)
.AddAdvancedResponseHandler(advancedResponseWriter);
var postQueryWithRestRequest = _service
.PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
HttpCompletionOption.ResponseHeadersRead);

return postQueryWithRestRequest;
}

internal static Query CreateQuery(string query, Dialect dialect = null)
Expand Down
24 changes: 8 additions & 16 deletions Client/QueryApiSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,11 @@ public List<T> QuerySync<T>(Query query, string org = null, CancellationToken ca

var consumer = new FluxResponseConsumerPoco<T>(poco => { measurements.Add(poco); }, Mapper);

RestRequest QueryFn(Func<HttpResponseMessage, RestRequest, RestResponse> advancedResponseWriter)
{
return _service
.PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
HttpCompletionOption.ResponseHeadersRead)
.AddAdvancedResponseHandler(advancedResponseWriter);
}
var restRequest = _service
.PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
HttpCompletionOption.ResponseHeadersRead);

QuerySync(QueryFn, consumer, ErrorConsumer, EmptyAction, cancellationToken);
QuerySync(restRequest, consumer, ErrorConsumer, EmptyAction, cancellationToken);

return measurements;
}
Expand Down Expand Up @@ -193,15 +189,11 @@ public List<FluxTable> QuerySync(Query query, string org = null, CancellationTok
var optionsOrg = org ?? _options.Org;
Arguments.CheckNonEmptyString(optionsOrg, OrgArgumentValidation);

RestRequest QueryFn(Func<HttpResponseMessage, RestRequest, RestResponse> advancedResponseWriter)
{
return _service
.PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
HttpCompletionOption.ResponseHeadersRead)
.AddAdvancedResponseHandler(advancedResponseWriter);
}
var restRequest = _service
.PostQueryWithRestRequest(null, "application/json", null, optionsOrg, null, query,
HttpCompletionOption.ResponseHeadersRead);

QuerySync(QueryFn, consumer, ErrorConsumer, EmptyAction, cancellationToken);
QuerySync(restRequest, consumer, ErrorConsumer, EmptyAction, cancellationToken);

return consumer.Tables;
}
Expand Down

0 comments on commit 247872d

Please sign in to comment.