diff --git a/config/config.go b/config/config.go index 76275b05eb..34331dc0eb 100644 --- a/config/config.go +++ b/config/config.go @@ -97,3 +97,107 @@ var ( EnableTxnLocalLatch = false TxnLocalLatchCapacity uint = 2048000 ) + +// RegionCache configurations. +var ( + RegionCacheBTreeDegree = 32 + RegionCacheTTL = time.Minute * 10 +) + +// RawKV configurations. +var ( + // MaxRawKVScanLimit is the maximum scan limit for rawkv Scan. + MaxRawKVScanLimit = 10240 + // RawBatchPutSize is the maximum size limit for rawkv each batch put request. + RawBatchPutSize = 16 * 1024 + // RawBatchPairCount is the maximum limit for rawkv each batch get/delete request. + RawBatchPairCount = 512 +) + +// RPC configurations. +var ( + // MaxConnectionCount is the max gRPC connections that will be established with + // each tikv-server. + MaxConnectionCount uint = 16 + + // GrpcKeepAliveTime is the duration of time after which if the client doesn't see + // any activity it pings the server to see if the transport is still alive. + GrpcKeepAliveTime = time.Duration(10) * time.Second + + // GrpcKeepAliveTimeout is the duration of time for which the client waits after having + // pinged for keepalive check and if no activity is seen even after that the connection + // is closed. + GrpcKeepAliveTimeout = time.Duration(3) * time.Second + + // MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than + // current value, an error will be reported from gRPC. + MaxSendMsgSize = 1<<31 - 1 + + // MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than + // current value, an error will be reported from gRPC. + MaxCallMsgSize = 1<<31 - 1 + + DialTimeout = 5 * time.Second + ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values. + ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region. + ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times. + GCTimeout = 5 * time.Minute + UnsafeDestroyRangeTimeout = 5 * time.Minute + + GrpcInitialWindowSize = 1 << 30 + GrpcInitialConnWindowSize = 1 << 30 +) + +// KV configurations. +var ( + // DefaultTxnMembufCap is the default transaction membuf capability. + DefaultTxnMembufCap = 4 * 1024 +) + +// Latch configurations. +var ( + LatchExpireDuration = 2 * time.Minute + LatchCheckInterval = 1 * time.Minute + LatchCheckCounter = 50000 + LatchListCount = 5 + LatchLockChanSize = 100 +) + +// Oracle configurations. +var ( + TsoSlowThreshold = 30 * time.Millisecond + // update oracle's lastTS every 2000ms. + OracleUpdateInterval = 2000 +) + +// Txn configurations. +var ( + // TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's + // Key+Value size below 16KB. + TxnCommitBatchSize = 16 * 1024 + + // By default, locks after 3000ms is considered unusual (the client created the + // lock might be dead). Other client may cleanup this kind of lock. + // For locks created recently, we will do backoff and retry. + TxnDefaultLockTTL uint64 = 3000 + // TODO: Consider if it's appropriate. + TxnMaxLockTTL uint64 = 120000 + // ttl = ttlFactor * sqrt(writeSizeInMiB) + TxnTTLFactor = 6000 + // TxnResolvedCacheSize is max number of cached txn status. + TxnResolvedCacheSize = 2048 + + // SafePoint. + // This is almost the same as 'tikv_gc_safe_point' in the table 'mysql.tidb', + // save this to pd instead of tikv, because we can't use interface of table + // if the safepoint on tidb is expired. + GcSavedSafePoint = "/tidb/store/gcworker/saved_safe_point" + + GcSafePointCacheInterval = time.Second * 100 + GcCPUTimeInaccuracyBound = time.Second + GcSafePointUpdateInterval = time.Second * 10 + GcSafePointQuickRepeatInterval = time.Second + + TxnScanBatchSize = 256 + TxnBatchGetSize = 5120 +) diff --git a/locate/region_cache.go b/locate/region_cache.go index 8804b75a42..e932fbb2b0 100644 --- a/locate/region_cache.go +++ b/locate/region_cache.go @@ -27,15 +27,11 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/tikv/client-go/codec" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/metrics" "github.com/tikv/client-go/retry" ) -const ( - btreeDegree = 32 - rcDefaultRegionCacheTTL = time.Minute * 10 -) - // CachedRegion encapsulates {Region, TTL} type CachedRegion struct { region *Region @@ -45,7 +41,7 @@ type CachedRegion struct { func (c *CachedRegion) isValid() bool { lastAccess := atomic.LoadInt64(&c.lastAccess) lastAccessTime := time.Unix(lastAccess, 0) - return time.Since(lastAccessTime) < rcDefaultRegionCacheTTL + return time.Since(lastAccessTime) < config.RegionCacheTTL } // RegionCache caches Regions loaded from PD. @@ -69,7 +65,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { pdClient: pdClient, } c.mu.regions = make(map[RegionVerID]*CachedRegion) - c.mu.sorted = btree.New(btreeDegree) + c.mu.sorted = btree.New(config.RegionCacheBTreeDegree) c.storeMu.stores = make(map[uint64]*Store) return c } diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index 2744379fb6..aa89898bfc 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -29,19 +29,10 @@ import ( ) var ( - // MaxRawKVScanLimit is the maximum scan limit for rawkv Scan. - MaxRawKVScanLimit = 10240 // ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large. ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit") ) -const ( - // rawBatchPutSize is the maximum size limit for rawkv each batch put request. - rawBatchPutSize = 16 * 1024 - // rawBatchPairCount is the maximum limit for rawkv each batch get/delete request. - rawBatchPairCount = 512 -) - // RawKVClient is a client of TiKV server which is used as a key-value storage, // only GET/PUT/DELETE commands are supported. type RawKVClient struct { @@ -278,7 +269,7 @@ func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, v start := time.Now() defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }() - if limit > MaxRawKVScanLimit { + if limit > config.MaxRawKVScanLimit { return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded) } @@ -319,7 +310,7 @@ func (c *RawKVClient) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *loc if err != nil { return nil, nil, err } - resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) if err != nil { return nil, nil, err } @@ -346,7 +337,7 @@ func (c *RawKVClient) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType r var batches []batch for regionID, groupKeys := range groups { - batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) + batches = appendKeyBatches(batches, regionID, groupKeys, config.RawBatchPairCount) } bo, cancel := bo.Fork() ches := make(chan singleBatchResp, len(batches)) @@ -405,7 +396,7 @@ func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.C } sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient) - resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, batch.regionID, config.ReadTimeoutShort) batchResp := singleBatchResp{} if err != nil { @@ -473,7 +464,7 @@ func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.R }, } - resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) if err != nil { return nil, nil, err } @@ -504,7 +495,7 @@ func (c *RawKVClient) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) e var batches []batch // split the keys by size and RegionVerID for regionID, groupKeys := range groups { - batches = appendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize) + batches = appendBatches(batches, regionID, groupKeys, keyToValue, config.RawBatchPutSize) } bo, cancel := bo.Fork() ch := make(chan error, len(batches)) @@ -583,7 +574,7 @@ func (c *RawKVClient) doBatchPut(bo *retry.Backoffer, batch batch) error { } sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient) - resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, batch.regionID, config.ReadTimeoutShort) if err != nil { return err } diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index d4b3fec546..45f839496b 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -20,6 +20,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/locate" "github.com/tikv/client-go/mockstore/mocktikv" "github.com/tikv/client-go/retry" @@ -178,7 +179,7 @@ func (s *testRawKVSuite) TestRawBatch(c *C) { size := 0 var testKeys [][]byte var testValues [][]byte - for i := 0; size/rawBatchPutSize < 4; i++ { + for i := 0; size/config.RawBatchPutSize < 4; i++ { key := fmt.Sprint("key", i) size += len(key) testKeys = append(testKeys, []byte(key)) diff --git a/rpc/client.go b/rpc/client.go index 8ebd97ade8..a3fa70658b 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -37,40 +37,6 @@ import ( gstatus "google.golang.org/grpc/status" ) -// MaxConnectionCount is the max gRPC connections that will be established with -// each tikv-server. -var MaxConnectionCount uint = 16 - -// GrpcKeepAliveTime is the duration of time after which if the client doesn't see -// any activity it pings the server to see if the transport is still alive. -var GrpcKeepAliveTime = time.Duration(10) * time.Second - -// GrpcKeepAliveTimeout is the duration of time for which the client waits after having -// pinged for keepalive check and if no activity is seen even after that the connection -// is closed. -var GrpcKeepAliveTimeout = time.Duration(3) * time.Second - -// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than -// current value, an error will be reported from gRPC. -var MaxSendMsgSize = 1<<31 - 1 - -// MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than -// current value, an error will be reported from gRPC. -var MaxCallMsgSize = 1<<31 - 1 - -// Timeout durations. -const ( - dialTimeout = 5 * time.Second - ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values. - ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region. - ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times. - GCTimeout = 5 * time.Minute - UnsafeDestroyRangeTimeout = 5 * time.Minute - - grpcInitialWindowSize = 1 << 30 - grpcInitialConnWindowSize = 1 << 30 -) - // Client is a client that sends RPC. // It should not be used after calling Close(). type Client interface { @@ -223,21 +189,21 @@ func (a *connArray) Init(addr string, security config.Security) error { allowBatch := config.MaxBatchSize > 0 for i := range a.v { - ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + ctx, cancel := context.WithTimeout(context.Background(), config.DialTimeout) conn, err := grpc.DialContext( ctx, addr, opt, - grpc.WithInitialWindowSize(grpcInitialWindowSize), - grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize), + grpc.WithInitialWindowSize(int32(config.GrpcInitialWindowSize)), + grpc.WithInitialConnWindowSize(int32(config.GrpcInitialConnWindowSize)), grpc.WithUnaryInterceptor(unaryInterceptor), grpc.WithStreamInterceptor(streamInterceptor), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxCallMsgSize)), - grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(config.MaxCallMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(config.MaxSendMsgSize)), grpc.WithBackoffMaxDelay(time.Second*3), grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: GrpcKeepAliveTime, - Timeout: GrpcKeepAliveTimeout, + Time: config.GrpcKeepAliveTime, + Timeout: config.GrpcKeepAliveTimeout, PermitWithoutStream: true, }), ) @@ -489,7 +455,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) { array, ok := c.conns[addr] if !ok { var err error - array, err = newConnArray(MaxConnectionCount, addr, c.security) + array, err = newConnArray(config.MaxConnectionCount, addr, c.security) if err != nil { return nil, err } diff --git a/txnkv/kv/buffer_store.go b/txnkv/kv/buffer_store.go index 23e40df9e6..dd178140b6 100644 --- a/txnkv/kv/buffer_store.go +++ b/txnkv/kv/buffer_store.go @@ -14,18 +14,10 @@ package kv import ( + "github.com/tikv/client-go/config" "github.com/tikv/client-go/key" ) -var ( - // DefaultTxnMembufCap is the default transaction membuf capability. - DefaultTxnMembufCap = 4 * 1024 - // ImportingTxnMembufCap is the capability of tidb importing data situation. - ImportingTxnMembufCap = 32 * 1024 - // TempTxnMemBufCap is the capability of temporary membuf. - TempTxnMemBufCap = 64 -) - // BufferStore wraps a Retriever for read and a MemBuffer for buffered write. // Common usage pattern: // bs := NewBufferStore(r) // use BufferStore to wrap a Retriever @@ -41,7 +33,7 @@ type BufferStore struct { // NewBufferStore creates a BufferStore using r for read. func NewBufferStore(r Retriever, cap int) *BufferStore { if cap <= 0 { - cap = DefaultTxnMembufCap + cap = config.DefaultTxnMembufCap } return &BufferStore{ r: r, diff --git a/txnkv/kv/buffer_store_test.go b/txnkv/kv/buffer_store_test.go index e671722d89..fe683e1fcb 100644 --- a/txnkv/kv/buffer_store_test.go +++ b/txnkv/kv/buffer_store_test.go @@ -19,6 +19,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/key" ) @@ -31,7 +32,7 @@ type testBufferStoreSuite struct{} var _ = Suite(testBufferStoreSuite{}) func (s testBufferStoreSuite) TestGetSet(c *C) { - bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap) + bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(config.DefaultTxnMembufCap)}, config.DefaultTxnMembufCap) key := key.Key("key") _, err := bs.Get(key) c.Check(err, NotNil) @@ -45,7 +46,7 @@ func (s testBufferStoreSuite) TestGetSet(c *C) { } func (s testBufferStoreSuite) TestSaveTo(c *C) { - bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap) + bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(config.DefaultTxnMembufCap)}, config.DefaultTxnMembufCap) var buf bytes.Buffer for i := 0; i < 10; i++ { fmt.Fprint(&buf, i) @@ -55,7 +56,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) { } bs.Set(key.Key("novalue"), nil) - mutator := NewMemDbBuffer(DefaultTxnMembufCap) + mutator := NewMemDbBuffer(config.DefaultTxnMembufCap) err := bs.SaveTo(mutator) c.Check(err, IsNil) diff --git a/txnkv/kv/mem_buffer_test.go b/txnkv/kv/mem_buffer_test.go index 9a892a3dc4..1896594766 100644 --- a/txnkv/kv/mem_buffer_test.go +++ b/txnkv/kv/mem_buffer_test.go @@ -21,6 +21,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/tikv/client-go/config" ) const ( @@ -37,11 +38,11 @@ type testKVSuite struct { func (s *testKVSuite) SetUpSuite(c *C) { s.bs = make([]MemBuffer, 1) - s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap) + s.bs[0] = NewMemDbBuffer(config.DefaultTxnMembufCap) } func (s *testKVSuite) ResetMembuffers() { - s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap) + s.bs[0] = NewMemDbBuffer(config.DefaultTxnMembufCap) } func insertData(c *C, buffer MemBuffer) { @@ -184,7 +185,7 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) { } func (s *testKVSuite) TestBufferLimit(c *C) { - buffer := NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer) + buffer := NewMemDbBuffer(config.DefaultTxnMembufCap).(*memDbBuffer) buffer.bufferSizeLimit = 1000 buffer.entrySizeLimit = 500 @@ -196,7 +197,7 @@ func (s *testKVSuite) TestBufferLimit(c *C) { err = buffer.Set([]byte("yz"), make([]byte, 499)) c.Assert(err, NotNil) // buffer size limit - buffer = NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer) + buffer = NewMemDbBuffer(config.DefaultTxnMembufCap).(*memDbBuffer) buffer.bufferLenLimit = 10 for i := 0; i < 10; i++ { err = buffer.Set([]byte{byte(i)}, []byte{byte(i)}) @@ -213,7 +214,7 @@ func BenchmarkMemDbBufferSequential(b *testing.B) { for i := 0; i < opCnt; i++ { data[i] = encodeInt(i) } - buffer := NewMemDbBuffer(DefaultTxnMembufCap) + buffer := NewMemDbBuffer(config.DefaultTxnMembufCap) benchmarkSetGet(b, buffer, data) b.ReportAllocs() } @@ -224,20 +225,20 @@ func BenchmarkMemDbBufferRandom(b *testing.B) { data[i] = encodeInt(i) } shuffle(data) - buffer := NewMemDbBuffer(DefaultTxnMembufCap) + buffer := NewMemDbBuffer(config.DefaultTxnMembufCap) benchmarkSetGet(b, buffer, data) b.ReportAllocs() } func BenchmarkMemDbIter(b *testing.B) { - buffer := NewMemDbBuffer(DefaultTxnMembufCap) + buffer := NewMemDbBuffer(config.DefaultTxnMembufCap) benchIterator(b, buffer) b.ReportAllocs() } func BenchmarkMemDbCreation(b *testing.B) { for i := 0; i < b.N; i++ { - NewMemDbBuffer(DefaultTxnMembufCap) + NewMemDbBuffer(config.DefaultTxnMembufCap) } b.ReportAllocs() } diff --git a/txnkv/kv/union_store.go b/txnkv/kv/union_store.go index 8bc9e5adcb..244282f9cf 100644 --- a/txnkv/kv/union_store.go +++ b/txnkv/kv/union_store.go @@ -14,6 +14,7 @@ package kv import ( + "github.com/tikv/client-go/config" "github.com/tikv/client-go/key" ) @@ -73,7 +74,7 @@ type unionStore struct { // NewUnionStore builds a new UnionStore. func NewUnionStore(snapshot Snapshot) UnionStore { return &unionStore{ - BufferStore: NewBufferStore(snapshot, DefaultTxnMembufCap), + BufferStore: NewBufferStore(snapshot, config.DefaultTxnMembufCap), snapshot: snapshot, lazyConditionPairs: make(map[string]*conditionPair), opts: make(map[Option]interface{}), diff --git a/txnkv/kv/union_store_test.go b/txnkv/kv/union_store_test.go index aeaa4b84c7..4eb03e1f51 100644 --- a/txnkv/kv/union_store_test.go +++ b/txnkv/kv/union_store_test.go @@ -16,6 +16,7 @@ package kv import ( . "github.com/pingcap/check" "github.com/pkg/errors" + "github.com/tikv/client-go/config" ) var _ = Suite(&testUnionStoreSuite{}) @@ -26,7 +27,7 @@ type testUnionStoreSuite struct { } func (s *testUnionStoreSuite) SetUpTest(c *C) { - s.store = NewMemDbBuffer(DefaultTxnMembufCap) + s.store = NewMemDbBuffer(config.DefaultTxnMembufCap) s.us = NewUnionStore(&mockSnapshot{s.store}) } diff --git a/txnkv/latch/latch.go b/txnkv/latch/latch.go index 9f1b61d444..ab021b2aea 100644 --- a/txnkv/latch/latch.go +++ b/txnkv/latch/latch.go @@ -23,6 +23,7 @@ import ( "github.com/cznic/mathutil" log "github.com/sirupsen/logrus" "github.com/spaolacci/murmur3" + "github.com/tikv/client-go/config" ) type node struct { @@ -227,7 +228,7 @@ func (latches *Latches) acquireSlot(lock *Lock) acquireResult { defer latch.Unlock() // Try to recycle to limit the memory usage. - if latch.count >= latchListCount { + if latch.count >= config.LatchListCount { latch.recycle(lock.startTS) } @@ -268,7 +269,7 @@ func (l *latch) recycle(currentTS uint64) int { fakeHead := node{next: l.queue} prev := &fakeHead for curr := prev.next; curr != nil; curr = curr.next { - if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration && curr.value == nil { + if tsoSub(currentTS, curr.maxCommitTS) >= config.LatchExpireDuration && curr.value == nil { l.count-- prev.next = curr.next total++ diff --git a/txnkv/latch/latch_test.go b/txnkv/latch/latch_test.go index cb22d4520d..096db1ec8e 100644 --- a/txnkv/latch/latch_test.go +++ b/txnkv/latch/latch_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/txnkv/oracle" ) @@ -142,7 +143,7 @@ func (s *testLatchSuite) TestRecycle(c *C) { } c.Assert(allEmpty, IsFalse) - currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(expireDuration)), 3) + currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(config.LatchExpireDuration)), 3) latches.recycle(currentTS) for i := 0; i < len(latches.slots); i++ { diff --git a/txnkv/latch/scheduler.go b/txnkv/latch/scheduler.go index de19ade4c4..86134951a3 100644 --- a/txnkv/latch/scheduler.go +++ b/txnkv/latch/scheduler.go @@ -17,11 +17,10 @@ import ( "sync" "time" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/txnkv/oracle" ) -const lockChanSize = 100 - // LatchesScheduler is used to schedule latches for transactions. type LatchesScheduler struct { latches *Latches @@ -34,7 +33,7 @@ type LatchesScheduler struct { // NewScheduler create the LatchesScheduler. func NewScheduler(size uint) *LatchesScheduler { latches := NewLatches(size) - unlockCh := make(chan *Lock, lockChanSize) + unlockCh := make(chan *Lock, config.LatchLockChanSize) scheduler := &LatchesScheduler{ latches: latches, unlockCh: unlockCh, @@ -44,11 +43,6 @@ func NewScheduler(size uint) *LatchesScheduler { return scheduler } -const expireDuration = 2 * time.Minute -const checkInterval = 1 * time.Minute -const checkCounter = 50000 -const latchListCount = 5 - func (scheduler *LatchesScheduler) run() { var counter int wakeupList := make([]*Lock, 0) @@ -61,7 +55,7 @@ func (scheduler *LatchesScheduler) run() { if lock.commitTS > lock.startTS { currentTS := lock.commitTS elapsed := tsoSub(currentTS, scheduler.lastRecycleTime) - if elapsed > checkInterval || counter > checkCounter { + if elapsed > config.LatchCheckInterval || counter > config.LatchCheckCounter { go scheduler.latches.recycle(lock.commitTS) scheduler.lastRecycleTime = currentTS counter = 0 diff --git a/txnkv/oracle/oracles/pd.go b/txnkv/oracle/oracles/pd.go index 6f5f3fe90e..26f1bc1396 100644 --- a/txnkv/oracle/oracles/pd.go +++ b/txnkv/oracle/oracles/pd.go @@ -20,14 +20,13 @@ import ( "github.com/pingcap/pd/client" log "github.com/sirupsen/logrus" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/metrics" "github.com/tikv/client-go/txnkv/oracle" ) var _ oracle.Oracle = &pdOracle{} -const slowDist = 30 * time.Millisecond - // pdOracle is an Oracle that uses a placement driver client as source. type pdOracle struct { c pd.Client @@ -103,7 +102,7 @@ func (o *pdOracle) getTimestamp(ctx context.Context) (uint64, error) { return 0, err } dist := time.Since(now) - if dist > slowDist { + if dist > config.TsoSlowThreshold { log.Warnf("get timestamp too slow: %s", dist) } return oracle.ComposeTS(physical, logical), nil diff --git a/txnkv/store/batch.go b/txnkv/store/batch.go index 9e8ed1634f..575036a75f 100644 --- a/txnkv/store/batch.go +++ b/txnkv/store/batch.go @@ -15,10 +15,6 @@ package store import "github.com/tikv/client-go/locate" -// TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's -// Key+Value size below 16KB. -const txnCommitBatchSize = 16 * 1024 - // batchKeys is a batch of keys in the same region. type batchKeys struct { region locate.RegionVerID diff --git a/txnkv/store/delete_range.go b/txnkv/store/delete_range.go index 0dfb993a37..66e8be7780 100644 --- a/txnkv/store/delete_range.go +++ b/txnkv/store/delete_range.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pkg/errors" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/retry" "github.com/tikv/client-go/rpc" ) @@ -79,7 +80,7 @@ func (t *DeleteRangeTask) Execute() error { }, } - resp, err := t.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutMedium) + resp, err := t.store.SendReq(bo, req, loc.Region, config.ReadTimeoutMedium) if err != nil { return err } diff --git a/txnkv/store/lock_resolver.go b/txnkv/store/lock_resolver.go index 3ad8d9100c..698f7334ff 100644 --- a/txnkv/store/lock_resolver.go +++ b/txnkv/store/lock_resolver.go @@ -29,9 +29,6 @@ import ( "github.com/tikv/client-go/rpc" ) -// ResolvedCacheSize is max number of cached txn status. -const ResolvedCacheSize = 2048 - // LockResolver resolves locks and also caches resolved txn status. type LockResolver struct { store *TiKVStore @@ -76,17 +73,6 @@ func (s TxnStatus) IsCommitted() bool { return s > 0 } // CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true. func (s TxnStatus) CommitTS() uint64 { return uint64(s) } -// By default, locks after 3000ms is considered unusual (the client created the -// lock might be dead). Other client may cleanup this kind of lock. -// For locks created recently, we will do backoff and retry. -var defaultLockTTL uint64 = 3000 - -// TODO: Consider if it's appropriate. -var maxLockTTL uint64 = 120000 - -// ttl = ttlFactor * sqrt(writeSizeInMiB) -var ttlFactor = 6000 - // Lock represents a lock from tikv server. type Lock struct { Key []byte @@ -99,7 +85,7 @@ type Lock struct { func NewLock(l *kvrpcpb.LockInfo) *Lock { ttl := l.GetLockTtl() if ttl == 0 { - ttl = defaultLockTTL + ttl = config.TxnDefaultLockTTL } return &Lock{ Key: l.GetKey(), @@ -118,7 +104,7 @@ func (lr *LockResolver) saveResolved(txnID uint64, status TxnStatus) { } lr.mu.resolved[txnID] = status lr.mu.recentResolved.PushBack(txnID) - if len(lr.mu.resolved) > ResolvedCacheSize { + if len(lr.mu.resolved) > config.TxnResolvedCacheSize { front := lr.mu.recentResolved.Front() delete(lr.mu.resolved, front.Value.(uint64)) lr.mu.recentResolved.Remove(front) @@ -185,7 +171,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo }, } startTime = time.Now() - resp, err := lr.store.SendReq(bo, req, loc, rpc.ReadTimeoutShort) + resp, err := lr.store.SendReq(bo, req, loc, config.ReadTimeoutShort) if err != nil { return false, err } @@ -296,7 +282,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary if err != nil { return status, err } - resp, err := lr.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort) + resp, err := lr.store.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) if err != nil { return status, err } @@ -350,7 +336,7 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat if status.IsCommitted() { req.ResolveLock.CommitVersion = status.CommitTS() } - resp, err := lr.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort) + resp, err := lr.store.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) if err != nil { return err } diff --git a/txnkv/store/safepoint.go b/txnkv/store/safepoint.go index 2c21dd8f32..c4e7ba69ac 100644 --- a/txnkv/store/safepoint.go +++ b/txnkv/store/safepoint.go @@ -24,22 +24,10 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/tikv/client-go/config" "google.golang.org/grpc" ) -// Safe point constants. -const ( - // This is almost the same as 'tikv_gc_safe_point' in the table 'mysql.tidb', - // save this to pd instead of tikv, because we can't use interface of table - // if the safepoint on tidb is expired. - GcSavedSafePoint = "/tidb/store/gcworker/saved_safe_point" - - GcSafePointCacheInterval = time.Second * 100 - gcCPUTimeInaccuracyBound = time.Second - gcSafePointUpdateInterval = time.Second * 10 - gcSafePointQuickRepeatInterval = time.Second -) - // SafePointKV is used for a seamingless integration for mockTest and runtime. type SafePointKV interface { Put(k string, v string) error @@ -132,7 +120,7 @@ func (w *EtcdSafePointKV) Get(k string) (string, error) { func saveSafePoint(kv SafePointKV, key string, t uint64) error { s := strconv.FormatUint(t, 10) - err := kv.Put(GcSavedSafePoint, s) + err := kv.Put(config.GcSavedSafePoint, s) if err != nil { log.Error("save safepoint failed:", err) return err @@ -141,7 +129,7 @@ func saveSafePoint(kv SafePointKV, key string, t uint64) error { } func loadSafePoint(kv SafePointKV, key string) (uint64, error) { - str, err := kv.Get(GcSavedSafePoint) + str, err := kv.Get(config.GcSavedSafePoint) if err != nil { return 0, err diff --git a/txnkv/store/scan.go b/txnkv/store/scan.go index 50afa360aa..45fbe8f167 100644 --- a/txnkv/store/scan.go +++ b/txnkv/store/scan.go @@ -20,6 +20,7 @@ import ( pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/key" "github.com/tikv/client-go/retry" "github.com/tikv/client-go/rpc" @@ -41,7 +42,7 @@ type Scanner struct { func newScanner(snapshot *TiKVSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) { // It must be > 1. Otherwise scanner won't skipFirst. if batchSize <= 1 { - batchSize = scanBatchSize + batchSize = config.TxnScanBatchSize } scanner := &Scanner{ snapshot: snapshot, @@ -174,7 +175,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { NotFillCache: s.snapshot.NotFillCache, }, } - resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutMedium) + resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutMedium) if err != nil { return err } diff --git a/txnkv/store/snapshot.go b/txnkv/store/snapshot.go index 9e5fdf41ad..056aa9e295 100644 --- a/txnkv/store/snapshot.go +++ b/txnkv/store/snapshot.go @@ -24,6 +24,7 @@ import ( pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/key" "github.com/tikv/client-go/metrics" "github.com/tikv/client-go/retry" @@ -31,11 +32,6 @@ import ( "github.com/tikv/client-go/txnkv/kv" ) -const ( - scanBatchSize = 256 - batchGetSize = 5120 -) - // TiKVSnapshot supports read from TiKV. type TiKVSnapshot struct { store *TiKVStore @@ -102,7 +98,7 @@ func (s *TiKVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, var batches []batchKeys for id, g := range groups { - batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, batchGetSize) + batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, config.TxnBatchGetSize) } if len(batches) == 0 { @@ -145,7 +141,7 @@ func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys NotFillCache: s.NotFillCache, }, } - resp, err := sender.SendReq(bo, req, batch.region, rpc.ReadTimeoutMedium) + resp, err := sender.SendReq(bo, req, batch.region, config.ReadTimeoutMedium) if err != nil { return err } @@ -230,7 +226,7 @@ func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) { if err != nil { return nil, err } - resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) if err != nil { return nil, err } @@ -273,7 +269,7 @@ func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) { // Iter returns a list of key-value pair after `k`. func (s *TiKVSnapshot) Iter(k key.Key, upperBound key.Key) (kv.Iterator, error) { - scanner, err := newScanner(s, k, upperBound, scanBatchSize) + scanner, err := newScanner(s, k, upperBound, config.TxnScanBatchSize) return scanner, err } diff --git a/txnkv/store/split_region.go b/txnkv/store/split_region.go index 9c352954bc..45d2149335 100644 --- a/txnkv/store/split_region.go +++ b/txnkv/store/split_region.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/tikv/client-go/config" "github.com/tikv/client-go/key" "github.com/tikv/client-go/retry" "github.com/tikv/client-go/rpc" @@ -47,7 +48,7 @@ func SplitRegion(store *TiKVStore, splitKey key.Key) error { log.Infof("skip split_region region at %q", splitKey) return nil } - res, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort) + res, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) if err != nil { return err } diff --git a/txnkv/store/store.go b/txnkv/store/store.go index 7fbc6fb258..464ea24526 100644 --- a/txnkv/store/store.go +++ b/txnkv/store/store.go @@ -33,9 +33,6 @@ import ( "github.com/tikv/client-go/txnkv/oracle/oracles" ) -// update oracle's lastTS every 2000ms. -var oracleUpdateInterval = 2000 - // TiKVStore contains methods to interact with a TiKV cluster. type TiKVStore struct { clusterID uint64 @@ -67,7 +64,7 @@ func NewStore(pdAddrs []string, security config.Security) (*TiKVStore, error) { return nil, err } - oracle, err := oracles.NewPdOracle(pdCli, time.Duration(oracleUpdateInterval)*time.Millisecond) + oracle, err := oracles.NewPdOracle(pdCli, time.Duration(config.OracleUpdateInterval)*time.Millisecond) if err != nil { return nil, err } @@ -180,21 +177,21 @@ func (s *TiKVStore) GetTimestampWithRetry(bo *retry.Backoffer) (uint64, error) { } func (s *TiKVStore) runSafePointChecker() { - d := gcSafePointUpdateInterval + d := config.GcSafePointUpdateInterval for { select { case spCachedTime := <-time.After(d): - cachedSafePoint, err := loadSafePoint(s.spkv, GcSavedSafePoint) + cachedSafePoint, err := loadSafePoint(s.spkv, config.GcSavedSafePoint) if err == nil { metrics.LoadSafepointCounter.WithLabelValues("ok").Inc() s.spMutex.Lock() s.safePoint, s.spTime = cachedSafePoint, spCachedTime s.spMutex.Unlock() - d = gcSafePointUpdateInterval + d = config.GcSafePointUpdateInterval } else { metrics.LoadSafepointCounter.WithLabelValues("fail").Inc() log.Errorf("fail to load safepoint from pd: %v", err) - d = gcSafePointQuickRepeatInterval + d = config.GcSafePointQuickRepeatInterval } case <-s.Closed(): return @@ -211,7 +208,7 @@ func (s *TiKVStore) CheckVisibility(startTS uint64) error { s.spMutex.RUnlock() diff := time.Since(cachedTime) - if diff > (GcSafePointCacheInterval - gcCPUTimeInaccuracyBound) { + if diff > (config.GcSafePointCacheInterval - config.GcCPUTimeInaccuracyBound) { return errors.WithStack(ErrPDServerTimeout) } diff --git a/txnkv/store/txn_committer.go b/txnkv/store/txn_committer.go index d40c352fd5..fdf1da47bc 100644 --- a/txnkv/store/txn_committer.go +++ b/txnkv/store/txn_committer.go @@ -143,15 +143,15 @@ func txnLockTTL(startTime time.Time, txnSize int) uint64 { // The formula is `ttl = ttlFactor * sqrt(sizeInMiB)`. // When writeSize is less than 256KB, the base ttl is defaultTTL (3s); // When writeSize is 1MiB, 100MiB, or 400MiB, ttl is 6s, 60s, 120s correspondingly; - lockTTL := defaultLockTTL - if txnSize >= txnCommitBatchSize { + lockTTL := config.TxnDefaultLockTTL + if txnSize >= config.TxnCommitBatchSize { sizeMiB := float64(txnSize) / bytesPerMiB - lockTTL = uint64(float64(ttlFactor) * math.Sqrt(sizeMiB)) - if lockTTL < defaultLockTTL { - lockTTL = defaultLockTTL + lockTTL = uint64(float64(config.TxnTTLFactor) * math.Sqrt(sizeMiB)) + if lockTTL < config.TxnDefaultLockTTL { + lockTTL = config.TxnDefaultLockTTL } - if lockTTL > maxLockTTL { - lockTTL = maxLockTTL + if lockTTL > config.TxnMaxLockTTL { + lockTTL = config.TxnMaxLockTTL } } @@ -183,10 +183,10 @@ func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction, atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups))) } // Make sure the group that contains primary key goes first. - batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize) + batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, config.TxnCommitBatchSize) delete(groups, firstRegion) for id, g := range groups { - batches = appendBatchBySize(batches, id, g, sizeFunc, txnCommitBatchSize) + batches = appendBatchBySize(batches, id, g, sizeFunc, config.TxnCommitBatchSize) } firstIsPrimary := bytes.Equal(keys[0], c.primary()) @@ -318,7 +318,7 @@ func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys) }, } for { - resp, err := c.store.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort) + resp, err := c.store.SendReq(bo, req, batch.region, config.ReadTimeoutShort) if err != nil { return err } @@ -399,7 +399,7 @@ func (c *TxnCommitter) commitSingleBatch(bo *retry.Backoffer, batch batchKeys) e req.Context.Priority = c.Priority sender := rpc.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetRPCClient()) - resp, err := sender.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, batch.region, config.ReadTimeoutShort) // If we fail to receive response for the request that commits primary key, it will be undetermined whether this // transaction has been successfully committed. @@ -470,7 +470,7 @@ func (c *TxnCommitter) cleanupSingleBatch(bo *retry.Backoffer, batch batchKeys) SyncLog: c.SyncLog, }, } - resp, err := c.store.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort) + resp, err := c.store.SendReq(bo, req, batch.region, config.ReadTimeoutShort) if err != nil { return err }