Skip to content

Commit

Permalink
evalengine: remove slow tiny weights
Browse files Browse the repository at this point in the history
Signed-off-by: Vicent Marti <[email protected]>
  • Loading branch information
vmg committed Nov 6, 2023
1 parent af65e3a commit 946eb31
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 32 deletions.
14 changes: 1 addition & 13 deletions go/vt/vtgate/engine/memory_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,6 @@ func (ms *MemorySort) SetTruncateColumnCount(count int) {
ms.TruncateColumnCount = count
}

func PanicHandler(err *error) {
if r := recover(); r != nil {
badness, ok := r.(error)
if !ok {
panic(r)
}

*err = badness
}
}

// TryExecute satisfies the Primitive interface.
func (ms *MemorySort) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
count, err := ms.fetchCount(ctx, vcursor, bindVars)
Expand All @@ -97,7 +86,7 @@ func (ms *MemorySort) TryExecute(ctx context.Context, vcursor VCursor, bindVars

// TryStreamExecute satisfies the Primitive interface.
func (ms *MemorySort) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) (err error) {
defer PanicHandler(&err)
defer evalengine.PanicHandler(&err)

count, err := ms.fetchCount(ctx, vcursor, bindVars)
if err != nil {
Expand All @@ -117,7 +106,6 @@ func (ms *MemorySort) TryStreamExecute(ctx context.Context, vcursor VCursor, bin
if err := cb(&sqltypes.Result{Fields: qr.Fields}); err != nil {
return err
}
sorter.SetFields(qr.Fields)
}
for _, row := range qr.Rows {
sorter.Push(row)
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtgate/engine/merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ms *MergeSort) GetFields(ctx context.Context, vcursor VCursor, bindVars ma

// TryStreamExecute performs a streaming exec.
func (ms *MergeSort) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) (err error) {
defer PanicHandler(&err)
defer evalengine.PanicHandler(&err)

var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
Expand Down Expand Up @@ -104,7 +104,6 @@ func (ms *MergeSort) TryStreamExecute(ctx context.Context, vcursor VCursor, bind
if err := callback(&sqltypes.Result{Fields: fields}); err != nil {
return err
}
merge.SetFields(fields)
}

var errs []error
Expand Down
21 changes: 4 additions & 17 deletions go/vt/vtgate/evalengine/api_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,23 +311,15 @@ type Sorter struct {
Compare Comparison
Limit int

weights []tinyWeighter
rows []sqltypes.Row
heap bool
rows []sqltypes.Row
heap bool
}

func (s *Sorter) Len() int {
return len(s.rows)
}

func (s *Sorter) SetFields(f []*querypb.Field) {
s.weights = s.Compare.tinyWeighters(f)
}

func (s *Sorter) Push(row sqltypes.Row) {
for _, w := range s.weights {
w.apply(&row[w.col])
}
if len(s.rows) < s.Limit {
s.rows = append(s.rows, row)
return
Expand Down Expand Up @@ -367,9 +359,8 @@ type mergeRow struct {
type Merger struct {
Compare Comparison

weights []tinyWeighter
rows []mergeRow
less func(a, b mergeRow) bool
rows []mergeRow
less func(a, b mergeRow) bool
}

func (m *Merger) Len() int {
Expand All @@ -383,10 +374,6 @@ func (m *Merger) Init() {
heapify(m.rows, m.less)
}

func (m *Merger) SetFields(f []*querypb.Field) {
m.weights = m.Compare.tinyWeighters(f)
}

func (m *Merger) Push(row sqltypes.Row, source int) {
m.rows = append(m.rows, mergeRow{row, source})
if m.less != nil {
Expand Down

0 comments on commit 946eb31

Please sign in to comment.