Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: disable foreign_key_checks for bulk data cleanup #15261

Merged
merged 6 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions go/test/endtoend/vreplication/fk_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@ var (
create table parent(id int, name varchar(128), primary key(id)) engine=innodb;
create table child(id int, parent_id int, name varchar(128), primary key(id), foreign key(parent_id) references parent(id) on delete cascade) engine=innodb;
create view vparent as select * from parent;
create table t1(id int, name varchar(128), primary key(id)) engine=innodb;
create table t2(id int, t1id int, name varchar(128), primary key(id), foreign key(t1id) references t1(id) on delete cascade) engine=innodb;
`
initialFKData = `
insert into parent values(1, 'parent1'), (2, 'parent2');
insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');`
insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');
insert into t1 values(1, 't11'), (2, 't12');
insert into t2 values(1, 1, 't21'), (2, 1, 't22'), (3, 2, 't23');
`

initialFKSourceVSchema = `
{
"tables": {
"parent": {},
"child": {}
"child": {},
"t1": {},
"t2": {}
}
}
`
Expand Down Expand Up @@ -59,6 +66,22 @@ insert into child values(1, 1, 'child11'), (2, 1, 'child21'), (3, 2, 'child32');
"name": "reverse_bits"
}
]
},
"t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"t2": {
"column_vindexes": [
{
"column": "t1id",
"name": "reverse_bits"
}
]
}
}
}
Expand Down
33 changes: 32 additions & 1 deletion go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

const testWorkflowFlavor = workflowFlavorVtctl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed to workflowFlavorRandom and also tested locally with workflowFlavorVtctld

Copy link
Contributor Author

@mattlord mattlord Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested locally with workflowFlavorVtctld and it passes:

❯ git diff | cat
diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go
index b7a88aee65..b15856501a 100644
--- a/go/test/endtoend/vreplication/fk_test.go
+++ b/go/test/endtoend/vreplication/fk_test.go
@@ -34,7 +34,7 @@ import (
 	binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
 )

-const testWorkflowFlavor = workflowFlavorVtctl
+const testWorkflowFlavor = workflowFlavorVtctld

 // TestFKWorkflow runs a MoveTables workflow with atomic copy for a db with foreign key constraints.
 // It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,

 ❯ make build
Thu Feb 15 15:12:32 EST 2024: Building source tree

❯ go test -v -timeout 15m -run ^TestFKWorkflow$ vitess.io/vitess/go/test/endtoend/vreplication
VTDATAROOT is /opt/vtdataroot/vreple2e_687716
=== RUN   TestFKWorkflow
E0215 15:13:19.528356   41743 vtorc_process.go:103] configuration - {
	"Debug": true,
	"ListenAddress": ":0",
	"MySQLTopologyUser": "orc_client_user",
	"MySQLTopologyPassword": "orc_client_user_password",
	"MySQLReplicaUser": "vt_repl",
	"MySQLReplicaPassword": "",
	"RecoveryPeriodBlockSeconds": 1
}
=== RUN   TestFKWorkflow/insertInitialFKData
=== RUN   TestFKWorkflow/vtctlclient_vdiff_fktarget.fk
=== RUN   TestFKWorkflow/vtctlclient_vdiff_fktarget.fk#01
--- PASS: TestFKWorkflow (52.09s)
    --- PASS: TestFKWorkflow/insertInitialFKData (0.01s)
    --- PASS: TestFKWorkflow/vtctlclient_vdiff_fktarget.fk (5.06s)
    --- PASS: TestFKWorkflow/vtctlclient_vdiff_fktarget.fk#01 (5.06s)
PASS
ok  	vitess.io/vitess/go/test/endtoend/vreplication	53.235s

Will change it to random now and push that change.


// TestFKWorkflow runs a MoveTables workflow with atomic copy for a db with foreign key constraints.
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
// i.e. with foreign_key_checks=0.
Expand Down Expand Up @@ -77,10 +79,13 @@ func TestFKWorkflow(t *testing.T) {
}()
go ls.simulateLoad()
}

targetKeyspace := "fktarget"
targetTabletId := 200
vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialFKTargetVSchema, "", 0, 0, targetTabletId, sourceKsOpts)

testFKCancel(t, vc)

workflowName := "fk"
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)

Expand All @@ -92,7 +97,7 @@ func TestFKWorkflow(t *testing.T) {
},
sourceKeyspace: sourceKeyspace,
atomicCopy: true,
}, workflowFlavorRandom)
}, testWorkflowFlavor)
mt.Create()

waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
Expand Down Expand Up @@ -122,6 +127,7 @@ func TestFKWorkflow(t *testing.T) {
cancel()
<-ch
}
mt.Complete()
}

func insertInitialFKData(t *testing.T) {
Expand All @@ -136,6 +142,9 @@ func insertInitialFKData(t *testing.T) {
log.Infof("Done inserting initial FK data")
waitForRowCount(t, vtgateConn, db, "parent", 2)
waitForRowCount(t, vtgateConn, db, "child", 3)
waitForRowCount(t, vtgateConn, db, "t1", 2)
waitForRowCount(t, vtgateConn, db, "t2", 3)

})
}

Expand Down Expand Up @@ -269,3 +278,25 @@ func (ls *fkLoadSimulator) exec(query string) *sqltypes.Result {
require.NotNil(t, qr)
return qr
}

// testFKCancel confirms that a MoveTables with tables with foreign key constraints, such that the parent table is
// lexico-graphically before the child table, can be cancelled. In such cases the child tables (which come later in
// the sorted order) need to be dropped first.
func testFKCancel(t *testing.T, vc *VitessCluster) {
var targetKeyspace = "fktarget"
var sourceKeyspace = "fksource"
var workflowName = "wf2"
var ksWorkflow = fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
mt := newMoveTables(vc, &moveTablesWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: workflowName,
targetKeyspace: targetKeyspace,
},
sourceKeyspace: sourceKeyspace,
atomicCopy: true,
}, testWorkflowFlavor)
mt.Create()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
mt.Cancel()
}
1,006 changes: 509 additions & 497 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Large diffs are not rendered by default.

42 changes: 38 additions & 4 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,10 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T
query = fmt.Sprintf("rename table %s.%s TO %s.%s", primaryDbName, tableNameEscaped, primaryDbName, renameName)
}
_, err = ts.ws.tmc.ExecuteFetchAsDba(ctx, source.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v", source.GetPrimary().String(), tableName, err)
Expand Down Expand Up @@ -1067,7 +1068,6 @@ func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Cont
}

func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
log.Flush()
err := ts.ForAllTargets(func(target *MigrationTarget) error {
log.Infof("ForAllTargets: %+v", target)
for _, tableName := range ts.Tables() {
Expand All @@ -1083,12 +1083,12 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
ts.Logger().Infof("%s: Dropping table %s.%s\n",
target.GetPrimary().String(), target.GetPrimary().DbName(), tableName)
res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
log.Infof("Removed target table with result: %+v", res)
log.Flush()
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v",
target.GetPrimary().String(), tableName, err)
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,12 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.
}

response, err := c.ExecuteFetchAsDba(ctx, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: req.Query,
DbName: topoproto.TabletDbName(tablet),
MaxRows: req.MaxRows,
DisableBinlogs: req.DisableBinlogs,
ReloadSchema: req.DisableBinlogs,
Query: req.Query,
DbName: topoproto.TabletDbName(tablet),
MaxRows: req.MaxRows,
DisableBinlogs: req.DisableBinlogs,
ReloadSchema: req.DisableBinlogs,
DisableForeignKeyChecks: req.DisableForeignKeyChecks,
})
if err != nil {
return nil, err
Expand Down
24 changes: 21 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,29 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanag
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
}
// get a connection
// Get a connection.
conn, err := tm.MysqlDaemon.GetDbaConnection(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

// disable binlogs if necessary
// Disable binlogs if necessary.
if req.DisableBinlogs {
_, err := conn.ExecuteFetch("SET sql_log_bin = OFF", 0, false)
if err != nil {
return nil, err
}
}

// Disable FK checks if requested.
if req.DisableForeignKeyChecks {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = OFF", 0, false)
if err != nil {
return nil, err
}
}

if req.DbName != "" {
// This execute might fail if db does not exist.
// Error is ignored because given query might create this database.
Expand Down Expand Up @@ -130,7 +138,17 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanag
}
}

// re-enable binlogs if necessary
// Re-enable FK checks if necessary.
if req.DisableForeignKeyChecks && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = ON", 0, false)
if err != nil {
// If we can't reset the FK checks flag,
// let's just close the connection.
conn.Close()
}
}

// Re-enable binlogs if necessary.
if req.DisableBinlogs && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET sql_log_bin = ON", 0, false)
if err != nil {
Expand Down
15 changes: 13 additions & 2 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -1790,7 +1791,12 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType w
source.GetPrimary().String(), source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName)
query = fmt.Sprintf("rename table %s.%s TO %s.%s", primaryDbName, tableNameEscaped, primaryDbName, renameName)
}
_, err = ts.wr.ExecuteFetchAsDba(ctx, source.GetPrimary().Alias, query, 1, false, true)
_, err = ts.wr.tmc.ExecuteFetchAsDba(ctx, source.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v", source.GetPrimary().String(), tableName, err)
return err
Expand Down Expand Up @@ -1896,7 +1902,12 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableName)
ts.Logger().Infof("%s: Dropping table %s.%s\n",
target.GetPrimary().String(), target.GetPrimary().DbName(), tableName)
_, err = ts.wr.ExecuteFetchAsDba(ctx, target.GetPrimary().Alias, query, 1, false, true)
_, err = ts.wr.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(query),
MaxRows: 1,
ReloadSchema: true,
DisableForeignKeyChecks: true,
})
if err != nil {
ts.Logger().Errorf("%s: Error removing table %s: %v",
target.GetPrimary().String(), tableName, err)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,10 @@ func testTableMigrateOneToMany(t *testing.T, keepData, keepRoutingRules bool) {
tme.dbTargetClients[0].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", &sqltypes.Result{}, nil)
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = OFF", &sqltypes.Result{})
tme.tmeDB.AddQuery(fmt.Sprintf("rename table `vt_ks1`.`t1` TO `vt_ks1`.`%s`", getRenameFileName("t1")), &sqltypes.Result{})
tme.tmeDB.AddQuery(fmt.Sprintf("rename table `vt_ks1`.`t2` TO `vt_ks1`.`%s`", getRenameFileName("t2")), &sqltypes.Result{})
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = ON", &sqltypes.Result{})
tme.dbTargetClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", &sqltypes.Result{}, nil) //
tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks2' and workflow = 'test'", &sqltypes.Result{}, nil)
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,12 +871,14 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv, params *VReplic
tme.dbSourceClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)
tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=1", state)
tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = OFF", &sqltypes.Result{})
tme.tmeDB.AddQuery("USE `vt_ks1`", noResult)
tme.tmeDB.AddQuery("USE `vt_ks2`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks1`.`t1`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks1`.`t2`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks2`.`t1`", noResult)
tme.tmeDB.AddQuery("drop table `vt_ks2`.`t2`", noResult)
tme.tmeDB.AddQuery("SET SESSION foreign_key_checks = ON", &sqltypes.Result{})
tme.tmeDB.AddQuery("update _vt.vreplication set message='Picked source tablet: cell:\"cell1\" uid:10 ' where id=1", noResult)
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", noResult)
Expand Down
1 change: 1 addition & 0 deletions proto/tabletmanagerdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ message ExecuteFetchAsDbaRequest {
uint64 max_rows = 3;
bool disable_binlogs = 4;
bool reload_schema = 5;
bool disable_foreign_key_checks = 6;
}

message ExecuteFetchAsDbaResponse {
Expand Down
6 changes: 6 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading