From 292bd56404b5e620d6855eeecb775e3141a107a1 Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 18 Apr 2019 17:29:22 +0800 Subject: [PATCH 1/4] config: cleanup and pass config as parameter Signed-off-by: disksing --- config/config.go | 197 +++------------------------------- config/raw.go | 35 ++++++ config/regioncache.go | 30 ++++++ config/rpc.go | 168 +++++++++++++++++++++++++++++ config/txn.go | 125 +++++++++++++++++++++ examples/rawkv/rawkv.go | 2 +- examples/txnkv/txnkv.go | 2 +- locate/region_cache.go | 12 ++- rawkv/rawkv.go | 28 ++--- rawkv/rawkv_test.go | 6 +- rpc/client.go | 70 ++++++------ txnkv/client.go | 4 +- txnkv/kv/buffer_store.go | 7 +- txnkv/kv/buffer_store_test.go | 8 +- txnkv/kv/mem_buffer_test.go | 23 ++-- txnkv/kv/memdb_buffer.go | 16 +-- txnkv/kv/union_store.go | 13 +-- txnkv/kv/union_store_test.go | 5 +- txnkv/latch/latch.go | 17 +-- txnkv/latch/latch_test.go | 10 +- txnkv/latch/scheduler.go | 10 +- txnkv/latch/scheduler_test.go | 5 +- txnkv/oracle/oracles/pd.go | 16 +-- txnkv/store/delete_range.go | 5 +- txnkv/store/lock_resolver.go | 18 ++-- txnkv/store/safepoint.go | 5 +- txnkv/store/scan.go | 8 +- txnkv/store/snapshot.go | 18 ++-- txnkv/store/split_region.go | 4 +- txnkv/store/store.go | 41 ++++--- txnkv/store/txn_committer.go | 39 +++---- txnkv/txn.go | 2 +- 32 files changed, 595 insertions(+), 354 deletions(-) create mode 100644 config/raw.go create mode 100644 config/regioncache.go create mode 100644 config/rpc.go create mode 100644 config/txn.go diff --git a/config/config.go b/config/config.go index 34331dc0eb..88027489fa 100644 --- a/config/config.go +++ b/config/config.go @@ -13,191 +13,20 @@ package config -import ( - "crypto/tls" - "crypto/x509" - "io/ioutil" - "time" - - "github.com/pkg/errors" -) - -// Security is SSL configuration. -type Security struct { - SSLCA string `toml:"ssl-ca" json:"ssl-ca"` - SSLCert string `toml:"ssl-cert" json:"ssl-cert"` - SSLKey string `toml:"ssl-key" json:"ssl-key"` +// Config contains configurations for tikv client. +type Config struct { + RPC RPC + Raw Raw + Txn Txn + RegionCache RegionCache } -// ToTLSConfig generates tls's config based on security section of the config. -func (s *Security) ToTLSConfig() (*tls.Config, error) { - var tlsConfig *tls.Config - if len(s.SSLCA) != 0 { - var certificates = make([]tls.Certificate, 0) - if len(s.SSLCert) != 0 && len(s.SSLKey) != 0 { - // Load the client certificates from disk - certificate, err := tls.LoadX509KeyPair(s.SSLCert, s.SSLKey) - if err != nil { - return nil, errors.Errorf("could not load client key pair: %s", err) - } - certificates = append(certificates, certificate) - } - - // Create a certificate pool from the certificate authority - certPool := x509.NewCertPool() - ca, err := ioutil.ReadFile(s.SSLCA) - if err != nil { - return nil, errors.Errorf("could not read ca certificate: %s", err) - } - - // Append the certificates from the CA - if !certPool.AppendCertsFromPEM(ca) { - return nil, errors.New("failed to append ca certs") - } - - tlsConfig = &tls.Config{ - Certificates: certificates, - RootCAs: certPool, - } +// Default returns the default config. +func Default() Config { + return Config{ + RPC: DefaultRPC(), + Raw: DefaultRaw(), + Txn: DefaultTxn(), + RegionCache: DefaultRegionCache(), } - - return tlsConfig, nil } - -// EnableOpenTracing is the flag to enable open tracing. -var EnableOpenTracing = false - -var ( - // OverloadThreshold is a threshold of TiKV load. - // If TiKV load is greater than this, TiDB will wait for a while to avoid little batch. - OverloadThreshold uint = 200 - // BatchWaitSize is the max wait size for batch. - BatchWaitSize uint = 8 - // MaxBatchSize is the max batch size when calling batch commands API. - MaxBatchSize uint = 128 - // MaxBatchWaitTime in nanosecond is the max wait time for batch. - MaxBatchWaitTime time.Duration -) - -// Those limits are enforced to make sure the transaction can be well handled by TiKV. -var ( - // TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)). - TxnEntrySizeLimit = 6 * 1024 * 1024 - // TxnEntryCountLimit is a limit of the number of entries in the MemBuffer. - TxnEntryCountLimit uint64 = 300 * 1000 - // TxnTotalSizeLimit is limit of the sum of all entry size. - TxnTotalSizeLimit = 100 * 1024 * 1024 - // MaxTxnTimeUse is the max time a transaction can run. - MaxTxnTimeUse = 590 -) - -// Local latches for transactions. Enable it when -// there are lots of conflicts between transactions. -var ( - EnableTxnLocalLatch = false - TxnLocalLatchCapacity uint = 2048000 -) - -// RegionCache configurations. -var ( - RegionCacheBTreeDegree = 32 - RegionCacheTTL = time.Minute * 10 -) - -// RawKV configurations. -var ( - // MaxRawKVScanLimit is the maximum scan limit for rawkv Scan. - MaxRawKVScanLimit = 10240 - // RawBatchPutSize is the maximum size limit for rawkv each batch put request. - RawBatchPutSize = 16 * 1024 - // RawBatchPairCount is the maximum limit for rawkv each batch get/delete request. - RawBatchPairCount = 512 -) - -// RPC configurations. -var ( - // MaxConnectionCount is the max gRPC connections that will be established with - // each tikv-server. - MaxConnectionCount uint = 16 - - // GrpcKeepAliveTime is the duration of time after which if the client doesn't see - // any activity it pings the server to see if the transport is still alive. - GrpcKeepAliveTime = time.Duration(10) * time.Second - - // GrpcKeepAliveTimeout is the duration of time for which the client waits after having - // pinged for keepalive check and if no activity is seen even after that the connection - // is closed. - GrpcKeepAliveTimeout = time.Duration(3) * time.Second - - // MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than - // current value, an error will be reported from gRPC. - MaxSendMsgSize = 1<<31 - 1 - - // MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than - // current value, an error will be reported from gRPC. - MaxCallMsgSize = 1<<31 - 1 - - DialTimeout = 5 * time.Second - ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values. - ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region. - ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times. - GCTimeout = 5 * time.Minute - UnsafeDestroyRangeTimeout = 5 * time.Minute - - GrpcInitialWindowSize = 1 << 30 - GrpcInitialConnWindowSize = 1 << 30 -) - -// KV configurations. -var ( - // DefaultTxnMembufCap is the default transaction membuf capability. - DefaultTxnMembufCap = 4 * 1024 -) - -// Latch configurations. -var ( - LatchExpireDuration = 2 * time.Minute - LatchCheckInterval = 1 * time.Minute - LatchCheckCounter = 50000 - LatchListCount = 5 - LatchLockChanSize = 100 -) - -// Oracle configurations. -var ( - TsoSlowThreshold = 30 * time.Millisecond - // update oracle's lastTS every 2000ms. - OracleUpdateInterval = 2000 -) - -// Txn configurations. -var ( - // TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's - // Key+Value size below 16KB. - TxnCommitBatchSize = 16 * 1024 - - // By default, locks after 3000ms is considered unusual (the client created the - // lock might be dead). Other client may cleanup this kind of lock. - // For locks created recently, we will do backoff and retry. - TxnDefaultLockTTL uint64 = 3000 - // TODO: Consider if it's appropriate. - TxnMaxLockTTL uint64 = 120000 - // ttl = ttlFactor * sqrt(writeSizeInMiB) - TxnTTLFactor = 6000 - // TxnResolvedCacheSize is max number of cached txn status. - TxnResolvedCacheSize = 2048 - - // SafePoint. - // This is almost the same as 'tikv_gc_safe_point' in the table 'mysql.tidb', - // save this to pd instead of tikv, because we can't use interface of table - // if the safepoint on tidb is expired. - GcSavedSafePoint = "/tidb/store/gcworker/saved_safe_point" - - GcSafePointCacheInterval = time.Second * 100 - GcCPUTimeInaccuracyBound = time.Second - GcSafePointUpdateInterval = time.Second * 10 - GcSafePointQuickRepeatInterval = time.Second - - TxnScanBatchSize = 256 - TxnBatchGetSize = 5120 -) diff --git a/config/raw.go b/config/raw.go new file mode 100644 index 0000000000..9b613db1e9 --- /dev/null +++ b/config/raw.go @@ -0,0 +1,35 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +// Raw is rawkv configurations. +type Raw struct { + // MaxScanLimit is the maximum scan limit for rawkv Scan. + MaxScanLimit int + + // MaxBatchPutSize is the maximum size limit for rawkv each batch put request. + MaxBatchPutSize int + + // BatchPairCount is the maximum limit for rawkv each batch get/delete request. + BatchPairCount int +} + +// DefaultRaw returns default rawkv configuration. +func DefaultRaw() Raw { + return Raw{ + MaxScanLimit: 10240, + MaxBatchPutSize: 16 * 1024, + BatchPairCount: 512, + } +} diff --git a/config/regioncache.go b/config/regioncache.go new file mode 100644 index 0000000000..81b60b06ae --- /dev/null +++ b/config/regioncache.go @@ -0,0 +1,30 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import "time" + +// RegionCache contains the configurations for region cache. +type RegionCache struct { + BTreeDegree int + CacheTTL time.Duration +} + +// DefaultRegionCache returns the default region cache config. +func DefaultRegionCache() RegionCache { + return RegionCache{ + BTreeDegree: 32, + CacheTTL: 10 * time.Minute, + } +} diff --git a/config/rpc.go b/config/rpc.go new file mode 100644 index 0000000000..7b0846c969 --- /dev/null +++ b/config/rpc.go @@ -0,0 +1,168 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + "time" + + "github.com/pkg/errors" +) + +// RPC configurations. +type RPC struct { + // MaxConnectionCount is the max gRPC connections that will be established with + // each tikv-server. + MaxConnectionCount uint + + // GrpcKeepAliveTime is the duration of time after which if the client doesn't see + // any activity it pings the server to see if the transport is still alive. + GrpcKeepAliveTime time.Duration + + // GrpcKeepAliveTimeout is the duration of time for which the client waits after having + // pinged for keepalive check and if no activity is seen even after that the connection + // is closed. + GrpcKeepAliveTimeout time.Duration + + // MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than + // current value, an error will be reported from gRPC. + MaxSendMsgSize int + + // MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than + // current value, an error will be reported from gRPC. + MaxCallMsgSize int + + // The value for initial window size on a gRPC stream. + GrpcInitialWindowSize int + + // The value for initial windows size on a gRPC connection. + GrpcInitialConnWindowSize int32 + + // The max time to establish a gRPC connection. + DialTimeout time.Duration + + // For requests that read/write several key-values. + ReadTimeoutShort time.Duration + + // For requests that may need scan region. + ReadTimeoutMedium time.Duration + + // For requests that may need scan region multiple times. + ReadTimeoutLong time.Duration + + // The flag to enable open tracing. + EnableOpenTracing bool + + // Batch system configurations. + Batch Batch + + Security Security +} + +// DefaultRPC returns the default RPC config. +func DefaultRPC() RPC { + return RPC{ + MaxConnectionCount: 16, + GrpcKeepAliveTime: 10 * time.Second, + GrpcKeepAliveTimeout: 3 * time.Second, + MaxSendMsgSize: 1<<31 - 1, + MaxCallMsgSize: 1<<31 - 1, + GrpcInitialWindowSize: 1 << 30, + GrpcInitialConnWindowSize: 1 << 30, + DialTimeout: 5 * time.Second, + ReadTimeoutShort: 20 * time.Second, + ReadTimeoutMedium: 60 * time.Second, + ReadTimeoutLong: 150 * time.Second, + EnableOpenTracing: false, + + Batch: DefaultBatch(), + Security: DefaultSecurity(), + } +} + +// Batch contains configurations for message batch. +type Batch struct { + // MaxBatchSize is the max batch size when calling batch commands API. Set 0 to + // turn off message batch. + MaxBatchSize uint + + // OverloadThreshold is a threshold of TiKV load. If TiKV load is greater than + // this, TiDB will wait for a while to avoid little batch. + OverloadThreshold uint + + // MaxWaitSize is the max wait size for batch. + MaxWaitSize uint + + // MaxWaitTime is the max wait time for batch. + MaxWaitTime time.Duration +} + +// DefaultBatch returns the default Batch config. +func DefaultBatch() Batch { + return Batch{ + MaxBatchSize: 0, + OverloadThreshold: 200, + MaxWaitSize: 8, + MaxWaitTime: 0, + } +} + +// Security is SSL configuration. +type Security struct { + SSLCA string `toml:"ssl-ca" json:"ssl-ca"` + SSLCert string `toml:"ssl-cert" json:"ssl-cert"` + SSLKey string `toml:"ssl-key" json:"ssl-key"` +} + +// ToTLSConfig generates tls's config based on security section of the config. +func (s *Security) ToTLSConfig() (*tls.Config, error) { + var tlsConfig *tls.Config + if len(s.SSLCA) != 0 { + var certificates = make([]tls.Certificate, 0) + if len(s.SSLCert) != 0 && len(s.SSLKey) != 0 { + // Load the client certificates from disk + certificate, err := tls.LoadX509KeyPair(s.SSLCert, s.SSLKey) + if err != nil { + return nil, errors.Errorf("could not load client key pair: %s", err) + } + certificates = append(certificates, certificate) + } + + // Create a certificate pool from the certificate authority + certPool := x509.NewCertPool() + ca, err := ioutil.ReadFile(s.SSLCA) + if err != nil { + return nil, errors.Errorf("could not read ca certificate: %s", err) + } + + // Append the certificates from the CA + if !certPool.AppendCertsFromPEM(ca) { + return nil, errors.New("failed to append ca certs") + } + + tlsConfig = &tls.Config{ + Certificates: certificates, + RootCAs: certPool, + } + } + + return tlsConfig, nil +} + +// DefaultSecurity returns the default Security config. +func DefaultSecurity() Security { + return Security{} +} diff --git a/config/txn.go b/config/txn.go new file mode 100644 index 0000000000..fc7c01a3a0 --- /dev/null +++ b/config/txn.go @@ -0,0 +1,125 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import "time" + +// Txn contains the configurations for transactional kv. +type Txn struct { + // EntrySizeLimit is limit of single entry size (len(key) + len(value)). + EntrySizeLimit int + + // EntryCountLimit is a limit of the number of entries in the MemBuffer. + EntryCountLimit int + + // TotalSizeLimit is limit of the sum of all entry size. + TotalSizeLimit int + + // MaxTimeUse is the max time a transaction can run. + MaxTimeUse int + + // DefaultMembufCap is the default transaction membuf capability. + DefaultMembufCap int + + // TiKV recommends each RPC packet should be less than ~1MB. We keep each + // packet's Key+Value size below 16KB by default. + CommitBatchSize int + + // ScanBatchSize is the limit of an iterator's scan request. + ScanBatchSize int + + // BatchGetSize is the max number of keys in a BatchGet request. + BatchGetSize int + + // By default, locks after 3000ms is considered unusual (the client created the + // lock might be dead). Other client may cleanup this kind of lock. + // For locks created recently, we will do backoff and retry. + DefaultLockTTL uint64 + + // The maximum value of a txn's lock TTL. + MaxLockTTL uint64 + + // ttl = ttlFactor * sqrt(writeSizeInMiB) + TTLFactor int + + // ResolveCacheSize is max number of cached txn status. + ResolveCacheSize int + + GcSavedSafePoint string + GcSafePointCacheInterval time.Duration + GcCPUTimeInaccuracyBound time.Duration + GcSafePointUpdateInterval time.Duration + GcSafePointQuickRepeatInterval time.Duration + + GCTimeout time.Duration + UnsafeDestroyRangeTimeout time.Duration + + TsoSlowThreshold time.Duration + OracleUpdateInterval time.Duration + + Latch Latch +} + +// DefaultTxn returns the default txn config. +func DefaultTxn() Txn { + return Txn{ + EntrySizeLimit: 6 * 1024 * 1024, + EntryCountLimit: 300 * 1000, + TotalSizeLimit: 100 * 1024 * 1024, + MaxTimeUse: 590, + DefaultMembufCap: 4 * 1024, + CommitBatchSize: 16 * 1024, + ScanBatchSize: 256, + BatchGetSize: 5120, + DefaultLockTTL: 3000, + MaxLockTTL: 120000, + TTLFactor: 6000, + ResolveCacheSize: 2048, + GcSavedSafePoint: "/tidb/store/gcworker/saved_safe_point", + GcSafePointCacheInterval: time.Second * 100, + GcCPUTimeInaccuracyBound: time.Second, + GcSafePointUpdateInterval: time.Second * 10, + GcSafePointQuickRepeatInterval: time.Second, + GCTimeout: 5 * time.Minute, + UnsafeDestroyRangeTimeout: 5 * time.Minute, + TsoSlowThreshold: 30 * time.Millisecond, + OracleUpdateInterval: 2 * time.Second, + Latch: DefaultLatch(), + } +} + +// Latch is the configuration for local latch. +type Latch struct { + // Enable it when there are lots of conflicts between transactions. + Enable bool + Capacity uint + ExpireDuration time.Duration + CheckInterval time.Duration + CheckCounter int + ListCount int + LockChanSize int +} + +// DefaultLatch returns the default Latch config. +func DefaultLatch() Latch { + return Latch{ + Enable: false, + Capacity: 2048000, + ExpireDuration: 2 * time.Minute, + CheckInterval: time.Minute, + CheckCounter: 50000, + ListCount: 5, + LockChanSize: 100, + } +} diff --git a/examples/rawkv/rawkv.go b/examples/rawkv/rawkv.go index 018a2fb0d3..2ea33c1488 100644 --- a/examples/rawkv/rawkv.go +++ b/examples/rawkv/rawkv.go @@ -21,7 +21,7 @@ import ( ) func main() { - cli, err := rawkv.NewClient([]string{"127.0.0.1:2379"}, config.Security{}) + cli, err := rawkv.NewClient([]string{"127.0.0.1:2379"}, config.Default()) if err != nil { panic(err) } diff --git a/examples/txnkv/txnkv.go b/examples/txnkv/txnkv.go index d32765e880..d3169fb7ef 100644 --- a/examples/txnkv/txnkv.go +++ b/examples/txnkv/txnkv.go @@ -41,7 +41,7 @@ var ( // Init initializes information. func initStore() { var err error - client, err = txnkv.NewClient([]string{*pdAddr}, config.Security{}) + client, err = txnkv.NewClient([]string{*pdAddr}, config.Default()) if err != nil { panic(err) } diff --git a/locate/region_cache.go b/locate/region_cache.go index e932fbb2b0..5a0df79129 100644 --- a/locate/region_cache.go +++ b/locate/region_cache.go @@ -38,14 +38,15 @@ type CachedRegion struct { lastAccess int64 } -func (c *CachedRegion) isValid() bool { +func (c *CachedRegion) isValid(ttl time.Duration) bool { lastAccess := atomic.LoadInt64(&c.lastAccess) lastAccessTime := time.Unix(lastAccess, 0) - return time.Since(lastAccessTime) < config.RegionCacheTTL + return time.Since(lastAccessTime) < ttl } // RegionCache caches Regions loaded from PD. type RegionCache struct { + conf *config.RegionCache pdClient pd.Client mu struct { @@ -60,12 +61,13 @@ type RegionCache struct { } // NewRegionCache creates a RegionCache. -func NewRegionCache(pdClient pd.Client) *RegionCache { +func NewRegionCache(pdClient pd.Client, conf *config.RegionCache) *RegionCache { c := &RegionCache{ + conf: conf, pdClient: pdClient, } c.mu.regions = make(map[RegionVerID]*CachedRegion) - c.mu.sorted = btree.New(config.RegionCacheBTreeDegree) + c.mu.sorted = btree.New(conf.BTreeDegree) c.storeMu.stores = make(map[uint64]*Store) return c } @@ -279,7 +281,7 @@ func (c *RegionCache) getCachedRegion(id RegionVerID) *Region { if !ok { return nil } - if cachedRegion.isValid() { + if cachedRegion.isValid(c.conf.CacheTTL) { atomic.StoreInt64(&cachedRegion.lastAccess, time.Now().Unix()) return cachedRegion.region } diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index e9150cb707..8426a59851 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -37,26 +37,28 @@ var ( // only GET/PUT/DELETE commands are supported. type Client struct { clusterID uint64 + conf *config.Config regionCache *locate.RegionCache pdClient pd.Client rpcClient rpc.Client } // NewClient creates a client with PD cluster addrs. -func NewClient(pdAddrs []string, security config.Security) (*Client, error) { +func NewClient(pdAddrs []string, conf config.Config) (*Client, error) { pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{ - CAPath: security.SSLCA, - CertPath: security.SSLCert, - KeyPath: security.SSLKey, + CAPath: conf.RPC.Security.SSLCA, + CertPath: conf.RPC.Security.SSLCert, + KeyPath: conf.RPC.Security.SSLKey, }) if err != nil { return nil, err } return &Client{ clusterID: pdCli.GetClusterID(context.TODO()), - regionCache: locate.NewRegionCache(pdCli), + conf: &conf, + regionCache: locate.NewRegionCache(pdCli, &conf.RegionCache), pdClient: pdCli, - rpcClient: rpc.NewRPCClient(security), + rpcClient: rpc.NewRPCClient(&conf.RPC), }, nil } @@ -269,7 +271,7 @@ func (c *Client) Scan(startKey, endKey []byte, limit int) (keys [][]byte, values start := time.Now() defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }() - if limit > config.MaxRawKVScanLimit { + if limit > c.conf.Raw.MaxScanLimit { return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded) } @@ -310,7 +312,7 @@ func (c *Client) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *locate.K if err != nil { return nil, nil, err } - resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, loc.Region, c.conf.RPC.ReadTimeoutShort) if err != nil { return nil, nil, err } @@ -337,7 +339,7 @@ func (c *Client) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType rpc.Cm var batches []batch for regionID, groupKeys := range groups { - batches = appendKeyBatches(batches, regionID, groupKeys, config.RawBatchPairCount) + batches = appendKeyBatches(batches, regionID, groupKeys, c.conf.Raw.BatchPairCount) } bo, cancel := bo.Fork() ches := make(chan singleBatchResp, len(batches)) @@ -396,7 +398,7 @@ func (c *Client) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.CmdTyp } sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient) - resp, err := sender.SendReq(bo, req, batch.regionID, config.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, batch.regionID, c.conf.RPC.ReadTimeoutShort) batchResp := singleBatchResp{} if err != nil { @@ -464,7 +466,7 @@ func (c *Client) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.Respon }, } - resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, loc.Region, c.conf.RPC.ReadTimeoutShort) if err != nil { return nil, nil, err } @@ -495,7 +497,7 @@ func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) error var batches []batch // split the keys by size and RegionVerID for regionID, groupKeys := range groups { - batches = appendBatches(batches, regionID, groupKeys, keyToValue, config.RawBatchPutSize) + batches = appendBatches(batches, regionID, groupKeys, keyToValue, c.conf.Raw.MaxBatchPutSize) } bo, cancel := bo.Fork() ch := make(chan error, len(batches)) @@ -574,7 +576,7 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch batch) error { } sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient) - resp, err := sender.SendReq(bo, req, batch.regionID, config.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, batch.regionID, c.conf.RPC.ReadTimeoutShort) if err != nil { return err } diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index af3f4770e7..a7e2dc77c9 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -43,9 +43,11 @@ func (s *testRawKVSuite) SetUpTest(c *C) { mocktikv.BootstrapWithSingleStore(s.cluster) pdClient := mocktikv.NewPDClient(s.cluster) mvccStore := mocktikv.MustNewMVCCStore() + conf := config.Default() s.client = &Client{ + conf: &conf, clusterID: 0, - regionCache: locate.NewRegionCache(pdClient), + regionCache: locate.NewRegionCache(pdClient, &conf.RegionCache), pdClient: pdClient, rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore), } @@ -179,7 +181,7 @@ func (s *testRawKVSuite) TestRawBatch(c *C) { size := 0 var testKeys [][]byte var testValues [][]byte - for i := 0; size/config.RawBatchPutSize < 4; i++ { + for i := 0; size/s.client.conf.Raw.MaxBatchPutSize < 4; i++ { key := fmt.Sprint("key", i) size += len(key) testKeys = append(testKeys, []byte(key)) diff --git a/rpc/client.go b/rpc/client.go index a3fa70658b..444a398767 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -47,6 +47,7 @@ type Client interface { } type connArray struct { + conf *config.RPC index uint32 v []*grpc.ClientConn // Bind with a background goroutine to process coprocessor streaming timeout. @@ -59,6 +60,7 @@ type connArray struct { } type batchCommandsClient struct { + conf *config.Batch conn *grpc.ClientConn client tikvpb.Tikv_BatchCommandsClient batched sync.Map @@ -142,32 +144,33 @@ func (c *batchCommandsClient) batchRecvLoop() { } transportLayerLoad := resp.GetTransportLayerLoad() - if transportLayerLoad > 0.0 && config.MaxBatchWaitTime > 0 { + if transportLayerLoad > 0.0 && c.conf.MaxWaitTime > 0 { // We need to consider TiKV load only if batch-wait strategy is enabled. atomic.StoreUint64(c.transportLayerLoad, transportLayerLoad) } } } -func newConnArray(maxSize uint, addr string, security config.Security) (*connArray, error) { +func newConnArray(addr string, conf *config.RPC) (*connArray, error) { a := &connArray{ + conf: conf, index: 0, - v: make([]*grpc.ClientConn, maxSize), + v: make([]*grpc.ClientConn, conf.MaxCallMsgSize), streamTimeout: make(chan *Lease, 1024), - batchCommandsCh: make(chan *batchCommandsEntry, config.MaxBatchSize), - batchCommandsClients: make([]*batchCommandsClient, 0, maxSize), + batchCommandsCh: make(chan *batchCommandsEntry, conf.Batch.MaxBatchSize), + batchCommandsClients: make([]*batchCommandsClient, 0, conf.MaxCallMsgSize), transportLayerLoad: 0, } - if err := a.Init(addr, security); err != nil { + if err := a.Init(addr); err != nil { return nil, err } return a, nil } -func (a *connArray) Init(addr string, security config.Security) error { +func (a *connArray) Init(addr string) error { opt := grpc.WithInsecure() - if len(security.SSLCA) != 0 { - tlsConfig, err := security.ToTLSConfig() + if len(a.conf.Security.SSLCA) != 0 { + tlsConfig, err := a.conf.Security.ToTLSConfig() if err != nil { return err } @@ -176,7 +179,7 @@ func (a *connArray) Init(addr string, security config.Security) error { unaryInterceptor := grpc_prometheus.UnaryClientInterceptor streamInterceptor := grpc_prometheus.StreamClientInterceptor - if config.EnableOpenTracing { + if a.conf.EnableOpenTracing { unaryInterceptor = grpc_middleware.ChainUnaryClient( unaryInterceptor, grpc_opentracing.UnaryClientInterceptor(), @@ -187,23 +190,23 @@ func (a *connArray) Init(addr string, security config.Security) error { ) } - allowBatch := config.MaxBatchSize > 0 + allowBatch := a.conf.Batch.MaxBatchSize > 0 for i := range a.v { - ctx, cancel := context.WithTimeout(context.Background(), config.DialTimeout) + ctx, cancel := context.WithTimeout(context.Background(), a.conf.DialTimeout) conn, err := grpc.DialContext( ctx, addr, opt, - grpc.WithInitialWindowSize(int32(config.GrpcInitialWindowSize)), - grpc.WithInitialConnWindowSize(int32(config.GrpcInitialConnWindowSize)), + grpc.WithInitialWindowSize(int32(a.conf.GrpcInitialWindowSize)), + grpc.WithInitialConnWindowSize(int32(a.conf.GrpcInitialConnWindowSize)), grpc.WithUnaryInterceptor(unaryInterceptor), grpc.WithStreamInterceptor(streamInterceptor), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(config.MaxCallMsgSize)), - grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(config.MaxSendMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(a.conf.MaxCallMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(a.conf.MaxSendMsgSize)), grpc.WithBackoffMaxDelay(time.Second*3), grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: config.GrpcKeepAliveTime, - Timeout: config.GrpcKeepAliveTimeout, + Time: a.conf.GrpcKeepAliveTime, + Timeout: a.conf.GrpcKeepAliveTimeout, PermitWithoutStream: true, }), ) @@ -224,6 +227,7 @@ func (a *connArray) Init(addr string, security config.Security) error { return errors.WithStack(err) } batchClient := &batchCommandsClient{ + conf: &a.conf.Batch, conn: conn, client: streamClient, batched: sync.Map{}, @@ -356,9 +360,11 @@ func (a *connArray) batchSendLoop() { } }() - entries := make([]*batchCommandsEntry, 0, config.MaxBatchSize) - requests := make([]*tikvpb.BatchCommandsRequest_Request, 0, config.MaxBatchSize) - requestIDs := make([]uint64, 0, config.MaxBatchSize) + conf := &a.conf.Batch + + entries := make([]*batchCommandsEntry, 0, conf.MaxBatchSize) + requests := make([]*tikvpb.BatchCommandsRequest_Request, 0, conf.MaxBatchSize) + requestIDs := make([]uint64, 0, conf.MaxBatchSize) for { // Choose a connection by round-robbin. @@ -370,15 +376,15 @@ func (a *connArray) batchSendLoop() { requestIDs = requestIDs[:0] metrics.PendingBatchRequests.Set(float64(len(a.batchCommandsCh))) - fetchAllPendingRequests(a.batchCommandsCh, int(config.MaxBatchSize), &entries, &requests) + fetchAllPendingRequests(a.batchCommandsCh, int(conf.MaxBatchSize), &entries, &requests) - if len(entries) < int(config.MaxBatchSize) && config.MaxBatchWaitTime > 0 { + if len(entries) < int(conf.MaxBatchSize) && conf.MaxWaitTime > 0 { transportLayerLoad := atomic.LoadUint64(batchCommandsClient.transportLayerLoad) // If the target TiKV is overload, wait a while to collect more requests. - if uint(transportLayerLoad) >= config.OverloadThreshold { + if uint(transportLayerLoad) >= conf.OverloadThreshold { fetchMorePendingRequests( - a.batchCommandsCh, int(config.MaxBatchSize), int(config.BatchWaitSize), - config.MaxBatchWaitTime, &entries, &requests, + a.batchCommandsCh, int(conf.MaxBatchSize), int(conf.MaxWaitSize), + conf.MaxWaitTime, &entries, &requests, ) } } @@ -420,14 +426,14 @@ type rpcClient struct { sync.RWMutex isClosed bool conns map[string]*connArray - security config.Security + conf *config.RPC } // NewRPCClient manages connections and rpc calls with tikv-servers. -func NewRPCClient(security config.Security) Client { +func NewRPCClient(conf *config.RPC) Client { return &rpcClient{ - conns: make(map[string]*connArray), - security: security, + conns: make(map[string]*connArray), + conf: conf, } } @@ -455,7 +461,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) { array, ok := c.conns[addr] if !ok { var err error - array, err = newConnArray(config.MaxConnectionCount, addr, c.security) + array, err = newConnArray(addr, c.conf) if err != nil { return nil, err } @@ -526,7 +532,7 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *Request, return nil, err } - if config.MaxBatchSize > 0 { + if c.conf.Batch.MaxBatchSize > 0 { if batchReq := req.ToBatchCommandsRequest(); batchReq != nil { return sendBatchRequest(ctx, addr, connArray, batchReq, timeout) } diff --git a/txnkv/client.go b/txnkv/client.go index 50fc37c983..84e037cf67 100644 --- a/txnkv/client.go +++ b/txnkv/client.go @@ -27,8 +27,8 @@ type Client struct { } // NewClient creates a client with PD addresses. -func NewClient(pdAddrs []string, security config.Security) (*Client, error) { - tikvStore, err := store.NewStore(pdAddrs, security) +func NewClient(pdAddrs []string, config config.Config) (*Client, error) { + tikvStore, err := store.NewStore(pdAddrs, config) if err != nil { return nil, err } diff --git a/txnkv/kv/buffer_store.go b/txnkv/kv/buffer_store.go index dd178140b6..d9043d3f73 100644 --- a/txnkv/kv/buffer_store.go +++ b/txnkv/kv/buffer_store.go @@ -31,13 +31,10 @@ type BufferStore struct { } // NewBufferStore creates a BufferStore using r for read. -func NewBufferStore(r Retriever, cap int) *BufferStore { - if cap <= 0 { - cap = config.DefaultTxnMembufCap - } +func NewBufferStore(r Retriever, conf *config.Txn) *BufferStore { return &BufferStore{ r: r, - MemBuffer: &lazyMemBuffer{cap: cap}, + MemBuffer: &lazyMemBuffer{conf: conf}, } } diff --git a/txnkv/kv/buffer_store_test.go b/txnkv/kv/buffer_store_test.go index fe683e1fcb..06f2aea7b1 100644 --- a/txnkv/kv/buffer_store_test.go +++ b/txnkv/kv/buffer_store_test.go @@ -32,7 +32,8 @@ type testBufferStoreSuite struct{} var _ = Suite(testBufferStoreSuite{}) func (s testBufferStoreSuite) TestGetSet(c *C) { - bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(config.DefaultTxnMembufCap)}, config.DefaultTxnMembufCap) + conf := config.DefaultTxn() + bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(&conf, 0)}, &conf) key := key.Key("key") _, err := bs.Get(key) c.Check(err, NotNil) @@ -46,7 +47,8 @@ func (s testBufferStoreSuite) TestGetSet(c *C) { } func (s testBufferStoreSuite) TestSaveTo(c *C) { - bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(config.DefaultTxnMembufCap)}, config.DefaultTxnMembufCap) + conf := config.DefaultTxn() + bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(&conf, 0)}, &conf) var buf bytes.Buffer for i := 0; i < 10; i++ { fmt.Fprint(&buf, i) @@ -56,7 +58,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) { } bs.Set(key.Key("novalue"), nil) - mutator := NewMemDbBuffer(config.DefaultTxnMembufCap) + mutator := NewMemDbBuffer(&conf, 0) err := bs.SaveTo(mutator) c.Check(err, IsNil) diff --git a/txnkv/kv/mem_buffer_test.go b/txnkv/kv/mem_buffer_test.go index 1896594766..4f4b3857ae 100644 --- a/txnkv/kv/mem_buffer_test.go +++ b/txnkv/kv/mem_buffer_test.go @@ -37,12 +37,14 @@ type testKVSuite struct { } func (s *testKVSuite) SetUpSuite(c *C) { + conf := config.DefaultTxn() s.bs = make([]MemBuffer, 1) - s.bs[0] = NewMemDbBuffer(config.DefaultTxnMembufCap) + s.bs[0] = NewMemDbBuffer(&conf, 0) } func (s *testKVSuite) ResetMembuffers() { - s.bs[0] = NewMemDbBuffer(config.DefaultTxnMembufCap) + conf := config.DefaultTxn() + s.bs[0] = NewMemDbBuffer(&conf, 0) } func insertData(c *C, buffer MemBuffer) { @@ -185,7 +187,8 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) { } func (s *testKVSuite) TestBufferLimit(c *C) { - buffer := NewMemDbBuffer(config.DefaultTxnMembufCap).(*memDbBuffer) + conf := config.DefaultTxn() + buffer := NewMemDbBuffer(&conf, 0).(*memDbBuffer) buffer.bufferSizeLimit = 1000 buffer.entrySizeLimit = 500 @@ -197,7 +200,7 @@ func (s *testKVSuite) TestBufferLimit(c *C) { err = buffer.Set([]byte("yz"), make([]byte, 499)) c.Assert(err, NotNil) // buffer size limit - buffer = NewMemDbBuffer(config.DefaultTxnMembufCap).(*memDbBuffer) + buffer = NewMemDbBuffer(&conf, 0).(*memDbBuffer) buffer.bufferLenLimit = 10 for i := 0; i < 10; i++ { err = buffer.Set([]byte{byte(i)}, []byte{byte(i)}) @@ -210,35 +213,39 @@ func (s *testKVSuite) TestBufferLimit(c *C) { var opCnt = 100000 func BenchmarkMemDbBufferSequential(b *testing.B) { + conf := config.DefaultTxn() data := make([][]byte, opCnt) for i := 0; i < opCnt; i++ { data[i] = encodeInt(i) } - buffer := NewMemDbBuffer(config.DefaultTxnMembufCap) + buffer := NewMemDbBuffer(&conf, 0) benchmarkSetGet(b, buffer, data) b.ReportAllocs() } func BenchmarkMemDbBufferRandom(b *testing.B) { + conf := config.DefaultTxn() data := make([][]byte, opCnt) for i := 0; i < opCnt; i++ { data[i] = encodeInt(i) } shuffle(data) - buffer := NewMemDbBuffer(config.DefaultTxnMembufCap) + buffer := NewMemDbBuffer(&conf, 0) benchmarkSetGet(b, buffer, data) b.ReportAllocs() } func BenchmarkMemDbIter(b *testing.B) { - buffer := NewMemDbBuffer(config.DefaultTxnMembufCap) + conf := config.DefaultTxn() + buffer := NewMemDbBuffer(&conf, 0) benchIterator(b, buffer) b.ReportAllocs() } func BenchmarkMemDbCreation(b *testing.B) { + conf := config.DefaultTxn() for i := 0; i < b.N; i++ { - NewMemDbBuffer(config.DefaultTxnMembufCap) + NewMemDbBuffer(&conf, 0) } b.ReportAllocs() } diff --git a/txnkv/kv/memdb_buffer.go b/txnkv/kv/memdb_buffer.go index e5116dff24..9c0adad179 100644 --- a/txnkv/kv/memdb_buffer.go +++ b/txnkv/kv/memdb_buffer.go @@ -17,14 +17,13 @@ package kv import ( "fmt" - "sync/atomic" - "github.com/pkg/errors" "github.com/pingcap/goleveldb/leveldb" "github.com/pingcap/goleveldb/leveldb/comparer" "github.com/pingcap/goleveldb/leveldb/iterator" "github.com/pingcap/goleveldb/leveldb/memdb" "github.com/pingcap/goleveldb/leveldb/util" + "github.com/pkg/errors" "github.com/tikv/client-go/config" "github.com/tikv/client-go/key" ) @@ -33,7 +32,7 @@ import ( type memDbBuffer struct { db *memdb.DB entrySizeLimit int - bufferLenLimit uint64 + bufferLenLimit int bufferSizeLimit int } @@ -43,12 +42,15 @@ type memDbIter struct { } // NewMemDbBuffer creates a new memDbBuffer. -func NewMemDbBuffer(cap int) MemBuffer { +func NewMemDbBuffer(conf *config.Txn, cap int) MemBuffer { + if cap == 0 { + cap = conf.DefaultMembufCap + } return &memDbBuffer{ db: memdb.New(comparer.DefaultComparer, cap), - entrySizeLimit: config.TxnEntrySizeLimit, - bufferLenLimit: atomic.LoadUint64(&config.TxnEntryCountLimit), - bufferSizeLimit: config.TxnTotalSizeLimit, + entrySizeLimit: conf.EntrySizeLimit, + bufferLenLimit: conf.EntryCountLimit, + bufferSizeLimit: conf.TotalSizeLimit, } } diff --git a/txnkv/kv/union_store.go b/txnkv/kv/union_store.go index 244282f9cf..4045a42280 100644 --- a/txnkv/kv/union_store.go +++ b/txnkv/kv/union_store.go @@ -72,9 +72,9 @@ type unionStore struct { } // NewUnionStore builds a new UnionStore. -func NewUnionStore(snapshot Snapshot) UnionStore { +func NewUnionStore(conf *config.Txn, snapshot Snapshot) UnionStore { return &unionStore{ - BufferStore: NewBufferStore(snapshot, config.DefaultTxnMembufCap), + BufferStore: NewBufferStore(snapshot, conf), snapshot: snapshot, lazyConditionPairs: make(map[string]*conditionPair), opts: make(map[Option]interface{}), @@ -105,8 +105,9 @@ func (it invalidIterator) Close() {} // lazyMemBuffer wraps a MemBuffer which is to be initialized when it is modified. type lazyMemBuffer struct { - mb MemBuffer - cap int + mb MemBuffer + cap int + conf *config.Txn } func (lmb *lazyMemBuffer) Get(k key.Key) ([]byte, error) { @@ -119,7 +120,7 @@ func (lmb *lazyMemBuffer) Get(k key.Key) ([]byte, error) { func (lmb *lazyMemBuffer) Set(key key.Key, value []byte) error { if lmb.mb == nil { - lmb.mb = NewMemDbBuffer(lmb.cap) + lmb.mb = NewMemDbBuffer(lmb.conf, lmb.cap) } return lmb.mb.Set(key, value) @@ -127,7 +128,7 @@ func (lmb *lazyMemBuffer) Set(key key.Key, value []byte) error { func (lmb *lazyMemBuffer) Delete(k key.Key) error { if lmb.mb == nil { - lmb.mb = NewMemDbBuffer(lmb.cap) + lmb.mb = NewMemDbBuffer(lmb.conf, lmb.cap) } return lmb.mb.Delete(k) diff --git a/txnkv/kv/union_store_test.go b/txnkv/kv/union_store_test.go index 4eb03e1f51..1161a9810a 100644 --- a/txnkv/kv/union_store_test.go +++ b/txnkv/kv/union_store_test.go @@ -27,8 +27,9 @@ type testUnionStoreSuite struct { } func (s *testUnionStoreSuite) SetUpTest(c *C) { - s.store = NewMemDbBuffer(config.DefaultTxnMembufCap) - s.us = NewUnionStore(&mockSnapshot{s.store}) + conf := config.DefaultTxn() + s.store = NewMemDbBuffer(&conf, 0) + s.us = NewUnionStore(&conf, &mockSnapshot{s.store}) } func (s *testUnionStoreSuite) TestGetSet(c *C) { diff --git a/txnkv/latch/latch.go b/txnkv/latch/latch.go index ab021b2aea..d1d946d7bc 100644 --- a/txnkv/latch/latch.go +++ b/txnkv/latch/latch.go @@ -37,6 +37,7 @@ type node struct { // latch stores a key's waiting transactions information. type latch struct { + conf *config.Latch queue *node count int waiting []*Lock @@ -94,6 +95,7 @@ func (l *Lock) SetCommitTS(commitTS uint64) { // Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable, // but conceptually a latch is a queue, and a slot is an index to the queue type Latches struct { + conf *config.Latch slots []latch } @@ -113,10 +115,11 @@ func (s bytesSlice) Less(i, j int) bool { // NewLatches create a Latches with fixed length, // the size will be rounded up to the power of 2. -func NewLatches(size uint) *Latches { - powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1))) +func NewLatches(conf *config.Latch) *Latches { + powerOfTwoSize := 1 << uint32(bits.Len32(uint32(conf.Capacity-1))) slots := make([]latch, powerOfTwoSize) return &Latches{ + conf: conf, slots: slots, } } @@ -228,8 +231,8 @@ func (latches *Latches) acquireSlot(lock *Lock) acquireResult { defer latch.Unlock() // Try to recycle to limit the memory usage. - if latch.count >= config.LatchListCount { - latch.recycle(lock.startTS) + if latch.count >= latches.conf.ListCount { + latch.recycle(lock.startTS, latches.conf.ExpireDuration) } find := findNode(latch.queue, key) @@ -264,12 +267,12 @@ func (latches *Latches) acquireSlot(lock *Lock) acquireResult { } // recycle is not thread safe, the latch should acquire its lock before executing this function. -func (l *latch) recycle(currentTS uint64) int { +func (l *latch) recycle(currentTS uint64, expireDuration time.Duration) int { total := 0 fakeHead := node{next: l.queue} prev := &fakeHead for curr := prev.next; curr != nil; curr = curr.next { - if tsoSub(currentTS, curr.maxCommitTS) >= config.LatchExpireDuration && curr.value == nil { + if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration && curr.value == nil { l.count-- prev.next = curr.next total++ @@ -286,7 +289,7 @@ func (latches *Latches) recycle(currentTS uint64) { for i := 0; i < len(latches.slots); i++ { latch := &latches.slots[i] latch.Lock() - total += latch.recycle(currentTS) + total += latch.recycle(currentTS, latches.conf.ExpireDuration) latch.Unlock() } log.Debugf("recycle run at %v, recycle count = %d...\n", time.Now(), total) diff --git a/txnkv/latch/latch_test.go b/txnkv/latch/latch_test.go index 096db1ec8e..7ec7f7692f 100644 --- a/txnkv/latch/latch_test.go +++ b/txnkv/latch/latch_test.go @@ -36,7 +36,9 @@ type testLatchSuite struct { } func (s *testLatchSuite) SetUpTest(c *C) { - s.latches = NewLatches(256) + conf := config.DefaultLatch() + conf.Capacity = 256 + s.latches = NewLatches(&conf) } func (s *testLatchSuite) newLock(keys [][]byte) (startTS uint64, lock *Lock) { @@ -109,7 +111,9 @@ func (s *testLatchSuite) TestFirstAcquireFailedWithStale(c *C) { } func (s *testLatchSuite) TestRecycle(c *C) { - latches := NewLatches(8) + conf := config.DefaultLatch() + conf.Capacity = 8 + latches := NewLatches(&conf) now := time.Now() startTS := oracle.ComposeTS(oracle.GetPhysical(now), 0) lock := latches.genLock(startTS, [][]byte{ @@ -143,7 +147,7 @@ func (s *testLatchSuite) TestRecycle(c *C) { } c.Assert(allEmpty, IsFalse) - currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(config.LatchExpireDuration)), 3) + currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(conf.ExpireDuration)), 3) latches.recycle(currentTS) for i := 0; i < len(latches.slots); i++ { diff --git a/txnkv/latch/scheduler.go b/txnkv/latch/scheduler.go index 86134951a3..098665b431 100644 --- a/txnkv/latch/scheduler.go +++ b/txnkv/latch/scheduler.go @@ -23,6 +23,7 @@ import ( // LatchesScheduler is used to schedule latches for transactions. type LatchesScheduler struct { + conf *config.Latch latches *Latches unlockCh chan *Lock closed bool @@ -31,10 +32,11 @@ type LatchesScheduler struct { } // NewScheduler create the LatchesScheduler. -func NewScheduler(size uint) *LatchesScheduler { - latches := NewLatches(size) - unlockCh := make(chan *Lock, config.LatchLockChanSize) +func NewScheduler(conf *config.Latch) *LatchesScheduler { + latches := NewLatches(conf) + unlockCh := make(chan *Lock, conf.LockChanSize) scheduler := &LatchesScheduler{ + conf: conf, latches: latches, unlockCh: unlockCh, closed: false, @@ -55,7 +57,7 @@ func (scheduler *LatchesScheduler) run() { if lock.commitTS > lock.startTS { currentTS := lock.commitTS elapsed := tsoSub(currentTS, scheduler.lastRecycleTime) - if elapsed > config.LatchCheckInterval || counter > config.LatchCheckCounter { + if elapsed > scheduler.conf.CheckInterval || counter > scheduler.conf.CheckCounter { go scheduler.latches.recycle(lock.commitTS) scheduler.lastRecycleTime = currentTS counter = 0 diff --git a/txnkv/latch/scheduler_test.go b/txnkv/latch/scheduler_test.go index 30f8a0d21b..f8d70cfff3 100644 --- a/txnkv/latch/scheduler_test.go +++ b/txnkv/latch/scheduler_test.go @@ -20,6 +20,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/tikv/client-go/config" ) var _ = Suite(&testSchedulerSuite{}) @@ -31,7 +32,9 @@ func (s *testSchedulerSuite) SetUpTest(c *C) { } func (s *testSchedulerSuite) TestWithConcurrency(c *C) { - sched := NewScheduler(7) + conf := config.DefaultLatch() + conf.Capacity = 7 + sched := NewScheduler(&conf) defer sched.Close() rand.Seed(time.Now().Unix()) diff --git a/txnkv/oracle/oracles/pd.go b/txnkv/oracle/oracles/pd.go index 26f1bc1396..ad161d8d87 100644 --- a/txnkv/oracle/oracles/pd.go +++ b/txnkv/oracle/oracles/pd.go @@ -29,23 +29,25 @@ var _ oracle.Oracle = &pdOracle{} // pdOracle is an Oracle that uses a placement driver client as source. type pdOracle struct { + conf *config.Txn c pd.Client lastTS uint64 quit chan struct{} } -// NewPdOracle create an Oracle that uses a pd client source. -// Refer https://github.com/pingcap/pd/blob/master/client/client.go for more details. +// NewPdOracle create an Oracle that uses a pd client source. Refer +// https://github.com/pingcap/pd/blob/master/client/client.go for more details. // PdOracle mantains `lastTS` to store the last timestamp got from PD server. If -// `GetTimestamp()` is not called after `updateInterval`, it will be called by -// itself to keep up with the timestamp on PD server. -func NewPdOracle(pdClient pd.Client, updateInterval time.Duration) (oracle.Oracle, error) { +// `GetTimestamp()` is not called after `conf.OracleUpdateInterval`, it will be +// called by itself to keep up with the timestamp on PD server. +func NewPdOracle(pdClient pd.Client, conf *config.Txn) (oracle.Oracle, error) { o := &pdOracle{ + conf: conf, c: pdClient, quit: make(chan struct{}), } ctx := context.TODO() - go o.updateTS(ctx, updateInterval) + go o.updateTS(ctx, conf.OracleUpdateInterval) // Initialize lastTS by Get. _, err := o.GetTimestamp(ctx) if err != nil { @@ -102,7 +104,7 @@ func (o *pdOracle) getTimestamp(ctx context.Context) (uint64, error) { return 0, err } dist := time.Since(now) - if dist > config.TsoSlowThreshold { + if dist > o.conf.TsoSlowThreshold { log.Warnf("get timestamp too slow: %s", dist) } return oracle.ComposeTS(physical, logical), nil diff --git a/txnkv/store/delete_range.go b/txnkv/store/delete_range.go index 66e8be7780..d148919213 100644 --- a/txnkv/store/delete_range.go +++ b/txnkv/store/delete_range.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pkg/errors" - "github.com/tikv/client-go/config" "github.com/tikv/client-go/retry" "github.com/tikv/client-go/rpc" ) @@ -51,6 +50,8 @@ func NewDeleteRangeTask(ctx context.Context, store *TiKVStore, startKey []byte, // Execute performs the delete range operation. func (t *DeleteRangeTask) Execute() error { + conf := t.store.GetConfig() + startKey, rangeEndKey := t.startKey, t.endKey for { select { @@ -80,7 +81,7 @@ func (t *DeleteRangeTask) Execute() error { }, } - resp, err := t.store.SendReq(bo, req, loc.Region, config.ReadTimeoutMedium) + resp, err := t.store.SendReq(bo, req, loc.Region, conf.RPC.ReadTimeoutMedium) if err != nil { return err } diff --git a/txnkv/store/lock_resolver.go b/txnkv/store/lock_resolver.go index 698f7334ff..9ae8df94c2 100644 --- a/txnkv/store/lock_resolver.go +++ b/txnkv/store/lock_resolver.go @@ -32,6 +32,7 @@ import ( // LockResolver resolves locks and also caches resolved txn status. type LockResolver struct { store *TiKVStore + conf *config.Config mu struct { sync.RWMutex // resolved caches resolved txns (FIFO, txn id -> txnStatus). @@ -43,6 +44,7 @@ type LockResolver struct { func newLockResolver(store *TiKVStore) *LockResolver { r := &LockResolver{ store: store, + conf: store.GetConfig(), } r.mu.resolved = make(map[uint64]TxnStatus) r.mu.recentResolved = list.New() @@ -55,8 +57,8 @@ var _ = NewLockResolver // NewLockResolver creates a LockResolver. // It is exported for other pkg to use. For instance, binlog service needs // to determine a transaction's commit state. -func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolver, error) { - s, err := NewStore(etcdAddrs, security) +func NewLockResolver(etcdAddrs []string, conf config.Config) (*LockResolver, error) { + s, err := NewStore(etcdAddrs, conf) if err != nil { return nil, err } @@ -82,10 +84,10 @@ type Lock struct { } // NewLock creates a new *Lock. -func NewLock(l *kvrpcpb.LockInfo) *Lock { +func NewLock(l *kvrpcpb.LockInfo, defaultTTL uint64) *Lock { ttl := l.GetLockTtl() if ttl == 0 { - ttl = config.TxnDefaultLockTTL + ttl = defaultTTL } return &Lock{ Key: l.GetKey(), @@ -104,7 +106,7 @@ func (lr *LockResolver) saveResolved(txnID uint64, status TxnStatus) { } lr.mu.resolved[txnID] = status lr.mu.recentResolved.PushBack(txnID) - if len(lr.mu.resolved) > config.TxnResolvedCacheSize { + if len(lr.mu.resolved) > lr.conf.Txn.ResolveCacheSize { front := lr.mu.recentResolved.Front() delete(lr.mu.resolved, front.Value.(uint64)) lr.mu.recentResolved.Remove(front) @@ -171,7 +173,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo }, } startTime = time.Now() - resp, err := lr.store.SendReq(bo, req, loc, config.ReadTimeoutShort) + resp, err := lr.store.SendReq(bo, req, loc, lr.conf.RPC.ReadTimeoutShort) if err != nil { return false, err } @@ -282,7 +284,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary if err != nil { return status, err } - resp, err := lr.store.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) + resp, err := lr.store.SendReq(bo, req, loc.Region, lr.conf.RPC.ReadTimeoutShort) if err != nil { return status, err } @@ -336,7 +338,7 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat if status.IsCommitted() { req.ResolveLock.CommitVersion = status.CommitTS() } - resp, err := lr.store.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) + resp, err := lr.store.SendReq(bo, req, loc.Region, lr.conf.RPC.ReadTimeoutShort) if err != nil { return err } diff --git a/txnkv/store/safepoint.go b/txnkv/store/safepoint.go index c4e7ba69ac..bf2be0fcda 100644 --- a/txnkv/store/safepoint.go +++ b/txnkv/store/safepoint.go @@ -24,7 +24,6 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/tikv/client-go/config" "google.golang.org/grpc" ) @@ -120,7 +119,7 @@ func (w *EtcdSafePointKV) Get(k string) (string, error) { func saveSafePoint(kv SafePointKV, key string, t uint64) error { s := strconv.FormatUint(t, 10) - err := kv.Put(config.GcSavedSafePoint, s) + err := kv.Put(key, s) if err != nil { log.Error("save safepoint failed:", err) return err @@ -129,7 +128,7 @@ func saveSafePoint(kv SafePointKV, key string, t uint64) error { } func loadSafePoint(kv SafePointKV, key string) (uint64, error) { - str, err := kv.Get(config.GcSavedSafePoint) + str, err := kv.Get(key) if err != nil { return 0, err diff --git a/txnkv/store/scan.go b/txnkv/store/scan.go index 45fbe8f167..5d2b8660ec 100644 --- a/txnkv/store/scan.go +++ b/txnkv/store/scan.go @@ -30,6 +30,7 @@ import ( // Scanner support tikv scan type Scanner struct { snapshot *TiKVSnapshot + conf *config.Config batchSize int valid bool cache []*pb.KvPair @@ -42,10 +43,11 @@ type Scanner struct { func newScanner(snapshot *TiKVSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) { // It must be > 1. Otherwise scanner won't skipFirst. if batchSize <= 1 { - batchSize = config.TxnScanBatchSize + batchSize = snapshot.conf.Txn.ScanBatchSize } scanner := &Scanner{ snapshot: snapshot, + conf: snapshot.conf, batchSize: batchSize, valid: true, nextStartKey: startKey, @@ -175,7 +177,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { NotFillCache: s.snapshot.NotFillCache, }, } - resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutMedium) + resp, err := sender.SendReq(bo, req, loc.Region, s.conf.RPC.ReadTimeoutMedium) if err != nil { return err } @@ -205,7 +207,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { // Check if kvPair contains error, it should be a Lock. for _, pair := range kvPairs { if keyErr := pair.GetError(); keyErr != nil { - lock, err := extractLockFromKeyErr(keyErr) + lock, err := extractLockFromKeyErr(keyErr, s.conf.Txn.DefaultLockTTL) if err != nil { return err } diff --git a/txnkv/store/snapshot.go b/txnkv/store/snapshot.go index 056aa9e295..f7e60d60fe 100644 --- a/txnkv/store/snapshot.go +++ b/txnkv/store/snapshot.go @@ -36,6 +36,7 @@ import ( type TiKVSnapshot struct { store *TiKVStore ts uint64 + conf *config.Config Priority pb.CommandPri NotFillCache bool @@ -47,6 +48,7 @@ func newTiKVSnapshot(store *TiKVStore, ts uint64) *TiKVSnapshot { return &TiKVSnapshot{ store: store, ts: ts, + conf: store.GetConfig(), Priority: pb.CommandPri_Normal, } } @@ -98,7 +100,7 @@ func (s *TiKVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, var batches []batchKeys for id, g := range groups { - batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, config.TxnBatchGetSize) + batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, s.conf.Txn.BatchGetSize) } if len(batches) == 0 { @@ -141,7 +143,7 @@ func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys NotFillCache: s.NotFillCache, }, } - resp, err := sender.SendReq(bo, req, batch.region, config.ReadTimeoutMedium) + resp, err := sender.SendReq(bo, req, batch.region, s.conf.RPC.ReadTimeoutMedium) if err != nil { return err } @@ -170,7 +172,7 @@ func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys collectF(pair.GetKey(), pair.GetValue()) continue } - lock, err := extractLockFromKeyErr(keyErr) + lock, err := extractLockFromKeyErr(keyErr, s.conf.Txn.DefaultLockTTL) if err != nil { return err } @@ -226,7 +228,7 @@ func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) { if err != nil { return nil, err } - resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, loc.Region, s.conf.RPC.ReadTimeoutShort) if err != nil { return nil, err } @@ -247,7 +249,7 @@ func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) { } val := cmdGetResp.GetValue() if keyErr := cmdGetResp.GetError(); keyErr != nil { - lock, err := extractLockFromKeyErr(keyErr) + lock, err := extractLockFromKeyErr(keyErr, s.conf.Txn.DefaultLockTTL) if err != nil { return nil, err } @@ -269,7 +271,7 @@ func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) { // Iter returns a list of key-value pair after `k`. func (s *TiKVSnapshot) Iter(k key.Key, upperBound key.Key) (kv.Iterator, error) { - scanner, err := newScanner(s, k, upperBound, config.TxnScanBatchSize) + scanner, err := newScanner(s, k, upperBound, s.conf.Txn.ScanBatchSize) return scanner, err } @@ -283,9 +285,9 @@ func (s *TiKVSnapshot) SetPriority(priority int) { s.Priority = pb.CommandPri(priority) } -func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { +func extractLockFromKeyErr(keyErr *pb.KeyError, defaultTTL uint64) (*Lock, error) { if locked := keyErr.GetLocked(); locked != nil { - return NewLock(locked), nil + return NewLock(locked, defaultTTL), nil } if keyErr.Conflict != nil { err := errors.New(conflictToString(keyErr.Conflict)) diff --git a/txnkv/store/split_region.go b/txnkv/store/split_region.go index 45d2149335..5c8ae8f87e 100644 --- a/txnkv/store/split_region.go +++ b/txnkv/store/split_region.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/tikv/client-go/config" "github.com/tikv/client-go/key" "github.com/tikv/client-go/retry" "github.com/tikv/client-go/rpc" @@ -39,6 +38,7 @@ func SplitRegion(store *TiKVStore, splitKey key.Key) error { }, } req.Context.Priority = kvrpcpb.CommandPri_Normal + conf := store.GetConfig() for { loc, err := store.GetRegionCache().LocateKey(bo, splitKey) if err != nil { @@ -48,7 +48,7 @@ func SplitRegion(store *TiKVStore, splitKey key.Key) error { log.Infof("skip split_region region at %q", splitKey) return nil } - res, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort) + res, err := sender.SendReq(bo, req, loc.Region, conf.RPC.ReadTimeoutShort) if err != nil { return err } diff --git a/txnkv/store/store.go b/txnkv/store/store.go index 464ea24526..2f1721f139 100644 --- a/txnkv/store/store.go +++ b/txnkv/store/store.go @@ -35,6 +35,7 @@ import ( // TiKVStore contains methods to interact with a TiKV cluster. type TiKVStore struct { + conf *config.Config clusterID uint64 uuid string oracle oracle.Oracle @@ -54,22 +55,24 @@ type TiKVStore struct { } // NewStore creates a TiKVStore instance. -func NewStore(pdAddrs []string, security config.Security) (*TiKVStore, error) { +func NewStore(pdAddrs []string, conf config.Config) (*TiKVStore, error) { pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{ - CAPath: security.SSLCA, - CertPath: security.SSLCert, - KeyPath: security.SSLKey, + CAPath: conf.RPC.Security.SSLCA, + CertPath: conf.RPC.Security.SSLCert, + KeyPath: conf.RPC.Security.SSLKey, }) if err != nil { return nil, err } - oracle, err := oracles.NewPdOracle(pdCli, time.Duration(config.OracleUpdateInterval)*time.Millisecond) + pdClient := &locate.CodecPDClient{Client: pdCli} + + oracle, err := oracles.NewPdOracle(pdCli, &conf.Txn) if err != nil { return nil, err } - tlsConfig, err := security.ToTLSConfig() + tlsConfig, err := conf.RPC.Security.ToTLSConfig() if err != nil { return nil, err } @@ -82,12 +85,13 @@ func NewStore(pdAddrs []string, security config.Security) (*TiKVStore, error) { clusterID := pdCli.GetClusterID(context.TODO()) store := &TiKVStore{ + conf: &conf, clusterID: clusterID, uuid: fmt.Sprintf("tikv-%d", clusterID), oracle: oracle, - client: rpc.NewRPCClient(security), - pdClient: &locate.CodecPDClient{Client: pdCli}, - regionCache: locate.NewRegionCache(pdCli), + client: rpc.NewRPCClient(&conf.RPC), + pdClient: pdClient, + regionCache: locate.NewRegionCache(pdClient, &conf.RegionCache), etcdAddrs: pdAddrs, tlsConfig: tlsConfig, spkv: spkv, @@ -97,14 +101,19 @@ func NewStore(pdAddrs []string, security config.Security) (*TiKVStore, error) { store.lockResolver = newLockResolver(store) - if config.EnableTxnLocalLatch { - store.txnLatches = latch.NewScheduler(config.TxnLocalLatchCapacity) + if conf.Txn.Latch.Enable { + store.txnLatches = latch.NewScheduler(&conf.Txn.Latch) } go store.runSafePointChecker() return store, nil } +// GetConfig returns the store's configurations. +func (s *TiKVStore) GetConfig() *config.Config { + return s.conf +} + // GetLockResolver returns the lock resolver instance. func (s *TiKVStore) GetLockResolver() *LockResolver { return s.lockResolver @@ -177,21 +186,21 @@ func (s *TiKVStore) GetTimestampWithRetry(bo *retry.Backoffer) (uint64, error) { } func (s *TiKVStore) runSafePointChecker() { - d := config.GcSafePointUpdateInterval + d := s.conf.Txn.GcSafePointUpdateInterval for { select { case spCachedTime := <-time.After(d): - cachedSafePoint, err := loadSafePoint(s.spkv, config.GcSavedSafePoint) + cachedSafePoint, err := loadSafePoint(s.spkv, s.conf.Txn.GcSavedSafePoint) if err == nil { metrics.LoadSafepointCounter.WithLabelValues("ok").Inc() s.spMutex.Lock() s.safePoint, s.spTime = cachedSafePoint, spCachedTime s.spMutex.Unlock() - d = config.GcSafePointUpdateInterval + d = s.conf.Txn.GcSafePointUpdateInterval } else { metrics.LoadSafepointCounter.WithLabelValues("fail").Inc() log.Errorf("fail to load safepoint from pd: %v", err) - d = config.GcSafePointQuickRepeatInterval + d = s.conf.Txn.GcSafePointQuickRepeatInterval } case <-s.Closed(): return @@ -208,7 +217,7 @@ func (s *TiKVStore) CheckVisibility(startTS uint64) error { s.spMutex.RUnlock() diff := time.Since(cachedTime) - if diff > (config.GcSafePointCacheInterval - config.GcCPUTimeInaccuracyBound) { + if diff > (s.conf.Txn.GcSafePointCacheInterval - s.conf.Txn.GcCPUTimeInaccuracyBound) { return errors.WithStack(ErrPDServerTimeout) } diff --git a/txnkv/store/txn_committer.go b/txnkv/store/txn_committer.go index fdf1da47bc..305c965e02 100644 --- a/txnkv/store/txn_committer.go +++ b/txnkv/store/txn_committer.go @@ -63,6 +63,7 @@ type TxnCommitter struct { ConnID uint64 // ConnID is used for log. store *TiKVStore + conf *config.Config startTS uint64 keys [][]byte mutations map[string]*pb.Mutation @@ -91,6 +92,7 @@ func NewTxnCommitter(store *TiKVStore, startTS uint64, startTime time.Time, muta lockCnt int ) + conf := store.GetConfig() for key, mut := range mutations { switch mut.Op { case pb.Op_Put, pb.Op_Insert: @@ -102,7 +104,7 @@ func NewTxnCommitter(store *TiKVStore, startTS uint64, startTime time.Time, muta } keys = append(keys, []byte(key)) entrySize := len(mut.Key) + len(mut.Value) - if entrySize > config.TxnEntrySizeLimit { + if entrySize > conf.Txn.EntrySizeLimit { return nil, kv.ErrEntryTooLarge } size += entrySize @@ -112,21 +114,22 @@ func NewTxnCommitter(store *TiKVStore, startTS uint64, startTime time.Time, muta return nil, nil } - if len(keys) > int(config.TxnEntryCountLimit) || size > config.TxnTotalSizeLimit { + if len(keys) > int(conf.Txn.EntryCountLimit) || size > conf.Txn.TotalSizeLimit { return nil, kv.ErrTxnTooLarge } // Convert from sec to ms - maxTxnTimeUse := uint64(config.MaxTxnTimeUse) * 1000 + maxTxnTimeUse := uint64(conf.Txn.MaxTimeUse) * 1000 metrics.TxnWriteKVCountHistogram.Observe(float64(len(keys))) metrics.TxnWriteSizeHistogram.Observe(float64(size)) return &TxnCommitter{ store: store, + conf: conf, startTS: startTS, keys: keys, mutations: mutations, - lockTTL: txnLockTTL(startTime, size), + lockTTL: txnLockTTL(conf, startTime, size), maxTxnTimeUse: maxTxnTimeUse, detail: CommitDetails{WriteSize: size, WriteKeys: len(keys)}, }, nil @@ -138,20 +141,20 @@ func (c *TxnCommitter) primary() []byte { const bytesPerMiB = 1024 * 1024 -func txnLockTTL(startTime time.Time, txnSize int) uint64 { +func txnLockTTL(conf *config.Config, startTime time.Time, txnSize int) uint64 { // Increase lockTTL for large transactions. // The formula is `ttl = ttlFactor * sqrt(sizeInMiB)`. // When writeSize is less than 256KB, the base ttl is defaultTTL (3s); // When writeSize is 1MiB, 100MiB, or 400MiB, ttl is 6s, 60s, 120s correspondingly; - lockTTL := config.TxnDefaultLockTTL - if txnSize >= config.TxnCommitBatchSize { + lockTTL := conf.Txn.DefaultLockTTL + if txnSize >= conf.Txn.CommitBatchSize { sizeMiB := float64(txnSize) / bytesPerMiB - lockTTL = uint64(float64(config.TxnTTLFactor) * math.Sqrt(sizeMiB)) - if lockTTL < config.TxnDefaultLockTTL { - lockTTL = config.TxnDefaultLockTTL + lockTTL = uint64(float64(conf.Txn.TTLFactor) * math.Sqrt(sizeMiB)) + if lockTTL < conf.Txn.DefaultLockTTL { + lockTTL = conf.Txn.DefaultLockTTL } - if lockTTL > config.TxnMaxLockTTL { - lockTTL = config.TxnMaxLockTTL + if lockTTL > conf.Txn.MaxLockTTL { + lockTTL = conf.Txn.MaxLockTTL } } @@ -183,10 +186,10 @@ func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction, atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups))) } // Make sure the group that contains primary key goes first. - batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, config.TxnCommitBatchSize) + batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, c.conf.Txn.CommitBatchSize) delete(groups, firstRegion) for id, g := range groups { - batches = appendBatchBySize(batches, id, g, sizeFunc, config.TxnCommitBatchSize) + batches = appendBatchBySize(batches, id, g, sizeFunc, c.conf.Txn.CommitBatchSize) } firstIsPrimary := bytes.Equal(keys[0], c.primary()) @@ -318,7 +321,7 @@ func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys) }, } for { - resp, err := c.store.SendReq(bo, req, batch.region, config.ReadTimeoutShort) + resp, err := c.store.SendReq(bo, req, batch.region, c.conf.RPC.ReadTimeoutShort) if err != nil { return err } @@ -349,7 +352,7 @@ func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys) } // Extract lock from key error - lock, err1 := extractLockFromKeyErr(keyErr) + lock, err1 := extractLockFromKeyErr(keyErr, c.conf.Txn.DefaultLockTTL) if err1 != nil { return err1 } @@ -399,7 +402,7 @@ func (c *TxnCommitter) commitSingleBatch(bo *retry.Backoffer, batch batchKeys) e req.Context.Priority = c.Priority sender := rpc.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetRPCClient()) - resp, err := sender.SendReq(bo, req, batch.region, config.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, batch.region, c.conf.RPC.ReadTimeoutShort) // If we fail to receive response for the request that commits primary key, it will be undetermined whether this // transaction has been successfully committed. @@ -470,7 +473,7 @@ func (c *TxnCommitter) cleanupSingleBatch(bo *retry.Backoffer, batch batchKeys) SyncLog: c.SyncLog, }, } - resp, err := c.store.SendReq(bo, req, batch.region, config.ReadTimeoutShort) + resp, err := c.store.SendReq(bo, req, batch.region, c.conf.RPC.ReadTimeoutShort) if err != nil { return err } diff --git a/txnkv/txn.go b/txnkv/txn.go index 10eb01541e..bd304a2ea9 100644 --- a/txnkv/txn.go +++ b/txnkv/txn.go @@ -44,7 +44,7 @@ type Transaction struct { func newTransaction(tikvStore *store.TiKVStore, ts uint64) *Transaction { snapshot := tikvStore.GetSnapshot(ts) - us := kv.NewUnionStore(snapshot) + us := kv.NewUnionStore(&tikvStore.GetConfig().Txn, snapshot) return &Transaction{ tikvStore: tikvStore, snapshot: snapshot, From bac0ef6c4c42793abf7fde24958b9dd8b9d19b6d Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 19 Apr 2019 14:48:44 +0800 Subject: [PATCH 2/4] fix review comments Signed-off-by: disksing --- config/raw.go | 2 +- config/regioncache.go | 2 +- config/rpc.go | 14 +++++++------- config/txn.go | 2 +- rpc/client.go | 8 ++++---- txnkv/kv/memdb_buffer.go | 2 +- txnkv/store/txn_committer.go | 5 +++-- 7 files changed, 18 insertions(+), 17 deletions(-) diff --git a/config/raw.go b/config/raw.go index 9b613db1e9..fe4ef64b20 100644 --- a/config/raw.go +++ b/config/raw.go @@ -1,4 +1,4 @@ -// Copyright 2018 PingCAP, Inc. +// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/config/regioncache.go b/config/regioncache.go index 81b60b06ae..7c8283fe6f 100644 --- a/config/regioncache.go +++ b/config/regioncache.go @@ -1,4 +1,4 @@ -// Copyright 2018 PingCAP, Inc. +// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/config/rpc.go b/config/rpc.go index 7b0846c969..e7f53e5941 100644 --- a/config/rpc.go +++ b/config/rpc.go @@ -1,4 +1,4 @@ -// Copyright 2018 PingCAP, Inc. +// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -37,13 +37,13 @@ type RPC struct { // is closed. GrpcKeepAliveTimeout time.Duration - // MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than + // GrpcMaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than // current value, an error will be reported from gRPC. - MaxSendMsgSize int + GrpcMaxSendMsgSize int - // MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than + // GrpcMaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than // current value, an error will be reported from gRPC. - MaxCallMsgSize int + GrpcMaxCallMsgSize int // The value for initial window size on a gRPC stream. GrpcInitialWindowSize int @@ -78,8 +78,8 @@ func DefaultRPC() RPC { MaxConnectionCount: 16, GrpcKeepAliveTime: 10 * time.Second, GrpcKeepAliveTimeout: 3 * time.Second, - MaxSendMsgSize: 1<<31 - 1, - MaxCallMsgSize: 1<<31 - 1, + GrpcMaxSendMsgSize: 1<<31 - 1, + GrpcMaxCallMsgSize: 1<<31 - 1, GrpcInitialWindowSize: 1 << 30, GrpcInitialConnWindowSize: 1 << 30, DialTimeout: 5 * time.Second, diff --git a/config/txn.go b/config/txn.go index fc7c01a3a0..26e59c8f87 100644 --- a/config/txn.go +++ b/config/txn.go @@ -1,4 +1,4 @@ -// Copyright 2018 PingCAP, Inc. +// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/rpc/client.go b/rpc/client.go index 444a398767..c120212133 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -155,10 +155,10 @@ func newConnArray(addr string, conf *config.RPC) (*connArray, error) { a := &connArray{ conf: conf, index: 0, - v: make([]*grpc.ClientConn, conf.MaxCallMsgSize), + v: make([]*grpc.ClientConn, conf.GrpcMaxCallMsgSize), streamTimeout: make(chan *Lease, 1024), batchCommandsCh: make(chan *batchCommandsEntry, conf.Batch.MaxBatchSize), - batchCommandsClients: make([]*batchCommandsClient, 0, conf.MaxCallMsgSize), + batchCommandsClients: make([]*batchCommandsClient, 0, conf.GrpcMaxCallMsgSize), transportLayerLoad: 0, } if err := a.Init(addr); err != nil { @@ -201,8 +201,8 @@ func (a *connArray) Init(addr string) error { grpc.WithInitialConnWindowSize(int32(a.conf.GrpcInitialConnWindowSize)), grpc.WithUnaryInterceptor(unaryInterceptor), grpc.WithStreamInterceptor(streamInterceptor), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(a.conf.MaxCallMsgSize)), - grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(a.conf.MaxSendMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(a.conf.GrpcMaxCallMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(a.conf.GrpcMaxSendMsgSize)), grpc.WithBackoffMaxDelay(time.Second*3), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: a.conf.GrpcKeepAliveTime, diff --git a/txnkv/kv/memdb_buffer.go b/txnkv/kv/memdb_buffer.go index 9c0adad179..c27cd0ccef 100644 --- a/txnkv/kv/memdb_buffer.go +++ b/txnkv/kv/memdb_buffer.go @@ -43,7 +43,7 @@ type memDbIter struct { // NewMemDbBuffer creates a new memDbBuffer. func NewMemDbBuffer(conf *config.Txn, cap int) MemBuffer { - if cap == 0 { + if cap <= 0 { cap = conf.DefaultMembufCap } return &memDbBuffer{ diff --git a/txnkv/store/txn_committer.go b/txnkv/store/txn_committer.go index 305c965e02..886629b271 100644 --- a/txnkv/store/txn_committer.go +++ b/txnkv/store/txn_committer.go @@ -186,10 +186,11 @@ func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction, atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups))) } // Make sure the group that contains primary key goes first. - batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, c.conf.Txn.CommitBatchSize) + commitBatchSize := c.conf.Txn.CommitBatchSize + batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, commitBatchSize) delete(groups, firstRegion) for id, g := range groups { - batches = appendBatchBySize(batches, id, g, sizeFunc, c.conf.Txn.CommitBatchSize) + batches = appendBatchBySize(batches, id, g, sizeFunc, commitBatchSize) } firstIsPrimary := bytes.Equal(keys[0], c.primary()) From 784b60e68b453fc52d4123f6a83682fdd0843640 Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 19 Apr 2019 16:25:58 +0800 Subject: [PATCH 3/4] fix conflicts Signed-off-by: disksing --- proxy/httpproxy/rawkv.go | 2 +- proxy/httpproxy/txnkv.go | 2 +- proxy/rawkv.go | 4 ++-- proxy/txnkv.go | 4 ++-- rawkv/rawkv.go | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/proxy/httpproxy/rawkv.go b/proxy/httpproxy/rawkv.go index 3f21f151a3..809ef537e6 100644 --- a/proxy/httpproxy/rawkv.go +++ b/proxy/httpproxy/rawkv.go @@ -48,7 +48,7 @@ type RawResponse struct { } func (h rawkvHandler) New(vars map[string]string, r *RawRequest) (*RawResponse, int, error) { - id, err := h.p.New(r.PDAddrs, config.Security{}) + id, err := h.p.New(r.PDAddrs, config.Default()) if err != nil { return nil, http.StatusInternalServerError, err } diff --git a/proxy/httpproxy/txnkv.go b/proxy/httpproxy/txnkv.go index 55644e6609..31fe73f401 100644 --- a/proxy/httpproxy/txnkv.go +++ b/proxy/httpproxy/txnkv.go @@ -52,7 +52,7 @@ type TxnResponse struct { } func (h txnkvHandler) New(vars map[string]string, r *TxnRequest) (*TxnResponse, int, error) { - id, err := h.p.New(r.PDAddrs, config.Security{}) + id, err := h.p.New(r.PDAddrs, config.Default()) if err != nil { return nil, http.StatusInternalServerError, err } diff --git a/proxy/rawkv.go b/proxy/rawkv.go index dd5b43d6ec..7de0d658e9 100644 --- a/proxy/rawkv.go +++ b/proxy/rawkv.go @@ -35,8 +35,8 @@ func NewRaw() RawKVProxy { } // New creates a new client and returns the client's UUID. -func (p RawKVProxy) New(pdAddrs []string, security config.Security) (UUID, error) { - client, err := rawkv.NewClient(pdAddrs, security) +func (p RawKVProxy) New(pdAddrs []string, conf config.Config) (UUID, error) { + client, err := rawkv.NewClient(pdAddrs, conf) if err != nil { return "", err } diff --git a/proxy/txnkv.go b/proxy/txnkv.go index 617b96276c..f6e42930e1 100644 --- a/proxy/txnkv.go +++ b/proxy/txnkv.go @@ -43,8 +43,8 @@ func NewTxn() TxnKVProxy { } // New creates a new client and returns the client's UUID. -func (p TxnKVProxy) New(pdAddrs []string, security config.Security) (UUID, error) { - client, err := txnkv.NewClient(pdAddrs, security) +func (p TxnKVProxy) New(pdAddrs []string, conf config.Config) (UUID, error) { + client, err := txnkv.NewClient(pdAddrs, conf) if err != nil { return "", err } diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index b0f02a8d82..20845e011a 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -317,7 +317,7 @@ func (c *Client) ReverseScan(startKey, endKey []byte, limit int) (keys [][]byte, metrics.RawkvCmdHistogram.WithLabelValues("raw_reverse_scan").Observe(time.Since(start).Seconds()) }() - if limit > config.MaxRawKVScanLimit { + if limit > c.conf.Raw.MaxScanLimit { return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded) } From 4a37af122754e42b70af6f1438e806857bf80a41 Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 19 Apr 2019 16:58:55 +0800 Subject: [PATCH 4/4] address comment Signed-off-by: disksing --- rpc/client.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index c120212133..a3dfb22091 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -49,7 +49,7 @@ type Client interface { type connArray struct { conf *config.RPC index uint32 - v []*grpc.ClientConn + conns []*grpc.ClientConn // Bind with a background goroutine to process coprocessor streaming timeout. streamTimeout chan *Lease @@ -155,7 +155,7 @@ func newConnArray(addr string, conf *config.RPC) (*connArray, error) { a := &connArray{ conf: conf, index: 0, - v: make([]*grpc.ClientConn, conf.GrpcMaxCallMsgSize), + conns: make([]*grpc.ClientConn, conf.GrpcMaxCallMsgSize), streamTimeout: make(chan *Lease, 1024), batchCommandsCh: make(chan *batchCommandsEntry, conf.Batch.MaxBatchSize), batchCommandsClients: make([]*batchCommandsClient, 0, conf.GrpcMaxCallMsgSize), @@ -191,7 +191,7 @@ func (a *connArray) Init(addr string) error { } allowBatch := a.conf.Batch.MaxBatchSize > 0 - for i := range a.v { + for i := range a.conns { ctx, cancel := context.WithTimeout(context.Background(), a.conf.DialTimeout) conn, err := grpc.DialContext( ctx, @@ -216,7 +216,7 @@ func (a *connArray) Init(addr string) error { a.Close() return errors.WithStack(err) } - a.v[i] = conn + a.conns[i] = conn if allowBatch { // Initialize batch streaming clients. @@ -248,8 +248,8 @@ func (a *connArray) Init(addr string) error { } func (a *connArray) Get() *grpc.ClientConn { - next := atomic.AddUint32(&a.index, 1) % uint32(len(a.v)) - return a.v[next] + next := atomic.AddUint32(&a.index, 1) % uint32(len(a.conns)) + return a.conns[next] } func (a *connArray) Close() { @@ -259,10 +259,10 @@ func (a *connArray) Close() { atomic.StoreInt32(&c.closed, 1) } close(a.batchCommandsCh) - for i, c := range a.v { + for i, c := range a.conns { if c != nil { c.Close() - a.v[i] = nil + a.conns[i] = nil } } close(a.streamTimeout) @@ -368,7 +368,7 @@ func (a *connArray) batchSendLoop() { for { // Choose a connection by round-robbin. - next := atomic.AddUint32(&a.index, 1) % uint32(len(a.v)) + next := atomic.AddUint32(&a.index, 1) % uint32(len(a.conns)) batchCommandsClient := a.batchCommandsClients[next] entries = entries[:0]