Skip to content

Commit

Permalink
feat: optimizing delay tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
ifooth committed Nov 26, 2024
1 parent 946c18b commit c6758e0
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
4 changes: 4 additions & 0 deletions bcs-common/common/task/brokers/etcd/delay_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (b *etcdBroker) handleDelayTask(ctx context.Context) {
return taskList[i].eta.Before(taskList[j].eta)
})

// 异步任务随时可能插入, 最多处理1分钟后重新获取任务列表(aka 异步任务到期后, 最多延迟1分钟放到pending队列)
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

for _, task := range taskList {
// 超时控制
select {
Expand Down
13 changes: 9 additions & 4 deletions bcs-common/common/task/brokers/etcd/delay_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,24 @@ func TestMakeDelayTask(t *testing.T) {
}{
{
name: "delayed_task1",
input: "/machinery/v2/broker/delayed_tasks/eta-0/machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
input: "/machinery/v2/broker/delayed_tasks/eta-1/machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
taskKey: "machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
eta: time.UnixMilli(0),
eta: time.UnixMilli(1),
},
{
name: "delayed_task1",
name: "delayed_task2",
input: "/machinery/v2/broker/delayed_tasks/eta-0/machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
taskKey: "machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
eta: time.UnixMilli(0),
},
{
name: "delayed_task3",
input: "/machinery/v2/broker/delayed_tasks/eta-1732356480593/machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
taskKey: "machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
eta: time.UnixMilli(1732356480593),
},
{
name: "delayed_task4",
input: "/machinery/v2/broker/delayed_tasks/eta-1732356480583/machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
taskKey: "machinery_tasks/d30986b4-6634-4013-bf56-88c0463450c2-test-0",
eta: time.UnixMilli(1732356480583),
Expand All @@ -54,7 +60,6 @@ func TestMakeDelayTask(t *testing.T) {
kv := &mvccpb.KeyValue{Key: []byte(tt.input)}
task, err := makeDelayTask(kv)
require.NoError(t, err)

assert.Equal(t, tt.input, task.key)
assert.Equal(t, tt.taskKey, task.taskKey)
assert.Equal(t, tt.eta, task.eta)
Expand Down

0 comments on commit c6758e0

Please sign in to comment.