Skip to content

Commit

Permalink
refactor: VTGate executor with a runnable context package (vitessio#1…
Browse files Browse the repository at this point in the history
…7305)

Signed-off-by: Harshit Gangal <[email protected]>
Signed-off-by: Andres Taylor <[email protected]>
Co-authored-by: Andres Taylor <[email protected]>
  • Loading branch information
harshit-gangal and systay authored Dec 4, 2024
1 parent 2da3893 commit ab7b516
Show file tree
Hide file tree
Showing 34 changed files with 1,610 additions and 1,278 deletions.
57 changes: 29 additions & 28 deletions go/test/vschemawrapper/vschema_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ import (
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
econtext "vitess.io/vitess/go/vt/vtgate/executorcontext"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/semantics"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

var _ plancontext.VSchema = (*VSchemaWrapper)(nil)

// VSchemaWrapper is a wrapper around VSchema that implements the ContextVSchema interface.
// It is used in tests to provide a VSchema implementation.
type VSchemaWrapper struct {
Vcursor *econtext.VCursorImpl
V *vindexes.VSchema
Keyspace *vindexes.Keyspace
TabletType_ topodatapb.TabletType
Expand All @@ -53,6 +57,30 @@ type VSchemaWrapper struct {
Env *vtenv.Environment
}

func NewVschemaWrapper(
env *vtenv.Environment,
vschema *vindexes.VSchema,
builder func(string, plancontext.VSchema, string) (*engine.Plan, error),
) (*VSchemaWrapper, error) {
ss := econtext.NewAutocommitSession(&vtgatepb.Session{})
vcursor, err := econtext.NewVCursorImpl(ss, sqlparser.MarginComments{}, nil, nil, nil, vschema, nil, nil, nil, econtext.VCursorConfig{
Collation: env.CollationEnv().DefaultConnectionCharset(),
DefaultTabletType: topodatapb.TabletType_PRIMARY,
SetVarEnabled: true,
})
if err != nil {
return nil, err
}
return &VSchemaWrapper{
Env: env,
V: vschema,
Vcursor: vcursor,
TestBuilder: builder,
TabletType_: topodatapb.TabletType_PRIMARY,
SysVarEnabled: true,
}, nil
}

func (vw *VSchemaWrapper) GetPrepareData(stmtName string) *vtgatepb.PrepareData {
switch stmtName {
case "prep_one_param":
Expand Down Expand Up @@ -244,34 +272,7 @@ func (vw *VSchemaWrapper) FindView(tab sqlparser.TableName) sqlparser.SelectStat
}

func (vw *VSchemaWrapper) FindTableOrVindex(tab sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodatapb.TabletType, key.Destination, error) {
if tab.Qualifier.IsEmpty() && tab.Name.String() == "dual" {
ksName := vw.getActualKeyspace()
var ks *vindexes.Keyspace
if ksName == "" {
ks = vw.getfirstKeyspace()
ksName = ks.Name
} else {
ks = vw.V.Keyspaces[ksName].Keyspace
}
tbl := &vindexes.Table{
Name: sqlparser.NewIdentifierCS("dual"),
Keyspace: ks,
Type: vindexes.TypeReference,
}
return tbl, nil, ksName, topodatapb.TabletType_PRIMARY, nil, nil
}
destKeyspace, destTabletType, destTarget, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY)
if err != nil {
return nil, nil, destKeyspace, destTabletType, destTarget, err
}
if destKeyspace == "" {
destKeyspace = vw.getActualKeyspace()
}
table, vindex, err := vw.V.FindTableOrVindex(destKeyspace, tab.Name.String(), topodatapb.TabletType_PRIMARY)
if err != nil {
return nil, nil, destKeyspace, destTabletType, destTarget, err
}
return table, vindex, destKeyspace, destTabletType, destTarget, nil
return vw.Vcursor.FindTableOrVindex(tab)
}

func (vw *VSchemaWrapper) getfirstKeyspace() (ks *vindexes.Keyspace) {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate"
"vitess.io/vitess/go/vt/vtgate/engine"
econtext "vitess.io/vitess/go/vt/vtgate/executorcontext"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/queryservice"
Expand Down Expand Up @@ -235,7 +236,7 @@ func (vte *VTExplain) vtgateExecute(sql string) ([]*engine.Plan, map[string]*Tab
// This will ensure that the commit/rollback order is predictable.
vte.sortShardSession()

_, err := vte.vtgateExecutor.Execute(context.Background(), nil, "VtexplainExecute", vtgate.NewSafeSession(vte.vtgateSession), sql, nil)
_, err := vte.vtgateExecutor.Execute(context.Background(), nil, "VtexplainExecute", econtext.NewSafeSession(vte.vtgateSession), sql, nil)
if err != nil {
for _, tc := range vte.explainTopo.TabletConns {
tc.tabletQueries = nil
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
econtext "vitess.io/vitess/go/vt/vtgate/executorcontext"
)

// This file contains tests for all the autocommit code paths
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestAutocommitTransactionStarted(t *testing.T) {

// single shard query - no savepoint needed
sql := "update `user` set a = 2 where id = 1"
_, err := executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
_, err := executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)
require.Len(t, sbc1.Queries, 1)
require.Equal(t, sql, sbc1.Queries[0].Sql)
Expand All @@ -394,7 +394,7 @@ func TestAutocommitTransactionStarted(t *testing.T) {
// multi shard query - savepoint needed
sql = "update `user` set a = 2 where id in (1, 4)"
expectedSql := "update `user` set a = 2 where id in ::__vals"
_, err = executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
_, err = executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)
require.Len(t, sbc1.Queries, 2)
require.Contains(t, sbc1.Queries[0].Sql, "savepoint")
Expand All @@ -413,7 +413,7 @@ func TestAutocommitDirectTarget(t *testing.T) {
}
sql := "insert into `simple`(val) values ('val')"

_, err := executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
_, err := executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)

assertQueries(t, sbclookup, []*querypb.BoundQuery{{
Expand All @@ -434,7 +434,7 @@ func TestAutocommitDirectRangeTarget(t *testing.T) {
}
sql := "delete from sharded_user_msgs limit 1000"

_, err := executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
_, err := executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)

assertQueries(t, sbc1, []*querypb.BoundQuery{{
Expand All @@ -451,5 +451,5 @@ func autocommitExec(executor *Executor, sql string) (*sqltypes.Result, error) {
TransactionMode: vtgatepb.TransactionMode_MULTI,
}

return executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
return executor.Execute(context.Background(), nil, "TestExecute", econtext.NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
}
Loading

0 comments on commit ab7b516

Please sign in to comment.