Skip to content

Commit

Permalink
VReplication: Improve replication plan error messages (#14752)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Dec 20, 2023
1 parent cb142a9 commit 4200295
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 49 deletions.
59 changes: 27 additions & 32 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/sqlparser"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

Expand Down Expand Up @@ -573,16 +574,16 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "bad query",
}},
},
err: "syntax error at position 4 near 'bad'",
err: "syntax error at position 4 near 'bad' in query: bad query",
}, {
// not a select
input: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "update t1 set val=1",
Filter: "update t1 set val = 1",
}},
},
err: "unexpected: update t1 set val = 1",
err: "unsupported non-select statement in query: update t1 set val = 1",
}, {
// no distinct
input: &binlogdatapb.Filter{
Expand All @@ -591,7 +592,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select distinct c1 from t1",
}},
},
err: "unexpected: select distinct c1 from t1",
err: "unsupported distinct clause in query: select distinct c1 from t1",
}, {
// no ',' join
input: &binlogdatapb.Filter{
Expand All @@ -600,7 +601,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select * from t1, t2",
}},
},
err: "unexpected: select * from t1, t2",
err: "unsupported multi-table usage in query: select * from t1, t2",
}, {
// no join
input: &binlogdatapb.Filter{
Expand All @@ -609,7 +610,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select * from t1 join t2",
}},
},
err: "unexpected: select * from t1 join t2",
err: "unsupported from expression (*sqlparser.JoinTableExpr) in query: select * from t1 join t2",
}, {
// no subqueries
input: &binlogdatapb.Filter{
Expand All @@ -618,7 +619,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select * from (select * from t2) as a",
}},
},
err: "unexpected: select * from (select * from t2) as a",
err: "unsupported from source (*sqlparser.DerivedTable) in query: select * from (select * from t2) as a",
}, {
// cannot combine '*' with other
input: &binlogdatapb.Filter{
Expand All @@ -627,7 +628,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select *, c1 from t1",
}},
},
err: "unexpected: select *, c1 from t1",
err: "unsupported mix of '*' and columns in query: select *, c1 from t1",
}, {
// cannot combine '*' with other (different code path)
input: &binlogdatapb.Filter{
Expand All @@ -636,7 +637,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select c1, * from t1",
}},
},
err: "unexpected: *",
err: "invalid expression: * in query: select c1, * from t1",
}, {
// no distinct in func
input: &binlogdatapb.Filter{
Expand All @@ -645,7 +646,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select hour(distinct c1) as a from t1",
}},
},
err: "syntax error at position 21 near 'distinct'",
err: "syntax error at position 21 near 'distinct' in query: select hour(distinct c1) as a from t1",
}, {
// funcs need alias
input: &binlogdatapb.Filter{
Expand All @@ -654,7 +655,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select hour(c1) from t1",
}},
},
err: "expression needs an alias: hour(c1)",
err: "expression needs an alias: hour(c1) in query: select hour(c1) from t1",
}, {
// only count(*)
input: &binlogdatapb.Filter{
Expand All @@ -663,7 +664,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select count(c1) as c from t1",
}},
},
err: "only count(*) is supported: count(c1)",
err: "only count(*) is supported: count(c1) in query: select count(c1) as c from t1",
}, {
// no sum(*)
input: &binlogdatapb.Filter{
Expand All @@ -672,7 +673,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select sum(*) as c from t1",
}},
},
err: "syntax error at position 13",
err: "syntax error at position 13 in query: select sum(*) as c from t1",
}, {
// sum should have only one argument
input: &binlogdatapb.Filter{
Expand All @@ -681,7 +682,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select sum(a, b) as c from t1",
}},
},
err: "syntax error at position 14",
err: "syntax error at position 14 in query: select sum(a, b) as c from t1",
}, {
// no complex expr in sum
input: &binlogdatapb.Filter{
Expand All @@ -690,7 +691,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select sum(a + b) as c from t1",
}},
},
err: "unexpected: sum(a + b)",
err: "unsupported non-column name in sum clause: sum(a + b) in query: select sum(a + b) as c from t1",
}, {
// no complex expr in group by
input: &binlogdatapb.Filter{
Expand All @@ -699,7 +700,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select a from t1 group by a + 1",
}},
},
err: "unexpected: a + 1",
err: "unsupported non-column name or alias in group by clause: a + 1 in query: select a from t1 group by a + 1",
}, {
// group by does not reference alias
input: &binlogdatapb.Filter{
Expand All @@ -708,7 +709,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select a as b from t1 group by a",
}},
},
err: "group by expression does not reference an alias in the select list: a",
err: "group by expression does not reference an alias in the select list: a in query: select a as b from t1 group by a",
}, {
// cannot group by aggr
input: &binlogdatapb.Filter{
Expand All @@ -717,7 +718,7 @@ func TestBuildPlayerPlan(t *testing.T) {
Filter: "select count(*) as a from t1 group by a",
}},
},
err: "group by expression is not allowed to reference an aggregate expression: a",
err: "group by expression is not allowed to reference an aggregate expression: a in query: select count(*) as a from t1 group by a",
}}

