From 98c83c79c9babee01581f0ffba052c53c82538af Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 20 Aug 2024 15:38:29 +0200 Subject: [PATCH] Also apply vplayer events for lookup tables Signed-off-by: Rohit Nayak --- .../endtoend/vreplication/materialize_view_test.go | 4 +++- .../tabletmanager/vreplication/framework_test.go | 2 +- .../tabletmanager/vreplication/table_plan_builder.go | 10 +++++++--- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 11 +++++++---- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/vreplication/materialize_view_test.go b/go/test/endtoend/vreplication/materialize_view_test.go index 22065fb01d7..df8f077f274 100644 --- a/go/test/endtoend/vreplication/materialize_view_test.go +++ b/go/test/endtoend/vreplication/materialize_view_test.go @@ -26,7 +26,9 @@ const mvSchema = ` price int NOT NULL, name varchar(128) NOT NULL, description varbinary(128) NOT NULL, - PRIMARY KEY (oid) + PRIMARY KEY (oid), + KEY idx_cid (cid), + KEY idx_pid (pid) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; ` diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 17352c97d36..f0e92cca2ef 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -194,7 +194,7 @@ func TestMain(m *testing.M) { return 0 - runNoBlobTest = true + runNoBlobTest = true //nolint:govet if err := utils.SetBinlogRowImageMode("noblob", tempDir); err != nil { panic(err) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index acc2b4c6cfa..085c612cfc7 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -181,7 +181,7 @@ func buildReplicatorPlanForJoin(source *binlogdatapb.BinlogSource, colInfoMap ma if len(joinTables) == 0 { return buildReplicatorPlan(source, colInfoMap, copyState, stats, collationEnv, parser) } - log.Infof("In buildReplicatorPlanForJoin") + log.Infof("In buildReplicatorPlanForJoin with join tables %v", joinTables) filter := source.Filter joinPlan := &ReplicatorJoinPlan{ Tables: joinTables, @@ -293,11 +293,11 @@ func buildReplicatorPlanForJoin(source *binlogdatapb.BinlogSource, colInfoMap ma if !copy { plan.VStreamFilter.Rules[0].Match = fmt.Sprintf("/\\b(%s)\\b", strings.Join(plan.joinPlan.Tables, "|")) } else { - plan.VStreamFilter.Rules[0].Filter = fmt.Sprintf("/*vt+ view=% */ %s", view, plan.VStreamFilter.Rules[0].Filter) + plan.VStreamFilter.Rules[0].Filter = fmt.Sprintf("/*vt+ view=%s */ %s", view, plan.VStreamFilter.Rules[0].Filter) } plan.TablePlans[view] = tablePlan plan.TargetTables[view] = tablePlan - log.Infof("Added table plan for view %s", view) + log.Infof("Added table plan for view %s: %v, insert %v, updates %+v, deletes %+v", view, tablePlan, insert, updates, deletes) for _, tableName := range joinTables { rule := &binlogdatapb.Rule{ Match: tableName, @@ -653,6 +653,8 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr switch node := node.(type) { case *sqlparser.ColName: if !node.Qualifier.IsEmpty() { + _ = 1 + // FIXME // return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node)) } colName = node.Name @@ -717,6 +719,8 @@ func (tpb *tablePlanBuilder) analyzeExpr(selExpr sqlparser.SelectExpr) (*colExpr switch node := node.(type) { case *sqlparser.ColName: if !node.Qualifier.IsEmpty() { + _ = 1 + // FIXME // return false, fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(node)) } tpb.addCol(node.Name) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 70d42bf1c0a..479622a9b4e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "math" + "slices" "strconv" "strings" "time" @@ -378,11 +379,13 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row if rowEvent.TableName == vp.replicatorPlan.joinPlan.MainTableName { log.Infof("Join plan detected for table %s, main table %s", rowEvent.TableName, vp.replicatorPlan.joinPlan.MainTableName) return vp.applyRowEventForJoin(ctx, rowEvent) - } else { - log.Infof("Ignoring row event for table %s, main table %s", rowEvent.TableName, vp.replicatorPlan.joinPlan.MainTableName) - - return nil } + if slices.Contains(vp.replicatorPlan.joinPlan.Tables, rowEvent.TableName) { + log.Infof("Join plan detected for lookup table %s, main table %s", rowEvent.TableName, vp.replicatorPlan.joinPlan.MainTableName) + return vp.applyRowEventForJoin(ctx, rowEvent) + } + log.Infof("Ignoring row event for table %s, main table %s", rowEvent.TableName, vp.replicatorPlan.joinPlan.MainTableName) + return nil } if err := vp.updateFKCheck(ctx, rowEvent.Flags); err != nil {