Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander committed Feb 4, 2024
1 parent af4b77d commit 8d21447
Show file tree
Hide file tree
Showing 22 changed files with 144 additions and 399 deletions.
2 changes: 1 addition & 1 deletion go/cmd/vtctldclient/command/mirror_rules.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2021 The Vitess Authors.
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
18 changes: 3 additions & 15 deletions go/cmd/vtctldclient/command/vreplication/common/mirrortraffic.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2023 The Vitess Authors.
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -24,30 +24,18 @@ import (

"vitess.io/vitess/go/cmd/vtctldclient/cli"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

func GetMirrorTrafficCommand(opts *SubCommandsOpts) *cobra.Command {
cmd := &cobra.Command{
Use: "mirrortraffic",
Short: fmt.Sprintf("Mirror traffic for a %s VReplication workflow.", opts.SubCommand),
Example: fmt.Sprintf(`vtctldclient --server localhost:15999 %s --workflow %s --target-keyspace customer mirrortraffic --percent 50.0 --tablet-types "replica,rdonly"`, opts.SubCommand, opts.Workflow),
Example: fmt.Sprintf(`vtctldclient --server localhost:15999 %s --workflow %s --target-keyspace customer mirrortraffic --percent 50.0`, opts.SubCommand, opts.Workflow),
DisableFlagsInUseLine: true,
Aliases: []string{"MirrorTraffic"},
Args: cobra.NoArgs,
PreRunE: func(cmd *cobra.Command, args []string) error {
if !cmd.Flags().Lookup("tablet-types").Changed {
// We mirror traffic for all tablet types if none are provided.
MirrorTrafficOptions.TabletTypes = []topodatapb.TabletType{
topodatapb.TabletType_PRIMARY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
}
return nil
},
RunE: commandMirrorTraffic,
RunE: commandMirrorTraffic,
}
return cmd
}
Expand Down
3 changes: 1 addition & 2 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ func AddCommonCreateFlags(cmd *cobra.Command) {
}

var MirrorTrafficOptions = struct {
Percent float32
TabletTypes []topodatapb.TabletType
Percent float32
}{}

var SwitchTrafficOptions = struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/vt/topo/topoproto"
)

var (
Expand Down Expand Up @@ -67,7 +66,6 @@ func registerCommands(root *cobra.Command) {

mirrorTrafficCommand := common.GetMirrorTrafficCommand(opts)
mirrorTrafficCommand.Flags().Float32Var(&common.MirrorTrafficOptions.Percent, "percent", 1.0, "Percentage of traffic to mirror.")
mirrorTrafficCommand.Flags().Var((*topoproto.TabletTypeListFlag)(&common.MirrorTrafficOptions.TabletTypes), "tablet-types", "Source tablet types to mirror traffic from (e.g. PRIMARY,REPLICA,RDONLY).")
base.AddCommand(mirrorTrafficCommand)

switchTrafficCommand := common.GetSwitchTrafficCommand(opts)
Expand Down
277 changes: 77 additions & 200 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go/vt/topotools/mirror_rules.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2021 The Vitess Authors.
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/mirror.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2023 The Vitess Authors.
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/mirror_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2023 The Vitess Authors.
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type (
// CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas
CloneForReplicaWarming(ctx context.Context) VCursor

// CloneForReplicaWarming clones the VCursor for re-use in mirroring queries to other keyspaces
// CloneForMirroring clones the VCursor for re-use in mirroring queries to other keyspaces
CloneForMirroring(ctx context.Context) VCursor
}

Expand Down
16 changes: 7 additions & 9 deletions go/vt/vtgate/planbuilder/operators/join_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

// mergeJoinRoutes checks whether two operators can be merged into a single one.
// mergeJoinInputs checks whether two operators can be merged into a single one.
// If they can be merged, a new operator with the merged routing is returned
// If they cannot be merged, nil is returned.
func mergeJoinRoutes(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredicates []sqlparser.Expr, m merger) *Route {
func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredicates []sqlparser.Expr, m merger) *Route {
lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(lhs, rhs)
if lhsRoute == nil {
return nil
Expand Down Expand Up @@ -139,14 +139,14 @@ func getRoutesOrAlternates(lhsRoute, rhsRoute *Route) (*Route, *Route, Routing,
return lhsRoute, rhsRoute, routingA, routingB, sameKeyspace
}

if refA, ok := routingA.(*ReferenceRouting); ok {
if altARoute := refA.ReferenceRoute(routingB.Keyspace()); altARoute != nil {
if refA, ok := routingA.(*AnyShardRouting); ok {
if altARoute := refA.AlternateInKeyspace(routingB.Keyspace()); altARoute != nil {
return altARoute, rhsRoute, altARoute.Routing, routingB, true
}
}

if refB, ok := routingB.(*ReferenceRouting); ok {
if altBRoute := refB.ReferenceRoute(routingA.Keyspace()); altBRoute != nil {
if refB, ok := routingB.(*AnyShardRouting); ok {
if altBRoute := refB.AlternateInKeyspace(routingA.Keyspace()); altBRoute != nil {
return lhsRoute, altBRoute, routingA, altBRoute.Routing, true
}
}
Expand All @@ -159,7 +159,7 @@ func getTypeName(myvar interface{}) string {
}

func getRoutingType(r Routing) routingType {
switch t := r.(type) {
switch r.(type) {
case *InfoSchemaRouting:
return infoSchema
case *AnyShardRouting:
Expand All @@ -172,8 +172,6 @@ func getRoutingType(r Routing) routingType {
return none
case *TargetedRouting:
return targeted
case *ReferenceRouting:
return getRoutingType(t.InnerRouting())
}
panic(fmt.Sprintf("switch should be exhaustive, got %T", r))
}
Expand Down
71 changes: 16 additions & 55 deletions go/vt/vtgate/planbuilder/operators/misc_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type (
// AnyShardRouting is used for routing logic where any shard in the keyspace can be used.
// Shared by unsharded and reference routing
AnyShardRouting struct {
keyspace *vindexes.Keyspace
keyspace *vindexes.Keyspace
Alternates map[*vindexes.Keyspace]*Route
}

// DualRouting represents the dual-table.
Expand All @@ -53,11 +54,6 @@ type (
SequenceRouting struct {
keyspace *vindexes.Keyspace
}

ReferenceRouting struct {
innerRouting Routing
referenceRoutes map[*vindexes.Keyspace]*Route
}
)

var (
Expand All @@ -66,7 +62,6 @@ var (
_ Routing = (*AnyShardRouting)(nil)
_ Routing = (*DualRouting)(nil)
_ Routing = (*SequenceRouting)(nil)
_ Routing = (*ReferenceRouting)(nil)
)

func (tr *TargetedRouting) UpdateRoutingParams(_ *plancontext.PlanningContext, rp *engine.RoutingParameters) {
Expand Down Expand Up @@ -125,7 +120,8 @@ func (rr *AnyShardRouting) UpdateRoutingParams(_ *plancontext.PlanningContext, r

func (rr *AnyShardRouting) Clone() Routing {
return &AnyShardRouting{
keyspace: rr.keyspace,
keyspace: rr.keyspace,
Alternates: rr.Alternates,
}
}

Expand All @@ -148,6 +144,18 @@ func (rr *AnyShardRouting) Keyspace() *vindexes.Keyspace {
return rr.keyspace
}

func (rr *AnyShardRouting) AlternateInKeyspace(keyspace *vindexes.Keyspace) *Route {
if keyspace.Name == rr.keyspace.Name {
return nil
}

if route, ok := rr.Alternates[keyspace]; ok {
return route
}

return nil
}

func (dr *DualRouting) UpdateRoutingParams(*plancontext.PlanningContext, *engine.RoutingParameters) {}

func (dr *DualRouting) Clone() Routing {
Expand Down Expand Up @@ -194,50 +202,3 @@ func (sr *SequenceRouting) OpCode() engine.Opcode {
func (sr *SequenceRouting) Keyspace() *vindexes.Keyspace {
return nil
}

func (rr *ReferenceRouting) UpdateRoutingParams(_ *plancontext.PlanningContext, rp *engine.RoutingParameters) {
rp.Keyspace = rr.Keyspace()
}

func (rr *ReferenceRouting) Clone() Routing {
return &ReferenceRouting{
innerRouting: rr.innerRouting.Clone(),
referenceRoutes: rr.referenceRoutes,
}
}

func (rr *ReferenceRouting) updateRoutingLogic(ctx *plancontext.PlanningContext, expr sqlparser.Expr) Routing {
return rr.innerRouting.updateRoutingLogic(ctx, expr)
}

func (rr *ReferenceRouting) Cost() int {
return rr.innerRouting.Cost() + len(rr.referenceRoutes)
}

func (rr *ReferenceRouting) OpCode() engine.Opcode {
return rr.innerRouting.OpCode()
}

func (rr *ReferenceRouting) Keyspace() *vindexes.Keyspace {
return rr.innerRouting.Keyspace()
}

func (rr *ReferenceRouting) ReferenceRoute(keyspace *vindexes.Keyspace) *Route {
if keyspace.Name == rr.Keyspace().Name {
return nil
}

if route, ok := rr.referenceRoutes[keyspace]; ok {
return route
}

return nil
}

func (rr *ReferenceRouting) SetReferenceRoutes(referenceRoutes map[*vindexes.Keyspace]*Route) {
rr.referenceRoutes = referenceRoutes
}

func (rr *ReferenceRouting) InnerRouting() Routing {
return rr.innerRouting
}
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (Operato

rb, isRoute := in.src().(*Route)
if isRoute && rb.IsSingleShard() {
return Swap(in, rb, "push horizon into single-shard route")
return Swap(in, rb, "push horizon into route")
}

sel, isSel := in.selectStatement().(*sqlparser.Select)
Expand All @@ -245,7 +245,7 @@ func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (Operato
in.selectStatement().GetLimit() == nil

if canPush {
return Swap(in, rb, "push horizon into simple select route")
return Swap(in, rb, "push horizon into route")
}

return expandHorizon(ctx, in)
Expand Down
Loading

0 comments on commit 8d21447

Please sign in to comment.