Skip to content

Commit

Permalink
fix: vcursor test
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Dec 3, 2024
1 parent 231b8cb commit ce72f62
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 17 deletions.
8 changes: 5 additions & 3 deletions go/vt/vtgate/executorcontext/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package executorcontext
import (
"context"
"fmt"
"github.com/google/uuid"
"golang.org/x/exp/maps"
"io"
"sort"
"strings"
"sync/atomic"
"time"

"github.com/google/uuid"
"golang.org/x/exp/maps"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/config"
"vitess.io/vitess/go/mysql/sqlerror"
Expand Down Expand Up @@ -203,6 +204,7 @@ func NewVCursorImpl(
// connCollation = executor.env.CollationEnv().DefaultConnectionCharset()
// }
return &VCursorImpl{
config: cfg,
SafeSession: safeSession,
keyspace: keyspace,
tabletType: tabletType,
Expand Down Expand Up @@ -362,7 +364,7 @@ func (vc *VCursorImpl) RecordWarning(warning *querypb.QueryWarning) {

// IsShardRoutingEnabled implements the VCursor interface.
func (vc *VCursorImpl) IsShardRoutingEnabled() bool {
return vc.IsShardRoutingEnabled()
return vc.config.EnableShardRouting
}

func (vc *VCursorImpl) ReadTransaction(ctx context.Context, transactionID string) (*querypb.TransactionMetadata, error) {
Expand Down
140 changes: 126 additions & 14 deletions go/vt/vtgate/executorcontext/vcursor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package executorcontext

import (
"context"
"encoding/hex"
"errors"
"fmt"
"strconv"
Expand All @@ -28,6 +27,13 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"

"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -160,11 +166,12 @@ func TestDestinationKeyspace(t *testing.T) {
expectedError: ErrNoKeyspace.Error(),
}}

r, _, _, _, _ := createExecutorEnv(t)
for i, tc := range tests {
t.Run(strconv.Itoa(i)+tc.targetString, func(t *testing.T) {
session := NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString})
impl, _ := NewVCursorImpl(session, sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4)
impl, _ := NewVCursorImpl(session, sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, VCursorConfig{
DefaultTabletType: topodatapb.TabletType_PRIMARY,
})
impl.vschema = tc.vschema
dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier)
if tc.expectedError == "" {
Expand Down Expand Up @@ -224,10 +231,9 @@ func TestSetTarget(t *testing.T) {
expectedError: "can't execute the given command because you have an active transaction",
}}

// r, _, _, _, _ := createExecutorEnv(t)
for i, tc := range tests {
t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) {
cfg := VCursorConfig{}
cfg := VCursorConfig{DefaultTabletType: topodatapb.TabletType_PRIMARY}
vc, _ := NewVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, cfg)
vc.vschema = tc.vschema
err := vc.SetTarget(tc.targetString)
Expand Down Expand Up @@ -278,7 +284,11 @@ func TestKeyForPlan(t *testing.T) {
t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) {
ss := NewSafeSession(&vtgatepb.Session{InTransaction: false})
ss.SetTargetString(tc.targetString)
vc, err := NewVCursorImpl(ss, sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&FakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, VCursorConfig{})
cfg := VCursorConfig{
Collation: collations.CollationUtf8mb4ID,
DefaultTabletType: topodatapb.TabletType_PRIMARY,
}
vc, err := NewVCursorImpl(ss, sqlparser.MarginComments{}, &fakeExecutor{}, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&FakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, cfg)
require.NoError(t, err)
vc.vschema = tc.vschema

Expand Down Expand Up @@ -311,13 +321,13 @@ func TestFirstSortedKeyspace(t *testing.T) {
// TestSetExecQueryTimeout tests the SetExecQueryTimeout method.
// Validates the timeout value is set based on override rule.
func TestSetExecQueryTimeout(t *testing.T) {
executor, _, _, _, _ := createExecutorEnv(t)
safeSession := NewSafeSession(nil)
vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, executor, nil, nil, &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4)
vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, nil, nil, &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4, VCursorConfig{
// flag timeout
QueryTimeout: 20,
})
require.NoError(t, err)

// flag timeout
queryTimeout = 20
vc.SetExecQueryTimeout(nil)
require.Equal(t, 20*time.Millisecond, vc.queryTimeout)
require.NotNil(t, safeSession.Options.Timeout)
Expand All @@ -344,8 +354,8 @@ func TestSetExecQueryTimeout(t *testing.T) {
require.NotNil(t, safeSession.Options.Timeout)
require.EqualValues(t, 0, safeSession.Options.GetAuthoritativeTimeout())

// reset
queryTimeout = 0
// reset flag timeout
vc.config.QueryTimeout = 0
safeSession.SetQueryTimeout(0)
vc.SetExecQueryTimeout(nil)
require.Equal(t, 0*time.Millisecond, vc.queryTimeout)
Expand All @@ -354,10 +364,9 @@ func TestSetExecQueryTimeout(t *testing.T) {
}

func TestRecordMirrorStats(t *testing.T) {
executor, _, _, _, _ := createExecutorEnv(t)
safeSession := NewSafeSession(nil)
logStats := logstats.NewLogStats(context.Background(), t.Name(), "select 1", "", nil)
vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, executor, logStats, nil, &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4)
vc, err := NewVCursorImpl(safeSession, sqlparser.MarginComments{}, nil, logStats, nil, &vindexes.VSchema{}, nil, nil, false, querypb.ExecuteOptions_Gen4, VCursorConfig{})
require.NoError(t, err)

require.Zero(t, logStats.MirrorSourceExecuteTime)
Expand All @@ -370,3 +379,106 @@ func TestRecordMirrorStats(t *testing.T) {
require.Equal(t, 20*time.Millisecond, logStats.MirrorTargetExecuteTime)
require.ErrorContains(t, logStats.MirrorTargetError, "test error")
}

type fakeExecutor struct{}

func (f fakeExecutor) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, method string, session *SafeSession, s string, vars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver ResultsObserver) (qr *sqltypes.Result, errs []error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, observer ResultsObserver) []error {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedShard, query *querypb.BoundQuery, session *SafeSession, lockFuncType sqlparser.LockingFuncType) (*sqltypes.Result, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) Commit(ctx context.Context, safeSession *SafeSession) error {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ExecuteMessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, name string, callback func(*sqltypes.Result) error) error {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ExecuteVStream(ctx context.Context, rss []*srvtopo.ResolvedShard, filter *binlogdatapb.Filter, gtid string, callback func(evs []*binlogdatapb.VEvent) error) error {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ReleaseLock(ctx context.Context, session *SafeSession) error {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ShowVitessReplicationStatus(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ShowShards(ctx context.Context, filter *sqlparser.ShowFilter, destTabletType topodatapb.TabletType) (*sqltypes.Result, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ShowTablets(filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ShowVitessMetadata(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) SetVitessMetadata(ctx context.Context, name, value string) error {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) VSchema() *vindexes.VSchema {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) PlanPrepareStmt(ctx context.Context, vcursor *VCursorImpl, query string) (*engine.Plan, sqlparser.Statement, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) Environment() *vtenv.Environment {
return vtenv.NewTestEnv()
}

func (f fakeExecutor) ReadTransaction(ctx context.Context, transactionID string) (*querypb.TransactionMetadata, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) UnresolvedTransactions(ctx context.Context, targets []*querypb.Target) ([]*querypb.TransactionMetadata, error) {
// TODO implement me
panic("implement me")
}

func (f fakeExecutor) AddWarningCount(name string, value int64) {
// TODO implement me
panic("implement me")
}

var _ iExecute = (*fakeExecutor)(nil)

0 comments on commit ce72f62

Please sign in to comment.