Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: disable foreign_key_checks for bulk data cleanup #15261

Merged
merged 6 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions go/test/endtoend/vreplication/fk_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@ var (
create table parent(id int, name varchar(128), primary key(id)) engine=innodb;
create table child(id int, parent_id int, name varchar(128), primary key(id), foreign key(parent_id) references parent(id) on delete cascade) engine=innodb;
create view vparent as select * from parent;
create table t1(id int, name varchar(128), primary key(id)) engine=innodb;
create table t2(id int, t1id int, name varchar(128), primary key(id), foreign key(t1id) references t1(id) on delete cascade) engine=innodb;
`
initialFKData = `
insert into parent values(1, 'parent1'), (2, 'parent2');
insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');`
insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');
insert into t1 values(1, 't11'), (2, 't12');
insert into t2 values(1, 1, 't21'), (2, 1, 't22'), (3, 2, 't23');
`

initialFKSourceVSchema = `
{
"tables": {
"parent": {},
"child": {}
"child": {},
"t1": {},
"t2": {}
}
}
`
Expand Down Expand Up @@ -59,6 +66,22 @@ insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');
"name": "reverse_bits"
}
]
},
"t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"t2": {
"column_vindexes": [
{
"column": "t1id",
"name": "reverse_bits"
}
]
}
}
}
Expand Down
33 changes: 32 additions & 1 deletion go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const testWorkflowFlavor = workflowFlavorRandom

// TestFKWorkflow runs a MoveTables workflow with atomic copy for a db with foreign key constraints.
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
// i.e. with foreign_key_checks=0.
Expand Down Expand Up @@ -77,10 +79,13 @@ func TestFKWorkflow(t *testing.T) {
}()
go ls.simulateLoad()
}

targetKeyspace := "fktarget"
targetTabletId := 200
vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialFKTargetVSchema, "", 0, 0, targetTabletId, sourceKsOpts)

testFKCancel(t, vc)

workflowName := "fk"
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)

Expand All @@ -92,7 +97,7 @@ func TestFKWorkflow(t *testing.T) {
},
sourceKeyspace: sourceKeyspace,
atomicCopy: true,
}, workflowFlavorRandom)
}, testWorkflowFlavor)
mt.Create()

waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
Expand Down Expand Up @@ -122,6 +127,7 @@ func TestFKWorkflow(t *testing.T) {
cancel()
<-ch
}
mt.Complete()
}

func insertInitialFKData(t *testing.T) {
Expand All @@ -136,6 +142,9 @@ func insertInitialFKData(t *testing.T) {
log.Infof("Done inserting initial FK data")
waitForRowCount(t, vtgateConn, db, "parent", 2)
waitForRowCount(t, vtgateConn, db, "child", 3)
waitForRowCount(t, vtgateConn, db, "t1", 2)
waitForRowCount(t, vtgateConn, db, "t2", 3)

})
}

Expand Down Expand Up @@ -269,3 +278,25 @@ func (ls *fkLoadSimulator) exec(query string) *sqltypes.Result {
require.NotNil(t, qr)
return qr
}

// testFKCancel confirms that a MoveTables workflow which includes tables with foreign key
// constraints, where the parent table is lexicographically sorted before the child table and
// thus may be dropped first, can be successfully cancelled.
func testFKCancel(t *testing.T, vc *VitessCluster) {
var targetKeyspace = "fktarget"
var sourceKeyspace = "fksource"
var workflowName = "wf2"
var ksWorkflow = fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
mt := newMoveTables(vc, &moveTablesWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: workflowName,
targetKeyspace: targetKeyspace,
},
sourceKeyspace: sourceKeyspace,
atomicCopy: true,
}, testWorkflowFlavor)
mt.Create()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
mt.Cancel()
}
1,006 changes: 509 additions & 497 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Large diffs are not rendered by default.

42 changes: 38 additions & 4 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,10 @@
query = fmt.Sprintf("rename table %s.%s TO %s.%s", primaryDbName, tableNameEscaped, primaryDbName, renameName)
}
_, err = ts.ws.tmc.ExecuteFetchAsDba(ctx, source.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,

Check warning on line 516 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L513-L516

Added lines #L513 - L516 were not covered by tests
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v", source.GetPrimary().String(), tableName, err)
Expand Down Expand Up @@ -1067,7 +1068,6 @@
}

func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
log.Flush()
err := ts.ForAllTargets(func(target *MigrationTarget) error {
log.Infof("ForAllTargets: %+v", target)
for _, tableName := range ts.Tables() {
Expand All @@ -1083,12 +1083,12 @@
ts.Logger().Infof("%s: Dropping table %s.%s\n",
target.GetPrimary().String(), target.GetPrimary().DbName(), tableName)
res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,

Check warning on line 1089 in go/vt/vtctl/workflow/traffic_switcher.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/traffic_switcher.go#L1086-L1089

Added lines #L1086 - L1089 were not covered by tests
})
log.Infof("Removed target table with result: %+v", res)
log.Flush()
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v",
target.GetPrimary().String(), tableName, err)
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,12 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.
}

response, err := c.ExecuteFetchAsDba(ctx, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: req.Query,
DbName: topoproto.TabletDbName(tablet),
MaxRows: req.MaxRows,
DisableBinlogs: req.DisableBinlogs,
ReloadSchema: req.DisableBinlogs,
Query: req.Query,
DbName: topoproto.TabletDbName(tablet),
MaxRows: req.MaxRows,
DisableBinlogs: req.DisableBinlogs,
ReloadSchema: req.DisableBinlogs,
DisableForeignKeyChecks: req.DisableForeignKeyChecks,
})
if err != nil {
return nil, err
Expand Down
24 changes: 21 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,29 @@
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
}
// get a connection
// Get a connection.
conn, err := tm.MysqlDaemon.GetDbaConnection(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

// disable binlogs if necessary
// Disable binlogs if necessary.
if req.DisableBinlogs {
_, err := conn.ExecuteFetch("SET sql_log_bin = OFF", 0, false)
if err != nil {
return nil, err
}
}

// Disable FK checks if requested.
if req.DisableForeignKeyChecks {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = OFF", 0, false)
if err != nil {
return nil, err

Check warning on line 95 in go/vt/vttablet/tabletmanager/rpc_query.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/rpc_query.go#L95

Added line #L95 was not covered by tests
}
}

if req.DbName != "" {
// This execute might fail if db does not exist.
// Error is ignored because given query might create this database.
Expand Down Expand Up @@ -130,7 +138,17 @@
}
}

// re-enable binlogs if necessary
// Re-enable FK checks if necessary.
if req.DisableForeignKeyChecks && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = ON", 0, false)
if err != nil {
// If we can't reset the FK checks flag,
// let's just close the connection.
conn.Close()

Check warning on line 147 in go/vt/vttablet/tabletmanager/rpc_query.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/rpc_query.go#L147

Added line #L147 was not covered by tests
}
}

// Re-enable binlogs if necessary.
if req.DisableBinlogs && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET sql_log_bin = ON", 0, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestVDiffStats(t *testing.T) {
defer testStats.controllers[id].TableDiffPhaseTimings.Record(phase, time.Now())
time.Sleep(sleepTime)
}
want := int64(1.2 * float64(sleepTime)) // Allow 20% overhead for recording timing
want := 10 * sleepTime // Allow 10x overhead for recording timing on flaky test hosts
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This addresses unrelated test flakes seen.

record(string(initializing))
require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()[string(initializing)].Total())
record(string(pickingTablets))
Expand Down
15 changes: 13 additions & 2 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -1790,7 +1791,12 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType w
source.GetPrimary().String(), source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName)
query = fmt.Sprintf("rename table %s.%s TO %s.%s", primaryDbName, tableNameEscaped, primaryDbName, renameName)
}
_, err = ts.wr.ExecuteFetchAsDba(ctx, source.GetPrimary().Alias, query, 1, false, true)
_, err = ts.wr.tmc.ExecuteFetchAsDba(ctx, source.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v", source.GetPrimary().String(), tableName, err)
return err
Expand Down Expand Up @@ -1896,7 +1902,12 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableName)
ts.Logger().Infof("%s: Dropping table %s.%s\n",
target.GetPrimary().String(), target.GetPrimary().DbName(), tableName)
_, err = ts.wr.ExecuteFetchAsDba(ctx, target.GetPrimary().Alias, query, 1, false, true)
_, err = ts.wr.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v",
target.GetPrimary().String(), tableName, err)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,10 @@ func testTableMigrateOneToMany(t *testing.T, keepData, keepRoutingRules bool) {
tme.dbTargetClients[0].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", &sqltypes.Result{}, nil)
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = OFF", &sqltypes.Result{})
tme.tmeDB.AddQuery(fmt.Sprintf("rename table `vt_ks1`.`t1` TO `vt_ks1`.`%s`", getRenameFileName("t1")), &sqltypes.Result{})
tme.tmeDB.AddQuery(fmt.Sprintf("rename table `vt_ks1`.`t2` TO `vt_ks1`.`%s`", getRenameFileName("t2")), &sqltypes.Result{})
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = ON", &sqltypes.Result{})
tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", &sqltypes.Result{}, nil) //
tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", &sqltypes.Result{}, nil)
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,12 +871,14 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv, params *VReplic
tme.dbSourceClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)
tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=1", state)
tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = OFF", &sqltypes.Result{})
tme.tmeDB.AddQuery("USE `vt_ks1`", noResult)
tme.tmeDB.AddQuery("USE `vt_ks2`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks1`.`t1`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks1`.`t2`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks2`.`t1`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks2`.`t2`", noResult)
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = ON", &sqltypes.Result{})
tme.tmeDB.AddQuery("update _vt.vreplication set message='Picked source tablet: cell:\"cell1\" uid:10 ' where id=1", noResult)
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", noResult)
Expand Down
1 change: 1 addition & 0 deletions proto/tabletmanagerdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ message ExecuteFetchAsDbaRequest {
uint64 max_rows = 3;
bool disable_binlogs = 4;
bool reload_schema = 5;
bool disable_foreign_key_checks = 6;
}

message ExecuteFetchAsDbaResponse {
Expand Down
Loading
Loading