Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move config variables to config package #9

Merged
merged 2 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,107 @@ var (
EnableTxnLocalLatch = false
TxnLocalLatchCapacity uint = 2048000
)

// RegionCache configurations.
var (
RegionCacheBTreeDegree = 32
RegionCacheTTL = time.Minute * 10
)

// RawKV configurations.
var (
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
MaxRawKVScanLimit = 10240
// rawBatchPutSize is the maximum size limit for rawkv each batch put request.
RawBatchPutSize = 16 * 1024
// rawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
RawBatchPairCount = 512
)

// RPC configurations.
var (
// MaxConnectionCount is the max gRPC connections that will be established with
// each tikv-server.
MaxConnectionCount uint = 16

// GrpcKeepAliveTime is the duration of time after which if the client doesn't see
// any activity it pings the server to see if the transport is still alive.
GrpcKeepAliveTime = time.Duration(10) * time.Second

// GrpcKeepAliveTimeout is the duration of time for which the client waits after having
// pinged for keepalive check and if no activity is seen even after that the connection
// is closed.
GrpcKeepAliveTimeout = time.Duration(3) * time.Second

// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than
// current value, an error will be reported from gRPC.
MaxSendMsgSize = 1<<31 - 1

// MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than
// current value, an error will be reported from gRPC.
MaxCallMsgSize = 1<<31 - 1

DialTimeout = 5 * time.Second
ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
GCTimeout = 5 * time.Minute
UnsafeDestroyRangeTimeout = 5 * time.Minute

GrpcInitialWindowSize = 1 << 30
GrpcInitialConnWindowSize = 1 << 30
)

// KV configurations.
var (
// DefaultTxnMembufCap is the default transaction membuf capability.
DefaultTxnMembufCap = 4 * 1024
)

// Latch configurations.
var (
LatchExpireDuration = 2 * time.Minute
LatchCheckInterval = 1 * time.Minute
LatchCheckCounter = 50000
LatchListCount = 5
LatchLockChanSize = 100
)

// Oracle configurations.
var (
TsoSlowThreshold = 30 * time.Millisecond
// update oracle's lastTS every 2000ms.
OracleUpdateInterval = 2000
)

// Txn configurations.
var (
// TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's
// Key+Value size below 16KB.
TxnCommitBatchSize = 16 * 1024

// By default, locks after 3000ms is considered unusual (the client created the
// lock might be dead). Other client may cleanup this kind of lock.
// For locks created recently, we will do backoff and retry.
TxnDefaultLockTTL uint64 = 3000
// TODO: Consider if it's appropriate.
TxnMaxLockTTL uint64 = 120000
// ttl = ttlFactor * sqrt(writeSizeInMiB)
TxnTTLFactor = 6000
// TxnResolvedCacheSize is max number of cached txn status.
TxnResolvedCacheSize = 2048

// SafePoint.
// This is almost the same as 'tikv_gc_safe_point' in the table 'mysql.tidb',
// save this to pd instead of tikv, because we can't use interface of table
// if the safepoint on tidb is expired.
GcSavedSafePoint = "/tidb/store/gcworker/saved_safe_point"

GcSafePointCacheInterval = time.Second * 100
GcCPUTimeInaccuracyBound = time.Second
GcSafePointUpdateInterval = time.Second * 10
GcSafePointQuickRepeatInterval = time.Second

TxnScanBatchSize = 256
TxnBatchGetSize = 5120
)
10 changes: 3 additions & 7 deletions locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,11 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/codec"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
)

const (
btreeDegree = 32
rcDefaultRegionCacheTTL = time.Minute * 10
)

