Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Improve replication plan builder and event application errors #16596

Merged
merged 6 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent
// select * construct was used. We need to use the field names.
tplan, err := rp.buildFromFields(prelim.TargetName, prelim.Lastpk, fieldEvent.Fields)
if err != nil {
return nil, err
return nil, vterrors.Wrapf(err, "failed to build replication plan for %s table", fieldEvent.TableName)
}
tplan.Fields = fieldEvent.Fields
return tplan, nil
Expand Down
37 changes: 35 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if err := vp.applyEvent(ctx, event, mustSave); err != nil {
if err != io.EOF {
vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1)
var table, tableLogMsg string
var table, tableLogMsg, gtidLogMsg string
switch {
case event.GetFieldEvent() != nil:
table = event.GetFieldEvent().TableName
Expand All @@ -559,7 +559,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if table != "" {
tableLogMsg = fmt.Sprintf(" for table %s", table)
}
log.Errorf("Error applying event%s: %s", tableLogMsg, err.Error())
pos := getNextPosition(items, i, j+1)
if pos != "" {
gtidLogMsg = fmt.Sprintf(" while processing position %s", pos)
}
log.Errorf("Error applying event%s%s: %s", tableLogMsg, gtidLogMsg, err.Error())
err = vterrors.Wrapf(err, "error applying event%s%s", tableLogMsg, gtidLogMsg)
Comment on lines +566 to +567
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure, but shouldn't there be spaces between "event" and the two "%s"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as those are optional portions — each starting with a space. If we had a hardcoded space after "event" then in some cases we'd have trailing whitespace.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay makes sense!

}
return err
}
Expand Down Expand Up @@ -592,6 +597,34 @@ func hasAnotherCommit(items [][]*binlogdatapb.VEvent, i, j int) bool {
return false
}

// getNextPosition returns the GTID set/position we would be at if the current
// transaction was committed. This is useful for error handling as we can then
// determine which GTID we're failing to process from the source and examine the
// binlog events for that GTID directly on the source to debug the issue.
// This is needed as it's not as simple as the user incrementing the current
// position in the stream by 1 as we may be skipping N intermediate GTIDs in the
// stream due to filtering. For GTIDs that we filter out we still replicate the
// GTID event itself, just without any internal events and a COMMIT event (see
// the unsavedEvent handling).
func getNextPosition(items [][]*binlogdatapb.VEvent, i, j int) string {
for i < len(items) {
for j < len(items[i]) {
switch items[i][j].Type {
case binlogdatapb.VEventType_GTID:
pos, err := binlogplayer.DecodePosition(items[i][j].Gtid)
if err != nil {
return ""
}
return pos.String()
}
j++
}
j = 0
i++
}
return ""
}

func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error {
stats := NewVrLogStats(event.Type.String())
switch event.Type {
Expand Down
27 changes: 23 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,20 @@ func TestPlayerForeignKeyCheck(t *testing.T) {
cancel()
}

func TestPlayerStatementModeWithFilter(t *testing.T) {
// TestPlayerStatementModeWithFilterAndErrorHandling confirms that we get the
// expected error when using a filter with statement mode. It also tests the
// general vplayer applyEvent error and log message handling.
func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) {
defer deleteTablet(addTablet(100))

// We want to check for the expected log message.
ole := log.Errorf
logger := logutil.NewMemoryLogger()
log.Errorf = logger.Errorf
defer func() {
log.Errorf = ole
}()

execStatements(t, []string{
"create table src1(id int, val varbinary(128), primary key(id))",
})
Expand All @@ -600,21 +611,29 @@ func TestPlayerStatementModeWithFilter(t *testing.T) {
cancel, _ := startVReplication(t, bls, "")
defer cancel()

const gtid = "37f16b4c-5a74-11ef-87de-56bfd605e62e:100"
input := []string{
"set @@session.binlog_format='STATEMENT'",
fmt.Sprintf("set @@session.gtid_next='%s'", gtid),
"insert into src1 values(1, 'aaa')",
"set @@session.gtid_next='AUTOMATIC'",
"set @@session.binlog_format='ROW'",
}

expectedMsg := fmt.Sprintf("[Ee]rror applying event while processing position .*%s.* filter rules are not supported for SBR.*", gtid)

// It does not work when filter is enabled
output := qh.Expect(
"begin",
"rollback",
"/update _vt.vreplication set message='filter rules are not supported for SBR",
fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg),
)

execStatements(t, input)
expectDBClientQueries(t, output)

logs := logger.String()
require.Regexp(t, expectedMsg, logs)
}

func TestPlayerStatementMode(t *testing.T) {
Expand Down Expand Up @@ -1758,8 +1777,8 @@ func TestPlayerDDL(t *testing.T) {
execStatements(t, []string{"alter table t1 add column val2 varchar(128)"})
expectDBClientQueries(t, qh.Expect(
"alter table t1 add column val2 varchar(128)",
"/update _vt.vreplication set message='Duplicate",
"/update _vt.vreplication set state='Error', message='Duplicate",
"/update _vt.vreplication set message='error applying event: Duplicate",
"/update _vt.vreplication set state='Error', message='error applying event: Duplicate",
))
cancel()

Expand Down
Loading