Skip to content

Commit

Permalink
Cherry-pick 61b12e6 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Jan 10, 2024
1 parent 01d85ac commit 989700c
Show file tree
Hide file tree
Showing 9 changed files with 16,835 additions and 2 deletions.
344 changes: 343 additions & 1 deletion go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

882 changes: 882 additions & 0 deletions go/vt/proto/binlogdata/binlogdata_vtproto.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (
"context"
"errors"
"fmt"
<<<<<<< HEAD
"math"
=======
"net/url"
>>>>>>> 61b12e635a (VReplication: send unique key name to `rowstreamer`, which can then use with `FORCE INDEX` (#14916))
"strconv"
"strings"

Expand Down Expand Up @@ -534,6 +538,7 @@ func (v *VRepl) analyzeBinlogSource(ctx context.Context) {
SourceUniqueKeyColumns: encodeColumns(&v.chosenSourceUniqueKey.Columns),
TargetUniqueKeyColumns: encodeColumns(&v.chosenTargetUniqueKey.Columns),
SourceUniqueKeyTargetColumns: encodeColumns(v.chosenSourceUniqueKey.Columns.MappedNamesColumnList(v.sharedColumnsMap)),
ForceUniqueKey: url.QueryEscape(v.chosenSourceUniqueKey.Name),
}
if len(v.convertCharset) > 0 {
rule.ConvertCharset = v.convertCharset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
if rule.SourceUniqueKeyColumns != "" {
commentsList = append(commentsList, fmt.Sprintf(`ukColumns="%s"`, rule.SourceUniqueKeyColumns))
}
if rule.ForceUniqueKey != "" {
commentsList = append(commentsList, fmt.Sprintf(`ukForce="%s"`, rule.ForceUniqueKey))
}
if len(commentsList) > 0 {
comments := sqlparser.Comments{
fmt.Sprintf(`/*vt+ %s */`, strings.Join(commentsList, " ")),
Expand Down
8 changes: 7 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vstreamer
import (
"context"
"fmt"
"net/url"
"sync"
"time"

Expand Down Expand Up @@ -178,7 +179,12 @@ func (rs *rowStreamer) buildPlan() error {
return err
}
}

if s, found := directives.GetString("ukForce", ""); found {
st.PKIndexName, err = url.QueryUnescape(s)
if err != nil {
return err
}
}
rs.pkColumns, err = rs.buildPKColumns(st)
if err != nil {
return err
Expand Down
52 changes: 52 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,58 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

// TestRowStreamerQuery validates that the correct force index hint and order by is added to the rowstreamer query.
func TestRowStreamerQuery(t *testing.T) {
execStatements(t, []string{
"create table t1(id int, uk1 int, val varbinary(128), primary key(id), unique key uk2 (uk1))",
})
defer execStatements(t, []string{
"drop table t1",
})
engine.se.Reload(context.Background())
// We need to StreamRows, to get an initialized RowStreamer.
// Note that the query passed into StreamRows is overwritten while running the test.
err := engine.StreamRows(context.Background(), "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error {
type testCase struct {
directives string
sendQuerySuffix string
}
queryTemplate := "select %s id, uk1, val from t1"
getQuery := func(directives string) string {
return fmt.Sprintf(queryTemplate, directives)
}
sendQueryPrefix := "select /*+ MAX_EXECUTION_TIME(3600000) */ id, uk1, val from t1"
testCases := []testCase{
{"", "force index (`PRIMARY`) order by id"},
{"/*vt+ ukColumns=\"uk1\" ukForce=\"uk2\" */", "force index (`uk2`) order by uk1"},
{"/*vt+ ukForce=\"uk2\" */", "force index (`uk2`) order by uk1"},
{"/*vt+ ukColumns=\"uk1\" */", "order by uk1"},
}

for _, tc := range testCases {
t.Run(tc.directives, func(t *testing.T) {
var err error
var rs *rowStreamer
// Depending on the order of the test cases, the index of the engine.rowStreamers slice may change.
for _, rs2 := range engine.rowStreamers {
if rs2 != nil {
rs = rs2
break
}
}
require.NotNil(t, rs)
rs.query = getQuery(tc.directives)
err = rs.buildPlan()
require.NoError(t, err)
want := fmt.Sprintf("%s %s", sendQueryPrefix, tc.sendQuerySuffix)
require.Equal(t, want, rs.sendQuery)
})
}
return nil
})
require.NoError(t, err)
}

func TestStreamRowsScan(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down
3 changes: 3 additions & 0 deletions proto/binlogdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ message Rule {
// such columns need to have special transofrmation of the data, from an integral format into a
// string format. e.g. the value 0 needs to be converted to '0'.
map<string, bool> convert_int_to_enum = 8;

// ForceUniqueKey gives vtreamer a hint for `FORCE INDEX (...)` usage.
string force_unique_key = 9;
}

// Filter represents a list of ordered rules. The first
Expand Down
Loading

0 comments on commit 989700c

Please sign in to comment.