Skip to content

Commit

Permalink
Fix external file/pos e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 7, 2025
1 parent 11d0404 commit b559159
Showing 1 changed file with 6 additions and 10 deletions.
16 changes: 6 additions & 10 deletions go/test/endtoend/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ three streams although only two are required. This is to show that there can exi
streams from the same source. The main difference between an external source vs a vitess
source is that the source proto contains an "external_mysql" field instead of keyspace and shard.
That field is the key into the externalConnections section of the input yaml.
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('product', 'vt_commerce', 'filter:<rules:<match:\"product\" > > external_mysql:\"product\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('customer', 'vt_commerce', 'filter:<rules:<match:\"customer\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('orders', 'vt_commerce', 'filter:<rules:<match:\"orders\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
*/
func TestMigration(t *testing.T) {
yamlFile := startCluster(t)
Expand All @@ -155,7 +151,7 @@ func TestMigration(t *testing.T) {
migrate(t, "customer", "commerce", []string{"customer"})
migrate(t, "customer", "commerce", []string{"orders"})
vttablet := keyspaces["commerce"].Shards[0].Vttablets[0].VttabletProcess
waitForVReplicationToCatchup(t, vttablet, 1*time.Second)
waitForVReplicationToCatchup(t, vttablet, 30*time.Second)

testcases := []struct {
query string
Expand Down Expand Up @@ -217,11 +213,11 @@ func migrate(t *testing.T, fromdb, toks string, tables []string) {
var sqlEscaped bytes.Buffer
val.EncodeSQL(&sqlEscaped)
query := fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running')", tables[0], "vt_"+toks, sqlEscaped.String())
fmt.Printf("VReplicationExec: %s\n", query)
vttablet := keyspaces[toks].Shards[0].Vttablets[0].VttabletProcess
err := clusterInstance.VtctldClientProcess.ExecuteCommand("VReplicationExec", vttablet.TabletPath, query)
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, options) values"+
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running', '{}')", tables[0], "vt_"+toks, sqlEscaped.String())
fmt.Printf("VReplication insert: %s\n", query)
vttablet := keyspaces[toks].Shards[0].Vttablets[0].Alias
err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", vttablet, query)
require.NoError(t, err)
}

Expand Down

0 comments on commit b559159

Please sign in to comment.