Skip to content

Commit

Permalink
Merge branch 'main' of github.com:isdaniel/MessageWorkerPool
Browse files Browse the repository at this point in the history
  • Loading branch information
isdaniel committed Jan 5, 2025
2 parents ef6e462 + a0e2d4f commit cb5ccae
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/MessageWorkerPool/MessageWorkerPool.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<RepositoryType>git</RepositoryType>
<PackageProjectUrl>https://github.com/isdaniel/MessageWorkerPool</PackageProjectUrl>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<Version>0.0.0.8</Version>
<Version>0.0.0.9</Version>
<PackageReleaseNotes>Support for .net standerd 2.0, multiple-processes Messaging architecture extension</PackageReleaseNotes>
<PackageReadmeFile>README.md</PackageReadmeFile>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
Expand Down
42 changes: 26 additions & 16 deletions src/MessageWorkerPool/RabbitMq/RabbitMqWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class RabbitMqWorker : IWorker
public RabbitMqSetting Setting { get; }
protected AsyncEventHandler<BasicDeliverEventArgs> ReceiveEvent;
private AsyncEventingBasicConsumer _consumer;
volatile int _messageCount = 0;
internal IModel Channel { get; private set; }
protected IProcessWrapper Process { get; private set; }
private readonly WorkerPoolSetting _workerSetting;
Expand Down Expand Up @@ -113,8 +114,9 @@ public virtual async Task InitWorkerAsync(CancellationToken token)
//it should return, if the worker are processing GracefulShutDown.
return;
}

Interlocked.Increment(ref _messageCount);
await ProcessingMessage(e, correlationId, token).ConfigureAwait(false);
Interlocked.Decrement(ref _messageCount);
}
};
_consumer.Received += ReceiveEvent;
Expand Down Expand Up @@ -253,18 +255,28 @@ protected virtual async Task GracefulReleaseAsync(CancellationToken token)

public async Task GracefulShutDownAsync(CancellationToken token)
{
Logger.LogInformation("Executing GracefulShutDownAsync!");
Status = WorkerStatus.Stopping;

if (ReceiveEvent != null)
using (Logger.BeginScope($"[Pid: {Process.Id}]"))
{
_consumer.Received -= ReceiveEvent;
ReceiveEvent = null;
}
Logger.LogInformation("Executing GracefulShutDownAsync!");
Status = WorkerStatus.Stopping;

while (Interlocked.CompareExchange(ref _messageCount, 0, 0) != 0)
{
Logger.LogInformation($"Waiting for all messages to be processed. Current messageCount: {_messageCount}");
await Task.Delay(100, token).ConfigureAwait(false);
}

if (ReceiveEvent != null)
{
_consumer.Received -= ReceiveEvent;
ReceiveEvent = null;
}

CloseProcess();
Status = WorkerStatus.Stopped;
await GracefulReleaseAsync(token);
CloseProcess();
Status = WorkerStatus.Stopped;
await GracefulReleaseAsync(token);
}

this.Dispose();
Logger.LogInformation("RabbitMQ Conn Closed!!!!");
}
Expand All @@ -273,11 +285,9 @@ private void CloseProcess()
{
//Sending close message
Process.StandardInput.WriteLine(MessageCommunicate.CLOSED_SIGNAL);
using (Logger.BeginScope($"[Pid: {Process.Id}]")) {
Logger.LogInformation($"Begin WaitForExit free resource....");
Process.WaitForExit();
Logger.LogInformation($"End WaitForExit and free resource....");
}
Logger.LogInformation($"Begin WaitForExit free resource....");
Process.WaitForExit();
Logger.LogInformation($"End WaitForExit and free resource....");
}

public void Dispose()
Expand Down
3 changes: 1 addition & 2 deletions src/MessageWorkerPool/WorkerPoolService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ protected override async Task ExecuteAsync(CancellationToken token)
_workerPools.Add(workerPool);
}

token.WaitHandle.WaitOne();
_logger.LogInformation("ExecuteAsync Finish!");
_logger.LogInformation("WorkerPool initialization Finish!");
}

public override async Task StopAsync(CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public async Task DoWorkAsync(Func<MessageInputTask, Task<MessageOutputTask>> pr
{
Console.Error.WriteLine($"Worker occur unexpected error: {ex.ToString()}");
//we could requeue to another queue (error queue..ect), when we support rpc.
Console.WriteLine(new MessageOutputTask() {
Message = ex.Message,
Status = MessageStatus.MESSAGE_DONE
}.ToJson());
// Console.WriteLine(new MessageOutputTask() {
// Message = ex.Message,
// Status = MessageStatus.MESSAGE_DONE
// }.ToJson());
}
}
}
Expand Down

0 comments on commit cb5ccae

Please sign in to comment.