From c830723e4c880104e6504acd144afe5c6577f4e9 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 21 Mar 2024 11:34:09 -0400 Subject: [PATCH] VReplication: Fix workflow filtering in GetWorkflows RPC (#15524) Signed-off-by: Matt Lord --- .../vreplication_vtctldclient_cli_test.go | 49 +++++++++++++++++++ go/vt/vtctl/workflow/server.go | 5 +- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 30d9e2b2eb5..bca51512a3c 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -17,6 +17,7 @@ limitations under the License. package vreplication import ( + "encoding/json" "fmt" "slices" "strings" @@ -27,6 +28,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" "vitess.io/vitess/go/test/endtoend/cluster" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -59,6 +61,9 @@ func TestVtctldclientCLI(t *testing.T) { workflowName := "wf1" targetTabs := setupMinimalCustomerKeyspace(t) + t.Run("WorkflowList", func(t *testing.T) { + testWorkflowList(t, sourceKeyspaceName, targetKeyspaceName) + }) t.Run("MoveTablesCreateFlags1", func(t *testing.T) { testMoveTablesFlags1(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs) }) @@ -175,6 +180,32 @@ func testMoveTablesFlags3(t *testing.T, sourceKeyspace, targetKeyspace string, t require.False(t, checkTablesExist(t, "zone1-100", []string{"customer2"})) } +// Create two workflows in order to confirm that listing all workflows works. +func testWorkflowList(t *testing.T, sourceKeyspace, targetKeyspace string) { + createFlags := []string{"--auto-start=false", "--tablet-types", + "primary,rdonly", "--tablet-types-in-preference-order=true", "--all-cells", + } + wfNames := []string{"list1", "list2"} + tables := []string{"customer", "customer2"} + for i := range wfNames { + mt := createMoveTables(t, sourceKeyspace, targetKeyspace, wfNames[i], tables[i], createFlags, nil, nil) + defer mt.Cancel() + } + slices.Sort(wfNames) + + workflowNames := workflowList(targetKeyspace) + slices.Sort(workflowNames) + require.EqualValues(t, wfNames, workflowNames) + + workflows := getWorkflows(targetKeyspace) + workflowNames = make([]string, len(workflows.Workflows)) + for i := range workflows.Workflows { + workflowNames[i] = workflows.Workflows[i].Name + } + slices.Sort(workflowNames) + require.EqualValues(t, wfNames, workflowNames) +} + func createMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName, tables string, createFlags, completeFlags, switchFlags []string) iMoveTables { mt := newMoveTables(vc, &moveTablesWorkflow{ @@ -322,6 +353,24 @@ func getWorkflow(targetKeyspace, workflow string) *vtctldatapb.GetWorkflowsRespo return workflowResponse.CloneVT() } +func getWorkflows(targetKeyspace string) *vtctldatapb.GetWorkflowsResponse { + getWorkflowsOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("GetWorkflows", targetKeyspace, "--show-all", "--compact", "--include-logs=false") + require.NoError(vc.t, err) + var getWorkflowsResponse vtctldatapb.GetWorkflowsResponse + err = protojson.Unmarshal([]byte(getWorkflowsOutput), &getWorkflowsResponse) + require.NoError(vc.t, err) + return getWorkflowsResponse.CloneVT() +} + +func workflowList(targetKeyspace string) []string { + workflowListOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKeyspace, "list") + require.NoError(vc.t, err) + var workflowList []string + err = json.Unmarshal([]byte(workflowListOutput), &workflowList) + require.NoError(vc.t, err) + return workflowList +} + func checkTablesExist(t *testing.T, tabletAlias string, tables []string) bool { tablesResponse, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSchema", tabletAlias, "--tables", strings.Join(tables, ","), "--table-names-only") require.NoError(t, err) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 33d2a9869ab..f63fb69dea7 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -383,8 +383,9 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows span.Annotate("include_logs", req.IncludeLogs) span.Annotate("shards", req.Shards) - readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{ - IncludeWorkflows: []string{req.Workflow}, + readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{} + if req.Workflow != "" { + readReq.IncludeWorkflows = []string{req.Workflow} } if req.ActiveOnly { readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped}