Skip to content

Commit

Permalink
Add connection provider
Browse files Browse the repository at this point in the history
  • Loading branch information
filipbekic01 committed Oct 29, 2024
1 parent bbb2a0c commit 16387d9
Show file tree
Hide file tree
Showing 14 changed files with 222 additions and 133 deletions.
11 changes: 1 addition & 10 deletions backend/ResQueue/ResQueue/Endpoints/AuthEndpoints.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using Microsoft.Extensions.Options;
using ResQueue.Dtos;

namespace ResQueue.Endpoints;

public static class AuthEndpoints
Expand All @@ -9,12 +6,6 @@ public static void MapAuthEndpoints(this IEndpointRouteBuilder routes)
{
RouteGroupBuilder group = routes.MapGroup("auth");

group.MapGet("", (IOptions<ResQueueOptions> options) => Results.Ok(new AuthDto(
SqlEngine: options.Value.SqlEngine,
Username: options.Value.Username,
Database: options.Value.Database,
Schema: options.Value.Schema,
Port: options.Value.Port
)));
group.MapGet("", () => Results.Ok());
}
}
27 changes: 14 additions & 13 deletions backend/ResQueue/ResQueue/Endpoints/QueuesEndpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using ResQueue.Enums;
using ResQueue.Factories;
using ResQueue.Features.Messages.PurgeQueue;
using ResQueue.Providers.DbConnectionProvider;

namespace ResQueue.Endpoints;

