Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add txnkv support and examples #6

Merged
merged 3 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,13 @@ var (
TxnEntryCountLimit uint64 = 300 * 1000
// TxnTotalSizeLimit is limit of the sum of all entry size.
TxnTotalSizeLimit = 100 * 1024 * 1024
// MaxTxnTimeUse is the max time a transaction can run.
MaxTxnTimeUse = 590
)

// Local latches for transactions. Enable it when
// there are lots of conflicts between transactions.
var (
EnableTxnLocalLatch = false
TxnLocalLatchCapacity uint = 2048000
)
62 changes: 62 additions & 0 deletions examples/rawkv/rawkv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"

"github.com/tikv/client-go/config"
"github.com/tikv/client-go/rawkv"
)

func main() {
cli, err := rawkv.NewRawKVClient([]string{"127.0.0.1:2379"}, config.Security{})
if err != nil {
panic(err)
}
defer cli.Close()

fmt.Printf("cluster ID: %d\n", cli.ClusterID())

key := []byte("Company")
val := []byte("PingCAP")

// put key into tikv
err = cli.Put(key, val)
if err != nil {
panic(err)
}
fmt.Printf("Successfully put %s:%s to tikv\n", key, val)

// get key from tikv
val, err = cli.Get(key)
if err != nil {
panic(err)
}
fmt.Printf("found val: %s for key: %s\n", val, key)

// delete key from tikv
err = cli.Delete(key)
if err != nil {
panic(err)
}
fmt.Printf("key: %s deleted\n", key)

// get key again from tikv
val, err = cli.Get(key)
if err != nil {
panic(err)
}
fmt.Printf("found val: %s for key: %s\n", val, key)
}
153 changes: 153 additions & 0 deletions examples/txnkv/txnkv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"
"fmt"
"os"

"github.com/pingcap/errors"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
"github.com/tikv/client-go/txnkv"
)

type KV struct {
K, V []byte
}

func (kv KV) String() string {
return fmt.Sprintf("%s => %s (%v)", kv.K, kv.V, kv.V)
}

var (
client *txnkv.TxnClient
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
)

// Init initializes information.
func initStore() {
var err error
client, err = txnkv.NewTxnClient([]string{*pdAddr}, config.Security{})
if err != nil {
panic(err)
}
}

// key1 val1 key2 val2 ...
func puts(args ...[]byte) error {
tx, err := client.Begin()
if err != nil {
return errors.Trace(err)
}

for i := 0; i < len(args); i += 2 {
key, val := args[i], args[i+1]
err := tx.Set(key, val)
if err != nil {
return errors.Trace(err)
}
}
err = tx.Commit(context.Background())
if err != nil {
return errors.Trace(err)
}

return nil
}

func get(k []byte) (KV, error) {
tx, err := client.Begin()
if err != nil {
return KV{}, errors.Trace(err)
}
v, err := tx.Get(k)
if err != nil {
return KV{}, errors.Trace(err)
}
return KV{K: k, V: v}, nil
}

func dels(keys ...[]byte) error {
tx, err := client.Begin()
if err != nil {
return errors.Trace(err)
}
for _, key := range keys {
err := tx.Delete(key)
if err != nil {
return errors.Trace(err)
}
}
err = tx.Commit(context.Background())
if err != nil {
return errors.Trace(err)
}
return nil
}

func scan(keyPrefix []byte, limit int) ([]KV, error) {
tx, err := client.Begin()
if err != nil {
return nil, errors.Trace(err)
}
it, err := tx.Iter(key.Key(keyPrefix), nil)
if err != nil {
return nil, errors.Trace(err)
}
defer it.Close()
var ret []KV
for it.Valid() && limit > 0 {
ret = append(ret, KV{K: it.Key()[:], V: it.Value()[:]})
limit--
it.Next()
}
return ret, nil
}

