Skip to content

Commit 392ee20

Browse files
Lara Araujocrazycs520
authored and
GitHub Enterprise
committed
fix issue that store's liveness may incorrectly marked as unreachable when the store restarts with label changed (tikv#1402) (tikv#1417) (tikv#8)
* fix issue that store's liveness may incorrectly marked as unreachable when the store restarts with label changed (tikv#1407) --------- * fix * fix ci --------- Signed-off-by: crazycs520 <[email protected]> Co-authored-by: crazycs <[email protected]>
1 parent 08f655d commit 392ee20

File tree

3 files changed

+63
-16
lines changed

3 files changed

+63
-16
lines changed

integration_tests/pd_api_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
212212
// Try to get the minimum resolved timestamp of the cluster from PD.
213213
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
214214
retryCount = 0
215-
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
215+
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == 0 {
216216
time.Sleep(100 * time.Millisecond)
217217
if retryCount > 5 {
218218
break

internal/locate/region_cache.go

+20-15
Original file line numberDiff line numberDiff line change
@@ -2691,14 +2691,25 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
26912691
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
26922692
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
26932693
newStore.livenessState = atomic.LoadUint32(&s.livenessState)
2694-
newStore.unreachableSince = s.unreachableSince
2694+
if newStore.getLivenessState() != reachable {
2695+
newStore.unreachableSince = s.unreachableSince
2696+
go newStore.checkUntilHealth(c, newStore.getLivenessState(), storeReResolveInterval)
2697+
}
26952698
c.storeMu.Lock()
26962699
if s.addr == addr {
26972700
newStore.slowScore = s.slowScore
26982701
}
26992702
c.storeMu.stores[newStore.storeID] = newStore
27002703
c.storeMu.Unlock()
27012704
s.setResolveState(deleted)
2705+
logutil.BgLogger().Info("store address or labels changed, add new store and mark old store deleted",
2706+
zap.Uint64("store", s.storeID),
2707+
zap.String("old-addr", s.addr),
2708+
zap.Any("old-labels", s.labels),
2709+
zap.String("old-liveness", s.getLivenessState().String()),
2710+
zap.String("new-addr", newStore.addr),
2711+
zap.Any("new-labels", newStore.labels),
2712+
zap.String("new-liveness", newStore.getLivenessState().String()))
27022713
return false, nil
27032714
}
27042715
s.changeResolveStateTo(needCheck, resolved)
@@ -2850,6 +2861,8 @@ func (s livenessState) String() string {
28502861
}
28512862
}
28522863

2864+
var storeReResolveInterval = 30 * time.Second
2865+
28532866
func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
28542867
// This mechanism doesn't support non-TiKV stores currently.
28552868
if s.storeType != tikvrpc.TiKV {
@@ -2861,7 +2874,7 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt
28612874
// It may be already started by another thread.
28622875
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
28632876
s.unreachableSince = time.Now()
2864-
reResolveInterval := 30 * time.Second
2877+
reResolveInterval := storeReResolveInterval
28652878
if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil {
28662879
if dur, err := time.ParseDuration(val.(string)); err == nil {
28672880
reResolveInterval = dur
@@ -2889,26 +2902,18 @@ func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResol
28892902
case <-c.ctx.Done():
28902903
return
28912904
case <-ticker.C:
2905+
if s.getResolveState() == deleted {
2906+
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String()))
2907+
return
2908+
}
28922909
if time.Since(lastCheckPDTime) > reResolveInterval {
28932910
lastCheckPDTime = time.Now()
28942911

28952912
valid, err := s.reResolve(c)
28962913
if err != nil {
28972914
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
28982915
} else if !valid {
2899-
if s.getResolveState() == deleted {
2900-
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
2901-
c.storeMu.RLock()
2902-
newStore := c.storeMu.stores[s.storeID]
2903-
c.storeMu.RUnlock()
2904-
logutil.BgLogger().Info("[health check] store meta changed",
2905-
zap.Uint64("storeID", s.storeID),
2906-
zap.String("oldAddr", s.addr),
2907-
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
2908-
zap.String("newAddr", newStore.addr),
2909-
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
2910-
go newStore.checkUntilHealth(c, liveness, reResolveInterval)
2911-
}
2916+
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String()))
29122917
return
29132918
}
29142919
}

internal/locate/region_cache_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -1982,3 +1982,45 @@ func (s *testRegionCacheSuite) TestScanRegionsWithGaps() {
19821982
})
19831983
s.Equal(scanRegionRes, regions)
19841984
}
1985+
1986+
func (s *testRegionCacheSuite) TestIssue1401() {
1987+
// init region cache
1988+
s.cache.LocateKey(s.bo, []byte("a"))
1989+
1990+
store1 := s.cache.getStoreByStoreID(s.store1)
1991+
s.Require().NotNil(store1)
1992+
s.Require().Equal(resolved, store1.getResolveState())
1993+
// change store1 label.
1994+
labels := store1.labels
1995+
labels = append(labels, &metapb.StoreLabel{Key: "host", Value: "0.0.0.0:20161"})
1996+
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr, labels...)
1997+
1998+
// mark the store is unreachable and need check.
1999+
atomic.StoreUint32(&store1.livenessState, uint32(unreachable))
2000+
store1.setResolveState(needCheck)
2001+
2002+
// setup mock liveness func
2003+
tf := func(s *Store, bo *retry.Backoffer) livenessState {
2004+
return reachable
2005+
}
2006+
s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
2007+
2008+
// start health check loop
2009+
go store1.checkUntilHealth(s.cache, unreachable, time.Second*30)
2010+
2011+
// mock asyncCheckAndResolveLoop worker to check and resolve store.
2012+
s.cache.checkAndResolve(nil, func(s *Store) bool {
2013+
return s.getResolveState() == needCheck
2014+
})
2015+
2016+
// assert that the old store should be deleted.
2017+
s.Eventually(func() bool {
2018+
return store1.getResolveState() == deleted
2019+
}, 3*time.Second, time.Second)
2020+
// assert the new store should be added and it should be resolved and reachable.
2021+
newStore1 := s.cache.getStoreByStoreID(s.store1)
2022+
s.Eventually(func() bool {
2023+
return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable
2024+
}, 3*time.Second, time.Second)
2025+
s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161"))
2026+
}

0 commit comments

Comments
 (0)