From 57470a6cf572ea85a08976fa9209a146dfa361b3 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Tue, 12 Sep 2023 23:44:45 -0400 Subject: [PATCH] Cherry-pick a159f1871966f80901a4e7559bc592a1539b4209 with conflicts --- .../tabletmanager/rpc_vreplication_test.go | 779 ++++++++++++++++++ .../vreplication/external_connector_test.go | 2 + .../vreplication/journal_test.go | 4 + .../vreplication/vcopier_test.go | 139 ++++ .../vreplication/vplayer_flaky_test.go | 6 + .../tabletmanager/vreplication/vreplicator.go | 454 +++++++++- .../vreplication/vreplicator_test.go | 563 +++++++++++++ 7 files changed, 1946 insertions(+), 1 deletion(-) create mode 100644 go/vt/vttablet/tabletmanager/rpc_vreplication_test.go diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go new file mode 100644 index 00000000000..d4e72a86e23 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -0,0 +1,779 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletmanager + +import ( + "context" + "fmt" + "math" + "runtime/debug" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/sqlescape" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/textutil" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vtctl/workflow" + "vitess.io/vitess/go/vt/vtgate/vindexes" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + "vitess.io/vitess/go/vt/proto/vttime" +) + +const ( + insertVReplicationPrefix = "insert into _vt.vreplication (workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys)" + getWorkflow = "select id from _vt.vreplication where db_name='vt_%s' and workflow='%s'" + checkForWorkflow = "select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'" + checkForFrozenWorkflow = "select 1 from _vt.vreplication where db_name='vt_%s' and message='FROZEN' and workflow_sub_type != 1" + freezeWorkflow = "update _vt.vreplication set message = 'FROZEN' where db_name='vt_%s' and workflow='%s'" + checkForJournal = "/select val from _vt.resharding_journal where id=" + getWorkflowStatus = "select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'" + getWorkflowState = "select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow, workflow_sub_type, defer_secondary_keys from _vt.vreplication where id=1" + getCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1" + getNumCopyStateTable = "select count(distinct table_name) from _vt.copy_state where vrepl_id=1" + getLatestCopyState = "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)" + getAutoIncrementStep = "select @@session.auto_increment_increment" + setSessionTZ = "set @@session.time_zone = '+00:00'" + setNames = "set names 'binary'" + getBinlogRowImage = "select @@binlog_row_image" + insertStreamsCreatedLog = "insert into _vt.vreplication_log(vrepl_id, type, state, message) values(1, 'Stream Created', '', '%s'" + getVReplicationRecord = "select * from _vt.vreplication where id = 1" + startWorkflow = "update _vt.vreplication set state='Running' where db_name='vt_%s' and workflow='%s'" + stopForCutover = "update _vt.vreplication set state='Stopped', message='stopped for cutover' where id=1" + getMaxValForSequence = "select max(`id`) as maxval from `vt_%s`.`%s`" + initSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)" + deleteWorkflow = "delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'" + updatePickedSourceTablet = `update _vt.vreplication set message='Picked source tablet: cell:\"%s\" uid:%d' where id=1` + getRowsCopied = "SELECT rows_copied FROM _vt.vreplication WHERE id=1" +) + +var ( + errShortCircuit = fmt.Errorf("short circuiting test") + defaultSchema = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "t1", + Columns: []string{"id", "c2"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id|c2", "int64|int64"), + }, + }, + } + position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition) +) + +// TestCreateVReplicationWorkflow tests the query generated +// from a VtctldServer MoveTablesCreate request to ensure +// that the VReplication stream(s) are created correctly. +func TestCreateVReplicationWorkflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sourceKs := "sourceks" + sourceTabletUID := 200 + targetKs := "targetks" + targetTabletUID := 300 + shard := "0" + wf := "testwf" + tenv := newTestEnv(t, ctx, sourceKs, []string{shard}) + defer tenv.close() + + sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, shard) + defer tenv.deleteTablet(sourceTablet.tablet) + targetTablet := tenv.addTablet(t, targetTabletUID, targetKs, shard) + defer tenv.deleteTablet(targetTablet.tablet) + + ws := workflow.NewServer(tenv.ts, tenv.tmc) + + tests := []struct { + name string + req *vtctldatapb.MoveTablesCreateRequest + schema *tabletmanagerdatapb.SchemaDefinition + query string + }{ + { + name: "defaults", + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + AllTables: true, + }, + query: fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"}}', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 0)`, + insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + }, + { + name: "all values", + schema: &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "t1", + Columns: []string{"id", "c2"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id|c2", "int64|int64"), + }, + { + Name: "wut", + Columns: []string{"id"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id", "int64"), + }, + }, + }, + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + IncludeTables: []string{defaultSchema.TableDefinitions[0].Name}, + ExcludeTables: []string{"wut"}, + SourceTimeZone: "EDT", + OnDdl: binlogdatapb.OnDDLAction_EXEC.String(), + StopAfterCopy: true, + DropForeignKeys: true, + DeferSecondaryKeys: true, + AutoStart: true, + }, + query: fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"}} on_ddl:EXEC stop_after_copy:true source_time_zone:\"EDT\" target_time_zone:\"UTC\"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1)`, + insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + }, + } + + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", + targetKs, wf), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and message='FROZEN' and workflow_sub_type != 1", + targetKs), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(sourceTablet.tablet, "select val from _vt.resharding_journal where id=7224776740563431192", &sqltypes.Result{}) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // This is needed because MockDBClient uses t.Fatal() + // which doesn't play well with subtests. + defer func() { + if err := recover(); err != nil { + t.Errorf("Recovered from panic: %v; Stack: %s", err, string(debug.Stack())) + } + }() + + require.NotNil(t, tt.req, "No MoveTablesCreate request provided") + require.NotEmpty(t, tt.query, "No expected query provided") + + if tt.schema == nil { + tt.schema = defaultSchema + } + tenv.tmc.SetSchema(tt.schema) + + tenv.tmc.tablets[targetTabletUID].vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + // This is our expected query, which will also short circuit + // the test with an error as at this point we've tested what + // we wanted to test. + tenv.tmc.tablets[targetTabletUID].vrdbClient.ExpectRequest(tt.query, nil, errShortCircuit) + _, err := ws.MoveTablesCreate(ctx, tt.req) + tenv.tmc.tablets[targetTabletUID].vrdbClient.Wait() + require.ErrorIs(t, err, errShortCircuit) + }) + } +} + +// TestMoveTables tests the query generated from a VtctldServer +// MoveTablesCreate request to ensure that the VReplication +// stream(s) are created correctly. Followed by ensuring that +// SwitchTraffic and ReverseTraffic work as expected. +func TestMoveTables(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sourceKs := "sourceks" + sourceTabletUID := 200 + targetKs := "targetks" + targetShards := make(map[string]*fakeTabletConn) + sourceShard := "0" + globalKs := "global" + globalShard := "0" + wf := "testwf" + tabletTypes := []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } + + tenv := newTestEnv(t, ctx, sourceKs, []string{sourceShard}) + defer tenv.close() + + sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, sourceShard) + defer tenv.deleteTablet(sourceTablet.tablet) + + targetShards["-80"] = tenv.addTablet(t, 300, targetKs, "-80") + defer tenv.deleteTablet(targetShards["-80"].tablet) + targetShards["80-"] = tenv.addTablet(t, 310, targetKs, "80-") + defer tenv.deleteTablet(targetShards["80-"].tablet) + + globalTablet := tenv.addTablet(t, 500, globalKs, globalShard) + defer tenv.deleteTablet(globalTablet.tablet) + + tenv.ts.SaveVSchema(ctx, globalKs, &vschemapb.Keyspace{ + Sharded: false, + Tables: map[string]*vschemapb.Table{ + "t1_seq": { + Type: vindexes.TypeSequence, + }, + }, + }) + tenv.ts.SaveVSchema(ctx, targetKs, &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id", + Name: "hash", + }}, + AutoIncrement: &vschemapb.AutoIncrement{ + Column: "id", + Sequence: "t1_seq", + }, + }, + }, + }) + + ws := workflow.NewServer(tenv.ts, tenv.tmc) + + tenv.mysqld.Schema = defaultSchema + tenv.mysqld.Schema.DatabaseSchema = tenv.dbName + tenv.mysqld.FetchSuperQueryMap = make(map[string]*sqltypes.Result) + tenv.mysqld.FetchSuperQueryMap[`select character_set_name, collation_name, column_name, data_type, column_type, extra from information_schema.columns where .*`] = sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "character_set_name|collation_name|column_name|data_type|column_type|extra", + "varchar|varchar|varchar|varchar|varchar|varchar", + ), + "NULL|NULL|id|bigint|bigint|", + "NULL|NULL|c2|bigint|bigint|", + ) + + bls := fmt.Sprintf("keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"}}", sourceKs, sourceShard) + + tenv.tmc.SetSchema(defaultSchema) + + tenv.tmc.setVReplicationExecResults(sourceTablet.tablet, checkForJournal, &sqltypes.Result{}) + + for _, ftc := range targetShards { + tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getWorkflow, targetKs, wf), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id", + "int64", + ), + "1", + ), + ) + tenv.tmc.setVReplicationExecResults(ftc.tablet, getCopyState, &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getWorkflowStatus, wf, targetKs), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|workflow|source|pos|stop_pos|max_replication_log|state|db_name|time_updated|transaction_timestamp|message|tags|workflow_type|workflow_sub_type", + "int64|varchar|blob|varchar|varchar|int64|varchar|varchar|int64|int64|varchar|varchar|int64|int64", + ), + fmt.Sprintf("1|%s|%s|%s|NULL|0|running|vt_%s|1686577659|0|||1|0", wf, bls, position, targetKs), + ), + ) + tenv.tmc.setVReplicationExecResults(ftc.tablet, getLatestCopyState, &sqltypes.Result{}) + + ftc.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + insert := fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1 where in_keyrange(id, \'%s.hash\', \'%s\')\"}}', '', 0, 0, '%s', 'primary,replica,rdonly', now(), 0, 'Stopped', '%s', 1, 0, 0)`, + insertVReplicationPrefix, wf, sourceKs, sourceShard, targetKs, ftc.tablet.Shard, tenv.cells[0], tenv.dbName) + ftc.vrdbClient.ExpectRequest(insert, &sqltypes.Result{InsertID: 1}, nil) + ftc.vrdbClient.ExpectRequest(getAutoIncrementStep, &sqltypes.Result{}, nil) + ftc.vrdbClient.ExpectRequest(getVReplicationRecord, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|source", + "int64|varchar", + ), + fmt.Sprintf("1|%s", bls), + ), nil) + ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil) + ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) + ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) + ftc.vrdbClient.ExpectRequest(getRowsCopied, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "rows_copied", + "int64", + ), + "0", + ), + nil, + ) + ftc.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", + "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + ), + fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), + ), nil) + ftc.vrdbClient.ExpectRequest(getNumCopyStateTable, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "count(distinct table_name)", + "int64", + ), + "1", + ), nil) + ftc.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", + "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + ), + fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), + ), nil) + ftc.vrdbClient.ExpectRequest(getNumCopyStateTable, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "count(distinct table_name)", + "int64", + ), + "1", + ), nil) + ftc.vrdbClient.ExpectRequest(getBinlogRowImage, sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "@@binlog_row_image", + "varchar", + ), + "FULL", + ), nil) + + ftc.vrdbClient.ExpectRequest(fmt.Sprintf(insertStreamsCreatedLog, bls), &sqltypes.Result{}, nil) + tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getWorkflow, targetKs, wf), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id", + "int64", + ), + "1", + ), + ) + tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(startWorkflow, targetKs, wf), &sqltypes.Result{}) + ftc.vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil) + + tenv.tmc.setVReplicationExecResults(ftc.tablet, stopForCutover, &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(freezeWorkflow, targetKs, wf), &sqltypes.Result{}) + + tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getMaxValForSequence, targetKs, "t1"), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "maxval", + "int64", + ), + fmt.Sprintf("%d", ftc.tablet.Alias.Uid), // Use the tablet's UID as the max value + ), + ) + } + + // We use the tablet's UID in the mocked results for the max value used on each target shard. + nextSeqVal := int(math.Max(float64(targetShards["-80"].tablet.Alias.Uid), float64(targetShards["80-"].tablet.Alias.Uid))) + 1 + tenv.tmc.setVReplicationExecResults(globalTablet.tablet, + sqlparser.BuildParsedQuery(initSequenceTable, sqlescape.EscapeID(fmt.Sprintf("vt_%s", globalKs)), sqlescape.EscapeID("t1_seq"), nextSeqVal, nextSeqVal, nextSeqVal).Query, + &sqltypes.Result{RowsAffected: 0}, + ) + + _, err := ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + TabletTypes: tabletTypes, + Cells: tenv.cells, + AllTables: true, + AutoStart: true, + }) + require.NoError(t, err) + + _, err = ws.WorkflowSwitchTraffic(ctx, &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + MaxReplicationLagAllowed: &vttime.Duration{Seconds: 922337203}, + EnableReverseReplication: true, + InitializeTargetSequences: true, + Direction: int32(workflow.DirectionForward), + }) + require.NoError(t, err) + + tenv.tmc.setVReplicationExecResults(sourceTablet.tablet, fmt.Sprintf(getWorkflowStatus, workflow.ReverseWorkflowName(wf), sourceKs), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|workflow|source|pos|stop_pos|max_replication_log|state|db_name|time_updated|transaction_timestamp|message|tags|workflow_type|workflow_sub_type", + "int64|varchar|blob|varchar|varchar|int64|varchar|varchar|int64|int64|varchar|varchar|int64|int64", + ), + fmt.Sprintf("1|%s|%s|%s|NULL|0|running|vt_%s|1686577659|0|||1|0", workflow.ReverseWorkflowName(wf), bls, position, sourceKs), + ), + ) + + _, err = ws.WorkflowSwitchTraffic(ctx, &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + MaxReplicationLagAllowed: &vttime.Duration{Seconds: 922337203}, + EnableReverseReplication: true, + Direction: int32(workflow.DirectionBackward), + }) + require.NoError(t, err) +} + +func TestUpdateVReplicationWorkflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cells := []string{"zone1"} + tabletTypes := []string{"replica"} + workflow := "testwf" + keyspace := "testks" + vreplID := 1 + tabletUID := 100 + + tenv := newTestEnv(t, ctx, keyspace, []string{shard}) + defer tenv.close() + + tablet := tenv.addTablet(t, tabletUID, keyspace, shard) + defer tenv.deleteTablet(tablet.tablet) + + parsed := sqlparser.BuildParsedQuery(sqlSelectVReplicationWorkflowConfig, sidecar.DefaultName, ":wf") + bindVars := map[string]*querypb.BindVariable{ + "wf": sqltypes.StringBindVariable(workflow), + } + selectQuery, err := parsed.GenerateQuery(bindVars, nil) + require.NoError(t, err) + blsStr := fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"customer" filter:"select * from customer"} rules:{match:"corder" filter:"select * from corder"}}`, + keyspace, shard) + selectRes := sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|source|cell|tablet_types", + "int64|varchar|varchar|varchar", + ), + fmt.Sprintf("%d|%s|%s|%s", vreplID, blsStr, cells[0], tabletTypes[0]), + ) + idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a", + sqltypes.Int64BindVariable(int64(vreplID))) + require.NoError(t, err) + idRes := sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id", + "int64", + ), + fmt.Sprintf("%d", vreplID), + ) + + tests := []struct { + name string + request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest + query string + }{ + { + name: "update cells", + request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: workflow, + Cells: []string{"zone2"}, + // TabletTypes is an empty value, so the current value should be cleared + }, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%d)`, + keyspace, shard, "zone2", vreplID), + }, + { + name: "update cells, NULL tablet_types", + request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: workflow, + Cells: []string{"zone3"}, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, // So keep the current value of replica + }, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + keyspace, shard, "zone3", tabletTypes[0], vreplID), + }, + { + name: "update tablet_types", + request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: workflow, + TabletSelectionPreference: tabletmanagerdatapb.TabletSelectionPreference_INORDER, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA}, + }, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%d)`, + keyspace, shard, "in_order:rdonly,replica", vreplID), + }, + { + name: "update tablet_types, NULL cells", + request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: workflow, + Cells: textutil.SimulatedNullStringSlice, // So keep the current value of zone1 + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, + }, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + keyspace, shard, cells[0], "rdonly", vreplID), + }, + { + name: "update on_ddl", + request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: workflow, + OnDdl: binlogdatapb.OnDDLAction_EXEC, + }, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`, + keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID), + }, + { + name: "update cell,tablet_types,on_ddl", + request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: workflow, + Cells: []string{"zone1", "zone2", "zone3"}, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_PRIMARY}, + OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE, + }, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`, + keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // This is needed because MockDBClient uses t.Fatal() + // which doesn't play well with subtests. + defer func() { + if err := recover(); err != nil { + t.Errorf("Recovered from panic: %v", err) + } + }() + + require.NotNil(t, tt.request, "No request provided") + require.NotEqual(t, "", tt.query, "No expected query provided") + + tt.request.State = binlogdatapb.VReplicationWorkflowState_Stopped + + // These are the same for each RPC call. + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil) + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil) + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil) + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil) + + // This is our expected query, which will also short circuit + // the test with an error as at this point we've tested what + // we wanted to test. + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(tt.query, &sqltypes.Result{RowsAffected: 1}, errShortCircuit) + _, err = tenv.tmc.tablets[tabletUID].tm.UpdateVReplicationWorkflow(ctx, tt.request) + tenv.tmc.tablets[tabletUID].vrdbClient.Wait() + require.ErrorIs(t, err, errShortCircuit) + }) + } +} + +// TestFailedMoveTablesCreateCleanup tests that the workflow +// and its artifacts are cleaned up when the workflow creation +// fails -- specifically after the point where we have created +// the workflow streams. +func TestFailedMoveTablesCreateCleanup(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sourceKs := "sourceks" + sourceTabletUID := 200 + shard := "0" + targetTabletUID := 300 + targetKs := "targetks" + wf := "testwf" + table := defaultSchema.TableDefinitions[0].Name + invalidTimeZone := "NOPE" + bls := fmt.Sprintf("keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"%s\" filter:\"select * from %s\"}}", + sourceKs, shard, table, table) + tenv := newTestEnv(t, ctx, sourceKs, []string{shard}) + defer tenv.close() + ws := workflow.NewServer(tenv.ts, tenv.tmc) + + sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, shard) + defer tenv.deleteTablet(sourceTablet.tablet) + targetTablet := tenv.addTablet(t, targetTabletUID, targetKs, shard) + defer tenv.deleteTablet(targetTablet.tablet) + + tenv.mysqld.Schema = defaultSchema + tenv.mysqld.Schema.DatabaseSchema = tenv.dbName + tenv.mysqld.FetchSuperQueryMap = make(map[string]*sqltypes.Result) + tenv.mysqld.FetchSuperQueryMap[`select character_set_name, collation_name, column_name, data_type, column_type, extra from information_schema.columns where .*`] = sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "character_set_name|collation_name|column_name|data_type|column_type|extra", + "varchar|varchar|varchar|varchar|varchar|varchar", + ), + "NULL|NULL|id|bigint|bigint|", + "NULL|NULL|c2|bigint|bigint|", + ) + + // Let's be sure that the routing rules are empty to start. + err := topotools.SaveRoutingRules(ctx, tenv.ts, nil) + require.NoError(t, err, "failed to save routing rules") + + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{}) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf(getWorkflow, targetKs, wf), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id", + "int64", + ), + "1", + ), + ) + targetTablet.vrdbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest( + fmt.Sprintf("%s %s", + insertVReplicationPrefix, + fmt.Sprintf(`values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"%s\" filter:\"select * from %s\"}} source_time_zone:\"%s\" target_time_zone:\"UTC\"', '', 0, 0, '%s', 'primary', now(), 0, 'Stopped', '%s', 1, 0, 0)`, + wf, sourceKs, shard, table, table, invalidTimeZone, strings.Join(tenv.cells, ","), tenv.dbName), + ), + &sqltypes.Result{ + RowsAffected: 1, + InsertID: 1, + }, + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getAutoIncrementStep, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(getVReplicationRecord, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|source", + "int64|varchar", + ), + fmt.Sprintf("1|%s", bls), + ), + nil, + ) + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), + &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(getRowsCopied, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "rows_copied", + "int64", + ), + "0", + ), + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getWorkflowState, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", + "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + ), + fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), + ), + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "count(distinct table_name)", + "int64", + ), + "1", + ), + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getWorkflowState, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys", + "varchar|varchar|int64|int64|varchar|int64|varchar|int64|int64", + ), + fmt.Sprintf("||0|0|Stopped|1|%s|0|0", wf), + ), + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getNumCopyStateTable, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "count(distinct table_name)", + "int64", + ), + "1", + ), + nil, + ) + targetTablet.vrdbClient.ExpectRequest(getBinlogRowImage, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "@@binlog_row_image", + "varchar", + ), + "FULL", + ), + nil, + ) + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(insertStreamsCreatedLog, bls), &sqltypes.Result{}, nil) + + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, + fmt.Sprintf("select convert_tz('2006-01-02 15:04:05', '%s', 'UTC')", invalidTimeZone), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + fmt.Sprintf("convert_tz('2006-01-02 15:04:05', '%s', 'UTC')", invalidTimeZone), + "datetime", + ), + "NULL", + ), + ) + + // We expect the workflow creation to fail due to the invalid time + // zone and thus the workflow iteslf to be cleaned up. + tenv.tmc.setVReplicationExecResults(sourceTablet.tablet, + fmt.Sprintf(deleteWorkflow, sourceKs, workflow.ReverseWorkflowName(wf)), + &sqltypes.Result{RowsAffected: 1}, + ) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, + fmt.Sprintf(deleteWorkflow, targetKs, wf), + &sqltypes.Result{RowsAffected: 1}, + ) + + // Save the current target vschema. + vs, err := tenv.ts.GetVSchema(ctx, targetKs) + require.NoError(t, err, "failed to get target vschema") + + _, err = ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ + Workflow: wf, + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Cells: tenv.cells, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_PRIMARY}, + IncludeTables: []string{table}, + SourceTimeZone: invalidTimeZone, + }) + require.ErrorContains(t, err, fmt.Sprintf("unable to perform time_zone conversions from %s to UTC", invalidTimeZone)) + + // Check that there are no orphaned routing rules. + rules, err := topotools.GetRoutingRules(ctx, tenv.ts) + require.NoError(t, err, "failed to get routing rules") + require.Equal(t, 0, len(rules), "expected no routing rules to be present") + + // Check that our vschema changes were also rolled back. + vs2, err := tenv.ts.GetVSchema(ctx, targetKs) + require.NoError(t, err, "failed to get target vschema") + require.Equal(t, vs, vs2, "expected vschema to be unchanged") +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go index 0d7214c08e4..2c1f77dc052 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go @@ -180,6 +180,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string { if pos == "" { return []string{ "/insert into _vt.vreplication", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "begin", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", @@ -189,6 +190,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string { } return []string{ "/insert into _vt.vreplication", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running'", } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/journal_test.go b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go index b5c6e609374..2bea29463b2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/journal_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go @@ -77,6 +77,7 @@ func TestJournalOneToOne(t *testing.T) { fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID), "commit", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running', message='' where id.*", }) @@ -146,6 +147,8 @@ func TestJournalOneToMany(t *testing.T) { "commit", "/update _vt.vreplication set message='Picked source tablet.*", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running', message='' where id.*", "/update _vt.vreplication set state='Running', message='' where id.*", }) @@ -209,6 +212,7 @@ func TestJournalTablePresent(t *testing.T) { fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID), "commit", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running', message='' where id.*", }) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 2167d71d64d..4ae898de251 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -119,6 +119,7 @@ func TestPlayerCopyCharPK(t *testing.T) { expectNontxQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", "insert into dst(idc,val) values ('a\\0',1)", @@ -220,6 +221,7 @@ func TestPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { expectNontxQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", "insert into dst(idc,val) values ('a',1)", @@ -324,6 +326,7 @@ func TestPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) { expectNontxQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", "insert into dst(id,idc,idc2,val) values (1,'a','a',1)", @@ -389,7 +392,12 @@ func TestPlayerCopyTablesWithFK(t *testing.T) { expectDBClientQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", +<<<<<<< HEAD "select @@foreign_key_checks;", +======= + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", + "select @@foreign_key_checks", +>>>>>>> a159f18719 (copy over existing vreplication rows copied to local counter if resuming from another tablet (#13949)) // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -496,6 +504,7 @@ func TestPlayerCopyTables(t *testing.T) { expectDBClientQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -632,6 +641,7 @@ func TestPlayerCopyBigTable(t *testing.T) { // Create the list of tables to copy and transition to Copying state. "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", // The first fast-forward has no starting point. So, it just saves the current position. "/update _vt.vreplication set state='Copying'", @@ -747,6 +757,7 @@ func TestPlayerCopyWildcardRule(t *testing.T) { // Create the list of tables to copy and transition to Copying state. "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", // The first fast-forward has no starting point. So, it just saves the current position. @@ -887,6 +898,7 @@ func TestPlayerCopyTableContinuation(t *testing.T) { expectNontxQueries(t, []string{ // Catchup "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "insert into dst1(id,val) select 1, 'insert in' from dual where (1,1) <= (6,6)", "insert into dst1(id,val) select 7, 'insert out' from dual where (7,7) <= (6,6)", "update dst1 set val='updated' where id=3 and (3,3) <= (6,6)", @@ -999,11 +1011,23 @@ func TestPlayerCopyWildcardTableContinuation(t *testing.T) { "/insert into _vt.vreplication", "/update _vt.vreplication set state = 'Copying'", "/update _vt.vreplication set message='Picked source tablet.*", +<<<<<<< HEAD "insert into dst(id,val) select 4, 'new' from dual where (4) <= (2)", // Copy "insert into dst(id,val) values (3,'uncopied'), (4,'new')", `/update _vt.copy_state set lastpk.*`, "/delete from _vt.copy_state.*dst", +======= + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", + ).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer { + if !optimizeInsertsEnabled { + expect = expect.Then(qh.Immediately("insert into dst(id,val) select 4, 'new' from dual where (4) <= (2)")) + } + return expect.Then(qh.Immediately("insert into dst(id,val) values (3,'uncopied'), (4,'new')")) + }).Then(qh.Immediately( + `/insert into _vt.copy_state .*`, + "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst", +>>>>>>> a159f18719 (copy over existing vreplication rows copied to local counter if resuming from another tablet (#13949)) "/update _vt.vreplication set state='Running'", }) expectData(t, "dst", [][]string{ @@ -1085,6 +1109,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) { "/insert into _vt.vreplication", "/update _vt.vreplication set state = 'Copying'", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Copy "insert into dst(id,val) values (3,'uncopied'), (4,'new')", `/update _vt.copy_state set lastpk.*`, @@ -1134,6 +1159,7 @@ func TestPlayerCopyTablesNone(t *testing.T) { expectDBClientQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "begin", "/update _vt.vreplication set state='Stopped'", "commit", @@ -1184,6 +1210,7 @@ func TestPlayerCopyTablesStopAfterCopy(t *testing.T) { expectDBClientQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -1206,6 +1233,114 @@ func TestPlayerCopyTablesStopAfterCopy(t *testing.T) { }) } +<<<<<<< HEAD +======= +// TestPlayerCopyTablesGIPK tests the flow when the source table has a generated invisible primary key, for when +// the target table also has a gipk and also when the gipk column is visible, for example, in a sharded keyspace. +// The test also confirms that the copy_state has the gipk. +func TestPlayerCopyTablesGIPK(t *testing.T) { + testVcopierTestCases(t, testPlayerCopyTablesGIPK, commonVcopierTestCases()) +} + +func testPlayerCopyTablesGIPK(t *testing.T) { + if !env.HasCapability(testenv.ServerCapabilityGeneratedInvisiblePrimaryKey) { + t.Skip("skipping test as server does not support generated invisible primary keys") + } + defer deleteTablet(addTablet(100)) + + execStatements(t, []string{ + "SET @@session.sql_generate_invisible_primary_key=ON;", + "create table src1(val varbinary(128))", + "insert into src1 values('aaa'), ('bbb')", + "create table src2(val varbinary(128))", + "insert into src2 values('aaa'), ('bbb')", + fmt.Sprintf("create table %s.dst1(val varbinary(128))", vrepldb), + "SET @@session.sql_generate_invisible_primary_key=OFF;", + fmt.Sprintf("create table %s.dst2(my_row_id int, val varbinary(128), primary key(my_row_id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table src1", + fmt.Sprintf("drop table %s.dst1", vrepldb), + "drop table src2", + fmt.Sprintf("drop table %s.dst2", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "dst1", + Filter: "select * from src1", + }, { + Match: "dst2", + Filter: "select * from src2", + }}, + } + + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + OnDdl: binlogdatapb.OnDDLAction_IGNORE, + StopAfterCopy: true, + } + query := binlogplayer.CreateVReplicationState("test", bls, "", binlogdatapb.VReplicationWorkflowState_Init, playerEngine.dbName, 0, 0) + qr, err := playerEngine.Exec(query) + if err != nil { + t.Fatal(err) + } + defer func() { + query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID) + if _, err := playerEngine.Exec(query); err != nil { + t.Fatal(err) + } + expectDeleteQueries(t) + }() + + expectDBClientQueries(t, qh.Expect( + "/insert into _vt.vreplication", + "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", + // Create the list of tables to copy and transition to Copying state. + "begin", + "/insert into _vt.copy_state", + "/update _vt.vreplication set state='Copying'", + "commit", + // The first fast-forward has no starting point. So, it just saves the current position. + "/update _vt.vreplication set pos=", + ).Then(qh.Eventually( + "begin", + "insert into dst1(my_row_id,val) values (1,'aaa'), (2,'bbb')", + `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"my_row_id\\" type:UINT64 charset:63 flags:49699} rows:{lengths:1 values:\\"2\\"}'.*`, + "commit", + )).Then(qh.Immediately( + // copy of dst1 is done: delete from copy_state. + "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst1", + )).Then(qh.Eventually( + "begin", + "/update _vt.vreplication set pos=", + "commit", + "begin", + "insert into dst2(my_row_id,val) values (1,'aaa'), (2,'bbb')", + `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"my_row_id\\" type:UINT64 charset:63 flags:49699} rows:{lengths:1 values:\\"2\\"}'.*`, + "commit", + )).Then(qh.Immediately( + // copy of dst2 is done: delete from copy_state. + "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*dst2", + // All tables copied. Stop vreplication because we requested it. + "/update _vt.vreplication set state='Stopped'", + ))) + + expectData(t, "dst1", [][]string{ + {"aaa"}, + {"bbb"}, + }) + expectData(t, "dst2", [][]string{ + {"1", "aaa"}, + {"2", "bbb"}, + }) +} + +>>>>>>> a159f18719 (copy over existing vreplication rows copied to local counter if resuming from another tablet (#13949)) func TestPlayerCopyTableCancel(t *testing.T) { defer deleteTablet(addTablet(100)) @@ -1261,6 +1396,7 @@ func TestPlayerCopyTableCancel(t *testing.T) { expectDBClientQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -1343,6 +1479,7 @@ func TestPlayerCopyTablesWithGeneratedColumn(t *testing.T) { // Create the list of tables to copy and transition to Copying state. "/insert into _vt.vreplication", "/update _vt.vreplication set message=", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state", // The first fast-forward has no starting point. So, it just saves the current position. @@ -1410,6 +1547,7 @@ func TestCopyTablesWithInvalidDates(t *testing.T) { expectDBClientQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -1501,6 +1639,7 @@ func TestCopyInvisibleColumns(t *testing.T) { // Create the list of tables to copy and transition to Copying state. "/insert into _vt.vreplication", "/update _vt.vreplication set message=", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state", // The first fast-forward has no starting point. So, it just saves the current position. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 5dbe260d68b..2a82bf54729 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1631,6 +1631,7 @@ func TestPlayerDDL(t *testing.T) { "/update.*'Running'", // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update.*'Running'", "begin", fmt.Sprintf("/update.*'%s'", pos2), @@ -1794,6 +1795,7 @@ func TestPlayerStopPos(t *testing.T) { "/update.*'Running'", // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update.*'Running'", "begin", "insert into yes(id,val) values (1,'aaa')", @@ -1819,6 +1821,7 @@ func TestPlayerStopPos(t *testing.T) { "/update.*'Running'", // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update.*'Running'", "begin", // Since 'no' generates empty transactions that are skipped by @@ -1837,6 +1840,7 @@ func TestPlayerStopPos(t *testing.T) { "/update.*'Running'", // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update.*'Running'", "/update.*'Stopped'.*already reached", }) @@ -2458,6 +2462,7 @@ func TestRestartOnVStreamEnd(t *testing.T) { }) expectDBClientQueries(t, []string{ "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running'", "begin", "insert into t1(id,val) values (2,'aaa')", @@ -2917,6 +2922,7 @@ func startVReplication(t *testing.T, bls *binlogdatapb.BinlogSource, pos string) expectDBClientQueries(t, []string{ "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running'", }) var once sync.Once diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 2fd917672be..e456cfe16ed 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -131,7 +131,7 @@ func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreame log.Warningf("the supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d", vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval) } - return &vreplicator{ + vr := &vreplicator{ vre: vre, id: id, source: source, @@ -142,6 +142,8 @@ func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreame throttleUpdatesRateLimiter: timer.NewRateLimiter(time.Second), } + vr.setExistingRowsCopied() + return vr } // Replicate starts a vreplication stream. It can be in one of three phases: @@ -550,3 +552,453 @@ func recalculatePKColsInfoByColumnNames(uniqueKeyColumnNames []string, colInfos } return pkColInfos } +<<<<<<< HEAD +======= + +// stashSecondaryKeys temporarily DROPs all secondary keys from the table schema +// and stashes an ALTER TABLE statement that will be used to recreate them at the +// end of the copy phase. +func (vr *vreplicator) stashSecondaryKeys(ctx context.Context, tableName string) error { + if !vr.supportsDeferredSecondaryKeys() { + return fmt.Errorf("deferring secondary key creation is not supported for %s workflows", + binlogdatapb.VReplicationWorkflowType_name[vr.WorkflowType]) + } + secondaryKeys, err := vr.getTableSecondaryKeys(ctx, tableName) + if err != nil { + return err + } + if len(secondaryKeys) > 0 { + alterDrop := &sqlparser.AlterTable{ + Table: sqlparser.TableName{ + Qualifier: sqlparser.NewIdentifierCS(vr.dbClient.DBName()), + Name: sqlparser.NewIdentifierCS(tableName), + }, + } + alterReAdd := &sqlparser.AlterTable{ + Table: sqlparser.TableName{ + Qualifier: sqlparser.NewIdentifierCS(vr.dbClient.DBName()), + Name: sqlparser.NewIdentifierCS(tableName), + }, + } + for _, secondaryKey := range secondaryKeys { + // Primary should never happen. Fulltext keys are + // not supported for deferral and retained during + // the copy phase as they have some unique + // behaviors and constraints: + // - Adding a fulltext key requires a full table + // rebuild to add the internal FTS_DOC_ID field + // to each record. + // - You can not add/remove multiple fulltext keys + // in a single ALTER statement. + if secondaryKey.Info.Primary || secondaryKey.Info.Fulltext { + continue + } + alterDrop.AlterOptions = append(alterDrop.AlterOptions, + &sqlparser.DropKey{ + Name: secondaryKey.Info.Name, + Type: sqlparser.NormalKeyType, + }, + ) + alterReAdd.AlterOptions = append(alterReAdd.AlterOptions, + &sqlparser.AddIndexDefinition{ + IndexDefinition: secondaryKey, + }, + ) + } + action, err := json.Marshal(PostCopyAction{ + Type: PostCopyActionSQL, + Task: sqlparser.String(alterReAdd), + }) + if err != nil { + return err + } + insert, err := sqlparser.ParseAndBind(sqlCreatePostCopyAction, sqltypes.Int32BindVariable(vr.id), + sqltypes.StringBindVariable(tableName), sqltypes.StringBindVariable(string(action))) + if err != nil { + return err + } + // Use a new DB client to avoid interfering with open transactions + // in the shared client as DDL includes an implied commit. + // We're also NOT using a DBA connection here because we want to + // be sure that the commit fails if the instance is somehow in + // READ-ONLY mode. + dbClient, err := vr.newClientConnection(ctx) + if err != nil { + log.Errorf("Unable to connect to the database when saving secondary keys for deferred creation on the %q table in the %q VReplication workflow: %v", + tableName, vr.WorkflowName, err) + return vterrors.Wrap(err, "unable to connect to the database when saving secondary keys for deferred creation") + } + defer dbClient.Close() + if _, err := dbClient.ExecuteFetch(insert, 1); err != nil { + return err + } + if _, err := dbClient.ExecuteFetch(sqlparser.String(alterDrop), 1); err != nil { + // If they've already been dropped, e.g. by another controller running on the tablet + // when doing a shard merge, then we can ignore the error. + if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Num == sqlerror.ERCantDropFieldOrKey { + secondaryKeys, err := vr.getTableSecondaryKeys(ctx, tableName) + if err == nil && len(secondaryKeys) == 0 { + return nil + } + } + return err + } + } + + return nil +} + +func (vr *vreplicator) getTableSecondaryKeys(ctx context.Context, tableName string) ([]*sqlparser.IndexDefinition, error) { + req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{tableName}} + schema, err := vr.mysqld.GetSchema(ctx, vr.dbClient.DBName(), req) + if err != nil { + return nil, err + } + // schema should never be nil, but check to be extra safe. + if schema == nil || len(schema.TableDefinitions) != 1 { + return nil, fmt.Errorf("unexpected number of table definitions returned from GetSchema call for table %q: %d", + tableName, len(schema.TableDefinitions)) + } + tableSchema := schema.TableDefinitions[0].Schema + var secondaryKeys []*sqlparser.IndexDefinition + parsedDDL, err := sqlparser.ParseStrictDDL(tableSchema) + if err != nil { + return secondaryKeys, err + } + createTable, ok := parsedDDL.(*sqlparser.CreateTable) + // createTable or createTable.TableSpec should never be nil + // if it was a valid cast, but check to be extra safe. + if !ok || createTable == nil || createTable.GetTableSpec() == nil { + return nil, fmt.Errorf("could not determine CREATE TABLE statement from table schema %q", tableSchema) + } + + for _, index := range createTable.GetTableSpec().Indexes { + if !index.Info.Primary { + secondaryKeys = append(secondaryKeys, index) + } + } + return secondaryKeys, err +} + +func (vr *vreplicator) execPostCopyActions(ctx context.Context, tableName string) error { + defer vr.stats.PhaseTimings.Record("postCopyActions", time.Now()) + + // Use a new DB client to avoid interfering with open transactions + // in the shared client as DDL includes an implied commit. + // We're also NOT using a DBA connection here because we want to be + // sure that the work fails if the instance is somehow in READ-ONLY + // mode. + dbClient, err := vr.newClientConnection(ctx) + if err != nil { + log.Errorf("Unable to connect to the database when executing post copy actions on the %q table in the %q VReplication workflow: %v", + tableName, vr.WorkflowName, err) + return vterrors.Wrap(err, "unable to connect to the database when executing post copy actions") + } + defer dbClient.Close() + + query, err := sqlparser.ParseAndBind(sqlGetPostCopyActions, sqltypes.Int32BindVariable(vr.id), + sqltypes.StringBindVariable(tableName)) + if err != nil { + return err + } + qr, err := dbClient.ExecuteFetch(query, -1) + if err != nil { + return err + } + // qr should never be nil, but check anyway to be extra safe. + if qr == nil || len(qr.Rows) == 0 { + return nil + } + + if err := vr.insertLog(LogCopyStart, fmt.Sprintf("Executing %d post copy action(s) for %s table", + len(qr.Rows), tableName)); err != nil { + return err + } + + // Save our connection ID so we can use it to easily KILL any + // running SQL action we may perform later if needed. + idqr, err := dbClient.ExecuteFetch("select connection_id()", 1) + if err != nil { + return err + } + // qr should never be nil, but check anyway to be extra safe. + if idqr == nil || len(idqr.Rows) != 1 { + return fmt.Errorf("unexpected number of rows returned (%d) from connection_id() query", len(idqr.Rows)) + } + connID, err := idqr.Rows[0][0].ToInt64() + if err != nil || connID == 0 { + return fmt.Errorf("unexpected result (%d) from connection_id() query, error: %v", connID, err) + } + + deleteAction := func(dbc *vdbClient, id int64, vid int32, tn string) error { + delq, err := sqlparser.ParseAndBind(sqlDeletePostCopyAction, sqltypes.Int32BindVariable(vid), + sqltypes.StringBindVariable(tn), sqltypes.Int64BindVariable(id)) + if err != nil { + return err + } + if _, err := dbc.ExecuteFetch(delq, 1); err != nil { + return fmt.Errorf("failed to delete post copy action for the %q table with id %d: %v", + tableName, id, err) + } + return nil + } + + // This could take hours so we start a monitoring goroutine to + // listen for context cancellation, which would indicate that + // the controller is stopping due to engine shutdown (tablet + // shutdown or transition). If that happens we attempt to KILL + // the running ALTER statement using a DBA connection. + // If we don't do this then we could e.g. cause a PRS to fail as + // the running ALTER will block setting [super_]read_only. + // A failed/killed ALTER will be tried again when the copy + // phase starts up again on the (new) PRIMARY. + var action PostCopyAction + done := make(chan struct{}) + defer close(done) + killAction := func(ak PostCopyAction) error { + // If we're executing an SQL query then KILL the + // connection being used to execute it. + if ak.Type == PostCopyActionSQL { + if connID < 1 { + return fmt.Errorf("invalid connection ID found (%d) when attempting to kill %q", connID, ak.Task) + } + killdbc := vr.vre.dbClientFactoryDba() + if err := killdbc.Connect(); err != nil { + return fmt.Errorf("unable to connect to the database when attempting to kill %q: %v", ak.Task, err) + } + defer killdbc.Close() + _, err = killdbc.ExecuteFetch(fmt.Sprintf("kill %d", connID), 1) + return err + } + // We may support non-SQL actions in the future. + return nil + } + go func() { + select { + // Only cancel an ongoing ALTER if the engine is closing. + case <-vr.vre.ctx.Done(): + log.Infof("Copy of the %q table stopped when performing the following post copy action in the %q VReplication workflow: %+v", + tableName, vr.WorkflowName, action) + if err := killAction(action); err != nil { + log.Errorf("Failed to kill post copy action on the %q table in the %q VReplication workflow: %v", + tableName, vr.WorkflowName, err) + } + return + case <-done: + // We're done, so no longer need to listen for cancellation. + return + } + }() + + for _, row := range qr.Named().Rows { + select { + // Stop any further actions if the vreplicator's context is + // cancelled -- most likely due to hitting the + // vreplication_copy_phase_duration + case <-ctx.Done(): + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + default: + } + id, err := row["id"].ToInt64() + if err != nil { + return err + } + action = PostCopyAction{} + actionBytes, err := row["action"].ToBytes() + if err != nil { + return err + } + if err := json.Unmarshal(actionBytes, &action); err != nil { + return err + } + + // There can be multiple vreplicators/controllers running on + // the same tablet for the same table e.g. when doing shard + // merges. Let's check for that and if there are still others + // that have not finished the copy phase for the table, with + // the same action, then we skip executing it as an individual + // action on a table should only be done by the last vreplicator + // to finish. We use a transaction because we select matching + // rows with FOR UPDATE in order to serialize the execution of + // the post copy actions for the same workflow and table. + // This ensures that the actions are only performed once after + // all streams have completed the copy phase for the table. + redundant := false + _, err = dbClient.ExecuteFetch("start transaction", 1) + if err != nil { + return err + } + vrsq, err := sqlparser.ParseAndBind(sqlGetAndLockPostCopyActionsForTable, sqltypes.StringBindVariable(tableName)) + if err != nil { + return err + } + vrsres, err := dbClient.ExecuteFetch(vrsq, -1) + if err != nil { + return fmt.Errorf("failed to get post copy actions for the %q table: %v", tableName, err) + } + if vrsres != nil && len(vrsres.Rows) > 1 { + // We have more than one planned post copy action on the table. + for _, row := range vrsres.Named().Rows { + vrid, err := row["vrepl_id"].ToInt32() + if err != nil { + return err + } + ctlaction := row["action"].ToString() + // Let's make sure that it's a different controller/vreplicator + // and that the action is the same. + if vrid != vr.id && strings.EqualFold(ctlaction, string(actionBytes)) { + // We know that there's another controller/vreplicator yet + // to finish its copy phase for the table and it will perform + // the same action on the same table when it completes, so we + // skip doing the action and simply delete our action record + // to mark this controller/vreplicator's post copy action work + // as being done for the table before it finishes the copy + // phase for the table. + if err := deleteAction(dbClient, id, vr.id, tableName); err != nil { + return err + } + redundant = true + break + } + } + } + _, err = dbClient.ExecuteFetch("commit", 1) + if err != nil { + return err + } + if redundant { + // Skip this action as it will be executed by a later vreplicator. + continue + } + + switch action.Type { + case PostCopyActionSQL: + log.Infof("Executing post copy SQL action on the %q table in the %q VReplication workflow: %s", + tableName, vr.WorkflowName, action.Task) + // This will return an io.EOF / MySQL CRServerLost (errno 2013) + // error if it is killed by the monitoring goroutine. + if _, err := dbClient.ExecuteFetch(action.Task, -1); err != nil { + failedAlterErr := err + // It's possible that we previously executed the ALTER but + // the subsequent DELETE of the post_copy_action record failed. + // For example, the context could be cancelled in-between. + // It's also possible that the user has modified the schema on + // the target side. + // If we get a duplicate key/index error then let's see if the + // index definitions that we would have added already exist in + // the table schema and if so move forward and delete the + // post_copy_action record. + if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERDupKeyName { + stmt, err := sqlparser.ParseStrictDDL(action.Task) + if err != nil { + return failedAlterErr + } + alterStmt, ok := stmt.(*sqlparser.AlterTable) + if !ok { + return failedAlterErr + } + currentSKs, err := vr.getTableSecondaryKeys(ctx, tableName) + if err != nil { + return failedAlterErr + } + if len(currentSKs) < len(alterStmt.AlterOptions) { + return failedAlterErr + } + for _, alterOption := range alterStmt.AlterOptions { + addKey, ok := alterOption.(*sqlparser.AddIndexDefinition) + if !ok { + return failedAlterErr + } + found := false + for _, currentSK := range currentSKs { + if sqlparser.Equals.RefOfIndexDefinition(addKey.IndexDefinition, currentSK) { + found = true + break + } + } + if !found { + return failedAlterErr + } + } + // All of the keys we wanted to add in the ALTER already + // exist in the live table schema. + } else { + return failedAlterErr + } + } + if err := deleteAction(dbClient, id, vr.id, tableName); err != nil { + return err + } + default: + return fmt.Errorf("unsupported post copy action type: %v", action.Type) + } + } + + if err := vr.insertLog(LogCopyStart, fmt.Sprintf("Completed all post copy actions for %s table", + tableName)); err != nil { + return err + } + + return nil +} + +// supportsDeferredSecondaryKeys tells you if related work should be done +// for the workflow. Deferring secondary index generation is only supported +// with MoveTables, Migrate, and Reshard. +func (vr *vreplicator) supportsDeferredSecondaryKeys() bool { + return vr.WorkflowType == int32(binlogdatapb.VReplicationWorkflowType_MoveTables) || + vr.WorkflowType == int32(binlogdatapb.VReplicationWorkflowType_Migrate) || + vr.WorkflowType == int32(binlogdatapb.VReplicationWorkflowType_Reshard) +} + +func (vr *vreplicator) newClientConnection(ctx context.Context) (*vdbClient, error) { + dbc := vr.vre.dbClientFactoryFiltered() + if err := dbc.Connect(); err != nil { + return nil, vterrors.Wrap(err, "can't connect to database") + } + dbClient := newVDBClient(dbc, vr.stats) + if _, err := vr.setSQLMode(ctx, dbClient); err != nil { + return nil, vterrors.Wrap(err, "failed to set sql_mode") + } + if err := vr.clearFKCheck(dbClient); err != nil { + return nil, vterrors.Wrap(err, "failed to clear foreign key check") + } + return dbClient, nil +} + +// setExistingRowsCopied deals with the case where another tablet started +// the workflow and a reparent occurred, and now that we manage the +// workflow, we need to read the rows_copied that already exists and add +// them to our counter, otherwise it will look like the reparent wiped all the +// rows_copied. So in the event that our CopyRowCount counter is zero, and +// the existing rows_copied in the vreplication table is not, copy the value of +// vreplication.rows_copied into our CopyRowCount. +func (vr *vreplicator) setExistingRowsCopied() { + if vr.stats.CopyRowCount.Get() == 0 { + rowsCopiedExisting, err := vr.readExistingRowsCopied(vr.id) + if err != nil { + log.Warningf("Failed to read existing rows copied value for %s worfklow: %v", vr.WorkflowName, err) + } else if rowsCopiedExisting != 0 { + log.Infof("Resuming the %s vreplication workflow started on another tablet, setting rows copied counter to %v", vr.WorkflowName, rowsCopiedExisting) + vr.stats.CopyRowCount.Set(rowsCopiedExisting) + } + } +} + +func (vr *vreplicator) readExistingRowsCopied(id int32) (int64, error) { + query, err := sqlparser.ParseAndBind(`SELECT rows_copied FROM _vt.vreplication WHERE id=%a`, + sqltypes.Int32BindVariable(id), + ) + if err != nil { + return 0, err + } + r, err := vr.dbClient.Execute(query) + if err != nil { + return 0, err + } + if len(r.Rows) != 1 { + return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get expected single row value when getting rows_copied for workflow id: %d", id) + } + return r.Rows[0][0].ToInt64() +} +>>>>>>> a159f18719 (copy over existing vreplication rows copied to local counter if resuming from another tablet (#13949)) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go index e441f327436..8ab8e8f0b46 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go @@ -173,3 +173,566 @@ func TestPrimaryKeyEquivalentColumns(t *testing.T) { }) } } +<<<<<<< HEAD +======= + +// TestDeferSecondaryKeys confirms the behavior of the +// --defer-secondary-keys MoveTables/Migrate, and Reshard +// workflow/command flag. +// 1. We drop the secondary keys +// 2. We store the secondary key definitions for step 3 +// 3. We add the secondary keys back after the rows are copied +func TestDeferSecondaryKeys(t *testing.T) { + ctx := context.Background() + tablet := addTablet(100) + defer deleteTablet(tablet) + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + } + id := int32(1) + vsclient := newTabletConnector(tablet) + stats := binlogplayer.NewStats() + defer stats.Stop() + dbClient := playerEngine.dbClientFactoryFiltered() + err := dbClient.Connect() + require.NoError(t, err) + defer dbClient.Close() + dbName := dbClient.DBName() + // Ensure there's a dummy vreplication workflow record + _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s') on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s'", + id, dbName, dbName), 1) + require.NoError(t, err) + defer func() { + _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) + require.NoError(t, err) + }() + vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine) + getActionsSQLf := "select action from _vt.post_copy_action where table_name='%s'" + getCurrentDDL := func(tableName string) string { + req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{tableName}} + sd, err := env.Mysqld.GetSchema(ctx, dbName, req) + require.NoError(t, err) + require.Equal(t, 1, len(sd.TableDefinitions)) + return removeVersionDifferences(sd.TableDefinitions[0].Schema) + } + _, err = dbClient.ExecuteFetch("use "+dbName, 1) + require.NoError(t, err) + diffHints := &schemadiff.DiffHints{ + StrictIndexOrdering: false, + } + + tests := []struct { + name string + tableName string + initialDDL string + strippedDDL string + intermediateDDL string + actionDDL string + WorkflowType int32 + wantStashErr string + wantExecErr string + expectFinalSchemaDiff bool + postStashHook func() error + }{ + { + name: "0SK", + tableName: "t1", + initialDDL: "create table t1 (id int not null, primary key (id))", + strippedDDL: "create table t1 (id int not null, primary key (id))", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + }, + { + name: "1SK:Materialize", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 int default null, primary key (id), key c1 (c1))", + strippedDDL: "create table t1 (id int not null, c1 int default null, primary key (id), key c1 (c1))", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_Materialize), + wantStashErr: "deferring secondary key creation is not supported for Materialize workflows", + }, + { + name: "1SK:OnlineDDL", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 int default null, primary key (id), key c1 (c1))", + strippedDDL: "create table t1 (id int not null, c1 int default null, primary key (id), key c1 (c1))", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_OnlineDDL), + wantStashErr: "deferring secondary key creation is not supported for OnlineDDL workflows", + }, + { + name: "1SK", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 int default null, primary key (id), key c1 (c1))", + strippedDDL: "create table t1 (id int not null, c1 int default null, primary key (id))", + actionDDL: "alter table %s.t1 add key c1 (c1)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_Reshard), + }, + { + name: "2SK", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id), key c1 (c1), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id))", + actionDDL: "alter table %s.t1 add key c1 (c1), add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + }, + { + name: "2tSK", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 varchar(10) default null, c2 varchar(10) default null, primary key (id), key c1_c2 (c1,c2), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 varchar(10) default null, c2 varchar(10) default null, primary key (id))", + actionDDL: "alter table %s.t1 add key c1_c2 (c1, c2), add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + }, + { + name: "2FPK2SK", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 varchar(10) not null, c2 varchar(10) default null, primary key (id,c1), key c1_c2 (c1,c2), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 varchar(10) not null, c2 varchar(10) default null, primary key (id,c1))", + actionDDL: "alter table %s.t1 add key c1_c2 (c1, c2), add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + }, + { + name: "3FPK1SK", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 varchar(10) not null, c2 varchar(10) not null, primary key (id,c1,c2), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 varchar(10) not null, c2 varchar(10) not null, primary key (id,c1,c2))", + actionDDL: "alter table %s.t1 add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_Reshard), + }, + { + name: "3FPK1SK_ShardMerge", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 varchar(10) not null, c2 varchar(10) not null, primary key (id,c1,c2), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 varchar(10) not null, c2 varchar(10) not null, primary key (id,c1,c2))", + actionDDL: "alter table %s.t1 add key c2 (c2)", + postStashHook: func() error { + myid := id + 1000 + // Insert second vreplication record to simulate a second controller/vreplicator + _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s')", + myid, dbName), 1) + if err != nil { + return err + } + myvr := newVReplicator(myid, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine) + myvr.WorkflowType = int32(binlogdatapb.VReplicationWorkflowType_Reshard) + // Insert second post copy action record to simulate a shard merge where you + // have N controllers/replicators running for the same table on the tablet. + // This forces a second row, which would otherwise not get created beacause + // when this is called there's no secondary keys to stash anymore. + addlAction, err := json.Marshal(PostCopyAction{ + Type: PostCopyActionSQL, + Task: fmt.Sprintf("alter table %s.t1 add key c2 (c2)", dbName), + }) + if err != nil { + return err + } + _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.post_copy_action (vrepl_id, table_name, action) values (%d, 't1', '%s')", + myid, string(addlAction)), 1) + if err != nil { + return err + } + err = myvr.execPostCopyActions(ctx, "t1") + if err != nil { + return err + } + return nil + }, + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_Reshard), + }, + { + name: "0FPK2tSK", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 varchar(10) default null, c2 varchar(10) default null, key c1_c2 (c1,c2), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 varchar(10) default null, c2 varchar(10) default null)", + actionDDL: "alter table %s.t1 add key c1_c2 (c1, c2), add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + }, + { + name: "2SKRetryNoErr", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id), key c1 (c1), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id))", + intermediateDDL: "alter table %s.t1 add key c1 (c1), add key c2 (c2)", + actionDDL: "alter table %s.t1 add key c1 (c1), add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + }, + { + name: "2SKRetryNoErr2", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id), key c1 (c1), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id))", + intermediateDDL: "alter table %s.t1 add key c2 (c2), add key c1 (c1)", + actionDDL: "alter table %s.t1 add key c1 (c1), add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + }, + { + name: "SKSuperSetNoErr", // a superset of the original keys is allowed + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id), key c1 (c1), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id))", + intermediateDDL: "alter table %s.t1 add unique key c1_c2 (c1,c2), add key c2 (c2), add key c1 (c1)", + actionDDL: "alter table %s.t1 add key c1 (c1), add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + expectFinalSchemaDiff: true, + }, + { + name: "2SKRetryErr", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id), key c1 (c1), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id))", + intermediateDDL: "alter table %s.t1 add key c2 (c2)", + actionDDL: "alter table %s.t1 add key c1 (c1), add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + wantExecErr: "Duplicate key name 'c2' (errno 1061) (sqlstate 42000)", + }, + { + name: "2SKRetryErr2", + tableName: "t1", + initialDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id), key c1 (c1), key c2 (c2))", + strippedDDL: "create table t1 (id int not null, c1 int default null, c2 int default null, primary key (id))", + intermediateDDL: "alter table %s.t1 add key c1 (c1)", + actionDDL: "alter table %s.t1 add key c1 (c1), add key c2 (c2)", + WorkflowType: int32(binlogdatapb.VReplicationWorkflowType_MoveTables), + wantExecErr: "Duplicate key name 'c1' (errno 1061) (sqlstate 42000)", + }, + } + + for _, tcase := range tests { + t.Run(tcase.name, func(t *testing.T) { + // Deferred secondary indexes are only supported for + // MoveTables and Reshard workflows. + vr.WorkflowType = tcase.WorkflowType + + // Create the table. + _, err := dbClient.ExecuteFetch(tcase.initialDDL, 1) + require.NoError(t, err) + defer func() { + _, err = dbClient.ExecuteFetch(fmt.Sprintf("drop table %s.%s", dbName, tcase.tableName), 1) + require.NoError(t, err) + _, err = dbClient.ExecuteFetch("delete from _vt.post_copy_action", 1) + require.NoError(t, err) + }() + + confirmNoSecondaryKeys := func() { + // Confirm that the table now has no secondary keys. + tcase.strippedDDL = removeVersionDifferences(tcase.strippedDDL) + currentDDL := getCurrentDDL(tcase.tableName) + require.True(t, strings.EqualFold(stripCruft(tcase.strippedDDL), stripCruft(currentDDL)), + "Expected: %s\n Got: %s", forError(tcase.strippedDDL), forError(currentDDL)) + } + + // If the table has any secondary keys, drop them and + // store an ALTER TABLE statement to re-add them after + // the table is copied. + err = vr.stashSecondaryKeys(ctx, tcase.tableName) + if tcase.wantStashErr != "" { + require.EqualError(t, err, tcase.wantStashErr) + } else { + require.NoError(t, err) + } + confirmNoSecondaryKeys() + + if tcase.postStashHook != nil { + err = tcase.postStashHook() + require.NoError(t, err) + + // We should still NOT have any secondary keys because there's still + // a running controller/vreplicator in the copy phase. + confirmNoSecondaryKeys() + } + + // If we expect post-copy SQL actions, then ensure + // that the stored DDL matches what we expect. + if tcase.actionDDL != "" { + res, err := dbClient.ExecuteFetch(fmt.Sprintf(getActionsSQLf, tcase.tableName), 1) + require.Equal(t, 1, len(res.Rows)) + require.NoError(t, err) + val, err := res.Rows[0][0].ToBytes() + require.NoError(t, err) + alter, err := jsonparser.GetString(val, "task") + require.NoError(t, err) + require.True(t, strings.EqualFold(stripCruft(fmt.Sprintf(tcase.actionDDL, dbName)), stripCruft(alter)), + "Expected: %s\n Got: %s", forError(fmt.Sprintf(tcase.actionDDL, dbName)), forError(alter)) + } + + if tcase.intermediateDDL != "" { + _, err := dbClient.ExecuteFetch(fmt.Sprintf(tcase.intermediateDDL, dbName), 1) + require.NoError(t, err) + } + + err = vr.execPostCopyActions(ctx, tcase.tableName) + expectedPostCopyActionRecs := 0 + if tcase.wantExecErr != "" { + require.Contains(t, err.Error(), tcase.wantExecErr) + expectedPostCopyActionRecs = 1 + } else { + require.NoError(t, err) + // Confirm that the final DDL logically matches the initial DDL. + // We do not require that the index definitions are in the same + // order in the table schema. + if !tcase.expectFinalSchemaDiff { + currentDDL := getCurrentDDL(tcase.tableName) + sdiff, err := schemadiff.DiffCreateTablesQueries(currentDDL, tcase.initialDDL, diffHints) + require.NoError(t, err) + require.Nil(t, sdiff, "Expected no schema difference but got: %s", sdiff.CanonicalStatementString()) + } + } + + // Confirm that the post copy action record(s) are deleted when there's + // no exec error or conversely that it still exists when there was + // one. + res, err := dbClient.ExecuteFetch(fmt.Sprintf(getActionsSQLf, tcase.tableName), expectedPostCopyActionRecs) + require.NoError(t, err) + require.Equal(t, expectedPostCopyActionRecs, len(res.Rows), + "Expected %d post copy action records, got %d", expectedPostCopyActionRecs, len(res.Rows)) + }) + } +} + +// TestCancelledDeferSecondaryKeys tests that the ALTER +// TABLE statement used to re-add secondary keys (when +// the --defer-secondary-keys flag was used), after +// copying all rows, is properly killed when the context +// is cancelled -- e.g. due to the VReplication engine +// closing for a tablet transition during a PRS. +func TestCancelledDeferSecondaryKeys(t *testing.T) { + // Skip the test for MariaDB as it does not have + // performance_schema enabled by default. + version, err := mysqlctl.GetVersionString() + require.NoError(t, err) + flavor, _, err := mysqlctl.ParseVersionString(version) + require.NoError(t, err) + if flavor == mysqlctl.FlavorMariaDB { + t.Skipf("Skipping test as it's not supported with %s", flavor) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tablet := addTablet(100) + defer deleteTablet(tablet) + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + } + // The test env uses the same factory for both dba and + // filtered connections. + dbconfigs.GlobalDBConfigs.Filtered.User = "vt_dba" + id := int32(1) + vsclient := newTabletConnector(tablet) + stats := binlogplayer.NewStats() + defer stats.Stop() + dbaconn := playerEngine.dbClientFactoryDba() + err = dbaconn.Connect() + require.NoError(t, err) + defer dbaconn.Close() + dbClient := playerEngine.dbClientFactoryFiltered() + err = dbClient.Connect() + require.NoError(t, err) + defer dbClient.Close() + dbName := dbClient.DBName() + // Ensure there's a dummy vreplication workflow record + _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s') on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s'", + id, dbName, dbName), 1) + require.NoError(t, err) + defer func() { + _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) + require.NoError(t, err) + }() + vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine) + vr.WorkflowType = int32(binlogdatapb.VReplicationWorkflowType_MoveTables) + getCurrentDDL := func(tableName string) string { + req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{tableName}} + sd, err := env.Mysqld.GetSchema(context.Background(), dbName, req) + require.NoError(t, err) + require.Equal(t, 1, len(sd.TableDefinitions)) + return removeVersionDifferences(sd.TableDefinitions[0].Schema) + } + getActionsSQLf := "select action from _vt.post_copy_action where vrepl_id=%d and table_name='%s'" + + tableName := "t1" + ddl := fmt.Sprintf("create table %s.t1 (id int not null, c1 int default null, c2 int default null, primary key(id), key c1 (c1), key c2 (c2))", dbName) + withoutPKs := "create table t1 (id int not null, c1 int default null, c2 int default null, primary key(id))" + alter := fmt.Sprintf("alter table %s.t1 add key c1 (c1), add key c2 (c2)", dbName) + + // Create the table. + _, err = dbClient.ExecuteFetch(ddl, 1) + require.NoError(t, err) + + // Setup the ALTER work. + err = vr.stashSecondaryKeys(ctx, tableName) + require.NoError(t, err) + + // Lock the table to block execution of the ALTER so + // that we can be sure that it runs and we can KILL it. + _, err = dbaconn.ExecuteFetch(fmt.Sprintf("lock table %s.%s write", dbName, tableName), 1) + require.NoError(t, err) + + // The ALTER should block on the table lock. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := vr.execPostCopyActions(ctx, tableName) + assert.True(t, strings.EqualFold(err.Error(), fmt.Sprintf("EOF (errno 2013) (sqlstate HY000) during query: %s", alter))) + }() + + // Confirm that the expected ALTER query is being attempted. + query := fmt.Sprintf("select count(*) from performance_schema.events_statements_current where sql_text = '%s'", alter) + waitForQueryResult(t, dbaconn, query, "1") + + // Cancel the context while the ALTER is running/blocked + // and wait for it to be KILLed off. + playerEngine.cancel() + wg.Wait() + + _, err = dbaconn.ExecuteFetch("unlock tables", 1) + assert.NoError(t, err) + + // Confirm that the ALTER to re-add the secondary keys + // did not succeed. + currentDDL := getCurrentDDL(tableName) + assert.True(t, strings.EqualFold(stripCruft(withoutPKs), stripCruft(currentDDL)), + "Expected: %s\n Got: %s", forError(withoutPKs), forError(currentDDL)) + + // Confirm that we successfully attempted to kill it. + query = "select count(*) from performance_schema.events_statements_history where digest_text = 'KILL ?' and errors = 0" + res, err := dbaconn.ExecuteFetch(query, 1) + assert.NoError(t, err) + assert.Equal(t, 1, len(res.Rows)) + // TODO: figure out why the KILL never shows up... + //require.Equal(t, "1", res.Rows[0][0].ToString()) + + // Confirm that the post copy action record still exists + // so it will later be retried. + res, err = dbClient.ExecuteFetch(fmt.Sprintf(getActionsSQLf, id, tableName), 1) + require.NoError(t, err) + require.Equal(t, 1, len(res.Rows)) +} + +// TestResumingFromPreviousWorkflowKeepingRowsCopied tests that when you +// resume a workflow started by another tablet (eg. a reparent occurred), +// the rows_copied does not reset to zero but continues along from where +// it left off. +func TestResumingFromPreviousWorkflowKeepingRowsCopied(t *testing.T) { + _, cancel := context.WithCancel(context.Background()) + defer cancel() + tablet := addTablet(100) + defer deleteTablet(tablet) + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + } + // The test env uses the same factory for both dba and + // filtered connections. + dbconfigs.GlobalDBConfigs.Filtered.User = "vt_dba" + id := int32(1) + + vsclient := newTabletConnector(tablet) + stats := binlogplayer.NewStats() + defer stats.Stop() + + dbaconn := playerEngine.dbClientFactoryDba() + err := dbaconn.Connect() + require.NoError(t, err) + defer dbaconn.Close() + + dbClient := playerEngine.dbClientFactoryFiltered() + err = dbClient.Connect() + require.NoError(t, err) + defer dbClient.Close() + + dbName := dbClient.DBName() + rowsCopied := int64(500000) + // Ensure there's an existing vreplication workflow + _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, rows_copied) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s', %v) on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s', rows_copied=%v", + id, dbName, rowsCopied, dbName, rowsCopied), 1) + require.NoError(t, err) + defer func() { + _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) + require.NoError(t, err) + }() + vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine) + assert.Equal(t, rowsCopied, vr.stats.CopyRowCount.Get()) +} + +// stripCruft removes all whitespace unicode chars and backticks. +func stripCruft(in string) string { + out := strings.Builder{} + for _, r := range in { + if unicode.IsSpace(r) || r == '`' { + continue + } + out.WriteRune(r) + } + return out.String() +} + +// forError returns a string for humans to easily compare in +// in error messages. +func forError(in string) string { + mid := strings.ToLower(in) + // condense multiple spaces into one. + mid = regexp.MustCompile(`\s+`).ReplaceAllString(mid, " ") + sr := strings.NewReplacer( + "\t", "", + "\n", "", + "\r", "", + "`", "", + "( ", "(", + " )", ")", + ) + return sr.Replace(mid) +} + +// removeVersionDifferences removes portions of a CREATE TABLE statement +// that differ between versions: +// - 8.0 no longer includes display widths for integer or year types +// - MySQL and MariaDB versions differ in what table options they display +func removeVersionDifferences(in string) string { + out := in + var re *regexp.Regexp + for _, baseType := range []string{"int", "year"} { + re = regexp.MustCompile(fmt.Sprintf(`(?i)%s\(([0-9]*)?\)`, baseType)) + out = re.ReplaceAllString(out, baseType) + } + re = regexp.MustCompile(`(?i)engine[\s]*=[\s]*innodb.*$`) + out = re.ReplaceAllString(out, "") + return out +} + +func waitForQueryResult(t *testing.T, dbc binlogplayer.DBClient, query, val string) { + tmr := time.NewTimer(1 * time.Second) + defer tmr.Stop() + for { + res, err := dbc.ExecuteFetch(query, 1) + assert.NoError(t, err) + assert.Equal(t, 1, len(res.Rows)) + if res.Rows[0][0].ToString() == val { + return + } + select { + case <-tmr.C: + t.Fatalf("query %s did not return expected value of %s", query, val) + default: + time.Sleep(50 * time.Millisecond) + } + } +} +>>>>>>> a159f18719 (copy over existing vreplication rows copied to local counter if resuming from another tablet (#13949))