Skip to content

Commit

Permalink
add vtgate flag that explicitly allows vstream copy
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Sep 7, 2023
1 parent e87c86a commit f3d3ae0
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 2 deletions.
7 changes: 7 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"

Expand Down Expand Up @@ -540,6 +541,12 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
log.Infof("Starting to vstream from %s", tablet.Alias.String())
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
var vstreamCreatedOnce sync.Once

if !*allowVstreamCopy && (sgtid.Gtid == "" || len(sgtid.TablePKs) > 0) {
// We are attempting a vstream copy, but are not allowed (temporary until we can properly support RDONLYs for bootstrapping)
return vterrors.NewErrorf(vtrpc.Code_UNIMPLEMENTED, vterrors.NotSupportedYet, "vstream copy is not currently supported")
}

err = tabletConn.VStream(ctx, target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
Expand Down
48 changes: 48 additions & 0 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,54 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
}
}

func TestVstreamCopy(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cell := "aa"
ks := "TestVStreamCopy"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)

st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, "aa")
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
commit := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_COMMIT},
}
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa"))
sbc0.AddVStreamEvents(commit, nil)
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc"))
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error"))
var count sync2.AtomicInt32
count.Set(0)
// empty gtid id means no start position = bootstrapping/vstream copy
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "",
}},
}
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
require.Error(t, err)
require.Equal(t, err.Error(), "vstream copy is not currently supported")

// force set vtgate flag to allow vstream copy
*allowVstreamCopy = true
err = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
require.Equal(t, err.Error(), "final error")
}

func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
gw := NewTabletGateway(context.Background(), hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var (
defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable")
dbDDLPlugin = flag.String("dbddl_plugin", "fail", "controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service")
noScatter = flag.Bool("no_scatter", false, "when set to true, the planner will fail instead of producing a plan that includes scatter queries")
allowVstreamCopy = flag.Bool("allow_vstream_copy", false, "when set to false, vstream copy will not be allowed - temporary until we can properly support RDONLY for this")

// TODO(deepthi): change these two vars to unexported and move to healthcheck.go when LegacyHealthcheck is removed

Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se
// buildTablePlan identifies the tables for the copy phase and creates the plans which consist of the lastPK seen
// for a table and its Rule (for filtering purposes by the vstreamer engine)
// it can be called
// the first time, with just the filter and an empty pos
// during a restart, with both the filter and list of TableLastPK from the vgtid
//
// the first time, with just the filter and an empty pos
// during a restart, with both the filter and list of TableLastPK from the vgtid
func (uvs *uvstreamer) buildTablePlan() error {
uvs.plans = make(map[string]*tablePlan)
tableLastPKs := make(map[string]*binlogdatapb.TableLastPK)
Expand Down

0 comments on commit f3d3ae0

Please sign in to comment.