From fc5d159b0293a80cf74744c5ec2575a54fb01636 Mon Sep 17 00:00:00 2001 From: Yuriy Losev Date: Thu, 9 Nov 2023 18:57:55 +0400 Subject: [PATCH] Fix race condition (#552) * Avoid data race --- pkg/task/task.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/task/task.go b/pkg/task/task.go index 7dd38efe..95b875f2 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -2,6 +2,7 @@ package task import ( "fmt" + "sync" "time" uuid "github.com/gofrs/uuid/v5" @@ -41,8 +42,10 @@ type BaseTask struct { QueueName string QueuedAt time.Time + Props map[string]interface{} + + lock sync.RWMutex Metadata interface{} - Props map[string]interface{} } func NewTask(taskType TaskType) *BaseTask { @@ -67,7 +70,9 @@ func (t *BaseTask) WithQueueName(name string) *BaseTask { } func (t *BaseTask) WithMetadata(metadata interface{}) *BaseTask { + t.lock.Lock() t.Metadata = metadata + t.lock.Unlock() return t } @@ -97,11 +102,15 @@ func (t *BaseTask) WithQueuedAt(queuedAt time.Time) Task { } func (t *BaseTask) GetMetadata() interface{} { + t.lock.RLock() + defer t.lock.RUnlock() return t.Metadata } func (t *BaseTask) UpdateMetadata(meta interface{}) { + t.lock.Lock() t.Metadata = meta + t.lock.Unlock() } func (t *BaseTask) GetProp(key string) interface{} { @@ -126,9 +135,11 @@ func (t *BaseTask) UpdateFailureMessage(msg string) { func (t *BaseTask) GetDescription() string { metaDescription := "" + t.lock.RLock() if descriptor, ok := t.Metadata.(MetadataDescriptable); ok { metaDescription = ":" + descriptor.GetDescription() } + t.lock.RUnlock() failDescription := "" if t.FailureCount > 0 { failDescription = fmt.Sprintf(":failures %d", t.FailureCount)