Skip to content

Commit

Permalink
pass in vstream flag to executor and manager
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Sep 7, 2023
1 parent 8cc0ecd commit 27bf537
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion, false)

return nil
}
Expand Down
31 changes: 18 additions & 13 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ type Executor struct {

// allowScatter will fail planning if set to false and a plan contains any scatter queries
allowScatter bool
// allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream.
// This is temporary until RDONLYs are properly supported for bootstrapping.
allowVstreamCopy bool
}

var executorOnce sync.Once
Expand All @@ -127,20 +130,22 @@ func NewExecutor(
schemaTracker SchemaInfo,
noScatter bool,
pv plancontext.PlannerVersion,
noVstreamCopy bool,
) *Executor {
e := &Executor{
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
pv: pv,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
allowScatter: !noScatter,
allowVstreamCopy: !noVstreamCopy,
pv: pv,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1318,7 +1323,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar
return err
}

vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell)
vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell, e.allowVstreamCopy)
vs := &vstream{
vgtid: vgtid,
tabletType: topodatapb.TabletType_PRIMARY,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
bad.VSchema = badVSchema

getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
// create a new session each time so that ShardSessions don't get re-used across tests
Expand All @@ -493,7 +493,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
// create a new session each time so that ShardSessions don't get re-used across tests
primarySession = &vtgatepb.Session{
TargetString: "@primary",
Expand Down Expand Up @@ -522,7 +522,7 @@ func createCustomExecutorSetValues(vschema string, values []*sqltypes.Result) (e
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
// create a new session each time so that ShardSessions don't get re-used across tests
primarySession = &vtgatepb.Session{
TargetString: "@primary",
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ func TestStreamSelectIN(t *testing.T) {
}

func createExecutor(serv *sandboxTopo, cell string, resolver *Resolver) *Executor {
return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
}

func TestSelectScatter(t *testing.T) {
Expand Down Expand Up @@ -2981,7 +2981,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
count++
}

executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)
before := runtime.NumGoroutine()

query := "select id, col from user order by id limit 2"
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestStreamSQLSharded(t *testing.T) {
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false)

sql := "stream * from sharded_user_msgs"
result, err := executorStreamMessages(executor, sql)
Expand Down
14 changes: 9 additions & 5 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type vstreamManager struct {
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string
// allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream.
// This is temporary until RDONLYs are properly supported for bootstrapping.
allowVstreamCopy bool

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
Expand Down Expand Up @@ -120,12 +123,13 @@ type journalEvent struct {
done chan struct{}
}

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")
return &vstreamManager{
resolver: resolver,
toposerv: serv,
cell: cell,
resolver: resolver,
toposerv: serv,
cell: cell,
allowVstreamCopy: allowVstreamCopy,
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
Expand Down Expand Up @@ -542,7 +546,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
var vstreamCreatedOnce sync.Once

if !*allowVstreamCopy && (sgtid.Gtid == "" || len(sgtid.TablePKs) > 0) {
if !vs.vsm.allowVstreamCopy && (sgtid.Gtid == "" || len(sgtid.TablePKs) > 0) {
// We are attempting a vstream copy, but are not allowed (temporary until we can properly support RDONLYs for bootstrapping)
return vterrors.NewErrorf(vtrpc.Code_UNIMPLEMENTED, vterrors.NotSupportedYet, "vstream copy is not currently supported")
}
Expand Down
43 changes: 22 additions & 21 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestVStreamSkew(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}}
want := int64(0)
var sbc0, sbc1 *sandboxconn.SandboxConn
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestVStreamEvents(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})

vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())

Expand Down Expand Up @@ -213,7 +213,7 @@ func TestVStreamChunks(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestVStreamManagerGetCells(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
ts, _ := st.GetTopoServer()

for _, tcase := range tcases {
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestVStreamMulti(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestVStreamRetry(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)

st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
commit := []*binlogdatapb.VEvent{
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())

Expand Down Expand Up @@ -561,7 +561,7 @@ func TestVStreamJournalOneToMany(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestVStreamJournalManyToOne(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -791,7 +791,7 @@ func TestVStreamJournalNoMatch(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())

Expand Down Expand Up @@ -920,7 +920,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"})
vsm := newTestVStreamManager(hc, st, "aa")
vsm := newTestVStreamManager(hc, st, "aa", true)
sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet())
sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func TestResolveVStreamParams(t *testing.T) {
name := "TestVStream"
_ = createSandbox(name)
hc := discovery.NewFakeHealthCheck(nil)
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa")
vsm := newTestVStreamManager(hc, new(sandboxTopo), "aa", true)
testcases := []struct {
input *binlogdatapb.VGtid
output *binlogdatapb.VGtid
Expand Down Expand Up @@ -1146,7 +1146,7 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, cell)
vsm := newTestVStreamManager(hc, st, cell, true)
sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
vgtid := &binlogdatapb.VGtid{
Expand Down Expand Up @@ -1196,7 +1196,6 @@ func TestVStreamIdleHeartbeat(t *testing.T) {
}

func TestVstreamCopy(t *testing.T) {
*allowVstreamCopy = false
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -1206,7 +1205,6 @@ func TestVstreamCopy(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)

st := getSandboxTopo(ctx, cell, ks, []string{"-20"})
vsm := newTestVStreamManager(hc, st, "aa")
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
commit := []*binlogdatapb.VEvent{
Expand All @@ -1228,26 +1226,29 @@ func TestVstreamCopy(t *testing.T) {
Gtid: "",
}},
}

// allowVstreamCopy = false
vsm := newTestVStreamManager(hc, st, "aa", false)
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
require.Error(t, err)
require.Equal(t, err.Error(), "vstream copy is not currently supported")

// force set vtgate flag to allow vstream copy
*allowVstreamCopy = true
err = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
// allowVstreamCopy = true
vsm2 := newTestVStreamManager(hc, st, "aa", true)
err = vsm2.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
count.Add(1)
return nil
})
require.Equal(t, err.Error(), "final error")
}

func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager {
func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager {
gw := NewTabletGateway(context.Background(), hc, serv, cell)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
return newVStreamManager(srvResolver, serv, cell)
return newVStreamManager(srvResolver, serv, cell, allowVstreamCopy)
}

func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid *binlogdatapb.VGtid, flags *vtgatepb.VStreamFlags) <-chan *binlogdatapb.VStreamResponse {
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable")
dbDDLPlugin = flag.String("dbddl_plugin", "fail", "controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service")
noScatter = flag.Bool("no_scatter", false, "when set to true, the planner will fail instead of producing a plan that includes scatter queries")
allowVstreamCopy = flag.Bool("allow_vstream_copy", true, "when set to false, vstream copy will not be allowed - temporary until we can properly support RDONLY for this")
noVstreamCopy = flag.Bool("no_vstream_copy", false, "when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this")

// TODO(deepthi): change these two vars to unexported and move to healthcheck.go when LegacyHealthcheck is removed

Expand Down Expand Up @@ -211,7 +211,7 @@ func Init(
sc := NewScatterConn("VttabletCall", tc, gw)
srvResolver := srvtopo.NewResolver(serv, gw, cell)
resolver := NewResolver(srvResolver, serv, cell, sc)
vsm := newVStreamManager(srvResolver, serv, cell)
vsm := newVStreamManager(srvResolver, serv, cell, !*noVstreamCopy)

var si SchemaInfo // default nil
var st *vtschema.Tracker
Expand Down Expand Up @@ -239,6 +239,7 @@ func Init(
si,
*noScatter,
pv,
*noVstreamCopy,
)

// connect the schema tracker with the vschema manager
Expand Down

0 comments on commit 27bf537

Please sign in to comment.