From 59dd4d5aa49102078263f3808227751b6c17917a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 14 Aug 2024 12:49:45 -0400 Subject: [PATCH 1/6] Improve replication plan builder errors Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 3bef997d0be..bbe315336c0 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -88,7 +88,7 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent // select * construct was used. We need to use the field names. tplan, err := rp.buildFromFields(prelim.TargetName, prelim.Lastpk, fieldEvent.Fields) if err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to build replication plan for %s table", fieldEvent.TableName) } tplan.Fields = fieldEvent.Fields return tplan, nil From 0ddd45c07092972a56e4774327809412e5e70793 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 14 Aug 2024 13:15:14 -0400 Subject: [PATCH 2/6] Add table name to vplayer error as well as msg and include GTID Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 2b8b8130f89..1566047ca8a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -549,7 +549,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if err := vp.applyEvent(ctx, event, mustSave); err != nil { if err != io.EOF { vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1) - var table, tableLogMsg string + var table, tableLogMsg, gtidLogMsg string switch { case event.GetFieldEvent() != nil: table = event.GetFieldEvent().TableName @@ -559,7 +559,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if table != "" { tableLogMsg = fmt.Sprintf(" for table %s", table) } - log.Errorf("Error applying event%s: %s", tableLogMsg, err.Error()) + if vp.unsavedEvent != nil && vp.unsavedEvent.Gtid != "" { + gtidLogMsg = fmt.Sprintf(" that were part of GTID %s", vp.unsavedEvent.Gtid) + } + log.Errorf("Error applying event%s%s: %s", tableLogMsg, gtidLogMsg, err.Error()) + err = vterrors.Wrapf(err, "error applying event%s%s", tableLogMsg, gtidLogMsg) } return err } From c7ee55acc7fb924a701117ffba8a4a99850b4d78 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 14 Aug 2024 14:48:37 -0400 Subject: [PATCH 3/6] Fix GTID log Signed-off-by: Matt Lord --- .../tabletmanager/vreplication/vplayer.go | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 1566047ca8a..0102454b7ba 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -559,8 +559,9 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if table != "" { tableLogMsg = fmt.Sprintf(" for table %s", table) } - if vp.unsavedEvent != nil && vp.unsavedEvent.Gtid != "" { - gtidLogMsg = fmt.Sprintf(" that were part of GTID %s", vp.unsavedEvent.Gtid) + pos := getNextPosition(items, i, j+1) + if pos != "" { + gtidLogMsg = fmt.Sprintf(" while processing position %s", pos) } log.Errorf("Error applying event%s%s: %s", tableLogMsg, gtidLogMsg, err.Error()) err = vterrors.Wrapf(err, "error applying event%s%s", tableLogMsg, gtidLogMsg) @@ -596,6 +597,30 @@ func hasAnotherCommit(items [][]*binlogdatapb.VEvent, i, j int) bool { return false } +// getNextPosition returns the GTID set/position we would be at if the current +// transaction was committed. This is useful for error handling as we can then +// determine which GTID we're failing to process from the source and examine the +// binlog events for that GTID directly on the source to debug the issue. +// This is needed as it's not as simple as the user incrementing the current +// position in the stream by 1 as we may be skipping N intermediate GTIDs in the +// stream due to filtering. For GTIDs that we filter out we still replicate the +// GTID event itself, just without any internal events and a COMMIT event (see +// the unsavedEvent handling). +func getNextPosition(items [][]*binlogdatapb.VEvent, i, j int) string { + for i < len(items) { + for j < len(items[i]) { + switch items[i][j].Type { + case binlogdatapb.VEventType_GTID: + return items[i][j].Gtid + } + j++ + } + j = 0 + i++ + } + return "" +} + func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error { stats := NewVrLogStats(event.Type.String()) switch event.Type { From 13cba66cb1a1296362680692f6d2973eff1a7c3e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 14 Aug 2024 15:01:01 -0400 Subject: [PATCH 4/6] Use decode to handle any uncompression if needed and eliminate flavor Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 0102454b7ba..31711068da3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -611,7 +611,11 @@ func getNextPosition(items [][]*binlogdatapb.VEvent, i, j int) string { for j < len(items[i]) { switch items[i][j].Type { case binlogdatapb.VEventType_GTID: - return items[i][j].Gtid + pos, err := binlogplayer.DecodePosition(items[i][j].Gtid) + if err != nil { + return "" + } + return pos.String() } j++ } From 909c18f9e63510c8de1f8a877961099fd6449f08 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 14 Aug 2024 15:19:02 -0400 Subject: [PATCH 5/6] Adjust tests Signed-off-by: Matt Lord --- .../tabletmanager/vreplication/vplayer_flaky_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index b1925c3c64f..54d4a778509 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -610,7 +610,7 @@ func TestPlayerStatementModeWithFilter(t *testing.T) { output := qh.Expect( "begin", "rollback", - "/update _vt.vreplication set message='filter rules are not supported for SBR", + "/update _vt.vreplication set message='error applying event while processing position .* filter rules are not supported for SBR.*", ) execStatements(t, input) @@ -1758,8 +1758,8 @@ func TestPlayerDDL(t *testing.T) { execStatements(t, []string{"alter table t1 add column val2 varchar(128)"}) expectDBClientQueries(t, qh.Expect( "alter table t1 add column val2 varchar(128)", - "/update _vt.vreplication set message='Duplicate", - "/update _vt.vreplication set state='Error', message='Duplicate", + "/update _vt.vreplication set message='error applying event: Duplicate", + "/update _vt.vreplication set state='Error', message='error applying event: Duplicate", )) cancel() From 07b7e3a5dc8d3a3c5a444a9c4cf75c684e650fd2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 14 Aug 2024 16:53:02 -0400 Subject: [PATCH 6/6] Improve test Signed-off-by: Matt Lord --- .../vreplication/vplayer_flaky_test.go | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 54d4a778509..0641a111199 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -575,9 +575,20 @@ func TestPlayerForeignKeyCheck(t *testing.T) { cancel() } -func TestPlayerStatementModeWithFilter(t *testing.T) { +// TestPlayerStatementModeWithFilterAndErrorHandling confirms that we get the +// expected error when using a filter with statement mode. It also tests the +// general vplayer applyEvent error and log message handling. +func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) { defer deleteTablet(addTablet(100)) + // We want to check for the expected log message. + ole := log.Errorf + logger := logutil.NewMemoryLogger() + log.Errorf = logger.Errorf + defer func() { + log.Errorf = ole + }() + execStatements(t, []string{ "create table src1(id int, val varbinary(128), primary key(id))", }) @@ -600,21 +611,29 @@ func TestPlayerStatementModeWithFilter(t *testing.T) { cancel, _ := startVReplication(t, bls, "") defer cancel() + const gtid = "37f16b4c-5a74-11ef-87de-56bfd605e62e:100" input := []string{ "set @@session.binlog_format='STATEMENT'", + fmt.Sprintf("set @@session.gtid_next='%s'", gtid), "insert into src1 values(1, 'aaa')", + "set @@session.gtid_next='AUTOMATIC'", "set @@session.binlog_format='ROW'", } + expectedMsg := fmt.Sprintf("[Ee]rror applying event while processing position .*%s.* filter rules are not supported for SBR.*", gtid) + // It does not work when filter is enabled output := qh.Expect( "begin", "rollback", - "/update _vt.vreplication set message='error applying event while processing position .* filter rules are not supported for SBR.*", + fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg), ) execStatements(t, input) expectDBClientQueries(t, output) + + logs := logger.String() + require.Regexp(t, expectedMsg, logs) } func TestPlayerStatementMode(t *testing.T) {