Skip to content

Commit

Permalink
Move first VReplication usage
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Nov 2, 2023
1 parent 52daf18 commit e9b94d8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 39 deletions.
17 changes: 14 additions & 3 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
Expand All @@ -39,6 +40,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)
Expand Down Expand Up @@ -565,9 +567,18 @@ func (mz *materializer) startStreams(ctx context.Context) error {
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias)
}
query := fmt.Sprintf("update _vt.vreplication set state='Running' where db_name=%s and workflow=%s", encodeString(targetPrimary.DbName()), encodeString(mz.ms.Workflow))
if _, err := mz.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query)
req := &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: mz.ms.Workflow,
State: binlogdatapb.VReplicationWorkflowState_Running,
// Don't change anything else, so pass simulated NULLs.
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{
topodatapb.TabletType(textutil.SimulatedNullInt),
},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
}
if _, err := mz.tmc.UpdateVReplicationWorkflow(ctx, targetPrimary.Tablet, req); err != nil {
return vterrors.Wrap(err, "failed to update workflow")
}
return nil
})
Expand Down
67 changes: 31 additions & 36 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,19 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
where,
)

/*
readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
Workflow: req.Workflow,
}
callback := func(ctx context.Context, tablet *topo.TabletInfo) (*querypb.QueryResult, error) {
res, err := s.tmc.ReadVReplicationWorkflow(ctx, tablet.Tablet, readReq)
if err != nil {
return nil, err
}
return res, err
}
*/

vx := vexec.NewVExec(req.Keyspace, "", s.ts, s.tmc)
results, err := vx.QueryContext(ctx, query)
if err != nil {
Expand Down Expand Up @@ -1167,42 +1180,29 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L
if err != nil {
return err
}
p3qr, err := s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, fmt.Sprintf("select id, state, message, source from _vt.vreplication where workflow=%s and db_name=%s", encodeString(req.Name), encodeString(targetPrimary.DbName())))
res, err := s.tmc.ReadVReplicationWorkflow(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
Workflow: req.Name,
})
if err != nil {
return err
}
qr := sqltypes.Proto3ToResult(p3qr)
if qr == nil || len(qr.Rows) == 0 {
if res == nil {
return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", req.Name, targetPrimary.Alias)
}
for _, row := range qr.Rows {
id, err := row[0].ToCastInt64()
if err != nil {
return err
}
state := binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[row[1].ToString()])
message := row[2].ToString()
var bls binlogdatapb.BinlogSource
sourceBytes, err := row[3].ToBytes()
if err != nil {
return err
}
if err := prototext.Unmarshal(sourceBytes, &bls); err != nil {
return err
}
if bls.Filter == nil || len(bls.Filter.Rules) != 1 {
for _, stream := range res.Streams {
if stream.Bls.Filter == nil || len(stream.Bls.Filter.Rules) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid binlog source")
}
if vindex.Owner == "" || !bls.StopAfterCopy {
if vindex.Owner == "" || !stream.Bls.StopAfterCopy {
// If there's no owner or we've requested that the workflow NOT be stopped
// after the copy phase completes, then all streams need to be running.
if state != binlogdatapb.VReplicationWorkflowState_Running {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Running state: %v", id, targetShard.Keyspace(), targetShard.ShardName(), state)
if stream.State != binlogdatapb.VReplicationWorkflowState_Running {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Running state: %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State)
}
} else {
// If there is an owner, all streams need to be stopped after copy.
if state != binlogdatapb.VReplicationWorkflowState_Stopped || !strings.Contains(message, "Stopped after copy") {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Stopped after copy state: %v, %v", id, targetShard.Keyspace(), targetShard.ShardName(), state, message)
if stream.State != binlogdatapb.VReplicationWorkflowState_Stopped || !strings.Contains(stream.Message, "Stopped after copy") {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Stopped after copy state: %v, %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State, stream.Message)
}
}
}
Expand Down Expand Up @@ -2225,25 +2225,20 @@ func (s *Server) collectTargetStreams(ctx context.Context, mz *materializer) ([]
var shardTablets []string
var mu sync.Mutex
err := mz.forAllTargets(func(target *topo.ShardInfo) error {
var qrproto *querypb.QueryResult
var id int64
var err error
targetPrimary, err := s.ts.GetTablet(ctx, target.PrimaryAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias)
}
query := fmt.Sprintf("select id from _vt.vreplication where db_name=%s and workflow=%s", encodeString(targetPrimary.DbName()), encodeString(mz.ms.Workflow))
if qrproto, err = s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query)
res, err := s.tmc.ReadVReplicationWorkflow(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
Workflow: mz.ms.Workflow,
})
if err != nil {
return vterrors.Wrapf(err, "failed to read vreplication workflow on %+v", targetPrimary.Tablet)
}
qr := sqltypes.Proto3ToResult(qrproto)
for i := 0; i < len(qr.Rows); i++ {
id, err = qr.Rows[i][0].ToCastInt64()
if err != nil {
return err
}
for _, stream := range res.Streams {
mu.Lock()
shardTablets = append(shardTablets, fmt.Sprintf("%s:%d", target.ShardName(), id))
shardTablets = append(shardTablets, fmt.Sprintf("%s:%d", target.ShardName(), stream.Id))
mu.Unlock()
}
return nil
Expand Down

0 comments on commit e9b94d8

Please sign in to comment.