Skip to content

Commit

Permalink
message consumer add retry failed action
Browse files Browse the repository at this point in the history
  • Loading branch information
wanlitao committed Jul 21, 2022
1 parent 310fb3b commit 39aa3d9
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<Version>1.2.4</Version>
<AssemblyVersion>1.2.4.0</AssemblyVersion>
<FileVersion>1.2.4.0</FileVersion>
<Version>1.2.5</Version>
<AssemblyVersion>1.2.5.0</AssemblyVersion>
<FileVersion>1.2.5.0</FileVersion>
<Authors>wanlitao</Authors>
<Description>RocketMQ XTask DependencyInjection by HEF</Description>
<Company>GreatBillows</Company>
Expand Down
23 changes: 14 additions & 9 deletions src/HEF.XTask.RocketMQ/Consumer/RocketTaskMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,32 @@ public async Task<bool> Consume(MQTypedMessage<MessageExt, RocketMessage<TMessag
if (result) //延迟任务执行成功,无需其它操作
return true;

if (!scheduleContext.CheckStartRetry()) //没有重试 则直接通知任务执行失败
if (!scheduleContext.IsRetrying()) //首次执行失败 通知任务执行失败
{
await OnConsumeFailed(typedMessage.Message, rocketMessage);
return true;
}

if (scheduleContext.IsRetryEnd()) //重试结束后依然执行失败 才通知任务执行失败
if (scheduleContext.CheckStartRetry())
{
await OnConsumeFailed(typedMessage.Message, rocketMessage);
return true;
if (scheduleContext.IsRetryEnd()) //重试结束后依然执行失败 通知任务重试失败
{
await OnRetryFailed(typedMessage.Message, rocketMessage);
}
else
{
//任务执行失败 进行重试
var retryTask = new XRocketTask<TMessageBody>(rocketMessage);
RocketTaskScheduler.Schedule(retryTask, rocketMessage.Context);
}
}

//任务执行失败 进行重试
var retryTask = new XRocketTask<TMessageBody>(rocketMessage);
RocketTaskScheduler.Schedule(retryTask, rocketMessage.Context);

return true;
}

protected abstract Task<bool> Consume(MessageExt messageExt, RocketMessage<TMessageBody> rocketMessage);

protected abstract Task OnConsumeFailed(MessageExt messageExt, RocketMessage<TMessageBody> rocketMessage);

protected abstract Task OnRetryFailed(MessageExt messageExt, RocketMessage<TMessageBody> rocketMessage);
}
}
6 changes: 3 additions & 3 deletions src/HEF.XTask.RocketMQ/HEF.XTask.RocketMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<Version>1.2.4</Version>
<AssemblyVersion>1.2.4.0</AssemblyVersion>
<FileVersion>1.2.4.0</FileVersion>
<Version>1.2.5</Version>
<AssemblyVersion>1.2.5.0</AssemblyVersion>
<FileVersion>1.2.5.0</FileVersion>
<Authors>wanlitao</Authors>
<Description>RocketMQ XTask by HEF</Description>
<Company>GreatBillows</Company>
Expand Down
6 changes: 3 additions & 3 deletions src/HEF.XTask/HEF.XTask.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<Version>1.2.4</Version>
<AssemblyVersion>1.2.4.0</AssemblyVersion>
<FileVersion>1.2.4.0</FileVersion>
<Version>1.2.5</Version>
<AssemblyVersion>1.2.5.0</AssemblyVersion>
<FileVersion>1.2.5.0</FileVersion>
<Authors>wanlitao</Authors>
<Description>XTask by HEF</Description>
<Company>GreatBillows</Company>
Expand Down

0 comments on commit 39aa3d9

Please sign in to comment.