From 4dbf40ceb5cde75c5cf9f1352235836b90228b8d Mon Sep 17 00:00:00 2001 From: Martinho Novais Date: Fri, 21 Feb 2025 17:17:36 +0000 Subject: [PATCH] fix: catch more information about the items inserted with the same sort (#156) * fix: catch more information about the items inserted with the same sort --- .github/workflows/build.yml | 2 +- .../Adapters/QueuesAdapter.cs | 10 ++- .../Durable/Repository/Model/RetryQueue.cs | 11 ++++ .../Repository/RetryDurableQueueRepository.cs | 10 +-- src/KafkaFlow.Retry/Durable/RetryErrorCode.cs | 1 + .../conf/appsettings.json | 2 +- .../MongoDb/Adapters/QueuesAdapterTests.cs | 65 +++++++++++++++++++ 7 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 tests/KafkaFlow.Retry.UnitTests/Repositories/MongoDb/Adapters/QueuesAdapterTests.cs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 36283b53..2f2ff56d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -57,7 +57,7 @@ jobs: run: dotnet build --no-restore -c Release KafkaFlow.Retry.sln - name: Start SqlServer - run: docker run -d -p 1433:1433 -e ACCEPT_EULA=${{ env.ACCEPT_EULA }} -e SA_PASSWORD=${{ env.SQLSERVER_SA_PASSWORD }} -e MSSQL_PID=Developer mcr.microsoft.com/mssql/server:2017-latest + run: docker run -d -p 1433:1433 -e ACCEPT_EULA=${{ env.ACCEPT_EULA }} -e SA_PASSWORD=${{ env.SQLSERVER_SA_PASSWORD }} -e MSSQL_PID=Developer mcr.microsoft.com/mssql/server:2022-latest - name: Start Postgres run: docker run -d -p 5432:5432 -e POSTGRES_USER=${{ env.POSTGRES_SA_USER }} -e POSTGRES_PASSWORD=${{ env.POSTGRES_SA_PASSWORD }} postgres:latest diff --git a/src/KafkaFlow.Retry.MongoDb/Adapters/QueuesAdapter.cs b/src/KafkaFlow.Retry.MongoDb/Adapters/QueuesAdapter.cs index 1fd27a90..b181fc39 100644 --- a/src/KafkaFlow.Retry.MongoDb/Adapters/QueuesAdapter.cs +++ b/src/KafkaFlow.Retry.MongoDb/Adapters/QueuesAdapter.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using Dawn; +using KafkaFlow.Retry.Durable; using KafkaFlow.Retry.Durable.Repository.Model; using KafkaFlow.Retry.MongoDb.Adapters.Interfaces; using KafkaFlow.Retry.MongoDb.Model; @@ -35,7 +36,14 @@ public IEnumerable Adapt(IEnumerable queuesDbo, IEnum Guard.Argument(queuesDictionary.ContainsKey(itemDbo.RetryQueueId), nameof(itemDbo.RetryQueueId)) .True($"{nameof(itemDbo.RetryQueueId)} not found in queues list."); - queuesDictionary[itemDbo.RetryQueueId].AddItem(_itemAdapter.Adapt(itemDbo)); + var item = _itemAdapter.Adapt(itemDbo); + + if (!queuesDictionary[itemDbo.RetryQueueId].TryAddItem(item)) + { + throw new RetryDurableException( + new RetryError(RetryErrorCode.DataProviderGetRetryQueuesSameItemSort), + $"RetryQueueId:{itemDbo.RetryQueueId} RetryQueueItemId:{item.Id} RetryQueueItemSort:{item.Sort}"); + } } return queuesDictionary.Values; diff --git a/src/KafkaFlow.Retry/Durable/Repository/Model/RetryQueue.cs b/src/KafkaFlow.Retry/Durable/Repository/Model/RetryQueue.cs index 9daa276e..7217e751 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/Model/RetryQueue.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/Model/RetryQueue.cs @@ -57,4 +57,15 @@ public void AddItem(RetryQueueItem item) { _itemsList.Add(item.Sort, item); } + + public bool TryAddItem(RetryQueueItem item) + { + if (_itemsList.TryGetValue(item.Sort, out RetryQueueItem _)) + { + return false; + } + + _itemsList.Add(item.Sort, item); + return true; + } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs index 07d1eaaf..ece29a6e 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs @@ -149,15 +149,15 @@ public async Task> GetRetryQueuesAsync(GetQueuesInput ge return getQueuesResult?.RetryQueues ?? Enumerable.Empty(); } + catch (Exception ex) when (ex is RetryDurableException) + { + throw; + } catch (Exception ex) { - var kafkaException = new RetryDurableException( + throw new RetryDurableException( new RetryError(RetryErrorCode.DataProviderGetRetryQueues), "An error ocurred getting the retry queues", ex); - - //this.policyBuilder.OnDataProviderException(kafkaException); - - throw kafkaException; } } diff --git a/src/KafkaFlow.Retry/Durable/RetryErrorCode.cs b/src/KafkaFlow.Retry/Durable/RetryErrorCode.cs index a18bf15f..3abf5c47 100644 --- a/src/KafkaFlow.Retry/Durable/RetryErrorCode.cs +++ b/src/KafkaFlow.Retry/Durable/RetryErrorCode.cs @@ -31,4 +31,5 @@ public enum RetryErrorCode DataProviderCheckQueuePendingItems = 0204, DataProviderGetRetryQueues = 0205, DataProviderUpdateItem = 0206, + DataProviderGetRetryQueuesSameItemSort = 0207, } \ No newline at end of file diff --git a/tests/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json b/tests/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json index 60f0c793..8a117d77 100644 --- a/tests/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json +++ b/tests/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json @@ -10,7 +10,7 @@ "RetryQueueItemCollectionName": "RetryQueueItems" }, "SqlServerRepository": { - "ConnectionString": "Server=localhost; User ID=SA; Password=SqlSever123123; Pooling=true; Trusted_Connection=true; Integrated Security=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Tests; Encrypt=false;", + "ConnectionString": "Server=localhost; User ID=sa; Password=SqlSever123123; Pooling=true; Trusted_Connection=false; TrustServerCertificate=true; Integrated Security=false; Min Pool Size=1; Max Pool Size=100; Application Name=KafkaFlow Retry Tests; Encrypt=false;", "DatabaseName": "kafka_flow_retry_durable_test", "Schema": "dbo" }, diff --git a/tests/KafkaFlow.Retry.UnitTests/Repositories/MongoDb/Adapters/QueuesAdapterTests.cs b/tests/KafkaFlow.Retry.UnitTests/Repositories/MongoDb/Adapters/QueuesAdapterTests.cs new file mode 100644 index 00000000..c4567961 --- /dev/null +++ b/tests/KafkaFlow.Retry.UnitTests/Repositories/MongoDb/Adapters/QueuesAdapterTests.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using KafkaFlow.Retry.Durable; +using KafkaFlow.Retry.Durable.Common; +using KafkaFlow.Retry.Durable.Repository.Model; +using KafkaFlow.Retry.MongoDb.Adapters; +using KafkaFlow.Retry.MongoDb.Adapters.Interfaces; +using KafkaFlow.Retry.MongoDb.Model; +using Moq; + +namespace KafkaFlow.Retry.UnitTests.Repositories.MongoDb.Adapters; + +public class QueuesAdapterTests +{ + [Fact] + public void Adapt_SameSortOnDiffItems_ThrowsException() + { + //Arrange + var retryQueueId = Guid.Parse("A278590F-299B-4F4C-88F0-1EA3C4588786"); + + var mockIItemAdapter = new Mock(); + mockIItemAdapter + .Setup(x => x.Adapt(It.IsAny())) + .Returns( + new Queue(new List + { + new RetryQueueItem(Guid.Parse("A42311CF-2156-4A0C-AD81-EA235AA31B79"),1,DateTime.UtcNow,0,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"1"), + new RetryQueueItem(Guid.Parse("131DEACB-47D5-41BE-9755-9D8C7ED5576B"),1,DateTime.UtcNow,1,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"2"), + new RetryQueueItem(Guid.Parse("4DF0634D-3207-485D-9C5E-100BFF4607C5"),1,DateTime.UtcNow,1,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"3"), + new RetryQueueItem(Guid.Parse("7A1F3DA3-EFFD-46EA-A476-E60881246D5E"),1,DateTime.UtcNow,2,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"4"), + new RetryQueueItem(Guid.Parse("43220795-5BF1-4023-ADA1-789A391B0997"),1,DateTime.UtcNow,3,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"5"), + }).Dequeue); + + var adapter = new QueuesAdapter(mockIItemAdapter.Object); + + IEnumerable queuesDbo = new List + { + new RetryQueueDbo + { + Id = retryQueueId, + CreationDate = DateTime.UtcNow, + LastExecution = DateTime.UtcNow, + QueueGroupKey = "QueueGroupKey", + SearchGroupKey = "SearchGroupKey", + Status = RetryQueueStatus.Active, + } + }; + IEnumerable itemsDbo = new List + { + new RetryQueueItemDbo { RetryQueueId = retryQueueId }, + new RetryQueueItemDbo { RetryQueueId = retryQueueId }, + new RetryQueueItemDbo { RetryQueueId = retryQueueId }, + new RetryQueueItemDbo { RetryQueueId = retryQueueId }, + new RetryQueueItemDbo { RetryQueueId = retryQueueId }, + }; + + // Act + Action act = () => adapter.Adapt(queuesDbo, itemsDbo); + + // Assert + act.Should() + .Throw() + .WithMessage("RetryQueueId:a278590f-299b-4f4c-88f0-1ea3c4588786 RetryQueueItemId:4df0634d-3207-485d-9c5e-100bff4607c5 RetryQueueItemSort:1"); + } +} \ No newline at end of file