Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add polling to get the number of active queues #155

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,18 @@ internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer(
.WithCleanupPollingConfiguration(
configure => configure
.Enabled(false)
.WithCronExpression("0 0/1 * 1/1 * ? *")
)
.WithRetryDurableActiveQueuesCountPollingConfiguration(
configure => configure
.Enabled(true)
.WithCronExpression("0 0/1 * 1/1 * ? *")
.Do((numberOfActiveQueues) =>
{
Console.Write($"Number of active queues {numberOfActiveQueues}");
})
)

))
.AddTypedHandlers(
handlers => handlers
Expand Down
12 changes: 10 additions & 2 deletions samples/KafkaFlow.Retry.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@ private static async Task Main()
var mongoDbDatabaseName = "kafka_flow_retry_durable_sample";
var mongoDbRetryQueueCollectionName = "RetryQueues";
var mongoDbRetryQueueItemCollectionName = "RetryQueueItems";
var sqlServerConnectionString =
"Server=localhost;Trusted_Connection=True; Pooling=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Sample";
var sqlServerConnectionString = string.Join(
string.Empty,
"Server=localhost;",
"Trusted_Connection=True;",
"Pooling=true;",
"Min Pool Size=1;",
"Max Pool Size=100;",
"MultipleActiveResultSets=true;",
"Application Name=KafkaFlow Retry Tests;"
);
var sqlServerDatabaseName = "kafka_flow_retry_durable_sample";
var topics = new[]
{
Expand Down
5 changes: 5 additions & 0 deletions src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
return new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems);
}

public Task<long> CountQueuesAsync(CountQueuesInput input)
{
throw new NotImplementedException();
}

public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)
{
Guard.Argument(input, nameof(input)).NotNull();
Expand Down
5 changes: 5 additions & 0 deletions src/KafkaFlow.Retry.Postgres/RetryQueueDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
}
}

public Task<long> CountQueuesAsync(CountQueuesInput input)
{
throw new NotImplementedException();
}

public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)
{
Guard.Argument(input, nameof(input)).NotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ internal interface IRetryQueueRepository

Task<RetryQueueDbo> GetQueueAsync(IDbConnection dbConnection, string queueGroupKey);

Task<long> CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus);

Task<IList<RetryQueueDbo>> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top);

Task<int> UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution);
Expand Down
24 changes: 21 additions & 3 deletions src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ public async Task<long> AddAsync(IDbConnection dbConnection, RetryQueueDbo retry
}
}

public async Task<long> CountQueueAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus)
{
using (var command = dbConnection.CreateCommand())
{
command.CommandType = CommandType.Text;
command.CommandText =
$@"SELECT COUNT(1)
FROM [{dbConnection.Schema}].[RetryQueues]
WHERE SearchGroupKey = @SearchGroupKey
AND IdStatus = @IdStatus";

command.Parameters.AddWithValue("SearchGroupKey", searchGroupKey);
command.Parameters.AddWithValue("IdStatus", (byte)retryQueueStatus);

return (Int32) await command.ExecuteScalarAsync().ConfigureAwait(false);
}
}

public async Task<int> DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey,
RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete)
{
Expand Down Expand Up @@ -83,9 +101,9 @@ public async Task<RetryQueueDbo> GetQueueAsync(IDbConnection dbConnection, strin
command.CommandType = CommandType.Text;
command.CommandText =
$@"SELECT Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution
FROM [{dbConnection.Schema}].[RetryQueues]
WHERE QueueGroupKey = @QueueGroupKey
ORDER BY Id";
FROM [{dbConnection.Schema}].[RetryQueues]
WHERE QueueGroupKey = @QueueGroupKey
ORDER BY Id";

command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey);

Expand Down
14 changes: 14 additions & 0 deletions src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ public async Task<QueuePendingItemsResult> CheckQueuePendingItemsAsync(QueuePend
}
}

public async Task<long> CountQueuesAsync(CountQueuesInput input)
{
Guard.Argument(input, nameof(input)).NotNull();

using (var dbConnection = _connectionProvider.Create(_sqlServerDbSettings))
{
return await _retryQueueRepository.CountQueueAsync(
dbConnection,
input.SearchGroupKey,
input.Status)
.ConfigureAwait(false);
}
}

