Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Improve replication plan error messages #14752

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 27 additions & 32 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/sqlparser"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

Expand Down Expand Up @@ -573,16 +574,16 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "bad query",
}},
},
err: "syntax error at position 4 near 'bad'",
err: "syntax error at position 4 near 'bad' in query: bad query",
}, {
// not a select
input: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "update t1 set val=1",
Filter: "update t1 set val = 1",
}},
},
err: "unexpected: update t1 set val = 1",
err: "unsupported non-select statement in query: update t1 set val = 1",
}, {
// no distinct
input: &binlogdatapb.Filter{
Expand All @@ -591,7 +592,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select distinct c1 from t1",
}},
},
err: "unexpected: select distinct c1 from t1",
err: "unsupported distinct clause in query: select distinct c1 from t1",
}, {
// no ',' join
input: &binlogdatapb.Filter{
Expand All @@ -600,7 +601,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select * from t1, t2",
}},
},
err: "unexpected: select * from t1, t2",
err: "unsupported multi-table usage in query: select * from t1, t2",
}, {
// no join
input: &binlogdatapb.Filter{
Expand All @@ -609,7 +610,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select * from t1 join t2",
}},
},
err: "unexpected: select * from t1 join t2",
err: "unsupported from expression (*sqlparser.JoinTableExpr) in query: select * from t1 join t2",
}, {
// no subqueries
input: &binlogdatapb.Filter{
Expand All @@ -618,7 +619,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select * from (select * from t2) as a",
}},
},
err: "unexpected: select * from (select * from t2) as a",
err: "unsupported from source (*sqlparser.DerivedTable) in query: select * from (select * from t2) as a",
}, {
// cannot combine '*' with other
input: &binlogdatapb.Filter{
Expand All @@ -627,7 +628,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select *, c1 from t1",
}},
},
err: "unexpected: select *, c1 from t1",
err: "unsupported mix of '*' and columns in query: select *, c1 from t1",
}, {
// cannot combine '*' with other (different code path)
input: &binlogdatapb.Filter{
Expand All @@ -636,7 +637,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select c1, * from t1",
}},
},
err: "unexpected: *",
err: "invalid expression: * in query: select c1, * from t1",
}, {
// no distinct in func
input: &binlogdatapb.Filter{
Expand All @@ -645,7 +646,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select hour(distinct c1) as a from t1",
}},
},
err: "syntax error at position 21 near 'distinct'",
err: "syntax error at position 21 near 'distinct' in query: select hour(distinct c1) as a from t1",
}, {
// funcs need alias
input: &binlogdatapb.Filter{
Expand All @@ -654,7 +655,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select hour(c1) from t1",
}},
},
err: "expression needs an alias: hour(c1)",
err: "expression needs an alias: hour(c1) in query: select hour(c1) from t1",
}, {
// only count(*)
input: &binlogdatapb.Filter{
Expand All @@ -663,7 +664,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select count(c1) as c from t1",
}},
},
err: "only count(*) is supported: count(c1)",
err: "only count(*) is supported: count(c1) in query: select count(c1) as c from t1",
}, {
// no sum(*)
input: &binlogdatapb.Filter{
Expand All @@ -672,7 +673,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select sum(*) as c from t1",
}},
},
err: "syntax error at position 13",
err: "syntax error at position 13 in query: select sum(*) as c from t1",
}, {
// sum should have only one argument
input: &binlogdatapb.Filter{
Expand All @@ -681,7 +682,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select sum(a, b) as c from t1",
}},
},
err: "syntax error at position 14",
err: "syntax error at position 14 in query: select sum(a, b) as c from t1",
}, {
// no complex expr in sum
input: &binlogdatapb.Filter{
Expand All @@ -690,7 +691,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select sum(a + b) as c from t1",
}},
},
err: "unexpected: sum(a + b)",
err: "unsupported non-column name in sum clause: sum(a + b) in query: select sum(a + b) as c from t1",
}, {
// no complex expr in group by
input: &binlogdatapb.Filter{
Expand All @@ -699,7 +700,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select a from t1 group by a + 1",
}},
},
err: "unexpected: a + 1",
err: "unsupported non-column name or alias in group by clause: a + 1 in query: select a from t1 group by a + 1",
}, {
// group by does not reference alias
input: &binlogdatapb.Filter{
Expand All @@ -708,7 +709,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select a as b from t1 group by a",
}},
},
err: "group by expression does not reference an alias in the select list: a",
err: "group by expression does not reference an alias in the select list: a in query: select a as b from t1 group by a",
}, {
// cannot group by aggr
input: &binlogdatapb.Filter{
Expand All @@ -717,7 +718,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select count(*) as a from t1 group by a",
}},
},
err: "group by expression is not allowed to reference an aggregate expression: a",
err: "group by expression is not allowed to reference an aggregate expression: a in query: select count(*) as a from t1 group by a",
}}