// CachedRegion encapsulates {Region, TTL}
type CachedRegion struct {
region *Region
Expand All @@ -45,7 +41,7 @@ type CachedRegion struct {
func (c *CachedRegion) isValid() bool {
lastAccess := atomic.LoadInt64(&c.lastAccess)
lastAccessTime := time.Unix(lastAccess, 0)
return time.Since(lastAccessTime) < rcDefaultRegionCacheTTL
return time.Since(lastAccessTime) < config.RegionCacheTTL
}

// RegionCache caches Regions loaded from PD.
Expand All @@ -69,7 +65,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
pdClient: pdClient,
}
c.mu.regions = make(map[RegionVerID]*CachedRegion)
c.mu.sorted = btree.New(btreeDegree)
c.mu.sorted = btree.New(config.RegionCacheBTreeDegree)
c.storeMu.stores = make(map[uint64]*Store)
return c
}
Expand Down
23 changes: 7 additions & 16 deletions rawkv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,10 @@ import (
)

var (
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
MaxRawKVScanLimit = 10240
// ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large.
ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit")
)

const (
// rawBatchPutSize is the maximum size limit for rawkv each batch put request.
rawBatchPutSize = 16 * 1024
// rawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
rawBatchPairCount = 512
)

// RawKVClient is a client of TiKV server which is used as a key-value storage,
// only GET/PUT/DELETE commands are supported.
type RawKVClient struct {
Expand Down Expand Up @@ -278,7 +269,7 @@ func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, v
start := time.Now()
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }()

if limit > MaxRawKVScanLimit {
if limit > config.MaxRawKVScanLimit {
return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded)
}

Expand Down Expand Up @@ -319,7 +310,7 @@ func (c *RawKVClient) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *loc
if err != nil {
return nil, nil, err
}
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort)
if err != nil {
return nil, nil, err
}
Expand All @@ -346,7 +337,7 @@ func (c *RawKVClient) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType r

var batches []batch
for regionID, groupKeys := range groups {
batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount)
batches = appendKeyBatches(batches, regionID, groupKeys, config.RawBatchPairCount)
}
bo, cancel := bo.Fork()
ches := make(chan singleBatchResp, len(batches))
Expand Down Expand Up @@ -405,7 +396,7 @@ func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.C
}

sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, batch.regionID, config.ReadTimeoutShort)

batchResp := singleBatchResp{}
if err != nil {
Expand Down Expand Up @@ -473,7 +464,7 @@ func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.R
},
}

resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -504,7 +495,7 @@ func (c *RawKVClient) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) e
var batches []batch
// split the keys by size and RegionVerID
for regionID, groupKeys := range groups {
batches = appendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize)
batches = appendBatches(batches, regionID, groupKeys, keyToValue, config.RawBatchPutSize)
}
bo, cancel := bo.Fork()
ch := make(chan error, len(batches))
Expand Down Expand Up @@ -583,7 +574,7 @@ func (c *RawKVClient) doBatchPut(bo *retry.Backoffer, batch batch) error {
}

sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, batch.regionID, config.ReadTimeoutShort)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion rawkv/rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/locate"
"github.com/tikv/client-go/mockstore/mocktikv"
"github.com/tikv/client-go/retry"
Expand Down Expand Up @@ -178,7 +179,7 @@ func (s *testRawKVSuite) TestRawBatch(c *C) {
size := 0
var testKeys [][]byte
var testValues [][]byte
for i := 0; size/rawBatchPutSize < 4; i++ {
for i := 0; size/config.RawBatchPutSize < 4; i++ {
key := fmt.Sprint("key", i)
size += len(key)
testKeys = append(testKeys, []byte(key))
Expand Down
50 changes: 8 additions & 42 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,40 +37,6 @@ import (
gstatus "google.golang.org/grpc/status"
)

// MaxConnectionCount is the max gRPC connections that will be established with
// each tikv-server.
var MaxConnectionCount uint = 16

// GrpcKeepAliveTime is the duration of time after which if the client doesn't see
// any activity it pings the server to see if the transport is still alive.
var GrpcKeepAliveTime = time.Duration(10) * time.Second

// GrpcKeepAliveTimeout is the duration of time for which the client waits after having
// pinged for keepalive check and if no activity is seen even after that the connection
// is closed.
var GrpcKeepAliveTimeout = time.Duration(3) * time.Second

// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than
// current value, an error will be reported from gRPC.
var MaxSendMsgSize = 1<<31 - 1

// MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than
// current value, an error will be reported from gRPC.
var MaxCallMsgSize = 1<<31 - 1

// Timeout durations.
const (
dialTimeout = 5 * time.Second
ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
GCTimeout = 5 * time.Minute
UnsafeDestroyRangeTimeout = 5 * time.Minute

grpcInitialWindowSize = 1 << 30
grpcInitialConnWindowSize = 1 << 30
)

// Client is a client that sends RPC.
// It should not be used after calling Close().
type Client interface {
Expand Down Expand Up @@ -223,21 +189,21 @@ func (a *connArray) Init(addr string, security config.Security) error {

allowBatch := config.MaxBatchSize > 0
for i := range a.v {
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
ctx, cancel := context.WithTimeout(context.Background(), config.DialTimeout)
conn, err := grpc.DialContext(
ctx,
addr,
opt,
grpc.WithInitialWindowSize(grpcInitialWindowSize),
grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize),
grpc.WithInitialWindowSize(int32(config.GrpcInitialWindowSize)),
grpc.WithInitialConnWindowSize(int32(config.GrpcInitialConnWindowSize)),
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxCallMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(config.MaxCallMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(config.MaxSendMsgSize)),
grpc.WithBackoffMaxDelay(time.Second*3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: GrpcKeepAliveTime,
Timeout: GrpcKeepAliveTimeout,
Time: config.GrpcKeepAliveTime,
Timeout: config.GrpcKeepAliveTimeout,
PermitWithoutStream: true,
}),
)
Expand Down Expand Up @@ -489,7 +455,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
array, ok := c.conns[addr]
if !ok {
var err error
array, err = newConnArray(MaxConnectionCount, addr, c.security)
array, err = newConnArray(config.MaxConnectionCount, addr, c.security)
if err != nil {
return nil, err
}
Expand Down
12 changes: 2 additions & 10 deletions txnkv/kv/buffer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,10 @@
package kv

import (
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
)

var (
// DefaultTxnMembufCap is the default transaction membuf capability.
DefaultTxnMembufCap = 4 * 1024
// ImportingTxnMembufCap is the capability of tidb importing data situation.
ImportingTxnMembufCap = 32 * 1024
// TempTxnMemBufCap is the capability of temporary membuf.
TempTxnMemBufCap = 64
)

// BufferStore wraps a Retriever for read and a MemBuffer for buffered write.
// Common usage pattern:
// bs := NewBufferStore(r) // use BufferStore to wrap a Retriever
Expand All @@ -41,7 +33,7 @@ type BufferStore struct {
// NewBufferStore creates a BufferStore using r for read.
func NewBufferStore(r Retriever, cap int) *BufferStore {
if cap <= 0 {
cap = DefaultTxnMembufCap
cap = config.DefaultTxnMembufCap
}
return &BufferStore{
r: r,
Expand Down
7 changes: 4 additions & 3 deletions txnkv/kv/buffer_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
)

Expand All @@ -31,7 +32,7 @@ type testBufferStoreSuite struct{}
var _ = Suite(testBufferStoreSuite{})

func (s testBufferStoreSuite) TestGetSet(c *C) {
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap)
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(config.DefaultTxnMembufCap)}, config.DefaultTxnMembufCap)
key := key.Key("key")
_, err := bs.Get(key)
c.Check(err, NotNil)
Expand All @@ -45,7 +46,7 @@ func (s testBufferStoreSuite) TestGetSet(c *C) {
}

func (s testBufferStoreSuite) TestSaveTo(c *C) {
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap)
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(config.DefaultTxnMembufCap)}, config.DefaultTxnMembufCap)
var buf bytes.Buffer
for i := 0; i < 10; i++ {
fmt.Fprint(&buf, i)
Expand All @@ -55,7 +56,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) {
}
bs.Set(key.Key("novalue"), nil)

mutator := NewMemDbBuffer(DefaultTxnMembufCap)
mutator := NewMemDbBuffer(config.DefaultTxnMembufCap)
err := bs.SaveTo(mutator)
c.Check(err, IsNil)

Expand Down
Loading