PrimaryKeyInfos := map[string][]*ColumnInfo{
Expand All @@ -736,28 +737,22 @@ func TestBuildPlayerPlan(t *testing.T) {

for _, tcase := range testcases {
plan, err := buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, nil, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser())
gotPlan, _ := json.Marshal(plan)
wantPlan, _ := json.Marshal(tcase.plan)
if string(gotPlan) != string(wantPlan) {
t.Errorf("Filter(%v):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
}
gotErr := ""
if err != nil {
gotErr = err.Error()
}
if gotErr != tcase.err {
t.Errorf("Filter err(%v): %s, want %v", tcase.input, gotErr, tcase.err)
}
require.Equal(t, tcase.err, gotErr, "Filter err(%v): %s, want %v", tcase.input, gotErr, tcase.err)
gotPlan, _ := json.Marshal(plan)
wantPlan, _ := json.Marshal(tcase.plan)
require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)

plan, err = buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, copyState, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser())
if err != nil {
continue
}
gotPlan, _ = json.Marshal(plan)
wantPlan, _ = json.Marshal(tcase.planpk)
if string(gotPlan) != string(wantPlan) {
t.Errorf("Filter(%v,copyState):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
}
require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v,copyState):\n%s, want\n%s", tcase.input, gotPlan, wantPlan)
}
}

