Skip to content

Commit

Permalink
fix mco status update race condition (#1265)
Browse files Browse the repository at this point in the history
Signed-off-by: Thibault Mange <[email protected]>
  • Loading branch information
thibaultmg authored Jan 25, 2024
1 parent f54dbda commit 61cfcab
Showing 1 changed file with 61 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"time"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -22,59 +23,74 @@ import (
)

var (
stopStatusUpdate = make(chan struct{})
stopCheckReady = make(chan struct{})
requeueStatusUpdate = make(chan struct{})
updateStatusIsRunnning = false
updateReadyStatusIsRunnning = false
stopStatusUpdate = make(chan struct{})
stopCheckReady = make(chan struct{})
requeueStatusUpdate = make(chan struct{})
updateStatusIsRunnning = false
muUpdateStatusIsRunnning sync.Mutex
updateReadyStatusIsRunnning = false
muUpdateReadyStatusIsRunnning sync.Mutex
)

// Start goroutines to update MCO status
func StartStatusUpdate(c client.Client, instance *mcov1beta2.MultiClusterObservability) {
if !updateStatusIsRunnning {
go func() {
updateStatusIsRunnning = true
// defer close(stopStatusUpdate)
// defer close(requeueStatusUpdate)
for {
select {
case <-stopStatusUpdate:
updateStatusIsRunnning = false
close(stopCheckReady)
log.V(1).Info("status update goroutine is stopped.")
return
case <-requeueStatusUpdate:
log.V(1).Info("status update goroutine is triggered.")
updateStatus(c)
if updateReadyStatusIsRunnning && checkReadyStatus(c, instance) {
log.V(1).Info("send singal to stop status check ready goroutine because MCO status is ready")
stopCheckReady <- struct{}{}
}
muUpdateStatusIsRunnning.Lock()
if updateStatusIsRunnning {
muUpdateStatusIsRunnning.Unlock()
return
}
updateStatusIsRunnning = true
muUpdateStatusIsRunnning.Unlock()

// init the stop ready check channel
stopCheckReady = make(chan struct{})

go func() {
for {
select {
case <-stopStatusUpdate:
muUpdateStatusIsRunnning.Lock()
updateStatusIsRunnning = false
muUpdateStatusIsRunnning.Unlock()
close(stopCheckReady)
log.V(1).Info("status update goroutine is stopped.")
return
case <-requeueStatusUpdate:
log.V(1).Info("status update goroutine is triggered.")
updateStatus(c)
if updateReadyStatusIsRunnning && checkReadyStatus(c, instance) {
log.V(1).Info("send singal to stop status check ready goroutine because MCO status is ready")
stopCheckReady <- struct{}{}
}
}
}()
if !updateReadyStatusIsRunnning {
// init the stop ready check channel
stopCheckReady = make(chan struct{})
go func() {
updateReadyStatusIsRunnning = true
// defer close(stopCheckReady)
for {
select {
case <-stopCheckReady:
updateReadyStatusIsRunnning = false
log.V(1).Info("check status ready goroutine is stopped.")
return
case <-time.After(2 * time.Second):
log.V(1).Info("check status ready goroutine is triggered.")
if checkReadyStatus(c, instance) {
requeueStatusUpdate <- struct{}{}
}
}
}
}()
}
}()

muUpdateReadyStatusIsRunnning.Lock()
if updateReadyStatusIsRunnning {
muUpdateReadyStatusIsRunnning.Unlock()
return
}
updateReadyStatusIsRunnning = true
muUpdateReadyStatusIsRunnning.Unlock()

go func() {
for {
select {
case <-stopCheckReady:
muUpdateReadyStatusIsRunnning.Lock()
updateReadyStatusIsRunnning = false
muUpdateReadyStatusIsRunnning.Unlock()
log.V(1).Info("check status ready goroutine is stopped.")
return
case <-time.After(2 * time.Second):
log.V(1).Info("check status ready goroutine is triggered.")
if checkReadyStatus(c, instance) {
requeueStatusUpdate <- struct{}{}
}
}
}
}()
}

// updateStatus override UpdateStatus interface
Expand Down

0 comments on commit 61cfcab

Please sign in to comment.