diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go index 5dce71cf0f5..6c9f92128ac 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go @@ -21,13 +21,14 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/sqlparser" - "github.com/stretchr/testify/assert" - - "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -573,16 +574,16 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "bad query", }}, }, - err: "syntax error at position 4 near 'bad'", + err: "syntax error at position 4 near 'bad' in query: bad query", }, { // not a select input: &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "t1", - Filter: "update t1 set val=1", + Filter: "update t1 set val = 1", }}, }, - err: "unexpected: update t1 set val = 1", + err: "unsupported non-select statement in query: update t1 set val = 1", }, { // no distinct input: &binlogdatapb.Filter{ @@ -591,7 +592,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select distinct c1 from t1", }}, }, - err: "unexpected: select distinct c1 from t1", + err: "unsupported distinct clause in query: select distinct c1 from t1", }, { // no ',' join input: &binlogdatapb.Filter{ @@ -600,7 +601,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select * from t1, t2", }}, }, - err: "unexpected: select * from t1, t2", + err: "unsupported multi-table usage in query: select * from t1, t2", }, { // no join input: &binlogdatapb.Filter{ @@ -609,7 +610,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select * from t1 join t2", }}, }, - err: "unexpected: select * from t1 join t2", + err: "unsupported from expression (*sqlparser.JoinTableExpr) in query: select * from t1 join t2", }, { // no subqueries input: &binlogdatapb.Filter{ @@ -618,7 +619,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select * from (select * from t2) as a", }}, }, - err: "unexpected: select * from (select * from t2) as a", + err: "unsupported from source (*sqlparser.DerivedTable) in query: select * from (select * from t2) as a", }, { // cannot combine '*' with other input: &binlogdatapb.Filter{ @@ -627,7 +628,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select *, c1 from t1", }}, }, - err: "unexpected: select *, c1 from t1", + err: "unsupported mix of '*' and columns in query: select *, c1 from t1", }, { // cannot combine '*' with other (different code path) input: &binlogdatapb.Filter{ @@ -636,7 +637,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select c1, * from t1", }}, }, - err: "unexpected: *", + err: "invalid expression: * in query: select c1, * from t1", }, { // no distinct in func input: &binlogdatapb.Filter{ @@ -645,7 +646,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select hour(distinct c1) as a from t1", }}, }, - err: "syntax error at position 21 near 'distinct'", + err: "syntax error at position 21 near 'distinct' in query: select hour(distinct c1) as a from t1", }, { // funcs need alias input: &binlogdatapb.Filter{ @@ -654,7 +655,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select hour(c1) from t1", }}, }, - err: "expression needs an alias: hour(c1)", + err: "expression needs an alias: hour(c1) in query: select hour(c1) from t1", }, { // only count(*) input: &binlogdatapb.Filter{ @@ -663,7 +664,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select count(c1) as c from t1", }}, }, - err: "only count(*) is supported: count(c1)", + err: "only count(*) is supported: count(c1) in query: select count(c1) as c from t1", }, { // no sum(*) input: &binlogdatapb.Filter{ @@ -672,7 +673,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select sum(*) as c from t1", }}, }, - err: "syntax error at position 13", + err: "syntax error at position 13 in query: select sum(*) as c from t1", }, { // sum should have only one argument input: &binlogdatapb.Filter{ @@ -681,7 +682,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select sum(a, b) as c from t1", }}, }, - err: "syntax error at position 14", + err: "syntax error at position 14 in query: select sum(a, b) as c from t1", }, { // no complex expr in sum input: &binlogdatapb.Filter{ @@ -690,7 +691,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select sum(a + b) as c from t1", }}, }, - err: "unexpected: sum(a + b)", + err: "unsupported non-column name in sum clause: sum(a + b) in query: select sum(a + b) as c from t1", }, { // no complex expr in group by input: &binlogdatapb.Filter{ @@ -699,7 +700,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select a from t1 group by a + 1", }}, }, - err: "unexpected: a + 1", + err: "unsupported non-column name or alias in group by clause: a + 1 in query: select a from t1 group by a + 1", }, { // group by does not reference alias input: &binlogdatapb.Filter{ @@ -708,7 +709,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select a as b from t1 group by a", }}, }, - err: "group by expression does not reference an alias in the select list: a", + err: "group by expression does not reference an alias in the select list: a in query: select a as b from t1 group by a", }, { // cannot group by aggr input: &binlogdatapb.Filter{ @@ -717,7 +718,7 @@ func TestBuildPlayerPlan(t *testing.T) { Filter: "select count(*) as a from t1 group by a", }}, }, - err: "group by expression is not allowed to reference an aggregate expression: a", + err: "group by expression is not allowed to reference an aggregate expression: a in query: select count(*) as a from t1 group by a", }} PrimaryKeyInfos := map[string][]*ColumnInfo{ @@ -736,18 +737,14 @@ func TestBuildPlayerPlan(t *testing.T) { for _, tcase := range testcases { plan, err := buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, nil, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser()) - gotPlan, _ := json.Marshal(plan) - wantPlan, _ := json.Marshal(tcase.plan) - if string(gotPlan) != string(wantPlan) { - t.Errorf("Filter(%v):\n%s, want\n%s", tcase.input, gotPlan, wantPlan) - } gotErr := "" if err != nil { gotErr = err.Error() } - if gotErr != tcase.err { - t.Errorf("Filter err(%v): %s, want %v", tcase.input, gotErr, tcase.err) - } + require.Equal(t, tcase.err, gotErr, "Filter err(%v): %s, want %v", tcase.input, gotErr, tcase.err) + gotPlan, _ := json.Marshal(plan) + wantPlan, _ := json.Marshal(tcase.plan) + require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v):\n%s, want\n%s", tcase.input, gotPlan, wantPlan) plan, err = buildReplicatorPlan(getSource(tcase.input), PrimaryKeyInfos, copyState, binlogplayer.NewStats(), collations.MySQL8(), sqlparser.NewTestParser()) if err != nil { @@ -755,9 +752,7 @@ func TestBuildPlayerPlan(t *testing.T) { } gotPlan, _ = json.Marshal(plan) wantPlan, _ = json.Marshal(tcase.planpk) - if string(gotPlan) != string(wantPlan) { - t.Errorf("Filter(%v,copyState):\n%s, want\n%s", tcase.input, gotPlan, wantPlan) - } + require.Equal(t, string(wantPlan), string(gotPlan), "Filter(%v,copyState):\n%s, want\n%s", tcase.input, gotPlan, wantPlan) } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 0f94b6b13d2..2d94615b5cb 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -202,6 +202,12 @@ func MatchTable(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Ru func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*ColumnInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats, source *binlogdatapb.BinlogSource, collationEnv *collations.Environment, parser *sqlparser.Parser) (*TablePlan, error) { + planError := func(err error, query string) error { + // Use the error string here to ensure things are uniform across + // vterrors (from parse) and errors (all others). + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%s in query: %s", err.Error(), query) + } + filter := rule.Filter query := filter // generate equivalent select statement if filter is empty or a keyrange. @@ -219,7 +225,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum } sel, fromTable, err := analyzeSelectFrom(query, parser) if err != nil { - return nil, err + return nil, planError(err, query) } sendRule := &binlogdatapb.Rule{ Match: fromTable, @@ -235,10 +241,10 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum // If it's a "select *", we return a partial plan, and complete // it when we get back field info from the stream. if len(sel.SelectExprs) != 1 { - return nil, fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + return nil, planError(fmt.Errorf("unsupported mix of '*' and columns"), sqlparser.String(sel)) } if !expr.TableName.IsEmpty() { - return nil, fmt.Errorf("unsupported qualifier for '*' expression: %v", sqlparser.String(expr)) + return nil, planError(fmt.Errorf("unsupported qualifier for '*' expression"), sqlparser.String(expr)) } sendRule.Filter = query tablePlan := &TablePlan{ @@ -269,7 +275,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum } if err := tpb.analyzeExprs(sel.SelectExprs); err != nil { - return nil, err + return nil, planError(err, sqlparser.String(sel)) } // It's possible that the target table does not materialize all // the primary keys of the source table. In such situations, @@ -284,7 +290,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfos []*Colum } } if err := tpb.analyzeGroupBy(sel.GroupBy); err != nil { - return nil, err + return nil, planError(err, sqlparser.String(sel)) } targetKeyColumnNames, err := textutil.SplitUnescape(rule.TargetUniqueKeyColumns, ",") if err != nil { @@ -388,21 +394,21 @@ func analyzeSelectFrom(query string, parser *sqlparser.Parser) (sel *sqlparser.S } sel, ok := statement.(*sqlparser.Select) if !ok { - return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(statement)) + return nil, "", fmt.Errorf("unsupported non-select statement") } if sel.Distinct { - return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + return nil, "", fmt.Errorf("unsupported distinct clause") } if len(sel.From) > 1 { - return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + return nil, "", fmt.Errorf("unsupported multi-table usage") } node, ok := sel.From[0].(*sqlparser.AliasedTableExpr) if !ok { - return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + return nil, "", fmt.Errorf("unsupported from expression (%T)", sel.From[0]) } fromTable := sqlparser.GetTableName(node.Expr) if fromTable.IsEmpty() { - return nil, "", fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + return nil, "", fmt.Errorf("unsupported from source (%T)", node.Expr) } return sel, fromTable.String(), nil } @@ -421,7 +427,7 @@ func (tpb *tablePlanBuilder) analyzeExprs(selExprs sqlparser.SelectExprs) error func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr, error) { aliased, ok := selExpr.(*sqlparser.AliasedExpr) if !ok { - return nil, fmt.Errorf("unexpected: %v", sqlparser.String(selExpr)) + return nil, fmt.Errorf("invalid expression: %v", sqlparser.String(selExpr)) } as := aliased.As if as.IsEmpty() { @@ -470,7 +476,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr switch fname := expr.Name.Lowered(); fname { case "keyspace_id": if len(expr.Exprs) != 0 { - return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + return nil, fmt.Errorf("unsupported multiple keyspace_id expressions: %v", sqlparser.String(expr)) } tpb.sendSelect.SelectExprs = append(tpb.sendSelect.SelectExprs, &sqlparser.AliasedExpr{Expr: aliased.Expr}) // The vstreamer responds with "keyspace_id" as the field name for this request. @@ -480,7 +486,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr } if expr, ok := aliased.Expr.(sqlparser.AggrFunc); ok { if sqlparser.IsDistinct(expr) { - return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + return nil, fmt.Errorf("unsupported distinct expression usage: %v", sqlparser.String(expr)) } switch fname := expr.AggrName(); fname { case "count": @@ -491,11 +497,11 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr return cexpr, nil case "sum": if len(expr.GetArgs()) != 1 { - return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + return nil, fmt.Errorf("unsupported multiple columns in sum clause: %v", sqlparser.String(expr)) } innerCol, ok := expr.GetArg().(*sqlparser.ColName) if !ok { - return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + return nil, fmt.Errorf("unsupported non-column name in sum clause: %v", sqlparser.String(expr)) } if !innerCol.Qualifier.IsEmpty() { return nil, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(innerCol)) @@ -518,7 +524,7 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr case *sqlparser.Subquery: return false, fmt.Errorf("unsupported subquery: %v", sqlparser.String(node)) case sqlparser.AggrFunc: - return false, fmt.Errorf("unexpected: %v", sqlparser.String(node)) + return false, fmt.Errorf("unsupported aggregation function: %v", sqlparser.String(node)) } return true, nil }, aliased.Expr) @@ -545,7 +551,7 @@ func (tpb *tablePlanBuilder) analyzeGroupBy(groupBy sqlparser.GroupBy) error { for _, expr := range groupBy { colname, ok := expr.(*sqlparser.ColName) if !ok { - return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + return fmt.Errorf("unsupported non-column name or alias in group by clause: %v", sqlparser.String(expr)) } cexpr := tpb.findCol(colname.Name) if cexpr == nil {