Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-18.0] VReplication: Fix vtctldclient SwitchReads related bugs and move the TestBasicV2Workflows e2e test to vtctldclient (#15579) #15583

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func commandSwitchTraffic(cmd *cobra.Command, args []string) error {
req := &vtctldatapb.WorkflowSwitchTrafficRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
Cells: SwitchTrafficOptions.Cells,
TabletTypes: SwitchTrafficOptions.TabletTypes,
MaxReplicationLagAllowed: protoutil.DurationToProto(SwitchTrafficOptions.MaxReplicationLagAllowed),
Timeout: protoutil.DurationToProto(SwitchTrafficOptions.Timeout),
Expand Down
8 changes: 4 additions & 4 deletions go/streamlog/streamlog_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestFile(t *testing.T) {
logger.Send(&logMessage{"test 2"})

// Allow time for propagation
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, I assume 10 ms was too slow? These sleeps in CI have always been a bit of a headache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had seen it flake.


want := "test 1\ntest 2\n"
contents, _ := os.ReadFile(logPath)
Expand All @@ -227,7 +227,7 @@ func TestFile(t *testing.T) {
os.Rename(logPath, rotatedPath)

logger.Send(&logMessage{"test 3"})
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want = "test 1\ntest 2\ntest 3\n"
contents, _ = os.ReadFile(rotatedPath)
Expand All @@ -241,10 +241,10 @@ func TestFile(t *testing.T) {
if err := syscall.Kill(syscall.Getpid(), syscall.SIGUSR2); err != nil {
t.Logf("failed to send streamlog rotate signal: %v", err)
}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

logger.Send(&logMessage{"test 4"})
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

want = "test 1\ntest 2\ntest 3\n"
contents, _ = os.ReadFile(rotatedPath)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ func VttabletProcessInstance(port, grpcPort, tabletUID int, cell, shard, keyspac
Binary: "vttablet",
FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d_querylog.txt", tabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)),
Cell: cell,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, this is a pretty big miss.

TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID),
ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream,grpc-throttler",
LogDir: tmpDirectory,
Expand Down
13 changes: 12 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
Expand Down Expand Up @@ -318,6 +320,9 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
}
}
}
if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && attributeValue.Get("Pos").String() == "" {
done = false
}
} else {
done = false
}
Expand Down Expand Up @@ -351,7 +356,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
// as a CSV have secondary keys. This is useful when testing the
// --defer-secondary-keys flag to confirm that the secondary keys
// were re-added by the time the workflow hits the running phase.
// For a Reshard workflow, where no tables are specififed, pass
// For a Reshard workflow, where no tables are specified, pass
// an empty string for the tables and all tables in the target
// keyspace will be checked.
func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) {
Expand All @@ -371,6 +376,12 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro
}
}
for _, tablet := range tablets {
// Be sure that the schema is up to date.
err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodatapb.TabletAlias{
Cell: tablet.Cell,
Uid: uint32(tablet.TabletUID),
}))
require.NoError(t, err)
for _, table := range tableArr {
if schema.IsInternalOperationTableName(table) {
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)

func TestMoveTablesBuffering(t *testing.T) {
Expand All @@ -17,7 +16,7 @@ func TestMoveTablesBuffering(t *testing.T) {
defer vtgateConn.Close()
defer vc.TearDown(t)

currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
setupMinimalCustomerKeyspace(t)
tables := "loadtest"
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand Down Expand Up @@ -236,12 +235,12 @@ func (wf *workflow) create() {
cell := wf.tc.defaultCellName
switch typ {
case "movetables":
currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
sourceShards := strings.Join(wf.options.sourceShards, ",")
err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace,
strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", false)
case "reshard":
currentWorkflowType = wrangler.ReshardWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard
sourceShards := strings.Join(wf.options.sourceShards, ",")
targetShards := strings.Join(wf.options.targetShards, ",")
if targetShards == "" {
Expand Down Expand Up @@ -395,7 +394,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {

// Switch all traffic for the shard
wf80Dash.switchTraffic()
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
targetKs, wfName, shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)
currentCustomerCount = getCustomerCount(t, "")
Expand Down Expand Up @@ -455,7 +454,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) {
wfDash80.create()
wfDash80.switchTraffic()

expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
targetKs, wfName)
require.Equal(t, expectedSwitchOutput, lastOutput)

Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/tidwall/gjson"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)

// testCancel() starts and cancels a partial MoveTables for one of the shards which will be actually moved later on.
Expand Down Expand Up @@ -103,6 +102,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {
defer vtgateConn.Close()
defer vc.TearDown(t)
setupMinimalCustomerKeyspace(t)
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables

// Move customer table from unsharded product keyspace to
// sharded customer keyspace.
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

testCancel(t)

currentWorkflowType = wrangler.MoveTablesWorkflow
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
wfName := "partial80Dash"
sourceKs := "customer"
targetKs := "customer2"
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// Switch all traffic for the shard
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", false))
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
targetKs, wfName, shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)

Expand Down Expand Up @@ -293,7 +293,7 @@ func TestPartialMoveTablesBasic(t *testing.T) {

// Switch all traffic for the shard
require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", false))
expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n",
targetKs, wfName)
require.Equal(t, expectedSwitchOutput, lastOutput)

Expand Down
Loading
Loading