From 304b5cae526bb7e2209dc3b2716805a9aadb31e2 Mon Sep 17 00:00:00 2001
From: Nikolay Borisenko <22616990+nvborisenko@users.noreply.github.com>
Date: Tue, 28 Mar 2023 12:00:00 +0300
Subject: [PATCH] Less memory allocation while processing log requests (#138)

---
 .../Reporter/LaunchReporter.cs                |  15 +-
 .../Reporter/LogsReporter.cs                  | 143 ++++++++++--------
 .../Reporter/TestReporter.cs                  |  15 +-
 .../Reporter/ReporterBenchmark.cs             |  11 +-
 .../Reporter/AsyncLogsReporterFixture.cs      |   3 +-
 .../Reporter/LogsReporterFixture.cs           |   2 +-
 6 files changed, 104 insertions(+), 85 deletions(-)

diff --git a/src/ReportPortal.Shared/Reporter/LaunchReporter.cs b/src/ReportPortal.Shared/Reporter/LaunchReporter.cs
index 4b1eb557..3c4f4cc9 100644
--- a/src/ReportPortal.Shared/Reporter/LaunchReporter.cs
+++ b/src/ReportPortal.Shared/Reporter/LaunchReporter.cs
@@ -179,6 +179,7 @@ public void Finish(FinishLaunchRequest request)
 
             if (_logsReporter != null)
             {
+                _logsReporter.Finish();
                 dependentTasks.Add(_logsReporter.ProcessingTask);
             }
 
@@ -314,6 +315,8 @@ public void Log(CreateLogItemRequest createLogItemRequest)
 
         public void Sync()
         {
+            _logsReporter?.Sync();
+
             if (FinishTask != null)
             {
                 FinishTask.GetAwaiter().GetResult();
@@ -321,16 +324,14 @@ public void Sync()
             else
             {
                 StartTask?.GetAwaiter().GetResult();
+            }
 
-                if (ChildTestReporters != null)
+            if (ChildTestReporters != null)
+            {
+                foreach (var testNode in ChildTestReporters)
                 {
-                    foreach (var testNode in ChildTestReporters)
-                    {
-                        testNode.Sync();
-                    }
+                    testNode.Sync();
                 }
-
-                _logsReporter?.Sync();
             }
         }
 
diff --git a/src/ReportPortal.Shared/Reporter/LogsReporter.cs b/src/ReportPortal.Shared/Reporter/LogsReporter.cs
index dc964a41..4b0274b4 100644
--- a/src/ReportPortal.Shared/Reporter/LogsReporter.cs
+++ b/src/ReportPortal.Shared/Reporter/LogsReporter.cs
@@ -1,4 +1,4 @@
-using ReportPortal.Client.Abstractions;
+using ReportPortal.Client.Abstractions;
 using ReportPortal.Client.Abstractions.Requests;
 using ReportPortal.Shared.Configuration;
 using ReportPortal.Shared.Extensibility;
@@ -6,6 +6,7 @@
 using ReportPortal.Shared.Internal.Delegating;
 using ReportPortal.Shared.Internal.Logging;
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Threading.Tasks;
 
@@ -15,7 +16,7 @@ public class LogsReporter : ILogsReporter
     {
         private static ITraceLogger TraceLogger { get; } = TraceLogManager.Instance.GetLogger<LogsReporter>();
 
-        private readonly Queue<CreateLogItemRequest> _buffer = new Queue<CreateLogItemRequest>();
+        private readonly BlockingCollection<CreateLogItemRequest> _queue = new BlockingCollection<CreateLogItemRequest>();
 
         private readonly bool _asyncReporting;
         private readonly IReporter _reporter;
@@ -49,105 +50,115 @@ public LogsReporter(IReporter testReporter,
 
             if (batchCapacity < 1) throw new ArgumentException("Batch capacity for logs processing cannot be less than 1.", nameof(batchCapacity));
             BatchCapacity = batchCapacity;
-        }
 
-        private readonly object _syncObj = new object();
+            ProcessingTask = _reporter.StartTask.ContinueWith(async consumer =>
+            {
+                await ConsumeLogRequests();
+            }).Unwrap();
+        }
 
         public int BatchCapacity { get; }
 
         public void Log(CreateLogItemRequest logRequest)
         {
-            lock (_syncObj)
-            {
-                _buffer.Enqueue(logRequest);
-
-                var dependentTask = ProcessingTask ?? _reporter.StartTask;
-
-                ProcessingTask = dependentTask.ContinueWith(async (dt) =>
-                {
-                    try
-                    {
-                        // only if parent reporter is successful
-                        if (!_reporter.StartTask.IsFaulted && !_reporter.StartTask.IsCanceled)
-                        {
-                            var requests = GetBufferedLogRequests(batchCapacity: BatchCapacity);
-
-                            if (requests.Count != 0)
-                            {
-                                foreach (var logItemRequest in requests)
-                                {
-                                    _logRequestAmender.Amend(logItemRequest);
-                                }
-
-                                NotifySending(requests);
-
-                                await _requestExecuter
-                                    .ExecuteAsync(async () => _asyncReporting
-                                        ? await _service.AsyncLogItem.CreateAsync(requests.ToArray())
-                                        : await _service.LogItem.CreateAsync(requests.ToArray()),
-                                        null,
-                                        _reporter.StatisticsCounter.LogItemStatisticsCounter,
-                                        $"Sending {requests.Count} log items...")
-                                    .ConfigureAwait(false);
+            _queue.Add(logRequest);
+        }
 
-                                NotifySent(requests.AsReadOnly());
-                            }
-                        }
-                    }
-                    catch (Exception exp)
-                    {
-                        TraceLogger.Error($"Unexpected error occurred while processing buffered log requests. {exp}");
-                    }
-                }, TaskContinuationOptions.PreferFairness).Unwrap();
-            }
+        public void Finish()
+        {
+            _queue.CompleteAdding();
         }
 
         public void Sync()
         {
             try
             {
+                Finish();
+
                 ProcessingTask?.GetAwaiter().GetResult();
             }
             catch
             {
                 // we don't aware of failed requests for sending log messages (for now)
             }
-
         }
 
-        private List<CreateLogItemRequest> GetBufferedLogRequests(int batchCapacity)
+        private async Task ConsumeLogRequests()
         {
-            var requests = new List<CreateLogItemRequest>();
-
-            var batchContainsItemWithAttachment = false;
-
-            lock (_syncObj)
+            try
             {
-                for (int i = 0; i < batchCapacity; i++)
+                foreach (var logRequest in _queue.GetConsumingEnumerable())
                 {
-                    if (_buffer.Count > 0)
+                    if (logRequest.Attach != null)
                     {
-                        var logItemRequest = _buffer.Peek();
+                        await SendLogRequests(new List<CreateLogItemRequest> { logRequest });
+                    }
+                    else
+                    {
+                        var buffer = new List<CreateLogItemRequest>
+                            {
+                                logRequest
+                            };
 
-                        if (logItemRequest.Attach != null && batchContainsItemWithAttachment)
-                        {
-                            break;
-                        }
-                        else
+                        for (int i = 0; i < BatchCapacity - 1; i++)
                         {
-                            if (logItemRequest.Attach != null)
+                            if (_queue.TryTake(out var nextLogRequest))
                             {
-                                batchContainsItemWithAttachment = true;
+                                if (nextLogRequest.Attach != null)
+                                {
+                                    await SendLogRequests(buffer);
+
+                                    buffer.Clear();
+
+                                    await SendLogRequests(new List<CreateLogItemRequest> { nextLogRequest });
+                                }
+                                else
+                                {
+                                    buffer.Add(nextLogRequest);
+                                }
                             }
+                        }
 
-                            requests.Add(_buffer.Dequeue());
+                        if (buffer.Count > 0)
+                        {
+                            await SendLogRequests(buffer);
                         }
                     }
                 }
-
             }
+            catch (Exception ex)
+            {
+                TraceLogger.Error($"Unexpected error occurred while processing buffered log requests. {ex}");
+            }
+        }
+
+        private async Task SendLogRequests(List<CreateLogItemRequest> logRequests)
+        {
+            // only if parent reporter is successful
+            if (!_reporter.StartTask.IsFaulted && !_reporter.StartTask.IsCanceled)
+            {
+                try
+                {
+                    foreach (var logItemRequest in logRequests)
+                    {
+                        _logRequestAmender.Amend(logItemRequest);
+                    }
+
+                    NotifySending(logRequests);
 
-            return requests;
+                    await _requestExecuter
+                        .ExecuteAsync(async () => _asyncReporting
+                            ? await _service.AsyncLogItem.CreateAsync(logRequests.ToArray())
+                            : await _service.LogItem.CreateAsync(logRequests.ToArray()), null, _reporter.StatisticsCounter.LogItemStatisticsCounter)
+                        .ConfigureAwait(false);
+
+                    NotifySent(logRequests.AsReadOnly());
+                }
+                catch (Exception ex)
+                {
+                    TraceLogger.Error($"Unexpected error occurred while sending log requests. {ex}");
+                }
+            }
         }
 
         private BeforeLogsSendingEventArgs NotifySending(IList<CreateLogItemRequest> requests)
diff --git a/src/ReportPortal.Shared/Reporter/TestReporter.cs b/src/ReportPortal.Shared/Reporter/TestReporter.cs
index 356f1b9b..03b1c576 100644
--- a/src/ReportPortal.Shared/Reporter/TestReporter.cs
+++ b/src/ReportPortal.Shared/Reporter/TestReporter.cs
@@ -169,6 +169,7 @@ public void Finish(FinishTestItemRequest request)
 
             if (_logsReporter != null)
             {
+                _logsReporter.Finish();
                 dependentTasks.Add(_logsReporter.ProcessingTask);
             }
 
@@ -311,6 +312,8 @@ public void Log(CreateLogItemRequest request)
 
         public void Sync()
         {
+            _logsReporter?.Sync();
+
             if (FinishTask != null)
             {
                 FinishTask.GetAwaiter().GetResult();
@@ -318,16 +321,14 @@ public void Sync()
             else
             {
                 StartTask?.GetAwaiter().GetResult();
+            }
 
-                if (ChildTestReporters != null)
+            if (ChildTestReporters != null)
+            {
+                foreach (var testNode in ChildTestReporters)
                 {
-                    foreach (var testNode in ChildTestReporters)
-                    {
-                        testNode.Sync();
-                    }
+                    testNode.Sync();
                 }
-
-                _logsReporter?.Sync();
             }
         }
 
diff --git a/test/ReportPortal.Shared.Benchmark/Reporter/ReporterBenchmark.cs b/test/ReportPortal.Shared.Benchmark/Reporter/ReporterBenchmark.cs
index 2e09f271..347f8e86 100644
--- a/test/ReportPortal.Shared.Benchmark/Reporter/ReporterBenchmark.cs
+++ b/test/ReportPortal.Shared.Benchmark/Reporter/ReporterBenchmark.cs
@@ -15,11 +15,13 @@ public class ReporterBenchmark
         [Params(1, 100000)]
         public int SuitesCount { get; set; }
 
+        [Params(0, 100)]
+        public int LogsCount { get; set; }
+
         [Benchmark]
         public void LaunchReporter()
         {
-            var configuration = new Configuration.ConfigurationBuilder().Build();
-            configuration.Properties[ConfigurationPath.AsyncReporting] = true;
+            var configuration = new ConfigurationBuilder().Build();
 
             var nopService = new NopService();
             var launchReporter = new LaunchReporter(nopService, configuration, null, new ExtensionManager());
@@ -42,6 +44,11 @@ public void LaunchReporter()
                     Type = TestItemType.Suite
                 });
 
+                for (int j = 0; j < LogsCount; j++)
+                {
+                    suiteNode.Log(new CreateLogItemRequest { Text = "abc" });
+                }
+
                 suiteNode.Finish(new FinishTestItemRequest
                 {
                     EndTime = launchDateTime,
diff --git a/test/ReportPortal.Shared.Tests/Reporter/AsyncLogsReporterFixture.cs b/test/ReportPortal.Shared.Tests/Reporter/AsyncLogsReporterFixture.cs
index bde125e3..aa4d6043 100644
--- a/test/ReportPortal.Shared.Tests/Reporter/AsyncLogsReporterFixture.cs
+++ b/test/ReportPortal.Shared.Tests/Reporter/AsyncLogsReporterFixture.cs
@@ -9,7 +9,6 @@
 using ReportPortal.Shared.Tests.Helpers;
 using System;
 using System.Collections.Generic;
-using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
@@ -199,7 +198,7 @@ public void ShouldSendAsSeparateRequestPerLogWithAttachmentIncludingWithoutAttac
 
             logsReporter.Sync();
 
-            service.Verify(s => s.AsyncLogItem.CreateAsync(It.IsAny<CreateLogItemRequest[]>(), default), Times.Exactly(2));
+            service.Verify(s => s.AsyncLogItem.CreateAsync(It.IsAny<CreateLogItemRequest[]>(), default), Times.Exactly(4));
         }
 
         [Fact]
diff --git a/test/ReportPortal.Shared.Tests/Reporter/LogsReporterFixture.cs b/test/ReportPortal.Shared.Tests/Reporter/LogsReporterFixture.cs
index e35d2b14..4e84a488 100644
--- a/test/ReportPortal.Shared.Tests/Reporter/LogsReporterFixture.cs
+++ b/test/ReportPortal.Shared.Tests/Reporter/LogsReporterFixture.cs
@@ -157,7 +157,7 @@ public void ShouldSendAsSeparateRequestPerLogWithAttachmentIncludingWithoutAttac
 
             logsReporter.Sync();
 
-            service.Verify(s => s.LogItem.CreateAsync(It.IsAny<CreateLogItemRequest[]>(), default), Times.Exactly(2));
+            service.Verify(s => s.LogItem.CreateAsync(It.IsAny<CreateLogItemRequest[]>(), default), Times.Exactly(4));
         }
 
         [Fact]