From e9b94d8d96421ea852f5ea67da34340212eae317 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 1 Nov 2023 23:23:34 -0400 Subject: [PATCH] Move first VReplication usage Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer.go | 17 +++++-- go/vt/vtctl/workflow/server.go | 67 +++++++++++++--------------- 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index f441871b711..d8b43b4b66c 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -25,6 +25,7 @@ import ( "time" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" @@ -39,6 +40,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -565,9 +567,18 @@ func (mz *materializer) startStreams(ctx context.Context) error { if err != nil { return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias) } - query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow=%s", encodeString(targetPrimary.DbName()), encodeString(mz.ms.Workflow)) - if _, err := mz.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil { - return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query) + req := &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: mz.ms.Workflow, + State: binlogdatapb.VReplicationWorkflowState_Running, + // Don't change anything else, so pass simulated NULLs. + Cells: textutil.SimulatedNullStringSlice, + TabletTypes: []topodatapb.TabletType{ + topodatapb.TabletType(textutil.SimulatedNullInt), + }, + OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt), + } + if _, err := mz.tmc.UpdateVReplicationWorkflow(ctx, targetPrimary.Tablet, req); err != nil { + return vterrors.Wrap(err, "failed to update workflow") } return nil }) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c8e33f29523..bc27a859345 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -408,6 +408,19 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows where, ) + /* + readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{ + Workflow: req.Workflow, + } + callback := func(ctx context.Context, tablet *topo.TabletInfo) (*querypb.QueryResult, error) { + res, err := s.tmc.ReadVReplicationWorkflow(ctx, tablet.Tablet, readReq) + if err != nil { + return nil, err + } + return res, err + } + */ + vx := vexec.NewVExec(req.Keyspace, "", s.ts, s.tmc) results, err := vx.QueryContext(ctx, query) if err != nil { @@ -1167,42 +1180,29 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L if err != nil { return err } - p3qr, err := s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, fmt.Sprintf("select id, state, message, source from _vt.vreplication where workflow=%s and db_name=%s", encodeString(req.Name), encodeString(targetPrimary.DbName()))) + res, err := s.tmc.ReadVReplicationWorkflow(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{ + Workflow: req.Name, + }) if err != nil { return err } - qr := sqltypes.Proto3ToResult(p3qr) - if qr == nil || len(qr.Rows) == 0 { + if res == nil { return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", req.Name, targetPrimary.Alias) } - for _, row := range qr.Rows { - id, err := row[0].ToCastInt64() - if err != nil { - return err - } - state := binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[row[1].ToString()]) - message := row[2].ToString() - var bls binlogdatapb.BinlogSource - sourceBytes, err := row[3].ToBytes() - if err != nil { - return err - } - if err := prototext.Unmarshal(sourceBytes, &bls); err != nil { - return err - } - if bls.Filter == nil || len(bls.Filter.Rules) != 1 { + for _, stream := range res.Streams { + if stream.Bls.Filter == nil || len(stream.Bls.Filter.Rules) != 1 { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid binlog source") } - if vindex.Owner == "" || !bls.StopAfterCopy { + if vindex.Owner == "" || !stream.Bls.StopAfterCopy { // If there's no owner or we've requested that the workflow NOT be stopped // after the copy phase completes, then all streams need to be running. - if state != binlogdatapb.VReplicationWorkflowState_Running { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Running state: %v", id, targetShard.Keyspace(), targetShard.ShardName(), state) + if stream.State != binlogdatapb.VReplicationWorkflowState_Running { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Running state: %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State) } } else { // If there is an owner, all streams need to be stopped after copy. - if state != binlogdatapb.VReplicationWorkflowState_Stopped || !strings.Contains(message, "Stopped after copy") { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Stopped after copy state: %v, %v", id, targetShard.Keyspace(), targetShard.ShardName(), state, message) + if stream.State != binlogdatapb.VReplicationWorkflowState_Stopped || !strings.Contains(stream.Message, "Stopped after copy") { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Stopped after copy state: %v, %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State, stream.Message) } } } @@ -2225,25 +2225,20 @@ func (s *Server) collectTargetStreams(ctx context.Context, mz *materializer) ([] var shardTablets []string var mu sync.Mutex err := mz.forAllTargets(func(target *topo.ShardInfo) error { - var qrproto *querypb.QueryResult - var id int64 var err error targetPrimary, err := s.ts.GetTablet(ctx, target.PrimaryAlias) if err != nil { return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias) } - query := fmt.Sprintf("select id from _vt.vreplication where db_name=%s and workflow=%s", encodeString(targetPrimary.DbName()), encodeString(mz.ms.Workflow)) - if qrproto, err = s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil { - return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query) + res, err := s.tmc.ReadVReplicationWorkflow(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{ + Workflow: mz.ms.Workflow, + }) + if err != nil { + return vterrors.Wrapf(err, "failed to read vreplication workflow on %+v", targetPrimary.Tablet) } - qr := sqltypes.Proto3ToResult(qrproto) - for i := 0; i < len(qr.Rows); i++ { - id, err = qr.Rows[i][0].ToCastInt64() - if err != nil { - return err - } + for _, stream := range res.Streams { mu.Lock() - shardTablets = append(shardTablets, fmt.Sprintf("%s:%d", target.ShardName(), id)) + shardTablets = append(shardTablets, fmt.Sprintf("%s:%d", target.ShardName(), stream.Id)) mu.Unlock() } return nil