Skip to content

Commit

Permalink
VReplication VPlayer: support statement and transaction batching (#14502
Browse files Browse the repository at this point in the history
)

Signed-off-by: Matt Lord <[email protected]>
Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
mattlord and rohit-nayak-ps authored Dec 7, 2023
1 parent b73b5b2 commit c6a6bfe
Show file tree
Hide file tree
Showing 21 changed files with 843 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ import (
"testing"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/test/endtoend/throttler"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
)

type WriteMetrics struct {
Expand Down Expand Up @@ -184,6 +184,9 @@ func TestMain(m *testing.M) {
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/test/endtoend/throttler"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
)

type testcase struct {
Expand Down Expand Up @@ -436,6 +436,9 @@ func TestMain(m *testing.M) {
"--migration_check_interval", "5s",
"--vstream_packet_size", "4096", // Keep this value small and below 10k to ensure multilple vstream iterations
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
10 changes: 8 additions & 2 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vttablet"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand All @@ -38,8 +39,13 @@ import (
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
// i.e. with foreign_key_checks=0.
func TestFKWorkflow(t *testing.T) {
// ensure that there are multiple copy phase cycles per table
extraVTTabletArgs = []string{"--vstream_packet_size=256"}
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
defer func() { extraVTTabletArgs = nil }()

cellName := "zone"
Expand Down
38 changes: 28 additions & 10 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ import (
"testing"
"time"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"github.com/buger/jsonparser"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
)

var (
Expand Down Expand Up @@ -280,6 +280,11 @@ func TestVreplicationCopyThrottling(t *testing.T) {
}

func TestBasicVreplicationWorkflow(t *testing.T) {
ogflags := extraVTTabletArgs
defer func() { extraVTTabletArgs = ogflags }()
// Test VPlayer batching mode.
extraVTTabletArgs = append(extraVTTabletArgs, fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching))
sourceKsOpts["DBTypeVersion"] = "mysql-8.0"
targetKsOpts["DBTypeVersion"] = "mysql-8.0"
testBasicVreplicationWorkflow(t, "noblob")
Expand Down Expand Up @@ -622,8 +627,15 @@ func testVStreamCellFlag(t *testing.T) {
func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
mainClusterConfig.vreplicationCompressGTID = true
oldVTTabletExtraArgs := extraVTTabletArgs
extraVTTabletArgs = append(extraVTTabletArgs,
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
)
defer func() {
mainClusterConfig.vreplicationCompressGTID = false
extraVTTabletArgs = oldVTTabletExtraArgs
}()
vc = NewVitessCluster(t, "TestCellAliasVreplicationWorkflow", cells, mainClusterConfig)
require.NotNil(t, vc)
Expand Down Expand Up @@ -777,6 +789,12 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
}
require.Equal(t, true, dec80Replicated)

// Insert multiple rows in the loadtest table and immediately delete them to confirm that bulk delete
// works the same way with the vplayer optimization enabled and disabled. Currently this optimization
// is disabled by default, but enabled in TestCellAliasVreplicationWorkflow.
execVtgateQuery(t, vtgateConn, sourceKs, "insert into loadtest(id, name) values(10001, 'tempCustomer'), (10002, 'tempCustomer2'), (10003, 'tempCustomer3'), (10004, 'tempCustomer4')")
execVtgateQuery(t, vtgateConn, sourceKs, "delete from loadtest where id > 10000")

// Confirm that all partial query metrics get updated when we are testing the noblob mode.
t.Run("validate partial query counts", func(t *testing.T) {
if !isBinlogRowImageNoBlob(t, productTab) {
Expand Down
22 changes: 15 additions & 7 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ var (

// BlplQuery is the key for the stats map.
BlplQuery = "Query"
// BlplMultiQuery is the key for the stats map.
BlplMultiQuery = "MultiQuery"
// BlplTransaction is the key for the stats map.
BlplTransaction = "Transaction"
// BlplBatchTransaction is the key for the stats map.
BlplBatchTransaction = "BatchTransaction"
)

// Stats is the internal stats of a player. It is a different
Expand All @@ -84,13 +88,15 @@ type Stats struct {

State atomic.Value

PhaseTimings *stats.Timings
QueryTimings *stats.Timings
QueryCount *stats.CountersWithSingleLabel
CopyRowCount *stats.Counter
CopyLoopCount *stats.Counter
ErrorCounts *stats.CountersWithMultiLabels
NoopQueryCount *stats.CountersWithSingleLabel
PhaseTimings *stats.Timings
QueryTimings *stats.Timings
QueryCount *stats.CountersWithSingleLabel
BulkQueryCount *stats.CountersWithSingleLabel
TrxQueryBatchCount *stats.CountersWithSingleLabel
CopyRowCount *stats.Counter
CopyLoopCount *stats.Counter
ErrorCounts *stats.CountersWithMultiLabels
NoopQueryCount *stats.CountersWithSingleLabel

VReplicationLags *stats.Timings
VReplicationLagRates *stats.Rates
Expand Down Expand Up @@ -157,6 +163,8 @@ func NewStats() *Stats {
bps.PhaseTimings = stats.NewTimings("", "", "Phase")
bps.QueryTimings = stats.NewTimings("", "", "Phase")
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "")
bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.CopyRowCount = stats.NewCounter("", "")
bps.CopyLoopCount = stats.NewCounter("", "")
bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"})
Expand Down
40 changes: 40 additions & 0 deletions go/vt/binlog/binlogplayer/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package binlogplayer
import (
"context"
"fmt"
"strings"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql"
Expand All @@ -38,6 +39,7 @@ type DBClient interface {
Rollback() error
Close()
ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)
ExecuteFetchMulti(query string, maxrows int) (qrs []*sqltypes.Result, err error)
}

// dbClientImpl is a real DBClient backed by a mysql connection.
Expand Down Expand Up @@ -140,6 +142,25 @@ func (dc *dbClientImpl) ExecuteFetch(query string, maxrows int) (*sqltypes.Resul
return mqr, nil
}

func (dc *dbClientImpl) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
results := make([]*sqltypes.Result, 0)
mqr, more, err := dc.dbConn.ExecuteFetchMulti(query, maxrows, true)
if err != nil {
dc.handleError(err)
return nil, err
}
results = append(results, mqr)
for more {
mqr, more, _, err = dc.dbConn.ReadQueryResult(maxrows, false)
if err != nil {
dc.handleError(err)
return nil, err
}
results = append(results, mqr)
}
return results, nil
}

func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
// Replace any provided sidecar database qualifiers with the correct one.
uq, err := sqlparser.ReplaceTableQualifiers(query, sidecar.DefaultName, sidecar.GetName())
Expand All @@ -148,3 +169,22 @@ func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetch(query string, maxr
}
return dcr.dbClientImpl.ExecuteFetch(uq, maxrows)
}

func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
// Replace any provided sidecar database qualifiers with the correct one.
qps, err := sqlparser.SplitStatementToPieces(query)
if err != nil {
return nil, err
}
for i, qp := range qps {
uq, err := sqlparser.ReplaceTableQualifiers(qp, sidecar.DefaultName, sidecar.GetName())
if err != nil {
return nil, err
}
qps[i] = uq
}
if err != nil {
return nil, err
}
return dcr.dbClientImpl.ExecuteFetchMulti(strings.Join(qps, ";"), maxrows)
}
4 changes: 4 additions & 0 deletions go/vt/binlog/binlogplayer/fake_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
}
return nil, fmt.Errorf("unexpected: %v", query)
}

func (dc *fakeDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
return make([]*sqltypes.Result, 0), nil
}
17 changes: 17 additions & 0 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"
)

const mockClientUNameFiltered = "Filtered"
Expand Down Expand Up @@ -224,3 +225,19 @@ func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
}
return result.result, result.err
}

func (dc *MockDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
queries, err := sqlparser.SplitStatementToPieces(query)
if err != nil {
return nil, err
}
results := make([]*sqltypes.Result, 0, len(queries))
for _, query := range queries {
qr, err := dc.ExecuteFetch(query, maxrows)
if err != nil {
return nil, err
}
results = append(results, qr)
}
return results, nil
}
3 changes: 3 additions & 0 deletions go/vt/vttablet/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import (
)

const (
// VReplicationExperimentalFlags is a bitmask of experimental features in vreplication.
VReplicationExperimentalFlagOptimizeInserts = int64(1)
VReplicationExperimentalFlagAllowNoBlobBinlogRowImage = int64(2)
VReplicationExperimentalFlagVPlayerBatching = int64(4)
)

var (
// Default flags.
VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
VReplicationNetReadTimeout = 300
VReplicationNetWriteTimeout = 600
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,22 @@ func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resu
return qr, err
}

func (dbc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
queries, err := sqlparser.SplitStatementToPieces(query)
if err != nil {
return nil, err
}
results := make([]*sqltypes.Result, 0, len(queries))
for _, query := range queries {
qr, err := dbc.ExecuteFetch(query, maxrows)
if err != nil {
return nil, err
}
results = append(results, qr)
}
return results, nil
}

//----------------------------------------------
// fakeTMClient

Expand Down
Loading

0 comments on commit c6a6bfe

Please sign in to comment.