Expand Down
40 changes: 23 additions & 17 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ func MatchTable(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Ru
func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*ColumnInfo, lastpk *sqltypes.Result,
stats *binlogplayer.Stats, source *binlogdatapb.BinlogSource, collationEnv *collations.Environment, parser *sqlparser.Parser) (*TablePlan, error) {

planError := func(err error, query string) error {
// Use the error string here to ensure things are uniform across
// vterrors (from parse) and errors (all others).
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%s in query: %s", err.Error(), query)
}

filter := rule.Filter
query := filter
// generate equivalent select statement if filter is empty or a keyrange.
Expand All @@ -219,7 +225,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
}
sel, fromTable, err := analyzeSelectFrom(query, parser)
if err != nil {
return nil, err
return nil, planError(err, query)
}
sendRule := &binlogdatapb.Rule{
Match: fromTable,
Expand All @@ -235,10 +241,10 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
// If it's a "select *", we return a partial plan, and complete
// it when we get back field info from the stream.
if len(sel.SelectExprs) != 1 {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, planError(fmt.Errorf("unsupported mix of '*' and columns"), sqlparser.String(sel))
}
if !expr.TableName.IsEmpty() {
return nil, fmt.Errorf("unsupported qualifier for '*' expression: %v", sqlparser.String(expr))
return nil, planError(fmt.Errorf("unsupported qualifier for '*' expression"), sqlparser.String(expr))
}
sendRule.Filter = query
tablePlan := &TablePlan{
Expand Down Expand Up @@ -269,7 +275,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
}

if err := tpb.analyzeExprs(sel.SelectExprs); err != nil {
return nil, err
return nil, planError(err, sqlparser.String(sel))
}
// It's possible that the target table does not materialize all
// the primary keys of the source table. In such situations,
Expand All @@ -284,7 +290,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum
}
}
if err := tpb.analyzeGroupBy(sel.GroupBy); err != nil {
return nil, err
return nil, planError(err, sqlparser.String(sel))
}
targetKeyColumnNames, err := textutil.SplitUnescape(rule.TargetUniqueKeyColumns, ",")
if err != nil {
Expand Down Expand Up @@ -388,21 +394,21 @@ func analyzeSelectFrom(query string, parser *sqlparser.Parser) (sel *sqlparser.S
}
sel, ok := statement.(*sqlparser.Select)
if !ok {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(statement))
return nil, "", fmt.Errorf("unsupported non-select statement")
}
if sel.Distinct {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, "", fmt.Errorf("unsupported distinct clause")
}
if len(sel.From) > 1 {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, "", fmt.Errorf("unsupported multi-table usage")
}
node, ok := sel.From[0].(*sqlparser.AliasedTableExpr)
if !ok {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, "", fmt.Errorf("unsupported from expression (%T)", sel.From[0])
}
fromTable := sqlparser.GetTableName(node.Expr)
if fromTable.IsEmpty() {
return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, "", fmt.Errorf("unsupported from source (%T)", node.Expr)
}
return sel, fromTable.String(), nil
}
Expand All @@ -421,7 +427,7 @@ func (tpb *tablePlanBuilder) analyzeExprs(selExprs sqlparser.SelectExprs) error
func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr, error) {
aliased, ok := selExpr.(*sqlparser.AliasedExpr)
if !ok {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(selExpr))
return nil, fmt.Errorf("invalid expression: %v", sqlparser.String(selExpr))
}
as := aliased.As
if as.IsEmpty() {
Expand Down Expand Up @@ -470,7 +476,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
switch fname := expr.Name.Lowered(); fname {
case "keyspace_id":
if len(expr.Exprs) != 0 {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return nil, fmt.Errorf("unsupported multiple keyspace_id expressions: %v", sqlparser.String(expr))
}
tpb.sendSelect.SelectExprs = append(tpb.sendSelect.SelectExprs, &sqlparser.AliasedExpr{Expr: aliased.Expr})
// The vstreamer responds with "keyspace_id" as the field name for this request.
Expand All @@ -480,7 +486,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
}
if expr, ok := aliased.Expr.(sqlparser.AggrFunc); ok {
if sqlparser.IsDistinct(expr) {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return nil, fmt.Errorf("unsupported distinct expression usage: %v", sqlparser.String(expr))
}
switch fname := expr.AggrName(); fname {
case "count":
Expand All @@ -491,11 +497,11 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
return cexpr, nil
case "sum":
if len(expr.GetArgs()) != 1 {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return nil, fmt.Errorf("unsupported multiple columns in sum clause: %v", sqlparser.String(expr))
}
innerCol, ok := expr.GetArg().(*sqlparser.ColName)
if !ok {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return nil, fmt.Errorf("unsupported non-column name in sum clause: %v", sqlparser.String(expr))
}
if !innerCol.Qualifier.IsEmpty() {
return nil, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(innerCol))
Expand All @@ -518,7 +524,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
case *sqlparser.Subquery:
return false, fmt.Errorf("unsupported subquery: %v", sqlparser.String(node))
case sqlparser.AggrFunc:
return false, fmt.Errorf("unexpected: %v", sqlparser.String(node))
return false, fmt.Errorf("unsupported aggregation function: %v", sqlparser.String(node))
}
return true, nil
}, aliased.Expr)
Expand All @@ -545,7 +551,7 @@ func (tpb *tablePlanBuilder) analyzeGroupBy(groupBy sqlparser.GroupBy) error {
for _, expr := range groupBy {
colname, ok := expr.(*sqlparser.ColName)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
return fmt.Errorf("unsupported non-column name or alias in group by clause: %v", sqlparser.String(expr))
}
cexpr := tpb.findCol(colname.Name)
if cexpr == nil {
Expand Down

0 comments on commit 4200295

Please sign in to comment.