Skip to content

Commit

Permalink
Also apply vplayer events for lookup tables
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Aug 21, 2024
1 parent db1ffcb commit 98c83c7
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/materialize_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ const mvSchema = `
price int NOT NULL,
name varchar(128) NOT NULL,
description varbinary(128) NOT NULL,
PRIMARY KEY (oid)
PRIMARY KEY (oid),
KEY idx_cid (cid),
KEY idx_pid (pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestMain(m *testing.M) {

return 0

runNoBlobTest = true
runNoBlobTest = true //nolint:govet
if err := utils.SetBinlogRowImageMode("noblob", tempDir); err != nil {
panic(err)
}
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func buildReplicatorPlanForJoin(source *binlogdatapb.BinlogSource, colInfoMap ma
if len(joinTables) == 0 {
return buildReplicatorPlan(source, colInfoMap, copyState, stats, collationEnv, parser)
}
log.Infof("In buildReplicatorPlanForJoin")
log.Infof("In buildReplicatorPlanForJoin with join tables %v", joinTables)
filter := source.Filter
joinPlan := &ReplicatorJoinPlan{
Tables: joinTables,
Expand Down Expand Up @@ -293,11 +293,11 @@ func buildReplicatorPlanForJoin(source *binlogdatapb.BinlogSource, colInfoMap ma
if !copy {
plan.VStreamFilter.Rules[0].Match = fmt.Sprintf("/\\b(%s)\\b", strings.Join(plan.joinPlan.Tables, "|"))
} else {
plan.VStreamFilter.Rules[0].Filter = fmt.Sprintf("/*vt+ view=% */ %s", view, plan.VStreamFilter.Rules[0].Filter)
plan.VStreamFilter.Rules[0].Filter = fmt.Sprintf("/*vt+ view=%s */ %s", view, plan.VStreamFilter.Rules[0].Filter)
}
plan.TablePlans[view] = tablePlan
plan.TargetTables[view] = tablePlan
log.Infof("Added table plan for view %s", view)
log.Infof("Added table plan for view %s: %v, insert %v, updates %+v, deletes %+v", view, tablePlan, insert, updates, deletes)
for _, tableName := range joinTables {
rule := &binlogdatapb.Rule{
Match: tableName,
Expand Down Expand Up @@ -653,6 +653,8 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
switch node := node.(type) {
case *sqlparser.ColName:
if !node.Qualifier.IsEmpty() {
_ = 1
// FIXME
// return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node))
}
colName = node.Name
Expand Down Expand Up @@ -717,6 +719,8 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr
switch node := node.(type) {
case *sqlparser.ColName:
if !node.Qualifier.IsEmpty() {
_ = 1
// FIXME
// return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node))
}
tpb.addCol(node.Name)
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"math"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -378,11 +379,13 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
if rowEvent.TableName == vp.replicatorPlan.joinPlan.MainTableName {
log.Infof("Join plan detected for table %s, main table %s", rowEvent.TableName, vp.replicatorPlan.joinPlan.MainTableName)
return vp.applyRowEventForJoin(ctx, rowEvent)
} else {
log.Infof("Ignoring row event for table %s, main table %s", rowEvent.TableName, vp.replicatorPlan.joinPlan.MainTableName)

return nil
}
if slices.Contains(vp.replicatorPlan.joinPlan.Tables, rowEvent.TableName) {
log.Infof("Join plan detected for lookup table %s, main table %s", rowEvent.TableName, vp.replicatorPlan.joinPlan.MainTableName)
return vp.applyRowEventForJoin(ctx, rowEvent)
}
log.Infof("Ignoring row event for table %s, main table %s", rowEvent.TableName, vp.replicatorPlan.joinPlan.MainTableName)
return nil
}

if err := vp.updateFKCheck(ctx, rowEvent.Flags); err != nil {
Expand Down

0 comments on commit 98c83c7

Please sign in to comment.