public async Task<DeleteQueuesResult> DeleteQueuesAsync(DeleteQueuesInput input)
{
Guard.Argument(input, nameof(input)).NotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using Dawn;
using KafkaFlow.Retry.Durable.Definitions.Builders.Polling;
using KafkaFlow.Retry.Durable.Definitions.Polling;

namespace KafkaFlow.Retry;
Expand All @@ -11,12 +12,14 @@ public class PollingDefinitionsAggregatorBuilder
private readonly CleanupPollingDefinitionBuilder _cleanupPollingDefinitionBuilder;
private readonly List<PollingDefinition> _pollingDefinitions;
private readonly RetryDurablePollingDefinitionBuilder _retryDurablePollingDefinitionBuilder;
private readonly RetryDurableActiveQueuesCountPollingDefinitionBuilder _retryDurableActiveQueuesCountPollingDefinitionBuilder;
private string _schedulerId;

public PollingDefinitionsAggregatorBuilder()
{
_cleanupPollingDefinitionBuilder = new CleanupPollingDefinitionBuilder();
_retryDurablePollingDefinitionBuilder = new RetryDurablePollingDefinitionBuilder();
_retryDurableActiveQueuesCountPollingDefinitionBuilder = new RetryDurableActiveQueuesCountPollingDefinitionBuilder();

_pollingDefinitions = new List<PollingDefinition>();
}
Expand Down Expand Up @@ -47,6 +50,19 @@ public PollingDefinitionsAggregatorBuilder WithRetryDurablePollingConfiguration(
return this;
}

public PollingDefinitionsAggregatorBuilder WithRetryDurableActiveQueuesCountPollingConfiguration(
Action<RetryDurableActiveQueuesCountPollingDefinitionBuilder> configure)
{
Guard.Argument(configure, nameof(configure)).NotNull();

configure(_retryDurableActiveQueuesCountPollingDefinitionBuilder);
var etryDurableActiveQueuesCountPollingDefinition = _retryDurableActiveQueuesCountPollingDefinitionBuilder.Build();

_pollingDefinitions.Add(etryDurableActiveQueuesCountPollingDefinition);

return this;
}

public PollingDefinitionsAggregatorBuilder WithSchedulerId(string schedulerId)
{
_schedulerId = schedulerId;
Expand All @@ -65,6 +81,11 @@ internal PollingDefinitionsAggregator Build()
ValidateRequiredPollingDefinition(PollingJobType.Cleanup);
}

if (_retryDurableActiveQueuesCountPollingDefinitionBuilder.Required)
{
ValidateRequiredPollingDefinition(PollingJobType.RetryDurableActiveQueuesCount);
}

return new PollingDefinitionsAggregator(_schedulerId, _pollingDefinitions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using KafkaFlow.Retry.Durable.Definitions.Polling;

namespace KafkaFlow.Retry.Durable.Definitions.Builders.Polling;
public class RetryDurableActiveQueuesCountPollingDefinitionBuilder : PollingDefinitionBuilder<RetryDurableActiveQueuesCountPollingDefinitionBuilder>
{
protected Action<long> ActionToPerform = null;

Check notice on line 7 in src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/RetryDurableActiveQueuesCountPollingDefinitionBuilder.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/RetryDurableActiveQueuesCountPollingDefinitionBuilder.cs#L7

Remove this initialization to 'ActionToPerform', the compiler will do that for you.

internal override bool Required => false;

public RetryDurableActiveQueuesCountPollingDefinitionBuilder Do(Action<long> actionToPerform)
{
ActionToPerform = actionToPerform;
return this;
}

internal RetryDurableActiveQueuesCountPollingDefinition Build()
{
return new RetryDurableActiveQueuesCountPollingDefinition(
IsEnabled,
CronExpression,
ActionToPerform
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ internal enum PollingJobType
Unknown = 0,

RetryDurable = 1,
Cleanup = 2
Cleanup = 2,
RetryDurableActiveQueuesCount = 3
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using Dawn;

namespace KafkaFlow.Retry.Durable.Definitions.Polling;
internal class RetryDurableActiveQueuesCountPollingDefinition : PollingDefinition
{
public RetryDurableActiveQueuesCountPollingDefinition(
bool enabled,
string cronExpression,
Action<long> activeQueues)
: base(enabled, cronExpression)
{
Guard.Argument(activeQueues, nameof(activeQueues)).NotNull();
ActiveQueues = activeQueues;
}

public override PollingJobType PollingJobType => PollingJobType.RetryDurableActiveQueuesCount;

public Action<long> ActiveQueues { get; }
}
15 changes: 14 additions & 1 deletion src/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public JobDataProvidersFactory(

public IEnumerable<IJobDataProvider> Create(IMessageProducer retryDurableMessageProducer, ILogHandler logHandler)
{
var jobDataProviders = new List<IJobDataProvider>(2);
var jobDataProviders = new List<IJobDataProvider>(3);

if (TryGetPollingDefinition<RetryDurablePollingDefinition>(PollingJobType.RetryDurable,
out var retryDurablePollingDefinition))
Expand Down Expand Up @@ -71,6 +71,19 @@ public IEnumerable<IJobDataProvider> Create(IMessageProducer retryDurableMessage
);
}

if (TryGetPollingDefinition<RetryDurableActiveQueuesCountPollingDefinition>(PollingJobType.RetryDurableActiveQueuesCount, out var retryDurableActiveQueuesCountPollingDefinition))
{
jobDataProviders.Add(
new RetryDurableActiveQueuesCountJobDataProvider(
retryDurableActiveQueuesCountPollingDefinition,
GetTrigger(retryDurableActiveQueuesCountPollingDefinition),
_pollingDefinitionsAggregator.SchedulerId,
_retryDurableQueueRepository,
logHandler
)
);
}

return jobDataProviders;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace KafkaFlow.Retry.Durable.Polling.Jobs;

[DisallowConcurrentExecutionAttribute]
[DisallowConcurrentExecution]
internal class CleanupPollingJob : IJob
{
public async Task Execute(IJobExecutionContext context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ internal static class PollingJobConstants
public const string RetryDurableQueueRepository = "RetryDurableQueueRepository";
public const string SchedulerId = "SchedulerId";
public const string Utf8Encoder = "Utf8Encoder";
public const string RetryDurableActiveQueuesCountPollingDefinition = "RetryDurableActiveQueuesCountPollingDefinition";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Threading.Tasks;
using KafkaFlow.Retry.Durable.Definitions.Polling;
using KafkaFlow.Retry.Durable.Polling.Extensions;
using KafkaFlow.Retry.Durable.Repository;
using KafkaFlow.Retry.Durable.Repository.Actions.Delete;
using KafkaFlow.Retry.Durable.Repository.Actions.Read;
using KafkaFlow.Retry.Durable.Repository.Model;
using Quartz;

namespace KafkaFlow.Retry.Durable.Polling.Jobs;

[DisallowConcurrentExecution]
internal class RetryDurableActiveQueuesCountJob : IJob
{
public async Task Execute(IJobExecutionContext context)
{
var jobDataMap = context.JobDetail.JobDataMap;

var retryDurableActiveQueuesCountPollingDefinition =
jobDataMap.GetValidValue<RetryDurableActiveQueuesCountPollingDefinition>(PollingJobConstants.RetryDurableActiveQueuesCountPollingDefinition,
nameof(RetryDurableActiveQueuesCountJob));
var schedulerId = jobDataMap.GetValidStringValue(PollingJobConstants.SchedulerId, nameof(RetryDurableActiveQueuesCountJob));
var retryDurableQueueRepository =
jobDataMap.GetValidValue<IRetryDurableQueueRepository>(PollingJobConstants.RetryDurableQueueRepository,
nameof(RetryDurableActiveQueuesCountJob));
var logHandler =
jobDataMap.GetValidValue<ILogHandler>(PollingJobConstants.LogHandler, nameof(RetryDurableActiveQueuesCountJob));

try
{
logHandler.Info(
$"{nameof(RetryDurableActiveQueuesCountJob)} starts execution",
new
{
context.Trigger.Key.Name
}
);

var countQueuesInput = new CountQueuesInput(RetryQueueStatus.Active)
{
SearchGroupKey = schedulerId
};

var countQueuesResult =
await retryDurableQueueRepository.CountRetryQueuesAsync(countQueuesInput).ConfigureAwait(false);

retryDurableActiveQueuesCountPollingDefinition.ActiveQueues(countQueuesResult);

logHandler.Info(
$"{nameof(RetryDurableActiveQueuesCountJob)} executed successfully.",
new
{
SearchGroupKey = schedulerId,
NumberOfActiveQueues = countQueuesResult
});
}
catch (Exception ex)
{
logHandler.Error($"Exception on {nameof(RetryDurableActiveQueuesCountJob)} execution", ex, null);
}
}
}
Loading
Loading