From 000fc5f1ec6547f95118f5d81d810fc8dc7f0f1f Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 25 Dec 2024 14:21:35 +0900 Subject: [PATCH] check commit ts in oracle Signed-off-by: you06 --- integration_tests/store_test.go | 4 +-- oracle/oracles/pd.go | 24 +++++++++++++++--- tikv/kv.go | 44 +++++++++++++++++---------------- txnkv/transaction/2pc.go | 16 +++--------- txnkv/transaction/test_probe.go | 3 ++- util/misc.go | 13 ++++++++++ 6 files changed, 65 insertions(+), 39 deletions(-) diff --git a/integration_tests/store_test.go b/integration_tests/store_test.go index f7c77b4b36..33e4a0b261 100644 --- a/integration_tests/store_test.go +++ b/integration_tests/store_test.go @@ -49,7 +49,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv" - "github.com/tikv/client-go/v2/txnkv/transaction" + "github.com/tikv/client-go/v2/util" ) func TestStore(t *testing.T) { @@ -196,7 +196,7 @@ func (s *testStoreSuite) TestFailBusyServerKV() { } func testUpdateLatestCommitInfo(require *require.Assertions, store tikv.StoreProbe, mode string) { - doTxn := func() *transaction.CommitInfo { + doTxn := func() *util.CommitInfo { txn, err := store.Begin() require.Nil(err) switch mode { diff --git a/oracle/oracles/pd.go b/oracle/oracles/pd.go index 60f0ee7cb3..db7d2ca672 100644 --- a/oracle/oracles/pd.go +++ b/oracle/oracles/pd.go @@ -47,6 +47,7 @@ import ( "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/clients/tso" "go.uber.org/zap" @@ -153,6 +154,9 @@ type pdOracle struct { // we don't require the ts for validation to be strictly the latest one. // Note that the result can't be reused for different txnScopes. The txnScope is used as the key. tsForValidation singleflight.Group + + // lastCommitTxnInfo stores the last commit info of the store, the validation of the commit info is checked by every arriving timestamp from PD. + lastCommitTxnInfo *atomic.Pointer[util.CommitInfo] } // lastTSO stores the last timestamp oracle gets from PD server and the local time when the TSO is fetched. @@ -166,6 +170,8 @@ type PDOracleOptions struct { UpdateInterval time.Duration // Disable the background periodic update of the last ts. This is for test purposes only. NoUpdateTS bool + // LastCommitTxnInfo is the last commit info of the store. + LastCommitTxnInfo *atomic.Pointer[util.CommitInfo] } // NewPdOracle create an Oracle that uses a pd client source. @@ -182,6 +188,7 @@ func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, e c: pdClient, quit: make(chan struct{}), lastTSUpdateInterval: atomic.Int64{}, + lastCommitTxnInfo: options.LastCommitTxnInfo, } o.adaptiveUpdateIntervalState.shrinkIntervalCh = make(chan time.Duration, 1) o.lastTSUpdateInterval.Store(int64(options.UpdateInterval)) @@ -228,8 +235,9 @@ func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, err type tsFuture struct { tso.TSFuture - o *pdOracle - txnScope string + o *pdOracle + txnScope string + toBeVerifyCommitInfo *util.CommitInfo } // Wait implements the oracle.Future interface. @@ -241,12 +249,22 @@ func (f *tsFuture) Wait() (uint64, error) { return 0, errors.WithStack(err) } ts := oracle.ComposeTS(physical, logical) + if f.toBeVerifyCommitInfo != nil { + if ts < f.toBeVerifyCommitInfo.CommitTS || ts <= f.toBeVerifyCommitInfo.StartTS { + msg := fmt.Sprintf("transaction with invalid ts found, ts: %d, txnInfo: %s", ts, f.toBeVerifyCommitInfo.String()) + panic(msg) + } + } f.o.setLastTS(ts, f.txnScope) return ts, nil } func (o *pdOracle) GetTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future { - return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope} + var commitInfo *util.CommitInfo + if o.lastCommitTxnInfo != nil { + commitInfo = o.lastCommitTxnInfo.Load() + } + return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope, commitInfo} } func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, error) { diff --git a/tikv/kv.go b/tikv/kv.go index da0c621aed..2830027622 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -145,7 +145,7 @@ type KVStore struct { gP Pool // lastCommitTxnInfo is used to store the commit info of the latest committed transaction. - lastCommitTxnInfo atomic.Pointer[transaction.CommitInfo] + lastCommitTxnInfo atomic.Pointer[util.CommitInfo] } var _ Storage = (*KVStore)(nil) @@ -264,8 +264,10 @@ func requestHealthFeedbackFromKVClient(ctx context.Context, addr string, tikvCli // NewKVStore creates a new TiKV store instance. func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client, opt ...Option) (*KVStore, error) { + var store KVStore o, err := oracles.NewPdOracle(pdClient, &oracles.PDOracleOptions{ - UpdateInterval: defaultOracleUpdateInterval, + UpdateInterval: defaultOracleUpdateInterval, + LastCommitTxnInfo: &store.lastCommitTxnInfo, }) if err != nil { return nil, err @@ -274,31 +276,31 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl regionCache := locate.NewRegionCache(pdClient, locate.WithRequestHealthFeedbackCallback(func(ctx context.Context, addr string) error { return requestHealthFeedbackFromKVClient(ctx, addr, tikvclient) })) - store := &KVStore{ - clusterID: pdClient.GetClusterID(context.TODO()), - uuid: uuid, - oracle: o, - pdClient: pdClient, - regionCache: regionCache, - kv: spkv, - safePoint: 0, - spTime: time.Now(), - replicaReadSeed: rand.Uint32(), - ctx: ctx, - cancel: cancel, - gP: NewSpool(128, 10*time.Second), - } + + store.clusterID = pdClient.GetClusterID(context.TODO()) + store.uuid = uuid + store.oracle = o + store.pdClient = pdClient + store.regionCache = regionCache + store.kv = spkv + store.safePoint = 0 + store.spTime = time.Now() + store.replicaReadSeed = rand.Uint32() + store.ctx = ctx + store.cancel = cancel + store.gP = NewSpool(128, 10*time.Second) + store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) store.clientMu.client.SetEventListener(regionCache.GetClientEventListener()) - store.lockResolver = txnlock.NewLockResolver(store) - loadOption(store, opt...) + store.lockResolver = txnlock.NewLockResolver(&store) + loadOption(&store, opt...) store.wg.Add(2) go store.runSafePointChecker() go store.safeTSUpdater() - return store, nil + return &store, nil } // NewPDClient returns an unwrapped pd client. @@ -862,7 +864,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { } // SetLastCommitInfo sets the last committed transaction's information. -func (s *KVStore) SetLastCommitInfo(ci *transaction.CommitInfo) { +func (s *KVStore) SetLastCommitInfo(ci *util.CommitInfo) { for { old := s.lastCommitTxnInfo.Load() if old != nil && old.CommitTS > ci.CommitTS { @@ -875,7 +877,7 @@ func (s *KVStore) SetLastCommitInfo(ci *transaction.CommitInfo) { } // GetLastCommitInfo get the last committed transaction's information. -func (s *KVStore) GetLastCommitInfo() *transaction.CommitInfo { +func (s *KVStore) GetLastCommitInfo() *util.CommitInfo { return s.lastCommitTxnInfo.Load() } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 6805d942d9..6f83e9a207 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -2376,7 +2376,7 @@ func (c *twoPhaseCommitter) mutationsOfKeys(keys [][]byte) CommitterMutations { return &res } -func (c *twoPhaseCommitter) getCommitInfo() *CommitInfo { +func (c *twoPhaseCommitter) getCommitInfo() *util.CommitInfo { var txnType string if c.isAsyncCommit() { txnType = "async" @@ -2385,7 +2385,7 @@ func (c *twoPhaseCommitter) getCommitInfo() *CommitInfo { } else { txnType = "2pc" } - return &CommitInfo{ + return &util.CommitInfo{ TxnType: txnType, StartTS: c.startTS, CommitTS: atomic.LoadUint64(&c.commitTS), @@ -2399,16 +2399,8 @@ func (c *twoPhaseCommitter) updateStoreCommitInfo() { c.store.SetLastCommitInfo(c.getCommitInfo()) } -type CommitInfo struct { - TxnType string - StartTS uint64 - CommitTS uint64 - MutationLen int - TxnSize int -} - type storeCommitInfo interface { - SetLastCommitInfo(*CommitInfo) + SetLastCommitInfo(*util.CommitInfo) // GetLastCommitInfo get the last committed transaction's information. - GetLastCommitInfo() *CommitInfo + GetLastCommitInfo() *util.CommitInfo } diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index 696ae06ed5..8fe8f6fef1 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" + "github.com/tikv/client-go/v2/util" ) // TxnProbe wraps a txn and exports internal states for testing purpose. @@ -380,7 +381,7 @@ func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []by } // GetCommitInfo expose CommitInfo of committer for testing purpose. -func (c CommitterProbe) GetCommitInfo() *CommitInfo { +func (c CommitterProbe) GetCommitInfo() *util.CommitInfo { return c.getCommitInfo() } diff --git a/util/misc.go b/util/misc.go index 88500f582e..1713de07c7 100644 --- a/util/misc.go +++ b/util/misc.go @@ -226,3 +226,16 @@ func None[T interface{}]() Option[T] { func (o Option[T]) Inner() *T { return o.inner } + +type CommitInfo struct { + TxnType string + StartTS uint64 + CommitTS uint64 + MutationLen int + TxnSize int +} + +func (c *CommitInfo) String() string { + return fmt.Sprintf("TxnType: %s, StartTS: %d, CommitTS: %d, MutationLen: %d, TxnSize: %d", + c.TxnType, c.StartTS, c.CommitTS, c.MutationLen, c.TxnSize) +}