Skip to content

Commit

Permalink
Less memory allocation while processing log requests (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
nvborisenko authored Mar 28, 2023
1 parent 84bcf85 commit 304b5ca
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 85 deletions.
15 changes: 8 additions & 7 deletions src/ReportPortal.Shared/Reporter/LaunchReporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public void Finish(FinishLaunchRequest request)

if (_logsReporter != null)
{
_logsReporter.Finish();
dependentTasks.Add(_logsReporter.ProcessingTask);
}

Expand Down Expand Up @@ -314,23 +315,23 @@ public void Log(CreateLogItemRequest createLogItemRequest)

public void Sync()
{
_logsReporter?.Sync();

if (FinishTask != null)
{
FinishTask.GetAwaiter().GetResult();
}
else
{
StartTask?.GetAwaiter().GetResult();
}

if (ChildTestReporters != null)
if (ChildTestReporters != null)
{
foreach (var testNode in ChildTestReporters)
{
foreach (var testNode in ChildTestReporters)
{
testNode.Sync();
}
testNode.Sync();
}

_logsReporter?.Sync();
}
}

Expand Down
143 changes: 77 additions & 66 deletions src/ReportPortal.Shared/Reporter/LogsReporter.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using ReportPortal.Client.Abstractions;
using ReportPortal.Client.Abstractions;
using ReportPortal.Client.Abstractions.Requests;
using ReportPortal.Shared.Configuration;
using ReportPortal.Shared.Extensibility;
using ReportPortal.Shared.Extensibility.ReportEvents.EventArgs;
using ReportPortal.Shared.Internal.Delegating;
using ReportPortal.Shared.Internal.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

Expand All @@ -15,7 +16,7 @@ public class LogsReporter : ILogsReporter
{
private static ITraceLogger TraceLogger { get; } = TraceLogManager.Instance.GetLogger<LogsReporter>();

private readonly Queue<CreateLogItemRequest> _buffer = new Queue<CreateLogItemRequest>();
private readonly BlockingCollection<CreateLogItemRequest> _queue = new BlockingCollection<CreateLogItemRequest>();

private readonly bool _asyncReporting;
private readonly IReporter _reporter;
Expand Down Expand Up @@ -49,105 +50,115 @@ public LogsReporter(IReporter testReporter,

if (batchCapacity < 1) throw new ArgumentException("Batch capacity for logs processing cannot be less than 1.", nameof(batchCapacity));
BatchCapacity = batchCapacity;
}

private readonly object _syncObj = new object();
ProcessingTask = _reporter.StartTask.ContinueWith(async consumer =>
{
await ConsumeLogRequests();
}).Unwrap();
}

public int BatchCapacity { get; }

public void Log(CreateLogItemRequest logRequest)
{
lock (_syncObj)
{
_buffer.Enqueue(logRequest);

var dependentTask = ProcessingTask ?? _reporter.StartTask;

ProcessingTask = dependentTask.ContinueWith(async (dt) =>
{
try
{
// only if parent reporter is successful
if (!_reporter.StartTask.IsFaulted && !_reporter.StartTask.IsCanceled)
{
var requests = GetBufferedLogRequests(batchCapacity: BatchCapacity);

if (requests.Count != 0)
{
foreach (var logItemRequest in requests)
{
_logRequestAmender.Amend(logItemRequest);
}

NotifySending(requests);

await _requestExecuter
.ExecuteAsync(async () => _asyncReporting
? await _service.AsyncLogItem.CreateAsync(requests.ToArray())
: await _service.LogItem.CreateAsync(requests.ToArray()),
null,
_reporter.StatisticsCounter.LogItemStatisticsCounter,
$"Sending {requests.Count} log items...")
.ConfigureAwait(false);
_queue.Add(logRequest);
}

NotifySent(requests.AsReadOnly());
}
}
}
catch (Exception exp)
{
TraceLogger.Error($"Unexpected error occurred while processing buffered log requests. {exp}");
}
}, TaskContinuationOptions.PreferFairness).Unwrap();
}
public void Finish()
{
_queue.CompleteAdding();
}

public void Sync()
{
try
{
Finish();

ProcessingTask?.GetAwaiter().GetResult();
}
catch
{
// we don't aware of failed requests for sending log messages (for now)
}

}

private List<CreateLogItemRequest> GetBufferedLogRequests(int batchCapacity)
private async Task ConsumeLogRequests()
{
var requests = new List<CreateLogItemRequest>();

var batchContainsItemWithAttachment = false;

lock (_syncObj)
try
{
for (int i = 0; i < batchCapacity; i++)
foreach (var logRequest in _queue.GetConsumingEnumerable())
{
if (_buffer.Count > 0)
if (logRequest.Attach != null)
{
var logItemRequest = _buffer.Peek();
await SendLogRequests(new List<CreateLogItemRequest> { logRequest });
}
else
{
var buffer = new List<CreateLogItemRequest>
{
logRequest
};

if (logItemRequest.Attach != null && batchContainsItemWithAttachment)
{
break;
}
else
for (int i = 0; i < BatchCapacity - 1; i++)
{
if (logItemRequest.Attach != null)
if (_queue.TryTake(out var nextLogRequest))
{
batchContainsItemWithAttachment = true;
if (nextLogRequest.Attach != null)
{
await SendLogRequests(buffer);

buffer.Clear();

await SendLogRequests(new List<CreateLogItemRequest> { nextLogRequest });
}
else
{
buffer.Add(nextLogRequest);
}
}
}

requests.Add(_buffer.Dequeue());
if (buffer.Count > 0)
{
await SendLogRequests(buffer);
}
}
}

}
catch (Exception ex)
{
TraceLogger.Error($"Unexpected error occurred while processing buffered log requests. {ex}");
}
}

private async Task SendLogRequests(List<CreateLogItemRequest> logRequests)
{
// only if parent reporter is successful
if (!_reporter.StartTask.IsFaulted && !_reporter.StartTask.IsCanceled)
{
try
{
foreach (var logItemRequest in logRequests)
{
_logRequestAmender.Amend(logItemRequest);
}

NotifySending(logRequests);

return requests;
await _requestExecuter
.ExecuteAsync(async () => _asyncReporting
? await _service.AsyncLogItem.CreateAsync(logRequests.ToArray())
: await _service.LogItem.CreateAsync(logRequests.ToArray()), null, _reporter.StatisticsCounter.LogItemStatisticsCounter)
.ConfigureAwait(false);

NotifySent(logRequests.AsReadOnly());
}
catch (Exception ex)
{
TraceLogger.Error($"Unexpected error occurred while sending log requests. {ex}");
}
}
}

private BeforeLogsSendingEventArgs NotifySending(IList<CreateLogItemRequest> requests)
Expand Down
15 changes: 8 additions & 7 deletions src/ReportPortal.Shared/Reporter/TestReporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public void Finish(FinishTestItemRequest request)

if (_logsReporter != null)
{
_logsReporter.Finish();
dependentTasks.Add(_logsReporter.ProcessingTask);
}

Expand Down Expand Up @@ -311,23 +312,23 @@ public void Log(CreateLogItemRequest request)

public void Sync()
{
_logsReporter?.Sync();

if (FinishTask != null)
{
FinishTask.GetAwaiter().GetResult();
}
else
{
StartTask?.GetAwaiter().GetResult();
}

if (ChildTestReporters != null)
if (ChildTestReporters != null)
{
foreach (var testNode in ChildTestReporters)
{
foreach (var testNode in ChildTestReporters)
{
testNode.Sync();
}
testNode.Sync();
}

_logsReporter?.Sync();
}
}

Expand Down
11 changes: 9 additions & 2 deletions test/ReportPortal.Shared.Benchmark/Reporter/ReporterBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ public class ReporterBenchmark
[Params(1, 100000)]
public int SuitesCount { get; set; }

[Params(0, 100)]
public int LogsCount { get; set; }

[Benchmark]
public void LaunchReporter()
{
var configuration = new Configuration.ConfigurationBuilder().Build();
configuration.Properties[ConfigurationPath.AsyncReporting] = true;
var configuration = new ConfigurationBuilder().Build();

var nopService = new NopService();
var launchReporter = new LaunchReporter(nopService, configuration, null, new ExtensionManager());
Expand All @@ -42,6 +44,11 @@ public void LaunchReporter()
Type = TestItemType.Suite
});

for (int j = 0; j < LogsCount; j++)
{
suiteNode.Log(new CreateLogItemRequest { Text = "abc" });
}

suiteNode.Finish(new FinishTestItemRequest
{
EndTime = launchDateTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using ReportPortal.Shared.Tests.Helpers;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -199,7 +198,7 @@ public void ShouldSendAsSeparateRequestPerLogWithAttachmentIncludingWithoutAttac

logsReporter.Sync();

service.Verify(s => s.AsyncLogItem.CreateAsync(It.IsAny<CreateLogItemRequest[]>(), default), Times.Exactly(2));
service.Verify(s => s.AsyncLogItem.CreateAsync(It.IsAny<CreateLogItemRequest[]>(), default), Times.Exactly(4));
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void ShouldSendAsSeparateRequestPerLogWithAttachmentIncludingWithoutAttac

logsReporter.Sync();

service.Verify(s => s.LogItem.CreateAsync(It.IsAny<CreateLogItemRequest[]>(), default), Times.Exactly(2));
service.Verify(s => s.LogItem.CreateAsync(It.IsAny<CreateLogItemRequest[]>(), default), Times.Exactly(4));
}

[Fact]
Expand Down

0 comments on commit 304b5ca

Please sign in to comment.