Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
filipbekic01 committed Sep 25, 2024
1 parent 9b2a815 commit 8acb11f
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 198 deletions.
3 changes: 2 additions & 1 deletion backend/ResQueue/ResQueue/Dtos/QueueDto.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ public class QueueDto
{
public string Id { get; set; } = null!;
public string RawData { get; set; } = null!;
public long TotalMessages { get; set; }
public int TotalMessages { get; set; }
public int Messages { get; set; }
public bool IsFavorite { get; set; }
public DateTime CreatedAt { get; set; }
}
5 changes: 3 additions & 2 deletions backend/ResQueue/ResQueue/Endpoints/QueueEndpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void MapQueueEndpoints(this IEndpointRouteBuilder routes)
search = search.Trim();
sortField = new[]
{
"inbox", "name", "messages"
"name", "messages"
}.Contains(sortField)
? sortField
: null;
Expand Down Expand Up @@ -116,7 +116,7 @@ public static void MapQueueEndpoints(this IEndpointRouteBuilder routes)
? Builders<Queue>.Sort.Ascending(x => x.RawData[sortField])
: Builders<Queue>.Sort.Descending(x => x.RawData[sortField]);

if (sortField.Equals("inbox", StringComparison.CurrentCultureIgnoreCase))
if (sortField.Equals("messages", StringComparison.CurrentCultureIgnoreCase))
{
secondarySort = sortOrder == 1
? Builders<Queue>.Sort.Ascending(x => x.TotalMessages)
Expand Down Expand Up @@ -146,6 +146,7 @@ public static void MapQueueEndpoints(this IEndpointRouteBuilder routes)
Id = x.Id.ToString(),
RawData = JsonHelper.ConvertBsonToJson(x.RawData),
TotalMessages = x.TotalMessages,
Messages = x.Messages,
IsFavorite = x.IsFavorite,
CreatedAt = x.CreatedAt
}).ToList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public async Task<OperationResult<SyncBrokerFeatureResponse>> ExecuteAsync(SyncB
if (!queue.RawData.Equals(newRawData))
{
queue.RawData = newRawData;
queue.TotalMessages = queue.Messages + queue.RawData["messages"].AsInt32;
queuesToUpdate.Add(queue);
}
}
Expand All @@ -112,7 +113,8 @@ public async Task<OperationResult<SyncBrokerFeatureResponse>> ExecuteAsync(SyncB
// Delete queue
foreach (var queue in queues)
{
if (queue.TotalMessages > 0)
// Skip if local messages exist
if (queue.Messages > 0)
{
continue;
}
Expand Down Expand Up @@ -206,7 +208,9 @@ public async Task<OperationResult<SyncBrokerFeatureResponse>> ExecuteAsync(SyncB
foreach (var queue in queuesToUpdate)
{
var updateFilter = Builders<Queue>.Filter.Eq(q => q.Id, queue.Id);
var updateDefinition = Builders<Queue>.Update.Set(q => q.RawData, queue.RawData);
var updateDefinition = Builders<Queue>.Update
.Set(q => q.RawData, queue.RawData)
.Set(q => q.TotalMessages, queue.TotalMessages);

var updateOneModel = new UpdateOneModel<Queue>(updateFilter, updateDefinition);
bulkOperations.Add(updateOneModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ public async Task<OperationResult<ArchiveMessagesFeatureResponse>> ExecuteAsync(
});
}

var queueId = ObjectId.Parse(request.Dto.QueueId);

var filter = Builders<Message>.Filter.And(
Builders<Message>.Filter.In(m => m.Id, request.Dto.Ids.Select(ObjectId.Parse)),
Builders<Message>.Filter.Eq(m => m.QueueId, ObjectId.Parse(request.Dto.QueueId)),
Builders<Message>.Filter.Eq(m => m.QueueId, queueId),
Builders<Message>.Filter.Eq(m => m.UserId, user.Id)
);

Expand All @@ -46,13 +48,42 @@ public async Task<OperationResult<ArchiveMessagesFeatureResponse>> ExecuteAsync(
using var session = await mongoClient.StartSessionAsync();
session.StartTransaction();

// Update many messages in the messages collection
var result = await messagesCollection.UpdateManyAsync(session, filter, update);

await queuesCollection.UpdateOneAsync(session, x => x.Id == ObjectId.Parse(request.Dto.QueueId),
Builders<Queue>.Update.Inc(x => x.TotalMessages, -result.ModifiedCount));
// Define the update pipeline
var updatePipeline = new[]
{
// First stage: Decrement Messages and ensure it's not less than 0
new BsonDocument("$set", new BsonDocument
{
{
"Messages", new BsonDocument("$max", new BsonArray
{
0,
new BsonDocument("$subtract", new BsonArray { "$Messages", result.ModifiedCount })
})
}
}),
// Second stage: Update TotalMessages using the updated Messages and RawData.messages
new BsonDocument("$set", new BsonDocument
{
{
"TotalMessages", new BsonDocument("$max", new BsonArray
{
0,
new BsonDocument("$add", new BsonArray { "$Messages", "$RawData.messages" })
})
}
})
};

await queuesCollection.UpdateOneAsync(session, x => x.Id == ObjectId.Parse(request.Dto.QueueId),
Builders<Queue>.Update.Max(x => x.TotalMessages, 0));
// Apply the update pipeline to the queues collection
await queuesCollection.UpdateOneAsync(
session,
x => x.Id == queueId,
Builders<Queue>.Update.Pipeline(updatePipeline)
);

await session.CommitTransactionAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ public record CloneMessagesFeatureResponse();

public class CloneMessageFeature(
UserManager<User> userManager,
IMongoCollection<Message> messagesCollection
IMongoCollection<Message> messagesCollection,
IMongoCollection<Queue> queuesCollection,
IMongoClient mongoClient
) : ICloneMessageFeature
{
public async Task<OperationResult<CloneMessagesFeatureResponse>> ExecuteAsync(CloneMessagesFeatureRequest request)
Expand Down Expand Up @@ -49,7 +51,41 @@ public async Task<OperationResult<CloneMessagesFeatureResponse>> ExecuteAsync(Cl
message.UpdatedAt = null;
message.CreatedAt = DateTime.UtcNow;

await messagesCollection.InsertOneAsync(message);
using var session = await mongoClient.StartSessionAsync();
session.StartTransaction();

// Insert the message into the messages collection
await messagesCollection.InsertOneAsync(session, message);

// Define the update pipeline
var updatePipeline = new[]
{
// First stage: Increment Messages by 1
new BsonDocument("$set", new BsonDocument
{
{ "Messages", new BsonDocument("$add", new BsonArray { "$Messages", 1 }) }
}),
// Second stage: Update TotalMessages using updated Messages and RawData.messages
new BsonDocument("$set", new BsonDocument
{
{
"TotalMessages", new BsonDocument("$max", new BsonArray
{
0,
new BsonDocument("$add", new BsonArray { "$Messages", "$RawData.messages" })
})
}
})
};

// Apply the update pipeline to the queues collection
await queuesCollection.UpdateOneAsync(
session,
x => x.Id == message.QueueId,
Builders<Queue>.Update.Pipeline(updatePipeline)
);

await session.CommitTransactionAsync();

return OperationResult<CloneMessagesFeatureResponse>.Success(new CloneMessagesFeatureResponse());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,36 @@ public async Task<OperationResult<CreateMessageFeatureResponse>> ExecuteAsync(
using var session = await mongoClient.StartSessionAsync();
session.StartTransaction();

// Insert the message into the messages collection
await messagesCollection.InsertOneAsync(session, message);
await queuesCollection.UpdateOneAsync(session, x => x.Id == queue.Id,
Builders<Queue>.Update.Inc(x => x.TotalMessages, 1));

// Define the update pipeline with multiple stages
var updatePipeline = new[]
{
// First stage: Increment Messages by 1
new BsonDocument("$set", new BsonDocument
{
{ "Messages", new BsonDocument("$add", new BsonArray { "$Messages", 1 }) }
}),
// Second stage: Update TotalMessages using the updated value of Messages
new BsonDocument("$set", new BsonDocument
{
{
"TotalMessages", new BsonDocument("$max", new BsonArray
{
0,
new BsonDocument("$add", new BsonArray { "$Messages", "$RawData.messages" })
})
}
})
};

// Apply the update pipeline to the queue
await queuesCollection.UpdateOneAsync(
session,
x => x.Id == message.QueueId,
Builders<Queue>.Update.Pipeline(updatePipeline)
);

await session.CommitTransactionAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,44 @@ public async Task<OperationResult<SyncMessagesFeatureResponse>> ExecuteAsync(Syn
using var session = await mongoClient.StartSessionAsync();
session.StartTransaction();

// Insert the message into the messages collection
await messagesCollection.InsertOneAsync(session, message);

await queuesCollection.UpdateOneAsync(session, x => x.Id == queue.Id,
Builders<Queue>.Update.Inc(x => x.RawData["messages"], -1));

await queuesCollection.UpdateOneAsync(session, x => x.Id == queue.Id,
Builders<Queue>.Update.Max(x => x.RawData["messages"], 0));

await queuesCollection.UpdateOneAsync(session, x => x.Id == queue.Id,
Builders<Queue>.Update.Inc(x => x.TotalMessages, 1));
// Prepare the update pipeline
var updatePipeline = new[]
{
// First stage: Update RawData.messages and Messages
new BsonDocument("$set", new BsonDocument
{
{
"RawData.messages", new BsonDocument("$max", new BsonArray
{
0,
new BsonDocument("$subtract", new BsonArray { "$RawData.messages", 1 })
})
},
{ "Messages", new BsonDocument("$add", new BsonArray { "$Messages", 1 }) }
}),
// Second stage: Update TotalMessages using updated values
new BsonDocument("$set", new BsonDocument
{
{
"TotalMessages", new BsonDocument("$max", new BsonArray
{
0,
new BsonDocument("$add", new BsonArray
{
"$Messages",
"$RawData.messages"
})
})
}
})
};

// Apply the update to the queue
await queuesCollection.UpdateOneAsync(session, x => x.Id == message.QueueId,
Builders<Queue>.Update.Pipeline(updatePipeline));

await session.CommitTransactionAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,45 @@ public async Task StartAsync(CancellationToken cancellationToken)
{
await using var scope = serviceProvider.CreateAsyncScope();
var collection = scope.ServiceProvider.GetRequiredService<IMongoCollection<BrokerInvitation>>();
var queuesCollection = scope.ServiceProvider.GetRequiredService<IMongoCollection<Queue>>();

await CreateBrokerInvitationTtlIndex(collection, cancellationToken);
await CreateBrokerInvitationIndexes(collection, cancellationToken);
await CreateQueueIndexes(queuesCollection, cancellationToken);
}

private static async Task CreateBrokerInvitationTtlIndex(
private static async Task CreateQueueIndexes(
IMongoCollection<Queue> collection,
CancellationToken cancellationToken
)
{
// Add index on TotalMessages
var totalMessagesIndex = Builders<Queue>.IndexKeys.Ascending(q => q.TotalMessages);
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<Queue>(totalMessagesIndex),
cancellationToken: cancellationToken
);

// Add index on IsFavorite
var isFavoriteIndex = Builders<Queue>.IndexKeys.Ascending(q => q.IsFavorite);
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<Queue>(isFavoriteIndex),
cancellationToken: cancellationToken
);

// Add index on BrokerId
var brokerIdIndex = Builders<Queue>.IndexKeys.Ascending(q => q.BrokerId);
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<Queue>(brokerIdIndex),
cancellationToken: cancellationToken
);
}

private static async Task CreateBrokerInvitationIndexes(
IMongoCollection<BrokerInvitation> collection,
CancellationToken cancellationToken
)
{
// Add index on ExpiresAt
var indexKeysDefinition = Builders<BrokerInvitation>.IndexKeys.Ascending(x => x.ExpiresAt);
var indexOptions = new CreateIndexOptions { ExpireAfter = TimeSpan.Zero };
var indexModel = new CreateIndexModel<BrokerInvitation>(indexKeysDefinition, indexOptions);
Expand Down
1 change: 1 addition & 0 deletions backend/ResQueue/ResQueue/Models/Queue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class Queue
[BsonId] public ObjectId Id { get; set; }
public ObjectId BrokerId { get; set; }
public int TotalMessages { get; set; }
public int Messages { get; set; }
public bool IsFavorite { get; set; }
public BsonDocument RawData { get; set; } = null!;
public DateTime CreatedAt { get; set; }
Expand Down
Loading

0 comments on commit 8acb11f

Please sign in to comment.