Expand All @@ -15,9 +16,9 @@ public static void MapQueueEndpoints(this IEndpointRouteBuilder routes)
{
RouteGroupBuilder group = routes.MapGroup("queues");

group.MapGet("view", async (IDatabaseConnectionFactory connectionFactory, IOptions<ResQueueOptions> options) =>
group.MapGet("view", async (IDatabaseConnectionFactory connectionFactory, IDbConnectionProvider conn) =>
{
var sql = options.Value.SqlEngine switch
var sql = conn.SqlEngine switch
{
ResQueueSqlEngine.Postgres => $"""
SELECT
Expand All @@ -33,7 +34,7 @@ public static void MapQueueEndpoints(this IEndpointRouteBuilder routes)
dead_letter_count AS DeadLetterCount,
'' AS CountStartTime,
count_duration AS CountDuration
FROM {options.Value.Schema}.queues;
FROM {conn.Schema}.queues;
""",
ResQueueSqlEngine.SqlServer => $"""
SELECT
Expand All @@ -49,7 +50,7 @@ count_duration AS CountDuration
DeadLetterCount,
CountStartTime,
CountDuration
FROM {options.Value.Schema}.Queues;
FROM {conn.Schema}.Queues;
""",
_ => throw new NotSupportedException("Unsupported SQL engine")
};
Expand All @@ -62,9 +63,9 @@ count_duration AS CountDuration
});

group.MapGet("view/{queueName}", async (IDatabaseConnectionFactory connectionFactory,
IOptions<ResQueueOptions> options, string queueName) =>
IDbConnectionProvider conn, string queueName) =>
{
var sql = options.Value.SqlEngine switch
var sql = conn.SqlEngine switch
{
ResQueueSqlEngine.Postgres => $"""
SELECT
Expand All @@ -80,7 +81,7 @@ count_duration AS CountDuration
dead_letter_count AS DeadLetterCount,
'' AS CountStartTime,
count_duration AS CountDuration
FROM {options.Value.Schema}.queues
FROM {conn.Schema}.queues
WHERE queue_name = @QueueName;
""",
ResQueueSqlEngine.SqlServer => $"""
Expand All @@ -97,7 +98,7 @@ count_duration AS CountDuration
DeadLetterCount,
CountStartTime,
CountDuration
FROM {options.Value.Schema}.Queues
FROM {conn.Schema}.Queues
WHERE QueueName = @QueueName;
""",
_ => throw new NotSupportedException("Unsupported SQL engine")
Expand All @@ -112,9 +113,9 @@ count_duration AS CountDuration

group.MapGet("",
async ([FromQuery] string queueName, IDatabaseConnectionFactory connectionFactory,
IOptions<ResQueueOptions> options) =>
IDbConnectionProvider conn) =>
{
var sql = options.Value.SqlEngine switch
var sql = conn.SqlEngine switch
{
ResQueueSqlEngine.Postgres => $"""
SELECT
Expand All @@ -123,7 +124,7 @@ count_duration AS CountDuration
name AS Name,
type AS Type,
auto_delete AS AutoDelete
FROM {options.Value.Schema}.queue
FROM {conn.Schema}.queue
WHERE name = @QueueName;
""",
ResQueueSqlEngine.SqlServer => $"""
Expand All @@ -133,7 +134,7 @@ auto_delete AS AutoDelete
Name,
CAST(type AS INT) AS Type,
AutoDelete
FROM {options.Value.Schema}.Queue
FROM {conn.Schema}.Queue
WHERE Name = @QueueName;
""",
_ => throw new NotSupportedException("Unsupported SQL engine")
Expand All @@ -147,7 +148,7 @@ auto_delete AS AutoDelete
});

group.MapPost("purge",
async (IPurgeQueueFeature feature, IOptions<ResQueueOptions> options, [FromBody] PurgeQueueDto dto) =>
async (IPurgeQueueFeature feature, IDbConnectionProvider conn, [FromBody] PurgeQueueDto dto) =>
{
var result = await feature.ExecuteAsync(new PurgeQueueRequest(dto));

Expand Down
14 changes: 6 additions & 8 deletions backend/ResQueue/ResQueue/Factories/DatabaseConnectionFactory.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
using System.Data.Common;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Options;
using Npgsql;
using ResQueue.Enums;
using ResQueue.Providers.DbConnectionProvider;

namespace ResQueue.Factories;

public class DatabaseConnectionFactory(
IOptions<ResQueueOptions> options
IDbConnectionProvider conn
) : IDatabaseConnectionFactory
{
private readonly ResQueueOptions _options = options.Value;

public DbConnection CreateConnection() => _options.SqlEngine switch
public DbConnection CreateConnection() => conn.SqlEngine switch
{
ResQueueSqlEngine.Postgres => new NpgsqlConnection(_options.ConnectionString),
ResQueueSqlEngine.SqlServer => new SqlConnection(_options.ConnectionString),
_ => throw new NotSupportedException($"The SQL engine '{_options.SqlEngine}' is not supported.")
ResQueueSqlEngine.Postgres => new NpgsqlConnection(conn.ConnectionString),
ResQueueSqlEngine.SqlServer => new SqlConnection(conn.ConnectionString),
_ => throw new NotSupportedException()
};

public DbCommand CreateCommand(string commandText, DbConnection connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using ResQueue.Dtos.Messages;
using ResQueue.Enums;
using ResQueue.Factories;
using ResQueue.Providers.DbConnectionProvider;

namespace ResQueue.Features.Messages.DeleteMessages;

Expand All @@ -15,7 +16,7 @@ public record DeleteMessagesResponse();

public class DeleteMessagesFeature(
IDatabaseConnectionFactory connectionFactory,
IOptions<ResQueueOptions> options
IDbConnectionProvider conn
) : IDeleteMessagesFeature
{
public async Task<OperationResult<DeleteMessagesResponse>> ExecuteAsync(DeleteMessagesRequest request)
Expand Down Expand Up @@ -51,15 +52,15 @@ private async Task CallRoutineAsync(long messageDeliveryId, DbConnection connect
var parameters = new DynamicParameters();
string commandText;

switch (options.Value.SqlEngine)
switch (conn.SqlEngine)
{
case ResQueueSqlEngine.Postgres:
commandText = $"SELECT {options.Value.Schema}._resqueue_delete_message(@message_delivery_id)";
commandText = $"SELECT {conn.Schema}._resqueue_delete_message(@message_delivery_id)";
parameters.Add("message_delivery_id", messageDeliveryId);
break;

case ResQueueSqlEngine.SqlServer:
commandText = $"EXEC {options.Value.Schema}._ResQueue_DeleteMessage @messageDeliveryId";
commandText = $"EXEC {conn.Schema}._ResQueue_DeleteMessage @messageDeliveryId";
parameters.Add("messageDeliveryId", messageDeliveryId);
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using ResQueue.Dtos.Messages;
using ResQueue.Enums;
using ResQueue.Factories;
using ResQueue.Providers.DbConnectionProvider;

namespace ResQueue.Features.Messages.GetMessages;

Expand All @@ -17,7 +18,7 @@ PaginatedResult<MessageDeliveryDto> Messages

public class GetMessagesFeature(
IDatabaseConnectionFactory connectionFactory,
IOptions<ResQueueOptions> options
IDbConnectionProvider conn
) : IGetMessagesFeature
{
public async Task<OperationResult<GetMessagesResponse>> ExecuteAsync(GetMessagesRequest request)
Expand Down Expand Up @@ -60,19 +61,19 @@ public async Task<OperationResult<GetMessagesResponse>> ExecuteAsync(GetMessages

private string GetSqlQueryCountText()
{
return options.Value.SqlEngine switch
return conn.SqlEngine switch
{
ResQueueSqlEngine.Postgres =>
$"SELECT COUNT(*) FROM {options.Value.Schema}.message_delivery WHERE queue_id = @QueueId",
$"SELECT COUNT(*) FROM {conn.Schema}.message_delivery WHERE queue_id = @QueueId",
ResQueueSqlEngine.SqlServer =>
$"SELECT COUNT(*) FROM {options.Value.Schema}.MessageDelivery WHERE QueueId = @QueueId",
$"SELECT COUNT(*) FROM {conn.Schema}.MessageDelivery WHERE QueueId = @QueueId",
_ => throw new NotSupportedException("Unsupported SQL Engine")
};
}

private string GetSqlQueryText()
{
return options.Value.SqlEngine switch
return conn.SqlEngine switch
{
ResQueueSqlEngine.Postgres => $"""
SELECT
Expand Down Expand Up @@ -108,8 +109,8 @@ private string GetSqlQueryText()
md.max_delivery_count AS MaxDeliveryCount,
md.last_delivered AS LastDelivered,
md.transport_headers AS TransportHeaders
FROM {options.Value.Schema}.message_delivery md
LEFT JOIN {options.Value.Schema}.message m ON m.transport_message_id = md.transport_message_id
FROM {conn.Schema}.message_delivery md
LEFT JOIN {conn.Schema}.message m ON m.transport_message_id = md.transport_message_id
WHERE md.queue_id = @QueueId
ORDER BY md.message_delivery_id
LIMIT @PageSize OFFSET @Offset
Expand Down Expand Up @@ -148,8 +149,8 @@ LIMIT @PageSize OFFSET @Offset
md.MaxDeliveryCount,
md.LastDelivered,
md.TransportHeaders
FROM {options.Value.Schema}.MessageDelivery md
LEFT JOIN {options.Value.Schema}.Message m ON m.TransportMessageId = md.TransportMessageId
FROM {conn.Schema}.MessageDelivery md
LEFT JOIN {conn.Schema}.Message m ON m.TransportMessageId = md.TransportMessageId
WHERE md.QueueId = @QueueId
ORDER BY md.MessageDeliveryId
OFFSET @Offset ROWS FETCH NEXT @PageSize ROWS ONLY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using ResQueue.Dtos.Messages;
using ResQueue.Enums;
using ResQueue.Factories;
using ResQueue.Providers.DbConnectionProvider;

namespace ResQueue.Features.Messages.PurgeQueue;

Expand All @@ -15,7 +16,7 @@ public record PurgeQueueResponse();

public class PurgeQueueFeature(
IDatabaseConnectionFactory connectionFactory,
IOptions<ResQueueOptions> options
IDbConnectionProvider conn
) : IPurgeQueueFeature
{
public async Task<OperationResult<PurgeQueueResponse>> ExecuteAsync(PurgeQueueRequest request)
Expand All @@ -34,15 +35,15 @@ private async Task CallRoutineAsync(PurgeQueueRequest request, DbConnection conn
var parameters = new DynamicParameters();
string commandText;

switch (options.Value.SqlEngine)
switch (conn.SqlEngine)
{
case ResQueueSqlEngine.Postgres:
commandText = $"SELECT {options.Value.Schema}._resqueue_purge_queue_by_id(@queue_id)";
commandText = $"SELECT {conn.Schema}._resqueue_purge_queue_by_id(@queue_id)";
parameters.Add("queue_id", request.Dto.QueueId);
break;

case ResQueueSqlEngine.SqlServer:
commandText = $"EXEC {options.Value.Schema}._ResQueue_PurgeQueueById @queueId";
commandText = $"EXEC {conn.Schema}._ResQueue_PurgeQueueById @queueId";
parameters.Add("queueId", request.Dto.QueueId);
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using ResQueue.Dtos.Messages;
using ResQueue.Enums;
using ResQueue.Factories;
using ResQueue.Providers.DbConnectionProvider;

namespace ResQueue.Features.Messages.RequeueMessages;

Expand All @@ -17,7 +18,7 @@ public record RequeueMessagesResponse();

public class RequeueMessagesFeature(
IDatabaseConnectionFactory connectionFactory,
IOptions<ResQueueOptions> options
IDbConnectionProvider conn
) : IRequeueMessagesFeature
{
public async Task<OperationResult<RequeueMessagesResponse>> ExecuteAsync(RequeueMessagesRequest request)
Expand All @@ -36,11 +37,11 @@ private async Task CallRoutineAsync(RequeueMessagesRequest request, DbConnection
var parameters = new DynamicParameters();
string commandText;

switch (options.Value.SqlEngine)
switch (conn.SqlEngine)
{
case ResQueueSqlEngine.Postgres:
commandText =
$"SELECT {options.Value.Schema}.requeue_messages(@queue_name, @source_queue_type, @target_queue_type, @message_count, @delay::text::interval, @redelivery_count)";
$"SELECT {conn.Schema}.requeue_messages(@queue_name, @source_queue_type, @target_queue_type, @message_count, @delay::text::interval, @redelivery_count)";
parameters.Add("queue_name", request.Dto.QueueName);
parameters.Add("source_queue_type", request.Dto.SourceQueueType);
parameters.Add("target_queue_type", request.Dto.TargetQueueType);
Expand All @@ -51,7 +52,7 @@ private async Task CallRoutineAsync(RequeueMessagesRequest request, DbConnection

case ResQueueSqlEngine.SqlServer:
commandText =
$"EXEC {options.Value.Schema}.RequeueMessages @queueName, @sourceQueueType, @targetQueueType, @messageCount, @delay, @redeliveryCount";
$"EXEC {conn.Schema}.RequeueMessages @queueName, @sourceQueueType, @targetQueueType, @messageCount, @delay, @redeliveryCount";
parameters.Add("queueName", request.Dto.QueueName);
parameters.Add("sourceQueueType", request.Dto.SourceQueueType);
parameters.Add("targetQueueType", request.Dto.TargetQueueType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using ResQueue.Dtos.Messages;
using ResQueue.Enums;
using ResQueue.Factories;
using ResQueue.Providers.DbConnectionProvider;

namespace ResQueue.Features.Messages.RequeueSpecificMessages;

Expand All @@ -17,7 +18,7 @@ public record RequeueSpecificMessagesResponse();

public class RequeueSpecificMessagesFeature(
IDatabaseConnectionFactory connectionFactory,
IOptions<ResQueueOptions> options
IDbConnectionProvider conn
) : IRequeueSpecificMessagesFeature
{
public async Task<OperationResult<RequeueSpecificMessagesResponse>> ExecuteAsync(
Expand Down Expand Up @@ -57,11 +58,11 @@ private async Task CallRoutineAsync(RequeueSpecificMessagesRequest request, long
var parameters = new DynamicParameters();
string commandText;

switch (options.Value.SqlEngine)
switch (conn.SqlEngine)
{
case ResQueueSqlEngine.Postgres:
commandText =
$"SELECT {options.Value.Schema}.requeue_message(@message_delivery_id, @target_queue_type, @delay::text::interval, @redelivery_count)";
$"SELECT {conn.Schema}.requeue_message(@message_delivery_id, @target_queue_type, @delay::text::interval, @redelivery_count)";
parameters.Add("message_delivery_id", deliveryMessageId);
parameters.Add("target_queue_type", request.Dto.TargetQueueType);
parameters.Add("delay", request.Dto.Delay);
Expand All @@ -70,7 +71,7 @@ private async Task CallRoutineAsync(RequeueSpecificMessagesRequest request, long

case ResQueueSqlEngine.SqlServer:
commandText =
$"EXEC {options.Value.Schema}.RequeueMessage @messageDeliveryId, @targetQueueType, @delay, @redeliveryCount";
$"EXEC {conn.Schema}.RequeueMessage @messageDeliveryId, @targetQueueType, @delay, @redeliveryCount";
parameters.Add("messageDeliveryId", deliveryMessageId);
parameters.Add("targetQueueType", request.Dto.TargetQueueType);
parameters.Add("delay", request.Dto.Delay);
Expand Down
Loading

0 comments on commit 16387d9

Please sign in to comment.