diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 7f560a24f9e..9f442a39c76 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -29,16 +29,16 @@ import ( "testing" "time" - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/schema" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" "vitess.io/vitess/go/test/endtoend/throttler" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/vttablet" ) type WriteMetrics struct { @@ -184,6 +184,9 @@ func TestMain(m *testing.M) { "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", "--watch_replication_stream", + // Test VPlayer batching mode. + fmt.Sprintf("--vreplication_experimental_flags=%d", + vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching), } clusterInstance.VtGateExtraArgs = []string{ "--ddl_strategy", "online", diff --git a/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go b/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go index bac59241cf2..2d9caaa6703 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress_suite/onlineddl_vrepl_stress_suite_test.go @@ -40,18 +40,18 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/timer" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/schema" - "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" "vitess.io/vitess/go/test/endtoend/throttler" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "vitess.io/vitess/go/timer" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/vttablet" ) type testcase struct { @@ -436,6 +436,9 @@ func TestMain(m *testing.M) { "--migration_check_interval", "5s", "--vstream_packet_size", "4096", // Keep this value small and below 10k to ensure multilple vstream iterations "--watch_replication_stream", + // Test VPlayer batching mode. + fmt.Sprintf("--vreplication_experimental_flags=%d", + vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching), } clusterInstance.VtGateExtraArgs = []string{ "--ddl_strategy", "online", diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 4798edfb975..7d5f01c13db 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vttablet" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -38,8 +39,13 @@ import ( // It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without, // i.e. with foreign_key_checks=0. func TestFKWorkflow(t *testing.T) { - // ensure that there are multiple copy phase cycles per table - extraVTTabletArgs = []string{"--vstream_packet_size=256"} + extraVTTabletArgs = []string{ + // Ensure that there are multiple copy phase cycles per table. + "--vstream_packet_size=256", + // Test VPlayer batching mode. + fmt.Sprintf("--vreplication_experimental_flags=%d", + vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching), + } defer func() { extraVTTabletArgs = nil }() cellName := "zone" diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 62d174df067..2bc4df760ee 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -27,27 +27,27 @@ import ( "testing" "time" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vtgate/vtgateconn" - + "github.com/buger/jsonparser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" - "github.com/buger/jsonparser" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/log" - querypb "vitess.io/vitess/go/vt/proto/query" - throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" ) var ( @@ -280,6 +280,11 @@ func TestVreplicationCopyThrottling(t *testing.T) { } func TestBasicVreplicationWorkflow(t *testing.T) { + ogflags := extraVTTabletArgs + defer func() { extraVTTabletArgs = ogflags }() + // Test VPlayer batching mode. + extraVTTabletArgs = append(extraVTTabletArgs, fmt.Sprintf("--vreplication_experimental_flags=%d", + vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching)) sourceKsOpts["DBTypeVersion"] = "mysql-8.0" targetKsOpts["DBTypeVersion"] = "mysql-8.0" testBasicVreplicationWorkflow(t, "noblob") @@ -622,8 +627,15 @@ func testVStreamCellFlag(t *testing.T) { func TestCellAliasVreplicationWorkflow(t *testing.T) { cells := []string{"zone1", "zone2"} mainClusterConfig.vreplicationCompressGTID = true + oldVTTabletExtraArgs := extraVTTabletArgs + extraVTTabletArgs = append(extraVTTabletArgs, + // Test VPlayer batching mode. + fmt.Sprintf("--vreplication_experimental_flags=%d", + vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching), + ) defer func() { mainClusterConfig.vreplicationCompressGTID = false + extraVTTabletArgs = oldVTTabletExtraArgs }() vc = NewVitessCluster(t, "TestCellAliasVreplicationWorkflow", cells, mainClusterConfig) require.NotNil(t, vc) @@ -777,6 +789,12 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl } require.Equal(t, true, dec80Replicated) + // Insert multiple rows in the loadtest table and immediately delete them to confirm that bulk delete + // works the same way with the vplayer optimization enabled and disabled. Currently this optimization + // is disabled by default, but enabled in TestCellAliasVreplicationWorkflow. + execVtgateQuery(t, vtgateConn, sourceKs, "insert into loadtest(id, name) values(10001, 'tempCustomer'), (10002, 'tempCustomer2'), (10003, 'tempCustomer3'), (10004, 'tempCustomer4')") + execVtgateQuery(t, vtgateConn, sourceKs, "delete from loadtest where id > 10000") + // Confirm that all partial query metrics get updated when we are testing the noblob mode. t.Run("validate partial query counts", func(t *testing.T) { if !isBinlogRowImageNoBlob(t, productTab) { diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 6d689bc5436..5b9d2e40e1e 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -60,8 +60,12 @@ var ( // BlplQuery is the key for the stats map. BlplQuery = "Query" + // BlplMultiQuery is the key for the stats map. + BlplMultiQuery = "MultiQuery" // BlplTransaction is the key for the stats map. BlplTransaction = "Transaction" + // BlplBatchTransaction is the key for the stats map. + BlplBatchTransaction = "BatchTransaction" ) // Stats is the internal stats of a player. It is a different @@ -84,13 +88,15 @@ type Stats struct { State atomic.Value - PhaseTimings *stats.Timings - QueryTimings *stats.Timings - QueryCount *stats.CountersWithSingleLabel - CopyRowCount *stats.Counter - CopyLoopCount *stats.Counter - ErrorCounts *stats.CountersWithMultiLabels - NoopQueryCount *stats.CountersWithSingleLabel + PhaseTimings *stats.Timings + QueryTimings *stats.Timings + QueryCount *stats.CountersWithSingleLabel + BulkQueryCount *stats.CountersWithSingleLabel + TrxQueryBatchCount *stats.CountersWithSingleLabel + CopyRowCount *stats.Counter + CopyLoopCount *stats.Counter + ErrorCounts *stats.CountersWithMultiLabels + NoopQueryCount *stats.CountersWithSingleLabel VReplicationLags *stats.Timings VReplicationLagRates *stats.Rates @@ -157,6 +163,8 @@ func NewStats() *Stats { bps.PhaseTimings = stats.NewTimings("", "", "Phase") bps.QueryTimings = stats.NewTimings("", "", "Phase") bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "") + bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") + bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") bps.CopyRowCount = stats.NewCounter("", "") bps.CopyLoopCount = stats.NewCounter("", "") bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"}) diff --git a/go/vt/binlog/binlogplayer/dbclient.go b/go/vt/binlog/binlogplayer/dbclient.go index f9cd03691a5..ce2ccaccb17 100644 --- a/go/vt/binlog/binlogplayer/dbclient.go +++ b/go/vt/binlog/binlogplayer/dbclient.go @@ -19,6 +19,7 @@ package binlogplayer import ( "context" "fmt" + "strings" "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/mysql" @@ -38,6 +39,7 @@ type DBClient interface { Rollback() error Close() ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) + ExecuteFetchMulti(query string, maxrows int) (qrs []*sqltypes.Result, err error) } // dbClientImpl is a real DBClient backed by a mysql connection. @@ -140,6 +142,25 @@ func (dc *dbClientImpl) ExecuteFetch(query string, maxrows int) (*sqltypes.Resul return mqr, nil } +func (dc *dbClientImpl) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) { + results := make([]*sqltypes.Result, 0) + mqr, more, err := dc.dbConn.ExecuteFetchMulti(query, maxrows, true) + if err != nil { + dc.handleError(err) + return nil, err + } + results = append(results, mqr) + for more { + mqr, more, _, err = dc.dbConn.ReadQueryResult(maxrows, false) + if err != nil { + dc.handleError(err) + return nil, err + } + results = append(results, mqr) + } + return results, nil +} + func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) { // Replace any provided sidecar database qualifiers with the correct one. uq, err := sqlparser.ReplaceTableQualifiers(query, sidecar.DefaultName, sidecar.GetName()) @@ -148,3 +169,22 @@ func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetch(query string, maxr } return dcr.dbClientImpl.ExecuteFetch(uq, maxrows) } + +func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) { + // Replace any provided sidecar database qualifiers with the correct one. + qps, err := sqlparser.SplitStatementToPieces(query) + if err != nil { + return nil, err + } + for i, qp := range qps { + uq, err := sqlparser.ReplaceTableQualifiers(qp, sidecar.DefaultName, sidecar.GetName()) + if err != nil { + return nil, err + } + qps[i] = uq + } + if err != nil { + return nil, err + } + return dcr.dbClientImpl.ExecuteFetchMulti(strings.Join(qps, ";"), maxrows) +} diff --git a/go/vt/binlog/binlogplayer/fake_dbclient.go b/go/vt/binlog/binlogplayer/fake_dbclient.go index 186722cf12f..750f35b3fe3 100644 --- a/go/vt/binlog/binlogplayer/fake_dbclient.go +++ b/go/vt/binlog/binlogplayer/fake_dbclient.go @@ -80,3 +80,7 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re } return nil, fmt.Errorf("unexpected: %v", query) } + +func (dc *fakeDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) { + return make([]*sqltypes.Result, 0), nil +} diff --git a/go/vt/binlog/binlogplayer/mock_dbclient.go b/go/vt/binlog/binlogplayer/mock_dbclient.go index d64c4d40146..ce07fbe9179 100644 --- a/go/vt/binlog/binlogplayer/mock_dbclient.go +++ b/go/vt/binlog/binlogplayer/mock_dbclient.go @@ -25,6 +25,7 @@ import ( "time" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/sqlparser" ) const mockClientUNameFiltered = "Filtered" @@ -224,3 +225,19 @@ func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re } return result.result, result.err } + +func (dc *MockDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) { + queries, err := sqlparser.SplitStatementToPieces(query) + if err != nil { + return nil, err + } + results := make([]*sqltypes.Result, 0, len(queries)) + for _, query := range queries { + qr, err := dc.ExecuteFetch(query, maxrows) + if err != nil { + return nil, err + } + results = append(results, qr) + } + return results, nil +} diff --git a/go/vt/vttablet/flags.go b/go/vt/vttablet/flags.go index 3ce2cd3b378..994080b95a5 100644 --- a/go/vt/vttablet/flags.go +++ b/go/vt/vttablet/flags.go @@ -25,11 +25,14 @@ import ( ) const ( + // VReplicationExperimentalFlags is a bitmask of experimental features in vreplication. VReplicationExperimentalFlagOptimizeInserts = int64(1) VReplicationExperimentalFlagAllowNoBlobBinlogRowImage = int64(2) + VReplicationExperimentalFlagVPlayerBatching = int64(4) ) var ( + // Default flags. VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage VReplicationNetReadTimeout = 300 VReplicationNetWriteTimeout = 600 diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index d5e8c134814..d0b81179f0f 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -396,6 +396,22 @@ func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resu return qr, err } +func (dbc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) { + queries, err := sqlparser.SplitStatementToPieces(query) + if err != nil { + return nil, err + } + results := make([]*sqltypes.Result, 0, len(queries)) + for _, query := range queries { + qr, err := dbc.ExecuteFetch(query, maxrows) + if err != nil { + return nil, err + } + results = append(results, qr) + } + return results, nil +} + //---------------------------------------------- // fakeTMClient diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index ee1a1dbc06c..64a924f28d3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -28,26 +28,25 @@ import ( "testing" "time" - "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/vt/dbconnpool" - "vitess.io/vitess/go/vt/vttablet" - - "vitess.io/vitess/go/test/utils" - "vitess.io/vitess/go/vt/dbconfigs" - "github.com/spf13/pflag" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" _flag "vitess.io/vitess/go/internal/flag" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sidecardb" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" "vitess.io/vitess/go/vt/vttablet/tabletconn" @@ -70,6 +69,7 @@ var ( globalFBC = &fakeBinlogClient{} vrepldb = "vrepl" globalDBQueries = make(chan string, 1000) + lastMultiExecQuery = "" testForeignKeyQueries = false testSetForeignKeyQueries = false doNotLogDBQueries = false @@ -494,6 +494,23 @@ func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resu return qr, err } +func (dc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) { + queries, err := sqlparser.SplitStatementToPieces(query) + if err != nil { + return nil, err + } + results := make([]*sqltypes.Result, 0, len(queries)) + for _, query := range queries { + qr, err := dc.ExecuteFetch(query, maxrows) + if err != nil { + return nil, err + } + results = append(results, qr) + } + lastMultiExecQuery = query + return results, nil +} + func expectDeleteQueries(t *testing.T) { t.Helper() if doNotLogDBQueries { diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 39ffdef04ae..9ecf8669d6d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -29,13 +29,14 @@ import ( vjson "vitess.io/vitess/go/mysql/json" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) // ReplicatorPlan is the execution plan for the replicator. It contains @@ -195,6 +196,7 @@ type TablePlan struct { Insert *sqlparser.ParsedQuery Update *sqlparser.ParsedQuery Delete *sqlparser.ParsedQuery + MultiDelete *sqlparser.ParsedQuery Fields []*querypb.Field EnumValuesMap map[string](map[string]string) ConvertIntToEnum map[string]bool @@ -444,6 +446,126 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun return nil, nil } +// applyBulkDeleteChanges applies a bulk DELETE statement from the row changes +// to the target table -- which resulted from a DELETE statement executed on the +// source that deleted N rows -- using an IN clause with the primary key values +// of the rows to be deleted. This currently only supports tables with single +// column primary keys. This limitation is in place for now as we know that case +// will still be efficient. When using large multi-column IN or OR group clauses +// in DELETES we could end up doing large (table) scans that actually make things +// slower. +// TODO: Add support for multi-column primary keys. +func (tp *TablePlan) applyBulkDeleteChanges(rowDeletes []*binlogdatapb.RowChange, executor func(string) (*sqltypes.Result, error), maxQuerySize int64) (*sqltypes.Result, error) { + if len(rowDeletes) == 0 { + return &sqltypes.Result{}, nil + } + if (len(tp.TablePlanBuilder.pkCols) + len(tp.TablePlanBuilder.extraSourcePkCols)) != 1 { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "bulk delete is only supported for tables with a single primary key column") + } + if tp.MultiDelete == nil { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "plan has no bulk delete query") + } + + baseQuerySize := int64(len(tp.MultiDelete.Query)) + querySize := baseQuerySize + + execQuery := func(pkVals *[]sqltypes.Value) (*sqltypes.Result, error) { + pksBV, err := sqltypes.BuildBindVariable(*pkVals) + if err != nil { + return nil, err + } + query, err := tp.MultiDelete.GenerateQuery(map[string]*querypb.BindVariable{"bulk_pks": pksBV}, nil) + if err != nil { + return nil, err + } + tp.TablePlanBuilder.stats.BulkQueryCount.Add("delete", 1) + return executor(query) + } + + pkIndex := -1 + pkVals := make([]sqltypes.Value, 0, len(rowDeletes)) + for _, rowDelete := range rowDeletes { + vals := sqltypes.MakeRowTrusted(tp.Fields, rowDelete.Before) + if pkIndex == -1 { + for i := range vals { + if tp.PKIndices[i] { + pkIndex = i + break + } + } + } + addedSize := int64(len(vals[pkIndex].Raw()) + 2) // Plus 2 for the comma and space + if querySize+addedSize > maxQuerySize { + if _, err := execQuery(&pkVals); err != nil { + return nil, err + } + pkVals = nil + querySize = baseQuerySize + } + pkVals = append(pkVals, vals[pkIndex]) + querySize += addedSize + } + + return execQuery(&pkVals) +} + +// applyBulkInsertChanges generates a multi-row INSERT statement from the row +// changes generated from a multi-row INSERT statement executed on the source. +func (tp *TablePlan) applyBulkInsertChanges(rowInserts []*binlogdatapb.RowChange, executor func(string) (*sqltypes.Result, error), maxQuerySize int64) (*sqltypes.Result, error) { + if len(rowInserts) == 0 { + return &sqltypes.Result{}, nil + } + if tp.BulkInsertFront == nil { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "plan has no bulk insert query") + } + + prefix := &strings.Builder{} + prefix.WriteString(tp.BulkInsertFront.Query) + prefix.WriteString(" values ") + insertPrefix := prefix.String() + maxQuerySize -= int64(len(insertPrefix)) + values := &strings.Builder{} + + execQuery := func(vals *strings.Builder) (*sqltypes.Result, error) { + if tp.BulkInsertOnDup != nil { + vals.WriteString(tp.BulkInsertOnDup.Query) + } + tp.TablePlanBuilder.stats.BulkQueryCount.Add("insert", 1) + return executor(insertPrefix + vals.String()) + } + + newStmt := true + for _, rowInsert := range rowInserts { + rowValues := &strings.Builder{} + bindvars := make(map[string]*querypb.BindVariable, len(tp.Fields)) + vals := sqltypes.MakeRowTrusted(tp.Fields, rowInsert.After) + for n, field := range tp.Fields { + bindVar, err := tp.bindFieldVal(field, &vals[n]) + if err != nil { + return nil, err + } + bindvars["a_"+field.Name] = bindVar + } + if err := tp.BulkInsertValues.Append(rowValues, bindvars, nil); err != nil { + return nil, err + } + if int64(values.Len()+2+rowValues.Len()) > maxQuerySize { // Plus 2 for the comma and space + if _, err := execQuery(values); err != nil { + return nil, err + } + values.Reset() + newStmt = true + } + if !newStmt { + values.WriteString(", ") + } + values.WriteString(rowValues.String()) + newStmt = false + } + + return execQuery(values) +} + func getQuery(pq *sqlparser.ParsedQuery, bindvars map[string]*querypb.BindVariable) (string, error) { sql, err := pq.GenerateQuery(bindvars, nil) if err != nil { diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats.go b/go/vt/vttablet/tabletmanager/vreplication/stats.go index 6379a9ba04f..892247efee0 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats.go @@ -254,6 +254,39 @@ func (st *vrStats) register() { return result }) + stats.NewGaugesFuncWithMultiLabels( + "VReplicationBulkQueryCount", + "vreplication vplayer queries with consolidated row events counts per DML type per stream", + []string{"source_keyspace", "source_shard", "workflow", "counts", "dml_type"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64, len(st.controllers)) + for _, ct := range st.controllers { + for label, count := range ct.blpStats.BulkQueryCount.Counts() { + if label == "" { + continue + } + result[ct.source.Keyspace+"."+ct.source.Shard+"."+ct.workflow+"."+fmt.Sprintf("%v", ct.id)+"."+label] = count + } + } + return result + }) + stats.NewCounterFunc( + "VReplicationBulkQueryCountTotal", + "vreplication vplayer queries with consolidated row events counts aggregated across all streams", + func() int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := int64(0) + for _, ct := range st.controllers { + for _, count := range ct.blpStats.BulkQueryCount.Counts() { + result += count + } + } + return result + }) + stats.NewGaugesFuncWithMultiLabels( "VReplicationNoopQueryCount", "vreplication noop query counts per stream", @@ -287,6 +320,41 @@ func (st *vrStats) register() { } return result }) + + stats.NewGaugesFuncWithMultiLabels( + "VReplicationTrxQueryBatchCount", + "vreplication vplayer transaction query batch counts per type per stream", + []string{"source_keyspace", "source_shard", "workflow", "counts", "commit_or_not"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64, len(st.controllers)) + for _, ct := range st.controllers { + for label, count := range ct.blpStats.TrxQueryBatchCount.Counts() { + if label == "" { + continue + } + result[ct.source.Keyspace+"."+ct.source.Shard+"."+ct.workflow+"."+fmt.Sprintf("%v", ct.id)+"."+label] = count + } + } + return result + }) + + stats.NewCounterFunc( + "VReplicationTrxQueryBatchCountTotal", + "vreplication vplayer transaction query batch counts aggregated across all streams", + func() int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := int64(0) + for _, ct := range st.controllers { + for _, count := range ct.blpStats.TrxQueryBatchCount.Counts() { + result += count + } + } + return result + }) + stats.NewGaugesFuncWithMultiLabels( "VReplicationCopyRowCount", "vreplication rows copied in copy phase per stream", @@ -476,6 +544,8 @@ func (st *vrStats) status() *EngineStatus { SourceTablet: ct.sourceTablet.Load().(*topodatapb.TabletAlias), Messages: ct.blpStats.MessageHistory(), QueryCounts: ct.blpStats.QueryCount.Counts(), + BulkQueryCounts: ct.blpStats.BulkQueryCount.Counts(), + TrxQueryBatchCounts: ct.blpStats.TrxQueryBatchCount.Counts(), PhaseTimings: ct.blpStats.PhaseTimings.Counts(), CopyRowCount: ct.blpStats.CopyRowCount.Get(), CopyLoopCount: ct.blpStats.CopyLoopCount.Get(), @@ -514,6 +584,8 @@ type ControllerStatus struct { SourceTablet *topodatapb.TabletAlias Messages []string QueryCounts map[string]int64 + BulkQueryCounts map[string]int64 + TrxQueryBatchCounts map[string]int64 PhaseTimings map[string]int64 CopyRowCount int64 CopyLoopCount int64 diff --git a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go index d5b5eacbdf2..79149d34d6d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/stats_test.go @@ -169,6 +169,16 @@ func TestVReplicationStats(t *testing.T) { require.Equal(t, int64(11), testStats.status().Controllers[0].QueryCounts["replicate"]) require.Equal(t, int64(23), testStats.status().Controllers[0].QueryCounts["fastforward"]) + blpStats.BulkQueryCount.Add("insert", 101) + blpStats.BulkQueryCount.Add("delete", 203) + require.Equal(t, int64(101), testStats.status().Controllers[0].BulkQueryCounts["insert"]) + require.Equal(t, int64(203), testStats.status().Controllers[0].BulkQueryCounts["delete"]) + + blpStats.TrxQueryBatchCount.Add("without_commit", 10) + blpStats.TrxQueryBatchCount.Add("with_commit", 2193) + require.Equal(t, int64(10), testStats.status().Controllers[0].TrxQueryBatchCounts["without_commit"]) + require.Equal(t, int64(2193), testStats.status().Controllers[0].TrxQueryBatchCounts["with_commit"]) + blpStats.CopyLoopCount.Add(100) blpStats.CopyRowCount.Add(200) require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount) diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index d94d0640529..715d87186a6 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" @@ -361,6 +362,7 @@ func (tpb *tablePlanBuilder) generate() *TablePlan { Insert: tpb.generateInsertStatement(), Update: tpb.generateUpdateStatement(), Delete: tpb.generateDeleteStatement(), + MultiDelete: tpb.generateMultiDeleteStatement(), PKReferences: pkrefs, PKIndices: tpb.pkIndices, Stats: tpb.stats, @@ -870,6 +872,18 @@ func (tpb *tablePlanBuilder) generateDeleteStatement() *sqlparser.ParsedQuery { return buf.ParsedQuery() } +func (tpb *tablePlanBuilder) generateMultiDeleteStatement() *sqlparser.ParsedQuery { + if vttablet.VReplicationExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching == 0 || + (len(tpb.pkCols)+len(tpb.extraSourcePkCols)) != 1 { + return nil + } + return sqlparser.BuildParsedQuery("delete from %s where %s in %a", + sqlparser.String(tpb.name), + sqlparser.String(tpb.pkCols[0].colName), + "::bulk_pks", + ) +} + func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) { buf.WriteString(" where ") bvf.mode = bvBefore diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index f88c900f2db..196ee6aac86 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -27,20 +27,20 @@ import ( "google.golang.org/protobuf/encoding/prototext" - "vitess.io/vitess/go/vt/vttablet" - "vitess.io/vitess/go/bytes2" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/pools" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type vcopier struct { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index c3941b0f1bb..39a8229efc6 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -19,12 +19,15 @@ package vreplication import ( "context" "io" + "strings" "time" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) // vdbClient is a wrapper on binlogplayer.DBClient. @@ -35,6 +38,9 @@ type vdbClient struct { InTransaction bool startTime time.Time queries []string + queriesPos int64 + batchSize int64 + maxBatchSize int64 } func newVDBClient(dbclient binlogplayer.DBClient, stats *binlogplayer.Stats) *vdbClient { @@ -51,6 +57,13 @@ func (vc *vdbClient) Begin() error { if err := vc.DBClient.Begin(); err != nil { return err } + + // If we're batching, we only batch the contents of the + // transaction, which starts with the begin and ends with + // the commit. + vc.queriesPos = int64(len(vc.queries)) + vc.batchSize = 6 // begin and semicolon + vc.queries = append(vc.queries, "begin") vc.InTransaction = true vc.startTime = time.Now() @@ -63,10 +76,30 @@ func (vc *vdbClient) Commit() error { } vc.InTransaction = false vc.queries = nil + vc.batchSize = 0 vc.stats.Timings.Record(binlogplayer.BlplTransaction, vc.startTime) return nil } +// CommitTrxQueryBatch sends the current transaction's query batch -- which +// is often the full contents of the transaction, unless we've crossed +// the maxBatchSize one or more times -- down the wire to the database, +// including the final commit. +func (vc *vdbClient) CommitTrxQueryBatch() error { + vc.queries = append(vc.queries, "commit") + queries := strings.Join(vc.queries[vc.queriesPos:], ";") + for _, err := vc.DBClient.ExecuteFetchMulti(queries, -1); err != nil; { + return err + } + vc.InTransaction = false + vc.queries = nil + vc.queriesPos = 0 + vc.batchSize = 0 + vc.stats.TrxQueryBatchCount.Add("with_commit", 1) + vc.stats.Timings.Record(binlogplayer.BlplBatchTransaction, vc.startTime) + return nil +} + func (vc *vdbClient) Rollback() error { if !vc.InTransaction { return nil @@ -90,6 +123,43 @@ func (vc *vdbClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, return vc.DBClient.ExecuteFetch(query, maxrows) } +// AddQueryToTrxBatch adds the query to the current transaction's query +// batch. If this new query would cause the current batch to exceed +// the maxBatchSize, then the current unsent batch is sent down the +// wire and this query will be included in the next batch. +func (vc *vdbClient) AddQueryToTrxBatch(query string) error { + if !vc.InTransaction { + return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "cannot batch query outside of a transaction: %s", query) + } + + addedSize := int64(len(query)) + 1 // Plus 1 for the semicolon + if vc.batchSize+addedSize > vc.maxBatchSize { + if _, err := vc.ExecuteTrxQueryBatch(); err != nil { + return err + } + } + vc.queries = append(vc.queries, query) + vc.batchSize += addedSize + + return nil +} + +// ExecuteQueryBatch sends the transaction's current batch of queries +// down the wire to the database. +func (vc *vdbClient) ExecuteTrxQueryBatch() ([]*sqltypes.Result, error) { + defer vc.stats.Timings.Record(binlogplayer.BlplMultiQuery, time.Now()) + + qrs, err := vc.DBClient.ExecuteFetchMulti(strings.Join(vc.queries[vc.queriesPos:], ";"), -1) + if err != nil { + return nil, err + } + vc.stats.TrxQueryBatchCount.Add("without_commit", 1) + vc.queriesPos += int64(len(vc.queries[vc.queriesPos:])) + vc.batchSize = 0 + + return qrs, nil +} + // Execute is ExecuteFetch without the maxrows. func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) { // Number of rows should never exceed relayLogMaxItems. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index be8876f26d2..c222bc11781 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -28,9 +28,9 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vttablet" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -47,6 +47,14 @@ type vplayer struct { replicatorPlan *ReplicatorPlan tablePlans map[string]*TablePlan + // These are set when creating the VPlayer based on whether the VPlayer + // is in batch (stmt and trx) execution mode or not. + query func(ctx context.Context, sql string) (*sqltypes.Result, error) + commit func() error + // If the VPlayer is in batch mode, we accumulate each transaction's statements + // that are then sent as a single multi-statement protocol request to the database. + batchMode bool + pos replication.Position // unsavedEvent is set any time we skip an event without // saving, which is on an empty commit. @@ -104,6 +112,47 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map settings.StopPos = pausePos saveStop = false } + + queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { + return vr.dbClient.ExecuteWithRetry(ctx, sql) + } + commitFunc := func() error { + return vr.dbClient.Commit() + } + batchMode := false + if vttablet.VReplicationExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { + batchMode = true + } + if batchMode { + // relayLogMaxSize is effectively the limit used when not batching. + maxAllowedPacket := int64(relayLogMaxSize) + // We explicitly do NOT want to batch this, we want to send it down the wire + // immediately so we use ExecuteFetch directly. + res, err := vr.dbClient.ExecuteFetch("select @@session.max_allowed_packet as max_allowed_packet", 1) + if err != nil { + log.Errorf("Error getting max_allowed_packet, will use the relay_log_max_size value of %d bytes: %v", relayLogMaxSize, err) + } else { + if maxAllowedPacket, err = res.Rows[0][0].ToInt64(); err != nil { + log.Errorf("Error getting max_allowed_packet, will use the relay_log_max_size value of %d bytes: %v", relayLogMaxSize, err) + } + } + // Leave 64 bytes of room for the commit to be sure that we have a more than + // ample buffer left. The default value of max_allowed_packet is 4MiB in 5.7 + // and 64MiB in 8.0 -- and the default for max_relay_log_size is 250000 + // bytes -- so we have plenty of room. + maxAllowedPacket -= 64 + queryFunc = func(ctx context.Context, sql string) (*sqltypes.Result, error) { + if !vr.dbClient.InTransaction { // Should be sent down the wire immediately + return vr.dbClient.Execute(sql) + } + return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch + } + commitFunc = func() error { + return vr.dbClient.CommitTrxQueryBatch() // Commit the current trx batch + } + vr.dbClient.maxBatchSize = maxAllowedPacket + } + return &vplayer{ vr: vr, startPos: settings.StartPos, @@ -115,6 +164,9 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map tablePlans: make(map[string]*TablePlan), phase: phase, throttlerAppName: throttlerapp.VCopierName.ConcatenateString(vr.throttlerAppName()), + query: queryFunc, + commit: commitFunc, + batchMode: batchMode, } } @@ -177,7 +229,7 @@ func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error { return nil } log.Infof("Setting this session's foreign_key_checks to %s", strconv.FormatBool(dbForeignKeyChecksEnabled)) - if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, "set @@session.foreign_key_checks="+strconv.FormatBool(dbForeignKeyChecksEnabled)); err != nil { + if _, err := vp.query(ctx, "set @@session.foreign_key_checks="+strconv.FormatBool(dbForeignKeyChecksEnabled)); err != nil { return fmt.Errorf("failed to set session foreign_key_checks: %w", err) } vp.foreignKeyChecksEnabled = dbForeignKeyChecksEnabled @@ -263,7 +315,7 @@ func (vp *vplayer) applyStmtEvent(ctx context.Context, event *binlogdatapb.VEven } if event.Type == binlogdatapb.VEventType_SAVEPOINT || vp.canAcceptStmtEvents { start := time.Now() - _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) + _, err := vp.query(ctx, sql) vp.vr.stats.QueryTimings.Record(vp.phase, start) vp.vr.stats.QueryCount.Add(vp.phase, 1) return err @@ -279,27 +331,46 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row if tplan == nil { return fmt.Errorf("unexpected event on table %s", rowEvent.TableName) } + applyFunc := func(sql string) (*sqltypes.Result, error) { + stats := NewVrLogStats("ROWCHANGE") + start := time.Now() + qr, err := vp.query(ctx, sql) + vp.vr.stats.QueryCount.Add(vp.phase, 1) + vp.vr.stats.QueryTimings.Record(vp.phase, start) + stats.Send(sql) + return qr, err + } + + if vp.batchMode && len(rowEvent.RowChanges) > 1 { + // If we have multiple delete row events for a table with a single PK column + // then we can perform a simple bulk DELETE using an IN clause. + if (rowEvent.RowChanges[0].Before != nil && rowEvent.RowChanges[0].After == nil) && + tplan.MultiDelete != nil { + _, err := tplan.applyBulkDeleteChanges(rowEvent.RowChanges, applyFunc, vp.vr.dbClient.maxBatchSize) + return err + } + // If we're done with the copy phase then we will be replicating all INSERTS + // regardless of the PK value and can use a single INSERT statment with + // multiple VALUES clauses. + if len(vp.copyState) == 0 && (rowEvent.RowChanges[0].Before == nil && rowEvent.RowChanges[0].After != nil) { + _, err := tplan.applyBulkInsertChanges(rowEvent.RowChanges, applyFunc, vp.vr.dbClient.maxBatchSize) + return err + } + } + for _, change := range rowEvent.RowChanges { - _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { - stats := NewVrLogStats("ROWCHANGE") - start := time.Now() - qr, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql) - vp.vr.stats.QueryCount.Add(vp.phase, 1) - vp.vr.stats.QueryTimings.Record(vp.phase, start) - stats.Send(sql) - return qr, err - }) - if err != nil { + if _, err := tplan.applyChange(change, applyFunc); err != nil { return err } } + return nil } -func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { +func (vp *vplayer) updatePos(ctx context.Context, ts int64) (posReached bool, err error) { vp.numAccumulatedHeartbeats = 0 update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get(), vreplicationStoreCompressedGTID) - if _, err := vp.vr.dbClient.Execute(update); err != nil { + if _, err := vp.query(ctx, update); err != nil { return false, fmt.Errorf("error %v updating position", err) } vp.unsavedEvent = nil @@ -393,7 +464,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if ctx.Err() != nil { return ctx.Err() } - // check throttler. + // Check throttler. if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)) { _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName) continue @@ -417,7 +488,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // In both cases, now > timeLastSaved. If so, the GTID of the last unsavedEvent // must be saved. if time.Since(vp.timeLastSaved) >= idleTimeout && vp.unsavedEvent != nil { - posReached, err := vp.updatePos(vp.unsavedEvent.Timestamp) + posReached, err := vp.updatePos(ctx, vp.unsavedEvent.Timestamp) if err != nil { return err } @@ -516,11 +587,11 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m vp.unsavedEvent = event return nil } - posReached, err := vp.updatePos(event.Timestamp) + posReached, err := vp.updatePos(ctx, event.Timestamp) if err != nil { return err } - if err := vp.vr.dbClient.Commit(); err != nil { + if err := vp.commit(); err != nil { return err } if posReached { @@ -573,7 +644,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return fmt.Errorf("internal error: vplayer is in a transaction on event: %v", event) } // Just update the position. - posReached, err := vp.updatePos(event.Timestamp) + posReached, err := vp.updatePos(ctx, event.Timestamp) if err != nil { return err } @@ -589,7 +660,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m switch vp.vr.source.OnDdl { case binlogdatapb.OnDDLAction_IGNORE: // We still have to update the position. - posReached, err := vp.updatePos(event.Timestamp) + posReached, err := vp.updatePos(ctx, event.Timestamp) if err != nil { return err } @@ -600,13 +671,13 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m if err := vp.vr.dbClient.Begin(); err != nil { return err } - if _, err := vp.updatePos(event.Timestamp); err != nil { + if _, err := vp.updatePos(ctx, event.Timestamp); err != nil { return err } if err := vp.vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, fmt.Sprintf("Stopped at DDL %s", event.Statement)); err != nil { return err } - if err := vp.vr.dbClient.Commit(); err != nil { + if err := vp.commit(); err != nil { return err } return io.EOF @@ -615,11 +686,11 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m // So, we apply the DDL first, and then save the position. // Manual intervention may be needed if there is a partial // failure here. - if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Statement); err != nil { + if _, err := vp.query(ctx, event.Statement); err != nil { return err } stats.Send(fmt.Sprintf("%v", event.Statement)) - posReached, err := vp.updatePos(event.Timestamp) + posReached, err := vp.updatePos(ctx, event.Timestamp) if err != nil { return err } @@ -627,11 +698,11 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return io.EOF } case binlogdatapb.OnDDLAction_EXEC_IGNORE: - if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Statement); err != nil { + if _, err := vp.query(ctx, event.Statement); err != nil { log.Infof("Ignoring error: %v for DDL: %s", err, event.Statement) } stats.Send(fmt.Sprintf("%v", event.Statement)) - posReached, err := vp.updatePos(event.Timestamp) + posReached, err := vp.updatePos(ctx, event.Timestamp) if err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 3b215d03791..dc11ac7bd9c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "os" + "regexp" "strconv" "strings" "sync" @@ -2335,7 +2336,7 @@ func TestPlayerCancelOnLock(t *testing.T) { } } -func TestPlayerBatching(t *testing.T) { +func TestPlayerTransactions(t *testing.T) { defer deleteTablet(addTablet(100)) execStatements(t, []string{ @@ -3107,6 +3108,256 @@ func TestPlayerNoBlob(t *testing.T) { require.Equal(t, int64(4), stats.PartialQueryCount.Counts()["update"]) } +func TestPlayerBatchMode(t *testing.T) { + // To test trx batch splitting at 1024-64 bytes. + maxAllowedPacket := 1024 + oldVreplicationExperimentalFlags := vttablet.VReplicationExperimentalFlags + vttablet.VReplicationExperimentalFlags = vttablet.VReplicationExperimentalFlagVPlayerBatching + defer func() { + vttablet.VReplicationExperimentalFlags = oldVreplicationExperimentalFlags + }() + + defer deleteTablet(addTablet(100)) + execStatements(t, []string{ + fmt.Sprintf("set @@global.max_allowed_packet=%d", maxAllowedPacket), + "create table t1(id bigint, val1 varchar(1000), primary key(id))", + fmt.Sprintf("create table %s.t1(id bigint, val1 varchar(1000), primary key(id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table t1", + fmt.Sprintf("drop table %s.t1", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + OnDdl: binlogdatapb.OnDDLAction_IGNORE, + } + cancel, vrID := startVReplication(t, bls, "") + defer cancel() + + maxBatchSize := maxAllowedPacket - 64 // VPlayer leaves 64 bytes of room + // When the trx will be in a single batch. + trxFullBatchExpectRE := `^begin;(set @@session\.foreign_key_checks=.*;)?%s;update _vt\.vreplication set pos=.*;commit$` + // If the trx batch is split, then we only expect the end part. + trxLastBatchExpectRE := `%s;update _vt\.vreplication set pos=.*;commit$` + // The vreplication position update statement will look like this: + // update _vt.vreplication set pos='MySQL56/b213e4de-937a-11ee-b184-668979c675f4:1-38', time_updated=1701786574, transaction_timestamp=1701786574, rows_copied=0, message='' where id=1; + // So it will use 182 bytes in the batch. + // This long value can be used to test the handling of bulk statements + // which bump up against the max batch size, as well as testing the trx + // batch splitting into multiple wire messages when hitting the max size. + longStr := strings.Repeat("a", maxBatchSize-70) + + testcases := []struct { + input string + output []string + expectedNonCommitBatches int64 + expectedInLastBatch string // Should only be set if we expect 1+ non-commit batches + expectedBulkInserts int64 + expectedBulkDeletes int64 + table string + data [][]string + }{ + { + input: "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc'), (4, 'ddd'), (5, 'eee')", + output: []string{"insert into t1(id,val1) values (1,'aaa'), (2,'bbb'), (3,'ccc'), (4,'ddd'), (5,'eee')"}, + expectedBulkInserts: 1, + table: "t1", + data: [][]string{ + {"1", "aaa"}, + {"2", "bbb"}, + {"3", "ccc"}, + {"4", "ddd"}, + {"5", "eee"}, + }, + }, + { + input: "delete from t1 where id = 1", + output: []string{"delete from t1 where id=1"}, + table: "t1", + data: [][]string{ + {"2", "bbb"}, + {"3", "ccc"}, + {"4", "ddd"}, + {"5", "eee"}, + }, + }, + { + input: "delete from t1 where id > 3", + output: []string{"delete from t1 where id in (4, 5)"}, + expectedBulkDeletes: 1, + table: "t1", + data: [][]string{ + {"2", "bbb"}, + {"3", "ccc"}, + }, + }, + { + input: fmt.Sprintf("insert into t1(id, val1) values (1, '%s'), (2, 'bbb'), (3, 'ccc') on duplicate key update id = id+100", longStr), + output: []string{ + fmt.Sprintf("insert into t1(id,val1) values (1,'%s')", longStr), + "delete from t1 where id=2", + "insert into t1(id,val1) values (102,'bbb')", + "delete from t1 where id=3", + // This will be in the second/last batch, along with the vrepl pos update. + "insert into t1(id,val1) values (103,'ccc')", + }, + expectedInLastBatch: "insert into t1(id,val1) values (103,'ccc')", + expectedNonCommitBatches: 1, + table: "t1", + data: [][]string{ + {"1", longStr}, + {"102", "bbb"}, + {"103", "ccc"}, + }, + }, + { + input: "insert into t1(id, val1) values (1, 'aaa'), (2, 'bbb'), (3, 'ccc') on duplicate key update id = id+500, val1 = values(val1)", + output: []string{ + "delete from t1 where id=1", + "insert into t1(id,val1) values (501,'aaa')", + "insert into t1(id,val1) values (2,'bbb'), (3,'ccc')", + }, + expectedBulkInserts: 1, + table: "t1", + data: [][]string{ + {"2", "bbb"}, + {"3", "ccc"}, + {"102", "bbb"}, + {"103", "ccc"}, + {"501", "aaa"}, + }, + }, + { + input: "delete from t1", + output: []string{"delete from t1 where id in (2, 3, 102, 103, 501)"}, + expectedBulkDeletes: 1, + table: "t1", + }, + { + input: fmt.Sprintf("insert into t1(id, val1) values (1, '%s'), (2, 'bbb'), (3, 'ccc'), (4, 'ddd'), (5, 'eee')", longStr), + output: []string{ + // This bulk insert is long enough that the BEGIN gets sent down by itself. + // The bulk query then gets split into two queries. It also causes the trx + // to get split into three batches (BEGIN, INSERT, INSERT). + fmt.Sprintf("insert into t1(id,val1) values (1,'%s'), (2,'bbb'), (3,'ccc'), (4,'ddd')", longStr), + // This will be in the second/last batch, along with the vrepl pos update. + "insert into t1(id,val1) values (5,'eee')", + }, + expectedBulkInserts: 2, + // The BEGIN, then the INSERT. + expectedNonCommitBatches: 2, // The last one includes the commit + expectedInLastBatch: "insert into t1(id,val1) values (5,'eee')", + table: "t1", + data: [][]string{ + {"1", longStr}, + {"2", "bbb"}, + {"3", "ccc"}, + {"4", "ddd"}, + {"5", "eee"}, + }, + }, + { + input: "insert into t1(id, val1) values (1000000000000, 'x'), (1000000000001, 'x'), (1000000000002, 'x'), (1000000000003, 'x'), (1000000000004, 'x'), (1000000000005, 'x'), (1000000000006, 'x'), (1000000000007, 'x'), (1000000000008, 'x'), (1000000000009, 'x'), (1000000000010, 'x'), (1000000000011, 'x'), (1000000000012, 'x'), (1000000000013, 'x'), (1000000000014, 'x'), (1000000000015, 'x'), (1000000000016, 'x'), (1000000000017, 'x'), (1000000000018, 'x'), (1000000000019, 'x'), (1000000000020, 'x'), (1000000000021, 'x'), (1000000000022, 'x'), (1000000000023, 'x'), (1000000000024, 'x'), (1000000000025, 'x'), (1000000000026, 'x'), (1000000000027, 'x'), (1000000000028, 'x'), (1000000000029, 'x'), (1000000000030, 'x'), (1000000000031, 'x'), (1000000000032, 'x'), (1000000000033, 'x'), (1000000000034, 'x'), (1000000000035, 'x'), (1000000000036, 'x'), (1000000000037, 'x'), (1000000000038, 'x'), (1000000000039, 'x'), (1000000000040, 'x'), (1000000000041, 'x'), (1000000000042, 'x'), (1000000000043, 'x'), (1000000000044, 'x'), (1000000000045, 'x'), (1000000000046, 'x'), (1000000000047, 'x'), (1000000000048, 'x'), (1000000000049, 'x'), (1000000000050, 'x'), (1000000000051, 'x'), (1000000000052, 'x'), (1000000000053, 'x'), (1000000000054, 'x'), (1000000000055, 'x'), (1000000000056, 'x'), (1000000000057, 'x'), (1000000000058, 'x'), (1000000000059, 'x'), (1000000000060, 'x'), (1000000000061, 'x'), (1000000000062, 'x'), (1000000000063, 'x'), (1000000000064, 'x'), (1000000000065, 'x'), (1000000000066, 'x'), (1000000000067, 'x'), (1000000000068, 'x'), (1000000000069, 'x'), (1000000000070, 'x'), (1000000000071, 'x'), (1000000000072, 'x'), (1000000000073, 'x'), (1000000000074, 'x'), (1000000000075, 'x'), (1000000000076, 'x'), (1000000000077, 'x'), (1000000000078, 'x'), (1000000000079, 'x'), (1000000000080, 'x'), (1000000000081, 'x'), (1000000000082, 'x'), (1000000000083, 'x'), (1000000000084, 'x'), (1000000000085, 'x'), (1000000000086, 'x'), (1000000000087, 'x'), (1000000000088, 'x'), (1000000000089, 'x'), (1000000000090, 'x'), (1000000000091, 'x'), (1000000000092, 'x'), (1000000000093, 'x'), (1000000000094, 'x'), (1000000000095, 'x'), (1000000000096, 'x'), (1000000000097, 'x'), (1000000000098, 'x'), (1000000000099, 'x'), (1000000000100, 'x'), (1000000000101, 'x'), (1000000000102, 'x'), (1000000000103, 'x'), (1000000000104, 'x'), (1000000000105, 'x'), (1000000000106, 'x'), (1000000000107, 'x'), (1000000000108, 'x'), (1000000000109, 'x'), (1000000000110, 'x'), (1000000000111, 'x'), (1000000000112, 'x'), (1000000000113, 'x'), (1000000000114, 'x'), (1000000000115, 'x'), (1000000000116, 'x'), (1000000000117, 'x'), (1000000000118, 'x'), (1000000000119, 'x'), (1000000000120, 'x'), (1000000000121, 'x'), (1000000000122, 'x'), (1000000000123, 'x'), (1000000000124, 'x'), (1000000000125, 'x'), (1000000000126, 'x'), (1000000000127, 'x'), (1000000000128, 'x'), (1000000000129, 'x'), (1000000000130, 'x'), (1000000000131, 'x'), (1000000000132, 'x'), (1000000000133, 'x'), (1000000000134, 'x'), (1000000000135, 'x'), (1000000000136, 'x'), (1000000000137, 'x'), (1000000000138, 'x'), (1000000000139, 'x'), (1000000000140, 'x'), (1000000000141, 'x'), (1000000000142, 'x'), (1000000000143, 'x'), (1000000000144, 'x'), (1000000000145, 'x'), (1000000000146, 'x'), (1000000000147, 'x'), (1000000000148, 'x'), (1000000000149, 'x'), (1000000000150, 'x')", + output: []string{ + "insert into t1(id,val1) values (1000000000000,'x'), (1000000000001,'x'), (1000000000002,'x'), (1000000000003,'x'), (1000000000004,'x'), (1000000000005,'x'), (1000000000006,'x'), (1000000000007,'x'), (1000000000008,'x'), (1000000000009,'x'), (1000000000010,'x'), (1000000000011,'x'), (1000000000012,'x'), (1000000000013,'x'), (1000000000014,'x'), (1000000000015,'x'), (1000000000016,'x'), (1000000000017,'x'), (1000000000018,'x'), (1000000000019,'x'), (1000000000020,'x'), (1000000000021,'x'), (1000000000022,'x'), (1000000000023,'x'), (1000000000024,'x'), (1000000000025,'x'), (1000000000026,'x'), (1000000000027,'x'), (1000000000028,'x'), (1000000000029,'x'), (1000000000030,'x'), (1000000000031,'x'), (1000000000032,'x'), (1000000000033,'x'), (1000000000034,'x'), (1000000000035,'x'), (1000000000036,'x'), (1000000000037,'x'), (1000000000038,'x'), (1000000000039,'x'), (1000000000040,'x'), (1000000000041,'x'), (1000000000042,'x'), (1000000000043,'x')", + "insert into t1(id,val1) values (1000000000044,'x'), (1000000000045,'x'), (1000000000046,'x'), (1000000000047,'x'), (1000000000048,'x'), (1000000000049,'x'), (1000000000050,'x'), (1000000000051,'x'), (1000000000052,'x'), (1000000000053,'x'), (1000000000054,'x'), (1000000000055,'x'), (1000000000056,'x'), (1000000000057,'x'), (1000000000058,'x'), (1000000000059,'x'), (1000000000060,'x'), (1000000000061,'x'), (1000000000062,'x'), (1000000000063,'x'), (1000000000064,'x'), (1000000000065,'x'), (1000000000066,'x'), (1000000000067,'x'), (1000000000068,'x'), (1000000000069,'x'), (1000000000070,'x'), (1000000000071,'x'), (1000000000072,'x'), (1000000000073,'x'), (1000000000074,'x'), (1000000000075,'x'), (1000000000076,'x'), (1000000000077,'x'), (1000000000078,'x'), (1000000000079,'x'), (1000000000080,'x'), (1000000000081,'x'), (1000000000082,'x'), (1000000000083,'x'), (1000000000084,'x'), (1000000000085,'x'), (1000000000086,'x'), (1000000000087,'x')", + "insert into t1(id,val1) values (1000000000088,'x'), (1000000000089,'x'), (1000000000090,'x'), (1000000000091,'x'), (1000000000092,'x'), (1000000000093,'x'), (1000000000094,'x'), (1000000000095,'x'), (1000000000096,'x'), (1000000000097,'x'), (1000000000098,'x'), (1000000000099,'x'), (1000000000100,'x'), (1000000000101,'x'), (1000000000102,'x'), (1000000000103,'x'), (1000000000104,'x'), (1000000000105,'x'), (1000000000106,'x'), (1000000000107,'x'), (1000000000108,'x'), (1000000000109,'x'), (1000000000110,'x'), (1000000000111,'x'), (1000000000112,'x'), (1000000000113,'x'), (1000000000114,'x'), (1000000000115,'x'), (1000000000116,'x'), (1000000000117,'x'), (1000000000118,'x'), (1000000000119,'x'), (1000000000120,'x'), (1000000000121,'x'), (1000000000122,'x'), (1000000000123,'x'), (1000000000124,'x'), (1000000000125,'x'), (1000000000126,'x'), (1000000000127,'x'), (1000000000128,'x'), (1000000000129,'x'), (1000000000130,'x'), (1000000000131,'x')", + // This will be in the last batch, along with the vrepl pos update. + "insert into t1(id,val1) values (1000000000132,'x'), (1000000000133,'x'), (1000000000134,'x'), (1000000000135,'x'), (1000000000136,'x'), (1000000000137,'x'), (1000000000138,'x'), (1000000000139,'x'), (1000000000140,'x'), (1000000000141,'x'), (1000000000142,'x'), (1000000000143,'x'), (1000000000144,'x'), (1000000000145,'x'), (1000000000146,'x'), (1000000000147,'x'), (1000000000148,'x'), (1000000000149,'x'), (1000000000150,'x')", + }, + expectedBulkInserts: 4, + expectedNonCommitBatches: 3, // The last one includes the commit + expectedInLastBatch: "insert into t1(id,val1) values (1000000000132,'x'), (1000000000133,'x'), (1000000000134,'x'), (1000000000135,'x'), (1000000000136,'x'), (1000000000137,'x'), (1000000000138,'x'), (1000000000139,'x'), (1000000000140,'x'), (1000000000141,'x'), (1000000000142,'x'), (1000000000143,'x'), (1000000000144,'x'), (1000000000145,'x'), (1000000000146,'x'), (1000000000147,'x'), (1000000000148,'x'), (1000000000149,'x'), (1000000000150,'x')", + table: "t1", + data: [][]string{ + {"1", longStr}, + {"2", "bbb"}, + {"3", "ccc"}, + {"4", "ddd"}, + {"5", "eee"}, + {"1000000000000", "x"}, {"1000000000001", "x"}, {"1000000000002", "x"}, {"1000000000003", "x"}, {"1000000000004", "x"}, {"1000000000005", "x"}, {"1000000000006", "x"}, {"1000000000007", "x"}, {"1000000000008", "x"}, {"1000000000009", "x"}, {"1000000000010", "x"}, {"1000000000011", "x"}, {"1000000000012", "x"}, {"1000000000013", "x"}, {"1000000000014", "x"}, {"1000000000015", "x"}, {"1000000000016", "x"}, {"1000000000017", "x"}, {"1000000000018", "x"}, {"1000000000019", "x"}, {"1000000000020", "x"}, {"1000000000021", "x"}, {"1000000000022", "x"}, {"1000000000023", "x"}, {"1000000000024", "x"}, {"1000000000025", "x"}, {"1000000000026", "x"}, {"1000000000027", "x"}, {"1000000000028", "x"}, {"1000000000029", "x"}, {"1000000000030", "x"}, {"1000000000031", "x"}, {"1000000000032", "x"}, {"1000000000033", "x"}, {"1000000000034", "x"}, {"1000000000035", "x"}, {"1000000000036", "x"}, {"1000000000037", "x"}, {"1000000000038", "x"}, {"1000000000039", "x"}, {"1000000000040", "x"}, {"1000000000041", "x"}, {"1000000000042", "x"}, {"1000000000043", "x"}, {"1000000000044", "x"}, {"1000000000045", "x"}, {"1000000000046", "x"}, {"1000000000047", "x"}, {"1000000000048", "x"}, {"1000000000049", "x"}, {"1000000000050", "x"}, {"1000000000051", "x"}, {"1000000000052", "x"}, {"1000000000053", "x"}, {"1000000000054", "x"}, {"1000000000055", "x"}, {"1000000000056", "x"}, {"1000000000057", "x"}, {"1000000000058", "x"}, {"1000000000059", "x"}, {"1000000000060", "x"}, {"1000000000061", "x"}, {"1000000000062", "x"}, {"1000000000063", "x"}, {"1000000000064", "x"}, {"1000000000065", "x"}, {"1000000000066", "x"}, {"1000000000067", "x"}, {"1000000000068", "x"}, {"1000000000069", "x"}, {"1000000000070", "x"}, {"1000000000071", "x"}, {"1000000000072", "x"}, {"1000000000073", "x"}, {"1000000000074", "x"}, {"1000000000075", "x"}, {"1000000000076", "x"}, {"1000000000077", "x"}, {"1000000000078", "x"}, {"1000000000079", "x"}, {"1000000000080", "x"}, {"1000000000081", "x"}, {"1000000000082", "x"}, {"1000000000083", "x"}, {"1000000000084", "x"}, {"1000000000085", "x"}, {"1000000000086", "x"}, {"1000000000087", "x"}, {"1000000000088", "x"}, {"1000000000089", "x"}, {"1000000000090", "x"}, {"1000000000091", "x"}, {"1000000000092", "x"}, {"1000000000093", "x"}, {"1000000000094", "x"}, {"1000000000095", "x"}, {"1000000000096", "x"}, {"1000000000097", "x"}, {"1000000000098", "x"}, {"1000000000099", "x"}, {"1000000000100", "x"}, {"1000000000101", "x"}, {"1000000000102", "x"}, {"1000000000103", "x"}, {"1000000000104", "x"}, {"1000000000105", "x"}, {"1000000000106", "x"}, {"1000000000107", "x"}, {"1000000000108", "x"}, {"1000000000109", "x"}, {"1000000000110", "x"}, {"1000000000111", "x"}, {"1000000000112", "x"}, {"1000000000113", "x"}, {"1000000000114", "x"}, {"1000000000115", "x"}, {"1000000000116", "x"}, {"1000000000117", "x"}, {"1000000000118", "x"}, {"1000000000119", "x"}, {"1000000000120", "x"}, {"1000000000121", "x"}, {"1000000000122", "x"}, {"1000000000123", "x"}, {"1000000000124", "x"}, {"1000000000125", "x"}, {"1000000000126", "x"}, {"1000000000127", "x"}, {"1000000000128", "x"}, {"1000000000129", "x"}, {"1000000000130", "x"}, {"1000000000131", "x"}, {"1000000000132", "x"}, {"1000000000133", "x"}, {"1000000000134", "x"}, {"1000000000135", "x"}, {"1000000000136", "x"}, {"1000000000137", "x"}, {"1000000000138", "x"}, {"1000000000139", "x"}, {"1000000000140", "x"}, {"1000000000141", "x"}, {"1000000000142", "x"}, {"1000000000143", "x"}, {"1000000000144", "x"}, {"1000000000145", "x"}, {"1000000000146", "x"}, {"1000000000147", "x"}, {"1000000000148", "x"}, {"1000000000149", "x"}, {"1000000000150", "x"}, + }, + }, + { // Now we have enough long IDs to cause the bulk delete to also be split along with the trx batch. + input: "delete from t1 where id > 1 and id <= 1000000000149", + output: []string{ + "delete from t1 where id in (2, 3, 4, 5, 1000000000000, 1000000000001, 1000000000002, 1000000000003, 1000000000004, 1000000000005, 1000000000006, 1000000000007, 1000000000008, 1000000000009, 1000000000010, 1000000000011, 1000000000012, 1000000000013, 1000000000014, 1000000000015, 1000000000016, 1000000000017, 1000000000018, 1000000000019, 1000000000020, 1000000000021, 1000000000022, 1000000000023, 1000000000024, 1000000000025, 1000000000026, 1000000000027, 1000000000028, 1000000000029, 1000000000030, 1000000000031, 1000000000032, 1000000000033, 1000000000034, 1000000000035, 1000000000036, 1000000000037, 1000000000038, 1000000000039, 1000000000040, 1000000000041, 1000000000042, 1000000000043, 1000000000044, 1000000000045, 1000000000046, 1000000000047, 1000000000048, 1000000000049, 1000000000050, 1000000000051, 1000000000052, 1000000000053, 1000000000054, 1000000000055, 1000000000056, 1000000000057, 1000000000058, 1000000000059)", + "delete from t1 where id in (1000000000060, 1000000000061, 1000000000062, 1000000000063, 1000000000064, 1000000000065, 1000000000066, 1000000000067, 1000000000068, 1000000000069, 1000000000070, 1000000000071, 1000000000072, 1000000000073, 1000000000074, 1000000000075, 1000000000076, 1000000000077, 1000000000078, 1000000000079, 1000000000080, 1000000000081, 1000000000082, 1000000000083, 1000000000084, 1000000000085, 1000000000086, 1000000000087, 1000000000088, 1000000000089, 1000000000090, 1000000000091, 1000000000092, 1000000000093, 1000000000094, 1000000000095, 1000000000096, 1000000000097, 1000000000098, 1000000000099, 1000000000100, 1000000000101, 1000000000102, 1000000000103, 1000000000104, 1000000000105, 1000000000106, 1000000000107, 1000000000108, 1000000000109, 1000000000110, 1000000000111, 1000000000112, 1000000000113, 1000000000114, 1000000000115, 1000000000116, 1000000000117, 1000000000118, 1000000000119, 1000000000120)", + // This will be in the last batch, along with the vrepl pos update. + "delete from t1 where id in (1000000000121, 1000000000122, 1000000000123, 1000000000124, 1000000000125, 1000000000126, 1000000000127, 1000000000128, 1000000000129, 1000000000130, 1000000000131, 1000000000132, 1000000000133, 1000000000134, 1000000000135, 1000000000136, 1000000000137, 1000000000138, 1000000000139, 1000000000140, 1000000000141, 1000000000142, 1000000000143, 1000000000144, 1000000000145, 1000000000146, 1000000000147, 1000000000148, 1000000000149)", + }, + expectedBulkDeletes: 3, + expectedNonCommitBatches: 2, // The last one includes the commit + expectedInLastBatch: "delete from t1 where id in (1000000000121, 1000000000122, 1000000000123, 1000000000124, 1000000000125, 1000000000126, 1000000000127, 1000000000128, 1000000000129, 1000000000130, 1000000000131, 1000000000132, 1000000000133, 1000000000134, 1000000000135, 1000000000136, 1000000000137, 1000000000138, 1000000000139, 1000000000140, 1000000000141, 1000000000142, 1000000000143, 1000000000144, 1000000000145, 1000000000146, 1000000000147, 1000000000148, 1000000000149)", + table: "t1", + data: [][]string{ + {"1", longStr}, + {"1000000000150", "x"}, + }, + }, + { + input: "delete from t1 where id = 1 or id > 1000000000149", + output: []string{"delete from t1 where id in (1, 1000000000150)"}, + expectedBulkDeletes: 1, + table: "t1", + }, + } + + expectedBulkInserts, expectedBulkDeletes, expectedTrxBatchExecs, expectedTrxBatchCommits := int64(0), int64(0), int64(0), int64(0) + stats := globalStats.controllers[int32(vrID)].blpStats + + for _, tcase := range testcases { + t.Run(fmt.Sprintf("%.50s", tcase.input), func(t *testing.T) { + execStatements(t, []string{tcase.input}) + var output qh.ExpectationSequencer + switch len(tcase.output) { + case 0: + require.FailNow(t, "no expected output provided for test case") + case 1: + output = qh.Expect(tcase.output[0]) + default: + output = qh.Expect(tcase.output[0], tcase.output[1:]...) + } + for _, stmt := range tcase.output { + require.LessOrEqual(t, len(stmt), maxBatchSize, "expected output statement is longer than the max batch size (%d): %s", maxBatchSize, stmt) + } + expectNontxQueries(t, output) + time.Sleep(1 * time.Second) + log.Flush() + if tcase.table != "" { + expectData(t, tcase.table, tcase.data) + } + + // Confirm that the row events generated the expected multi-row + // statements and the statements were sent in multi-statement + // protocol message(s) as expected. + expectedBulkDeletes += tcase.expectedBulkDeletes + expectedBulkInserts += tcase.expectedBulkInserts + expectedTrxBatchCommits++ // Should only ever be 1 per test case + expectedTrxBatchExecs += tcase.expectedNonCommitBatches + if tcase.expectedInLastBatch != "" { // We expect the trx to be split + require.Regexpf(t, regexp.MustCompile(fmt.Sprintf(trxLastBatchExpectRE, regexp.QuoteMeta(tcase.expectedInLastBatch))), lastMultiExecQuery, "Unexpected batch statement: %s", lastMultiExecQuery) + } else { + require.Regexpf(t, regexp.MustCompile(fmt.Sprintf(trxFullBatchExpectRE, regexp.QuoteMeta(strings.Join(tcase.output, ";")))), lastMultiExecQuery, "Unexpected batch statement: %s", lastMultiExecQuery) + } + require.Equal(t, expectedBulkInserts, stats.BulkQueryCount.Counts()["insert"], "expected %d bulk inserts but got %d", expectedBulkInserts, stats.BulkQueryCount.Counts()["insert"]) + require.Equal(t, expectedBulkDeletes, stats.BulkQueryCount.Counts()["delete"], "expected %d bulk deletes but got %d", expectedBulkDeletes, stats.BulkQueryCount.Counts()["delete"]) + require.Equal(t, expectedTrxBatchExecs, stats.TrxQueryBatchCount.Counts()["without_commit"], "expected %d trx batch execs but got %d", expectedTrxBatchExecs, stats.TrxQueryBatchCount.Counts()["without_commit"]) + require.Equal(t, expectedTrxBatchCommits, stats.TrxQueryBatchCount.Counts()["with_commit"], "expected %d trx batch commits but got %d", expectedTrxBatchCommits, stats.TrxQueryBatchCount.Counts()["with_commit"]) + }) + } +} + func expectJSON(t *testing.T, table string, values [][]string, id int, exec func(ctx context.Context, query string) (*sqltypes.Result, error)) { t.Helper() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 2a362645708..9c065866c15 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -28,22 +28,20 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/timer" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/mysqlctl" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) var ( diff --git a/go/vt/wrangler/fake_dbclient_test.go b/go/vt/wrangler/fake_dbclient_test.go index 7bcc5f5bcf2..03fad81d7b8 100644 --- a/go/vt/wrangler/fake_dbclient_test.go +++ b/go/vt/wrangler/fake_dbclient_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/sqltypes" ) @@ -160,6 +161,22 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resul return qr, err } +func (dc *fakeDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) { + queries, err := sqlparser.SplitStatementToPieces(query) + if err != nil { + return nil, err + } + results := make([]*sqltypes.Result, 0, len(queries)) + for _, query := range queries { + qr, err := dc.executeFetch(query, maxrows) + if err != nil { + return nil, err + } + results = append(results, qr) + } + return results, nil +} + // ExecuteFetch is part of the DBClient interface func (dc *fakeDBClient) executeFetch(query string, maxrows int) (*sqltypes.Result, error) { if dbrs := dc.queries[query]; dbrs != nil {