Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: VTGate executor with a runnable context package #17305

Merged
merged 14 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 29 additions & 28 deletions go/test/vschemawrapper/vschema_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ import (
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
econtext "vitess.io/vitess/go/vt/vtgate/executorcontext"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/semantics"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

var _ plancontext.VSchema = (*VSchemaWrapper)(nil)

// VSchemaWrapper is a wrapper around VSchema that implements the ContextVSchema interface.
// It is used in tests to provide a VSchema implementation.
type VSchemaWrapper struct {
Vcursor *econtext.VCursorImpl
V *vindexes.VSchema
Keyspace *vindexes.Keyspace
TabletType_ topodatapb.TabletType
Expand All @@ -53,6 +57,30 @@ type VSchemaWrapper struct {
Env *vtenv.Environment
}

func NewVschemaWrapper(
env *vtenv.Environment,
vschema *vindexes.VSchema,
builder func(string, plancontext.VSchema, string) (*engine.Plan, error),
) (*VSchemaWrapper, error) {
ss := econtext.NewAutocommitSession(&vtgatepb.Session{})
vcursor, err := econtext.NewVCursorImpl(ss, sqlparser.MarginComments{}, nil, nil, nil, vschema, nil, nil, nil, econtext.VCursorConfig{
Collation: env.CollationEnv().DefaultConnectionCharset(),
DefaultTabletType: topodatapb.TabletType_PRIMARY,
SetVarEnabled: true,
})
if err != nil {
return nil, err
}
return &VSchemaWrapper{
Env: env,
V: vschema,
Vcursor: vcursor,
TestBuilder: builder,
TabletType_: topodatapb.TabletType_PRIMARY,
SysVarEnabled: true,
}, nil
}

func (vw *VSchemaWrapper) GetPrepareData(stmtName string) *vtgatepb.PrepareData {
switch stmtName {
case "prep_one_param":
Expand Down Expand Up @@ -244,34 +272,7 @@ func (vw *VSchemaWrapper) FindView(tab sqlparser.TableName) sqlparser.SelectStat
}

func (vw *VSchemaWrapper) FindTableOrVindex(tab sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodatapb.TabletType, key.Destination, error) {
if tab.Qualifier.IsEmpty() && tab.Name.String() == "dual" {
ksName := vw.getActualKeyspace()
var ks *vindexes.Keyspace
if ksName == "" {
ks = vw.getfirstKeyspace()
ksName = ks.Name
} else {
ks = vw.V.Keyspaces[ksName].Keyspace
}
tbl := &vindexes.Table{
Name: sqlparser.NewIdentifierCS("dual"),
Keyspace: ks,
Type: vindexes.TypeReference,
}
return tbl, nil, ksName, topodatapb.TabletType_PRIMARY, nil, nil
}
destKeyspace, destTabletType, destTarget, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY)
if err != nil {
return nil, nil, destKeyspace, destTabletType, destTarget, err
}
if destKeyspace == "" {
destKeyspace = vw.getActualKeyspace()
}
table, vindex, err := vw.V.FindTableOrVindex(destKeyspace, tab.Name.String(), topodatapb.TabletType_PRIMARY)
if err != nil {
return nil, nil, destKeyspace, destTabletType, destTarget, err
}
return table, vindex, destKeyspace, destTabletType, destTarget, nil
return vw.Vcursor.FindTableOrVindex(tab)
}

func (vw *VSchemaWrapper) getfirstKeyspace() (ks *vindexes.Keyspace) {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate"
"vitess.io/vitess/go/vt/vtgate/engine"
econtext "vitess.io/vitess/go/vt/vtgate/executorcontext"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/queryservice"
Expand Down Expand Up @@ -235,7 +236,7 @@ func (vte *VTExplain) vtgateExecute(sql string) ([]*engine.Plan, map[string]*Tab
// This will ensure that the commit/rollback order is predictable.
vte.sortShardSession()

_, err := vte.vtgateExecutor.Execute(context.Background(), nil, "VtexplainExecute", vtgate.NewSafeSession(vte.vtgateSession), sql, nil)
_, err := vte.vtgateExecutor.Execute(context.Background(), nil, "VtexplainExecute", econtext.NewSafeSession(vte.vtgateSession), sql, nil)
if err != nil {
for _, tc := range vte.explainTopo.TabletConns {
tc.tabletQueries = nil
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
econtext "vitess.io/vitess/go/vt/vtgate/executorcontext"
)

// This file contains tests for all the autocommit code paths
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestAutocommitTransactionStarted(t *testing.T) {

// single shard query - no savepoint needed
sql := "update `user` set a = 2 where id = 1"
_, err := executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
_, err := executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)
require.Len(t, sbc1.Queries, 1)
require.Equal(t, sql, sbc1.Queries[0].Sql)
Expand All @@ -394,7 +394,7 @@ func TestAutocommitTransactionStarted(t *testing.T) {
// multi shard query - savepoint needed
sql = "update `user` set a = 2 where id in (1, 4)"
expectedSql := "update `user` set a = 2 where id in ::__vals"
_, err = executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
_, err = executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)
require.Len(t, sbc1.Queries, 2)
require.Contains(t, sbc1.Queries[0].Sql, "savepoint")
Expand All @@ -413,7 +413,7 @@ func TestAutocommitDirectTarget(t *testing.T) {
}
sql := "insert into `simple`(val) values ('val')"

_, err := executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
_, err := executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)

assertQueries(t, sbclookup, []*querypb.BoundQuery{{
Expand All @@ -434,7 +434,7 @@ func TestAutocommitDirectRangeTarget(t *testing.T) {
}
sql := "delete from sharded_user_msgs limit 1000"

_, err := executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
_, err := executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)

assertQueries(t, sbc1, []*querypb.BoundQuery{{
Expand All @@ -451,5 +451,5 @@ func autocommitExec(executor *Executor, sql string) (*sqltypes.Result, error) {
TransactionMode: vtgatepb.TransactionMode_MULTI,
}

return executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
return executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
}
Loading
Loading