PrimaryKeyInfos := map[string][]*ColumnInfo{
Expand All @@ -736,28 +737,22 @@ func TestBuildPlayerPlan(t *testing.T) {

for _, tcase := range testcases {
plan, err := buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, nil, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser())
gotPlan, _ := json.Marshal(plan)
wantPlan, _ := json.Marshal(tcase.plan)
if string(gotPlan) != string(wantPlan) {
t.Errorf("Filter(%v):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
}
gotErr := ""
if err != nil {
gotErr = err.Error()
}
if gotErr != tcase.err {
t.Errorf("Filter err(%v): %s, want %v", tcase.input, gotErr, tcase.err)
}
require.Equal(t, tcase.err, gotErr, "Filter err(%v): %s, want %v", tcase.input, gotErr, tcase.err)
gotPlan, _ := json.Marshal(plan)
wantPlan, _ := json.Marshal(tcase.plan)
require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)

plan, err = buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, copyState, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser())
if err != nil {
continue
}
gotPlan, _ = json.Marshal(plan)
wantPlan, _ = json.Marshal(tcase.planpk)
if string(gotPlan) != string(wantPlan) {
t.Errorf("Filter(%v,copyState):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
}
require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v,copyState):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
}
}

Expand Down
40 changes: 23 additions & 17 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ func MatchTable(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Ru
func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*ColumnInfo, lastpk *sqltypes.Result,
stats *binlogplayer.Stats, source *binlogdatapb.BinlogSource, collationEnv *collations.Environment, parser *sqlparser.Parser) (*TablePlan, error) {

planError := func(err error, query string) error {
// Use the error string here to ensure things are uniform across
// vterrors (from parse) and errors (all others).
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%s in query: %s", err.Error(), query)
}

filter := rule.Filter
query := filter
// generate equivalent select statement if filter is empty or a keyrange.
Expand All @@ -219,7 +225,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
}
sel, fromTable, err := analyzeSelectFrom(query, parser)
if err != nil {
return nil, err
return nil, planError(err, query)
}
sendRule := &binlogdatapb.Rule{
Match: fromTable,
Expand All @@ -235,10 +241,10 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
// If it's a "select *", we return a partial plan, and complete
// it when we get back field info from the stream.
if len(sel.SelectExprs) != 1 {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, planError(fmt.Errorf("unsupported mix of '*' and columns"), sqlparser.String(sel))
}
if !expr.TableName.IsEmpty() {
return nil, fmt.Errorf("unsupported qualifier for '*' expression: %v", sqlparser.String(expr))
return nil, planError(fmt.Errorf("unsupported qualifier for '*' expression"), sqlparser.String(expr))
}
sendRule.Filter = query
tablePlan := &TablePlan{
Expand Down Expand Up @@ -269,7 +275,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
}

if err := tpb.analyzeExprs(sel.SelectExprs); err != nil {
return nil, err
return nil, planError(err, sqlparser.String(sel))
}
// It's possible that the target table does not materialize all
// the primary keys of the source table. In such situations,
Expand All @@ -284,7 +290,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
}
}
if err := tpb.analyzeGroupBy(sel.GroupBy); err != nil {
return nil, err
return nil, planError(err, sqlparser.String(sel))
}
targetKeyColumnNames, err := textutil.SplitUnescape(rule.TargetUniqueKeyColumns, ",")
if err != nil {
Expand Down Expand Up @@ -388,21 +394,21 @@ func analyzeSelectFrom(query string, parser *sqlparser.Parser) (sel *sqlparser.S
}
sel, ok := statement.(*sqlparser.Select)
if !ok {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(statement))
return nil, "", fmt.Errorf("unsupported non-select statement")
}
if sel.Distinct {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, "", fmt.Errorf("unsupported distinct clause")
}
if len(sel.From) > 1 {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, "", fmt.Errorf("unsupported multi-table usage")
}
node, ok := sel.From[0].(*sqlparser.AliasedTableExpr)
if !ok {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, "", fmt.Errorf("unsupported from expression (%T)", sel.From[0])
}
fromTable := sqlparser.GetTableName(node.Expr)
if fromTable.IsEmpty() {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, "", fmt.Errorf("unsupported from source (%T)", node.Expr)
}
return sel, fromTable.String(), nil
}
Expand All @@ -421,7 +427,7 @@ func (tpb *tablePlanBuilder) analyzeExprs(selExprs sqlparser.SelectExprs) error
func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr, error) {
aliased, ok := selExpr.(*sqlparser.AliasedExpr)
if !ok {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(selExpr))
return nil, fmt.Errorf("invalid expression: %v", sqlparser.String(selExpr))
}
as := aliased.As
if as.IsEmpty() {
Expand Down Expand Up @@ -470,7 +476,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
switch fname := expr.Name.Lowered(); fname {
case "keyspace_id":
if len(expr.Exprs) != 0 {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return nil, fmt.Errorf("unsupported multiple keyspace_id expressions: %v", sqlparser.String(expr))
}
tpb.sendSelect.SelectExprs = append(tpb.sendSelect.SelectExprs, &sqlparser.AliasedExpr{Expr: aliased.Expr})
// The vstreamer responds with "keyspace_id" as the field name for this request.
Expand All @@ -480,7 +486,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
}
if expr, ok := aliased.Expr.(sqlparser.AggrFunc); ok {
if sqlparser.IsDistinct(expr) {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return nil, fmt.Errorf("unsupported distinct expression usage: %v", sqlparser.String(expr))
}
switch fname := expr.AggrName(); fname {
case "count":
Expand All @@ -491,11 +497,11 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
return cexpr, nil
case "sum":
if len(expr.GetArgs()) != 1 {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return nil, fmt.Errorf("unsupported multiple columns in sum clause: %v", sqlparser.String(expr))
}
innerCol, ok := expr.GetArg().(*sqlparser.ColName)
if !ok {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return nil, fmt.Errorf("unsupported non-column name in sum clause: %v", sqlparser.String(expr))
}
if !innerCol.Qualifier.IsEmpty() {
return nil, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(innerCol))
Expand All @@ -518,7 +524,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
case *sqlparser.Subquery:
return false, fmt.Errorf("unsupported subquery: %v", sqlparser.String(node))
case sqlparser.AggrFunc:
return false, fmt.Errorf("unexpected: %v", sqlparser.String(node))
return false, fmt.Errorf("unsupported aggregation function: %v", sqlparser.String(node))
}
return true, nil
}, aliased.Expr)
Expand All @@ -545,7 +551,7 @@ func (tpb *tablePlanBuilder) analyzeGroupBy(groupBy sqlparser.GroupBy) error {
for _, expr := range groupBy {
colname, ok := expr.(*sqlparser.ColName)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return fmt.Errorf("unsupported non-column name or alias in group by clause: %v", sqlparser.String(expr))
}
cexpr := tpb.findCol(colname.Name)
if cexpr == nil {
Expand Down
Loading