Skip to content

Commit

Permalink
fix: catch more information about the items inserted with the same so…
Browse files Browse the repository at this point in the history
…rt (#156)

* fix: catch more information about the items inserted with the same sort
  • Loading branch information
martinhonovais authored Feb 21, 2025
1 parent 1376184 commit 4dbf40c
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
run: dotnet build --no-restore -c Release KafkaFlow.Retry.sln

- name: Start SqlServer
run: docker run -d -p 1433:1433 -e ACCEPT_EULA=${{ env.ACCEPT_EULA }} -e SA_PASSWORD=${{ env.SQLSERVER_SA_PASSWORD }} -e MSSQL_PID=Developer mcr.microsoft.com/mssql/server:2017-latest
run: docker run -d -p 1433:1433 -e ACCEPT_EULA=${{ env.ACCEPT_EULA }} -e SA_PASSWORD=${{ env.SQLSERVER_SA_PASSWORD }} -e MSSQL_PID=Developer mcr.microsoft.com/mssql/server:2022-latest

- name: Start Postgres
run: docker run -d -p 5432:5432 -e POSTGRES_USER=${{ env.POSTGRES_SA_USER }} -e POSTGRES_PASSWORD=${{ env.POSTGRES_SA_PASSWORD }} postgres:latest
Expand Down
10 changes: 9 additions & 1 deletion src/KafkaFlow.Retry.MongoDb/Adapters/QueuesAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using Dawn;
using KafkaFlow.Retry.Durable;
using KafkaFlow.Retry.Durable.Repository.Model;
using KafkaFlow.Retry.MongoDb.Adapters.Interfaces;
using KafkaFlow.Retry.MongoDb.Model;
Expand Down Expand Up @@ -35,7 +36,14 @@ public IEnumerable<RetryQueue> Adapt(IEnumerable<RetryQueueDbo> queuesDbo, IEnum
Guard.Argument(queuesDictionary.ContainsKey(itemDbo.RetryQueueId), nameof(itemDbo.RetryQueueId))
.True($"{nameof(itemDbo.RetryQueueId)} not found in queues list.");

queuesDictionary[itemDbo.RetryQueueId].AddItem(_itemAdapter.Adapt(itemDbo));
var item = _itemAdapter.Adapt(itemDbo);

if (!queuesDictionary[itemDbo.RetryQueueId].TryAddItem(item))
{
throw new RetryDurableException(
new RetryError(RetryErrorCode.DataProviderGetRetryQueuesSameItemSort),
$"RetryQueueId:{itemDbo.RetryQueueId} RetryQueueItemId:{item.Id} RetryQueueItemSort:{item.Sort}");
}
}

return queuesDictionary.Values;
Expand Down
11 changes: 11 additions & 0 deletions src/KafkaFlow.Retry/Durable/Repository/Model/RetryQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,15 @@ public void AddItem(RetryQueueItem item)
{
_itemsList.Add(item.Sort, item);
}

public bool TryAddItem(RetryQueueItem item)
{
if (_itemsList.TryGetValue(item.Sort, out RetryQueueItem _))
{
return false;
}

_itemsList.Add(item.Sort, item);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ public async Task<IEnumerable<RetryQueue>> GetRetryQueuesAsync(GetQueuesInput ge

return getQueuesResult?.RetryQueues ?? Enumerable.Empty<RetryQueue>();
}
catch (Exception ex) when (ex is RetryDurableException)
{
throw;
}
catch (Exception ex)
{
var kafkaException = new RetryDurableException(
throw new RetryDurableException(
new RetryError(RetryErrorCode.DataProviderGetRetryQueues),
"An error ocurred getting the retry queues", ex);

//this.policyBuilder.OnDataProviderException(kafkaException);

throw kafkaException;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/KafkaFlow.Retry/Durable/RetryErrorCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ public enum RetryErrorCode
DataProviderCheckQueuePendingItems = 0204,
DataProviderGetRetryQueues = 0205,
DataProviderUpdateItem = 0206,
DataProviderGetRetryQueuesSameItemSort = 0207,
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"RetryQueueItemCollectionName": "RetryQueueItems"
},
"SqlServerRepository": {
"ConnectionString": "Server=localhost; User ID=SA; Password=SqlSever123123; Pooling=true; Trusted_Connection=true; Integrated Security=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Tests; Encrypt=false;",
"ConnectionString": "Server=localhost; User ID=sa; Password=SqlSever123123; Pooling=true; Trusted_Connection=false; TrustServerCertificate=true; Integrated Security=false; Min Pool Size=1; Max Pool Size=100; Application Name=KafkaFlow Retry Tests; Encrypt=false;",
"DatabaseName": "kafka_flow_retry_durable_test",
"Schema": "dbo"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
using KafkaFlow.Retry.Durable;
using KafkaFlow.Retry.Durable.Common;
using KafkaFlow.Retry.Durable.Repository.Model;
using KafkaFlow.Retry.MongoDb.Adapters;
using KafkaFlow.Retry.MongoDb.Adapters.Interfaces;
using KafkaFlow.Retry.MongoDb.Model;
using Moq;

namespace KafkaFlow.Retry.UnitTests.Repositories.MongoDb.Adapters;

public class QueuesAdapterTests
{
[Fact]
public void Adapt_SameSortOnDiffItems_ThrowsException()
{
//Arrange
var retryQueueId = Guid.Parse("A278590F-299B-4F4C-88F0-1EA3C4588786");

var mockIItemAdapter = new Mock<IItemAdapter>();
mockIItemAdapter
.Setup(x => x.Adapt(It.IsAny<RetryQueueItemDbo>()))
.Returns(
new Queue<RetryQueueItem>(new List<RetryQueueItem>
{
new RetryQueueItem(Guid.Parse("A42311CF-2156-4A0C-AD81-EA235AA31B79"),1,DateTime.UtcNow,0,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"1"),
new RetryQueueItem(Guid.Parse("131DEACB-47D5-41BE-9755-9D8C7ED5576B"),1,DateTime.UtcNow,1,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"2"),
new RetryQueueItem(Guid.Parse("4DF0634D-3207-485D-9C5E-100BFF4607C5"),1,DateTime.UtcNow,1,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"3"),
new RetryQueueItem(Guid.Parse("7A1F3DA3-EFFD-46EA-A476-E60881246D5E"),1,DateTime.UtcNow,2,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"4"),
new RetryQueueItem(Guid.Parse("43220795-5BF1-4023-ADA1-789A391B0997"),1,DateTime.UtcNow,3,null,null,RetryQueueItemStatus.Waiting,SeverityLevel.Unknown,"5"),
}).Dequeue);

var adapter = new QueuesAdapter(mockIItemAdapter.Object);

IEnumerable<RetryQueueDbo> queuesDbo = new List<RetryQueueDbo>
{
new RetryQueueDbo
{
Id = retryQueueId,
CreationDate = DateTime.UtcNow,
LastExecution = DateTime.UtcNow,
QueueGroupKey = "QueueGroupKey",
SearchGroupKey = "SearchGroupKey",
Status = RetryQueueStatus.Active,
}
};
IEnumerable<RetryQueueItemDbo> itemsDbo = new List<RetryQueueItemDbo>
{
new RetryQueueItemDbo { RetryQueueId = retryQueueId },
new RetryQueueItemDbo { RetryQueueId = retryQueueId },
new RetryQueueItemDbo { RetryQueueId = retryQueueId },
new RetryQueueItemDbo { RetryQueueId = retryQueueId },
new RetryQueueItemDbo { RetryQueueId = retryQueueId },
};

// Act
Action act = () => adapter.Adapt(queuesDbo, itemsDbo);

// Assert
act.Should()
.Throw<RetryDurableException>()
.WithMessage("RetryQueueId:a278590f-299b-4f4c-88f0-1ea3c4588786 RetryQueueItemId:4df0634d-3207-485d-9c5e-100bff4607c5 RetryQueueItemSort:1");
}
}

0 comments on commit 4dbf40c

Please sign in to comment.