Skip to content

Commit

Permalink
test: Add/Fix unit tests for updating message with UpdateVReplication…
Browse files Browse the repository at this point in the history
…Workflow

Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Jan 9, 2025
1 parent f6f42ba commit f63d9f3
Showing 1 changed file with 59 additions and 17 deletions.
76 changes: 59 additions & 17 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ const (
readWorkflowsLimited = "select workflow, id, source, pos, stop_pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, message, db_name, rows_copied, tags, time_heartbeat, workflow_type, time_throttled, component_throttled, workflow_sub_type, defer_secondary_keys, options from _vt.vreplication where db_name = '%s' and workflow in ('%s') group by workflow, id order by workflow, id"
readWorkflow = "select id, source, pos, stop_pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, message, db_name, rows_copied, tags, time_heartbeat, workflow_type, time_throttled, component_throttled, workflow_sub_type, defer_secondary_keys, options from _vt.vreplication where workflow = '%s' and db_name = '%s'"
readWorkflowConfig = "select id, source, cell, tablet_types, state, message from _vt.vreplication where workflow = '%s'"
updateWorkflow = "update _vt.vreplication set state = '%s', source = '%s', cell = '%s', tablet_types = '%s' where id in (%d)"
updateWorkflow = "update _vt.vreplication set state = '%s', source = '%s', cell = '%s', tablet_types = '%s', message = '%s' where id in (%d)"
getNonEmptyTableQuery = "select 1 from `%s` limit 1"
)

Expand Down Expand Up @@ -439,7 +439,7 @@ func TestMoveTablesUnsharded(t *testing.T) {
fmt.Sprintf("%d|%s|||Stopped|", vreplID, bls),
), nil)
ftc.vrdbClient.ExpectRequest(idQuery, idRes, nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updateWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String(), bls, "", "", vreplID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updateWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String(), bls, "", "", "", vreplID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(getVReplicationRecord, vreplID),
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down Expand Up @@ -702,7 +702,7 @@ func TestMoveTablesSharded(t *testing.T) {
fmt.Sprintf("%d|%s|||Stopped|", vreplID, bls),
), nil)
ftc.vrdbClient.ExpectRequest(idQuery, idRes, nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updateWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String(), bls, "", "", vreplID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updateWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String(), bls, "", "", "", vreplID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(getVReplicationRecord, vreplID),
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down Expand Up @@ -892,6 +892,13 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
),
fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID, blsStr, cells[0], tabletTypes[0]),
)
selectResNonEmptyMessage := sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|cell|tablet_types|state|message",
"int64|varchar|varchar|varchar|varchar|varbinary",
),
fmt.Sprintf("%d|%s|%s|%s|Running|initial test message", vreplID, blsStr, cells[0], tabletTypes[0]),
)

idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a",
sqltypes.Int64BindVariable(int64(vreplID)))
Expand All @@ -913,10 +920,11 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
copying := sqltypes.MakeTestResult(copyStatusFields, "1")

tests := []struct {
name string
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
query string
isCopying bool
name string
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
query string
isCopying bool
initiallyNonEmptyMessage bool
}{
{
name: "update cells",
Expand All @@ -925,7 +933,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
Cells: []string{"zone2"},
// TabletTypes is an empty value, so the current value should be cleared
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '', message = '' where id in (%d)`,
keyspace, shard, "zone2", vreplID),
},
{
Expand All @@ -935,7 +943,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
Cells: []string{"zone3"},
TabletTypes: textutil.SimulatedNullTabletTypeSlice, // So keep the current value of replica
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s', message = '' where id in (%d)`,
keyspace, shard, "zone3", tabletTypes[0], vreplID),
},
{
Expand All @@ -945,17 +953,28 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
TabletSelectionPreference: &inOrder,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA},
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '', tablet_types = '%s', message = '' where id in (%d)`,
keyspace, shard, "in_order:rdonly,replica", vreplID),
},
{
name: "update tablet_types, initially non-empty message",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
TabletSelectionPreference: &inOrder,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA},
},
initiallyNonEmptyMessage: true,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '', tablet_types = '%s', message = '%s' where id in (%d)`,
keyspace, shard, "in_order:rdonly,replica", "initial test message", vreplID),
},
{
name: "update tablet_types, NULL cells",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
Cells: textutil.SimulatedNullStringSlice, // So keep the current value of zone1
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY},
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s', message = '' where id in (%d)`,
keyspace, shard, cells[0], "rdonly", vreplID),
},
{
Expand All @@ -964,7 +983,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
Workflow: workflow,
OnDdl: &exec,
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}} on_ddl:%s', cell = '', tablet_types = '', message = '' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID),
},
{
Expand All @@ -975,7 +994,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_PRIMARY},
OnDdl: &execIgnore,
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}} on_ddl:%s', cell = '%s', tablet_types = '%s', message = '' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
},
{
Expand All @@ -986,9 +1005,28 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
},
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s', message = '' where id in (%d)`,
binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
{
name: "update message",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
Message: ptr.Of("test message"),
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '', tablet_types = '', message = '%s' where id in (%d)`,
keyspace, shard, "test message", vreplID),
},
{
name: "update message, initially non-empty message",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
Message: ptr.Of("test message"),
},
initiallyNonEmptyMessage: true,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '', tablet_types = '', message = '%s' where id in (%d)`,
keyspace, shard, "test message", vreplID),
},
{
name: "update to running while copying",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Expand All @@ -998,7 +1036,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
},
isCopying: true,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '%s', message = '' where id in (%d)`,
keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
{
Expand All @@ -1011,7 +1049,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
"password": "secret",
},
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '', options = json_set(options, '$.config', json_object(), '$.config."password"', 'secret', '$.config."user"', 'admin') where id in (%d)`,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}', cell = '%s', tablet_types = '', message = '', options = json_set(options, '$.config', json_object(), '$.config."password"', 'secret', '$.config."user"', 'admin') where id in (%d)`,
keyspace, shard, "zone2", vreplID),
},
}
Expand All @@ -1033,7 +1071,11 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {

// These are the same for each RPC call.
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
if tt.initiallyNonEmptyMessage {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectResNonEmptyMessage, nil)
} else {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
}
if tt.request.State == nil || *tt.request.State == binlogdatapb.VReplicationWorkflowState_Running {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
if tt.isCopying {
Expand Down

0 comments on commit f63d9f3

Please sign in to comment.