func main() {
pdAddr := os.Getenv("PD_ADDR")
if pdAddr != "" {
os.Args = append(os.Args, "-pd", pdAddr)
}
flag.Parse()
initStore()

// set
err := puts([]byte("key1"), []byte("value1"), []byte("key2"), []byte("value2"))
if err != nil {
panic(err)
}

// get
kv, err := get([]byte("key1"))
if err != nil {
panic(err)
}
fmt.Println(kv)

// scan
ret, err := scan([]byte("key"), 10)
for _, kv := range ret {
fmt.Println(kv)
}

// delete
err = dels([]byte("key1"), []byte("key2"))
if err != nil {
panic(err)
}
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
module github.com/tikv/client-go

require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/coreos/bbolt v1.3.2 // indirect
github.com/coreos/etcd v3.3.12+incompatible // indirect
github.com/coreos/etcd v3.3.12+incompatible
github.com/coreos/go-semver v0.2.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
Expand Down Expand Up @@ -32,6 +34,7 @@ require (
github.com/pingcap/pd v2.1.5+incompatible
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446 // indirect
github.com/sirupsen/logrus v1.3.0
github.com/soheilhy/cmux v0.1.4 // indirect
Expand All @@ -46,5 +49,6 @@ require (
go.uber.org/zap v1.9.1 // indirect
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
google.golang.org/grpc v1.19.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)
8 changes: 7 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
Expand Down Expand Up @@ -46,7 +50,7 @@ github.com/gorilla/mux v1.7.0 h1:tOSd0UKHQd6urX6ApfOn4XdBMY6Sh1MfxV3kmaazO+U=
github.com/gorilla/mux v1.7.0/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:BWIsLfhgKhV5g/oF34aRjniBHLTZe5DNekSjbAjIS6c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
Expand Down Expand Up @@ -159,6 +163,8 @@ google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoA
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
10 changes: 10 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,15 @@ var (
Help: "Bucketed histogram of seconds cost for waiting timestamp future.",
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 18), // 5us ~ 128 ms
})

LocalLatchWaitTimeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "local_latch_wait_seconds",
Help: "Wait time of a get local latch.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
})
)

// RetLabel returns "ok" when err == nil and "err" when err != nil.
Expand Down Expand Up @@ -254,4 +263,5 @@ func init() {
prometheus.MustRegister(PendingBatchRequests)
prometheus.MustRegister(BatchWaitDuration)
prometheus.MustRegister(TSFutureWaitDuration)
prometheus.MustRegister(LocalLatchWaitTimeHistogram)
}
25 changes: 15 additions & 10 deletions retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ func (t BackoffType) String() string {

// Maximum total sleep time(in ms) for kv/cop commands.
const (
copBuildTaskMaxBackoff = 5000
tsoMaxBackoff = 15000
scannerNextMaxBackoff = 20000
batchGetMaxBackoff = 20000
copNextMaxBackoff = 20000
getMaxBackoff = 20000
prewriteMaxBackoff = 20000
cleanupMaxBackoff = 20000
CopBuildTaskMaxBackoff = 5000
TsoMaxBackoff = 15000
ScannerNextMaxBackoff = 20000
BatchGetMaxBackoff = 20000
CopNextMaxBackoff = 20000
GetMaxBackoff = 20000
PrewriteMaxBackoff = 20000
CleanupMaxBackoff = 20000
GcOneRegionMaxBackoff = 20000
GcResolveLockMaxBackoff = 100000
deleteRangeOneRegionMaxBackoff = 100000
DeleteRangeOneRegionMaxBackoff = 100000
RawkvMaxBackoff = 20000
splitRegionBackoff = 20000
SplitRegionBackoff = 20000
)

// CommitMaxBackoff is max sleep time of the 'commit' command
Expand Down Expand Up @@ -252,3 +252,8 @@ func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) {
func (b *Backoffer) GetContext() context.Context {
return b.ctx
}

// TotalSleep returns the total sleep time of the backoffer.
func (b *Backoffer) TotalSleep() time.Duration {
return time.Duration(b.totalSleep) * time.Millisecond
}
8 changes: 8 additions & 0 deletions rpc/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"google.golang.org/grpc/status"
)

// ErrBodyMissing response body is missing error
var ErrBodyMissing = errors.New("response body is missing")

// RegionRequestSender sends KV/Cop requests to tikv server. It handles network
// errors and some region errors internally.
//
Expand Down Expand Up @@ -57,6 +60,11 @@ func NewRegionRequestSender(regionCache *locate.RegionCache, client Client) *Reg
}
}

// RPCError returns an error if an RPC error is encountered during request.
func (s *RegionRequestSender) RPCError() error {
return s.rpcError
}

// SendReq sends a request to tikv server.
func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *Request, regionID locate.RegionVerID, timeout time.Duration) (*Response, error) {

Expand Down
Loading