Skip to content

Commit

Permalink
[release-18.0] fix: reference table join merge (#16488) (#16495)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
Signed-off-by: Andres Taylor <[email protected]>
Co-authored-by: Harshit Gangal <[email protected]>
Co-authored-by: Andres Taylor <[email protected]>
  • Loading branch information
3 people authored Jul 31, 2024
1 parent 6c7b17e commit 4fcabc4
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 71 deletions.
69 changes: 9 additions & 60 deletions go/test/endtoend/vtgate/queries/reference/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package reference

import (
"context"
_ "embed"
"flag"
"fmt"
"os"
Expand All @@ -39,68 +40,16 @@ var (
vtParams mysql.ConnParams

unshardedKeyspaceName = "uks"
unshardedSQLSchema = `
CREATE TABLE IF NOT EXISTS zip(
id BIGINT NOT NULL AUTO_INCREMENT,
code5 INT(5) NOT NULL,
PRIMARY KEY(id)
) ENGINE=InnoDB;
//go:embed uschema.sql
unshardedSQLSchema string
//go:embed uvschema.json
unshardedVSchema string

INSERT INTO zip(id, code5)
VALUES (1, 47107),
(2, 82845),
(3, 11237);
CREATE TABLE IF NOT EXISTS zip_detail(
id BIGINT NOT NULL AUTO_INCREMENT,
zip_id BIGINT NOT NULL,
discontinued_at DATE,
PRIMARY KEY(id)
) ENGINE=InnoDB;
`
unshardedVSchema = `
{
"sharded":false,
"tables": {
"zip": {},
"zip_detail": {}
}
}
`
shardedKeyspaceName = "sks"
shardedSQLSchema = `
CREATE TABLE IF NOT EXISTS delivery_failure (
id BIGINT NOT NULL,
zip_detail_id BIGINT NOT NULL,
reason VARCHAR(255),
PRIMARY KEY(id)
) ENGINE=InnoDB;
`
shardedVSchema = `
{
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
}
},
"tables": {
"delivery_failure": {
"columnVindexes": [
{
"column": "id",
"name": "hash"
}
]
},
"zip_detail": {
"type": "reference",
"source": "` + unshardedKeyspaceName + `.zip_detail"
}
}
}
`
//go:embed sschema.sql
shardedSQLSchema string
//go:embed svschema.json
shardedVSchema string
)

func TestMain(m *testing.M) {
Expand Down
32 changes: 24 additions & 8 deletions go/test/endtoend/vtgate/queries/reference/reference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func TestReferenceRouting(t *testing.T) {
t,
conn,
`SELECT t.id FROM (
SELECT zd.id, zd.zip_id
FROM `+shardedKeyspaceName+`.zip_detail AS zd
WHERE zd.id IN (2)
ORDER BY zd.discontinued_at
LIMIT 1
) AS t
LEFT JOIN `+shardedKeyspaceName+`.zip_detail AS t0 ON t.zip_id = t0.zip_id
ORDER BY t.id`,
SELECT zd.id, zd.zip_id
FROM `+shardedKeyspaceName+`.zip_detail AS zd
WHERE zd.id IN (2)
ORDER BY zd.discontinued_at
LIMIT 1
) AS t
LEFT JOIN `+shardedKeyspaceName+`.zip_detail AS t0 ON t.zip_id = t0.zip_id
ORDER BY t.id`,
`[[INT64(2)]]`,
)
})
Expand Down Expand Up @@ -156,3 +156,19 @@ func TestReferenceRouting(t *testing.T) {
`[[INT64(2)]]`,
)
}

// TestMultiReferenceQuery tests that a query with multiple references with unsharded keyspace and sharded keyspace works with join.
func TestMultiReferenceQuery(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 21, "vtgate")
conn, closer := start(t)
defer closer()

query :=
`select 1
from delivery_failure df1
join delivery_failure df2 on df1.id = df2.id
join uks.zip_detail zd1 on df1.zip_detail_id = zd1.zip_id
join uks.zip_detail zd2 on zd1.zip_id = zd2.zip_id`

utils.Exec(t, conn, query)
}
6 changes: 6 additions & 0 deletions go/test/endtoend/vtgate/queries/reference/sschema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS delivery_failure (
id BIGINT NOT NULL,
zip_detail_id BIGINT NOT NULL,
reason VARCHAR(255),
PRIMARY KEY(id)
) ENGINE=InnoDB;
22 changes: 22 additions & 0 deletions go/test/endtoend/vtgate/queries/reference/svschema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
}
},
"tables": {
"delivery_failure": {
"columnVindexes": [
{
"column": "id",
"name": "hash"
}
]
},
"zip_detail": {
"type": "reference",
"source": "uks.zip_detail"
}
}
}
17 changes: 17 additions & 0 deletions go/test/endtoend/vtgate/queries/reference/uschema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE TABLE IF NOT EXISTS zip(
id BIGINT NOT NULL AUTO_INCREMENT,
code5 INT(5) NOT NULL,
PRIMARY KEY(id)
) ENGINE=InnoDB;

INSERT INTO zip(id, code5)
VALUES (1, 47107),
(2, 82845),
(3, 11237);

CREATE TABLE IF NOT EXISTS zip_detail(
id BIGINT NOT NULL AUTO_INCREMENT,
zip_id BIGINT NOT NULL,
discontinued_at DATE,
PRIMARY KEY(id)
) ENGINE=InnoDB;
6 changes: 6 additions & 0 deletions go/test/endtoend/vtgate/queries/reference/uvschema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"tables": {
"zip": {},
"zip_detail": {}
}
}
36 changes: 34 additions & 2 deletions go/vt/vtgate/planbuilder/operators/join_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

// mergeJoinInputs checks whether two operators can be merged into a single one.
// If they can be merged, a new operator with the merged routing is returned
// If they cannot be merged, nil is returned.
func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, m merger) (*Route, error) {
func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, m *joinMerger) (*Route, error) {
lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(lhs, rhs)
if lhsRoute == nil {
return nil, nil
Expand All @@ -41,6 +42,14 @@ func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, jo
case b == dual:
return m.merge(ctx, lhsRoute, rhsRoute, routingA)

// As both are reference route. We need to merge the alternates as well.
case a == anyShard && b == anyShard && sameKeyspace:
newrouting, err := mergeAnyShardRoutings(ctx, routingA.(*AnyShardRouting), routingB.(*AnyShardRouting), joinPredicates, m.innerJoin)
if err != nil {
return nil, err
}
return m.merge(ctx, lhsRoute, rhsRoute, newrouting)

// an unsharded/reference route can be merged with anything going to that keyspace
case a == anyShard && sameKeyspace:
return m.merge(ctx, lhsRoute, rhsRoute, routingB)
Expand All @@ -66,6 +75,29 @@ func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, jo
}
}

func mergeAnyShardRoutings(ctx *plancontext.PlanningContext, a, b *AnyShardRouting, joinPredicates []sqlparser.Expr, innerJoin bool) (*AnyShardRouting, error) {
alternates := make(map[*vindexes.Keyspace]*Route)
for ak, av := range a.Alternates {
for bk, bv := range b.Alternates {
// only same keyspace alternates can be merged.
if ak != bk {
continue
}
op, _, err := mergeOrJoin(ctx, av, bv, joinPredicates, innerJoin)
if err != nil {
return nil, err
}
if r, ok := op.(*Route); ok {
alternates[ak] = r
}
}
}
return &AnyShardRouting{
keyspace: a.keyspace,
Alternates: alternates,
}, nil
}

func prepareInputRoutes(lhs ops.Operator, rhs ops.Operator) (*Route, *Route, Routing, Routing, routingType, routingType, bool) {
lhsRoute, rhsRoute := operatorsToRoutes(lhs, rhs)
if lhsRoute == nil || rhsRoute == nil {
Expand Down Expand Up @@ -177,7 +209,7 @@ func getRoutingType(r Routing) routingType {
panic(fmt.Sprintf("switch should be exhaustive, got %T", r))
}

func newJoinMerge(predicates []sqlparser.Expr, innerJoin bool) merger {
func newJoinMerge(predicates []sqlparser.Expr, innerJoin bool) *joinMerger {
return &joinMerger{
predicates: predicates,
innerJoin: innerJoin,
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/planbuilder/operators/joins.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package operators

import (
"fmt"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/semantics"
Expand Down Expand Up @@ -92,7 +95,7 @@ func AddPredicate(

return join, nil
}
return nil, nil
return nil, vterrors.VT13001(fmt.Sprintf("pushed wrong predicate to the join: %s", sqlparser.String(expr)))
}

// we are looking for predicates like `tbl.col = <>` or `<> = tbl.col`,
Expand Down
25 changes: 25 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/reference_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -922,5 +922,30 @@
"user.user"
]
}
},
{
"comment": "two sharded and two unsharded reference table join - all should be merged into one route",
"query": "select 1 from user u join user_extra ue on u.id = ue.user_id join main.source_of_ref sr on sr.foo = ue.foo join main.rerouted_ref rr on rr.bar = sr.bar",
"plan": {
"QueryType": "SELECT",
"Original": "select 1 from user u join user_extra ue on u.id = ue.user_id join main.source_of_ref sr on sr.foo = ue.foo join main.rerouted_ref rr on rr.bar = sr.bar",
"Instructions": {
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from `user` as u, user_extra as ue, ref_with_source as sr, ref as rr where 1 != 1",
"Query": "select 1 from `user` as u, user_extra as ue, ref_with_source as sr, ref as rr where rr.bar = sr.bar and u.id = ue.user_id and sr.foo = ue.foo",
"Table": "`user`, ref, ref_with_source, user_extra"
},
"TablesUsed": [
"user.ref",
"user.ref_with_source",
"user.user",
"user.user_extra"
]
}
}
]

0 comments on commit 4fcabc4

Please sign in to comment.