From 4a9d8b2e10e11b194b419f6df43c222cbbb6ee7b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 10 Oct 2024 22:13:09 +0200 Subject: [PATCH 01/15] Add e2e test to repro unique key causing duplicate key error in reverse workflow Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/config_test.go | 12 ++++- .../vreplication/unsharded_init_data.sql | 3 ++ .../workflow_duplicate_key_backoff_test.go | 48 +++++++++++++++++++ go/vt/vtctl/workflow/server.go | 9 ++++ 4 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 4b4bcfecc35..12163cf2cc5 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -69,6 +69,7 @@ create table `+"`blüb_tbl`"+` (id int, val1 varchar(20), `+"`blöb1`"+` blob, create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); create table loadtest (id int, name varchar(256), primary key(id), key(name)); create table nopk (name varchar(128), age int unsigned); + create table admins(team_id int, email varchar(128), primary key(team_id), unique key(email)); `, strings.Join(customerTypes, ",")) // These should always be ignored in vreplication internalSchema = ` @@ -85,6 +86,7 @@ create table nopk (name varchar(128), age int unsigned); "tables": { "product": {}, "merchant": {}, + "admins": {}, "orders": {}, "loadtest": {}, "customer": {}, @@ -158,8 +160,16 @@ create table nopk (name varchar(128), age int unsigned); } ] }, - "enterprise_customer": { + "admins": { "column_vindexes": [ + { + "column": "team_id", + "name": "reverse_bits" + } + ] + }, + "enterprise_customer": { + "column_vindexes": [ { "column": "cid", "name": "xxhash" diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index 3e795cadcfd..124d4bff99c 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -50,3 +50,6 @@ insert into reftable (id, val1) values (2, 'b') insert into reftable (id, val1) values (3, 'c') insert into reftable (id, val1) values (4, 'd') insert into reftable (id, val1) values (5, 'e') + +insert into admins(team_id, email) values(1, 'a@example.com') +insert into admins(team_id, email) values(2, 'b@example.com') diff --git a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go new file mode 100644 index 00000000000..6f1c58f9207 --- /dev/null +++ b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go @@ -0,0 +1,48 @@ +package vreplication + +import ( + "testing" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vttablet "vitess.io/vitess/go/vt/vttablet/common" +) + +func TestWorkflowDuplicateKeyBackoff(t *testing.T) { + setSidecarDBName("_vt") + origDefaultRdonly := defaultRdonly + origDefailtReplica := defaultReplicas + defer func() { + defaultRdonly = origDefaultRdonly + defaultReplicas = origDefailtReplica + }() + defaultRdonly = 0 + defaultReplicas = 0 + + setupMinimalCluster(t) + vttablet.InitVReplicationConfigDefaults() + defer vc.TearDown() + + sourceKeyspaceName := "product" + targetKeyspaceName := "customer" + workflowName := "wf1" + targetTabs := setupMinimalCustomerKeyspace(t) + _ = targetTabs + tables := "customer,admins" + + mt := createMoveTables(t, sourceKeyspaceName, targetKeyspaceName, workflowName, tables, nil, nil, nil) + waitForWorkflowState(t, vc, "customer.wf1", binlogdatapb.VReplicationWorkflowState_Running.String()) + mt.SwitchReadsAndWrites() + vtgateConn, cancel := getVTGateConn() + defer cancel() + queries := []string{ + "update admins set email = null where team_id = 2", + "update admins set email = 'b@example.com' where team_id = 1", + "update admins set email = 'a@example.com' where team_id = 2", + } + + vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 2") + for _, query := range queries { + execVtgateQuery(t, vtgateConn, targetKeyspaceName, query) + } + +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 7c49de58c9b..5a87d8708b8 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -369,6 +369,15 @@ func (s *Server) GetCellsWithTableReadsSwitched( return cellsSwitched, cellsNotSwitched, nil } +type workflowUtil struct { + targetKeyspace string + workflowName string +} + +func (wu *workflowUtil) String() string { + return fmt.Sprintf("%s/%s", wu.targetKeyspace, wu.workflowName) +} + func (s *Server) GetWorkflow(ctx context.Context, keyspace, workflow string, includeLogs bool, shards []string) (*vtctldatapb.Workflow, error) { res, err := s.GetWorkflows(ctx, &vtctldatapb.GetWorkflowsRequest{ Keyspace: keyspace, From eab19a9c3cafd16e0bb747ecfd32c852a33159f4 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 25 Oct 2024 17:09:13 +0200 Subject: [PATCH 02/15] Add additional test column to admins for debugging if required. Misc testing changes/comments. ExecWithRetry enhanced with a timeout instead of the backoff function for now. Need to see if this can stay or needs to be changed Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/config_test.go | 2 +- go/test/endtoend/vreplication/helper_test.go | 20 ++++++++++ .../vreplication/unsharded_init_data.sql | 4 +- .../workflow_duplicate_key_backoff_test.go | 34 +++++++++++++--- .../tabletmanager/vreplication/vdbclient.go | 39 ++++++++++++++++++- .../tabletmanager/vreplication/vplayer.go | 2 +- test/config.json | 9 +++++ 7 files changed, 99 insertions(+), 11 deletions(-) diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index 12163cf2cc5..d091ada182b 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -69,7 +69,7 @@ create table `+"`blüb_tbl`"+` (id int, val1 varchar(20), `+"`blöb1`"+` blob, create table reftable (id int, val1 varchar(20), primary key(id), key(val1)); create table loadtest (id int, name varchar(256), primary key(id), key(name)); create table nopk (name varchar(128), age int unsigned); - create table admins(team_id int, email varchar(128), primary key(team_id), unique key(email)); + create table admins(team_id int, email varchar(128), val varchar(256), primary key(team_id), unique key(email)); `, strings.Join(customerTypes, ",")) // These should always be ignored in vreplication internalSchema = ` diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 3795b6f52d5..749f4d81401 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -288,6 +288,26 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da } } +func waitForResult(t *testing.T, vttablet *cluster.VttabletProcess, database string, query string, want string, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + qr, err := vttablet.QueryTablet(query, database, true) + require.NoError(t, err) + require.NotNil(t, qr) + if want == fmt.Sprintf("%v", qr.Rows) { + return + } + select { + case <-timer.C: + require.FailNow(t, fmt.Sprintf("query %q did not reach the expected result (%s) on tablet %q before the timeout of %s; last seen result: %s", + query, want, vttablet.Name, timeout, qr.Rows)) + default: + time.Sleep(defaultTick) + } + } +} + // waitForSequenceValue queries the provided sequence name in the // provided database using the provided vtgate connection until // we get a next value from it. This allows us to move forward diff --git a/go/test/endtoend/vreplication/unsharded_init_data.sql b/go/test/endtoend/vreplication/unsharded_init_data.sql index 124d4bff99c..6f609e5c5c9 100644 --- a/go/test/endtoend/vreplication/unsharded_init_data.sql +++ b/go/test/endtoend/vreplication/unsharded_init_data.sql @@ -51,5 +51,5 @@ insert into reftable (id, val1) values (3, 'c') insert into reftable (id, val1) values (4, 'd') insert into reftable (id, val1) values (5, 'e') -insert into admins(team_id, email) values(1, 'a@example.com') -insert into admins(team_id, email) values(2, 'b@example.com') +insert into admins(team_id, email, val) values(1, 'a@example.com', 'ibis-1') +insert into admins(team_id, email, val) values(2, 'b@example.com', 'ibis-2') diff --git a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go index 6f1c58f9207..89f20e68a74 100644 --- a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go +++ b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go @@ -2,8 +2,14 @@ package vreplication import ( "testing" + "time" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/endtoend/throttler" + "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" vttablet "vitess.io/vitess/go/vt/vttablet/common" ) @@ -29,20 +35,38 @@ func TestWorkflowDuplicateKeyBackoff(t *testing.T) { _ = targetTabs tables := "customer,admins" + req := &vtctldatapb.UpdateThrottlerConfigRequest{ + Enable: false, + } + res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "customer", req, nil, nil) + require.NoError(t, err, res) + res, err = throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "product", req, nil, nil) + require.NoError(t, err, res) + mt := createMoveTables(t, sourceKeyspaceName, targetKeyspaceName, workflowName, tables, nil, nil, nil) waitForWorkflowState(t, vc, "customer.wf1", binlogdatapb.VReplicationWorkflowState_Running.String()) mt.SwitchReadsAndWrites() vtgateConn, cancel := getVTGateConn() defer cancel() + + // team_id 1 => 80-, team_id 2 => -80 queries := []string{ - "update admins set email = null where team_id = 2", - "update admins set email = 'b@example.com' where team_id = 1", - "update admins set email = 'a@example.com' where team_id = 2", + "update admins set email = null, val = 'ibis-3' where team_id = 2", // -80 + "update admins set email = 'b@example.com', val = 'ibis-4' where team_id = 1", // 80- + "update admins set email = 'a@example.com', val = 'ibis-5' where team_id = 2", // -80 } - vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 2") + vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 1") //-80 for _, query := range queries { execVtgateQuery(t, vtgateConn, targetKeyspaceName, query) } - + // Since -80 is stopped the "update admins set email = 'b@example.com' where team_id = 1" will fail with duplicate key + // since it is already set for team_id = 2 + // The vplayer stream for -80 should backoff with the new logic and retry should be successful once the -80 stream is restarted + time.Sleep(5 * time.Second) + vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Running' where id = 1") + productTab := vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet + waitForResult(t, productTab, "product", "select * from admins order by team_id", + "[[INT32(1) VARCHAR(\"b@example.com\") VARCHAR(\"ibis-4\")] [INT32(2) VARCHAR(\"a@example.com\") VARCHAR(\"ibis-5\")]]", 20*time.Second) + log.Infof("TestWorkflowDuplicateKeyBackoff passed") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index b8339cdf874..842c71a13d5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -168,10 +168,45 @@ func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) { return vc.ExecuteFetch(query, vc.relayLogMaxItems) } +func (vc *vdbClient) IsRetryable(err error) bool { + if sqlErr, ok := err.(*sqlerror.SQLError); ok { + return sqlErr.Number() == sqlerror.ERDupEntry + } + return false +} + +func (vc *vdbClient) ExecuteWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) { + timeout := 1 * time.Minute + shortCtx, cancel := context.WithDeadline(ctx, time.Now().Add(timeout)) + defer cancel() + attempts := 0 + backoffSeconds := 1 + for { + qr, err := vc.ExecuteWithRetry(ctx, query) + if err == nil { + return qr, nil + } + if !vc.IsRetryable(err) { + return nil, err + } + attempts++ + log.Infof("Backing off for %v seconds before retrying query: %v, got err %v", backoffSeconds, query, err) + select { + case <-shortCtx.Done(): + return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "backoff timeout exceeded while retrying query: %v", query) + case <-time.After(time.Duration(backoffSeconds) * time.Second): + } + backoffSeconds = (1 << attempts) >> 1 + } +} + func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) { + ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() qr, err := vc.Execute(query) for err != nil { - if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout { + if sqlErr, ok := err.(*sqlerror.SQLError); ok && + sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout || sqlErr.Number() == sqlerror.ERDupEntry { log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay) if err := vc.Rollback(); err != nil { return nil, err @@ -179,7 +214,7 @@ func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqlty time.Sleep(dbLockRetryDelay) // Check context here. Otherwise this can become an infinite loop. select { - case <-ctx.Done(): + case <-ctx2.Done(): return nil, io.EOF default: } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index db2f3f341ac..06991338af9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -134,7 +134,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map } batchMode := false if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { - batchMode = true + batchMode = false //true // FIXME } if batchMode { // relayLogMaxSize is effectively the limit used when not batching. diff --git a/test/config.json b/test/config.json index 7419f658385..525d8ae0c3c 100644 --- a/test/config.json +++ b/test/config.json @@ -1094,6 +1094,15 @@ "RetryMax": 0, "Tags": [] }, + "vreplication_workflow_dup": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestWorkflowDuplicateKeyBackoff"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cellalias", + "RetryMax": 0, + "Tags": [] + }, "vreplication_multi_tenant": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication","-run", "MultiTenant"], From 8c92dd8d50426c808a1b3d5279db35fd28a0e6f2 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 25 Oct 2024 17:47:09 +0200 Subject: [PATCH 03/15] Self review Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/helper_test.go | 1 + go/vt/vtctl/workflow/server.go | 9 --------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 749f4d81401..ef960b11f47 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -288,6 +288,7 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da } } +// Wait for the data fetched by the query from the specified tablet and database to match the expected result. func waitForResult(t *testing.T, vttablet *cluster.VttabletProcess, database string, query string, want string, timeout time.Duration) { timer := time.NewTimer(timeout) defer timer.Stop() diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 5a87d8708b8..7c49de58c9b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -369,15 +369,6 @@ func (s *Server) GetCellsWithTableReadsSwitched( return cellsSwitched, cellsNotSwitched, nil } -type workflowUtil struct { - targetKeyspace string - workflowName string -} - -func (wu *workflowUtil) String() string { - return fmt.Sprintf("%s/%s", wu.targetKeyspace, wu.workflowName) -} - func (s *Server) GetWorkflow(ctx context.Context, keyspace, workflow string, includeLogs bool, shards []string) (*vtctldatapb.Workflow, error) { res, err := s.GetWorkflows(ctx, &vtctldatapb.GetWorkflowsRequest{ Keyspace: keyspace, From d0fca1f1436d9daa04aef7294ed7b271a01f63ce Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 25 Oct 2024 19:01:58 +0200 Subject: [PATCH 04/15] Remove hardcoded ignoring of the vplayer batching functionality Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/vdbclient.go | 29 ++----------------- .../tabletmanager/vreplication/vplayer.go | 2 +- 2 files changed, 4 insertions(+), 27 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index 842c71a13d5..af52df459de 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -18,6 +18,7 @@ package vreplication import ( "context" + "errors" "io" "strings" "time" @@ -175,37 +176,13 @@ func (vc *vdbClient) IsRetryable(err error) bool { return false } -func (vc *vdbClient) ExecuteWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) { - timeout := 1 * time.Minute - shortCtx, cancel := context.WithDeadline(ctx, time.Now().Add(timeout)) - defer cancel() - attempts := 0 - backoffSeconds := 1 - for { - qr, err := vc.ExecuteWithRetry(ctx, query) - if err == nil { - return qr, nil - } - if !vc.IsRetryable(err) { - return nil, err - } - attempts++ - log.Infof("Backing off for %v seconds before retrying query: %v, got err %v", backoffSeconds, query, err) - select { - case <-shortCtx.Done(): - return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "backoff timeout exceeded while retrying query: %v", query) - case <-time.After(time.Duration(backoffSeconds) * time.Second): - } - backoffSeconds = (1 << attempts) >> 1 - } -} - func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) { ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() qr, err := vc.Execute(query) for err != nil { - if sqlErr, ok := err.(*sqlerror.SQLError); ok && + var sqlErr *sqlerror.SQLError + if errors.As(err, &sqlErr) && sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout || sqlErr.Number() == sqlerror.ERDupEntry { log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay) if err := vc.Rollback(); err != nil { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 06991338af9..db2f3f341ac 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -134,7 +134,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map } batchMode := false if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { - batchMode = false //true // FIXME + batchMode = true } if batchMode { // relayLogMaxSize is effectively the limit used when not batching. From adc21ebd34c29b24f44e6b4f466d5d37e64984c9 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 26 Oct 2024 00:15:44 +0200 Subject: [PATCH 05/15] Add logic to set flag when commit is skipped and to reset it Signed-off-by: Rohit Nayak --- .../workflow_duplicate_key_backoff_test.go | 1 + .../vttablet/tabletmanager/vreplication/vplayer.go | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go index 89f20e68a74..59fe3f1bce7 100644 --- a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go +++ b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go @@ -23,6 +23,7 @@ func TestWorkflowDuplicateKeyBackoff(t *testing.T) { }() defaultRdonly = 0 defaultReplicas = 0 + //setAllVTTabletExperimentalFlags() setupMinimalCluster(t) vttablet.InitVReplicationConfigDefaults() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index db2f3f341ac..ba447db4ebb 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -99,6 +99,8 @@ type vplayer struct { // foreignKeyChecksStateInitialized is set to true once we have initialized the foreignKeyChecksEnabled. // The initialization is done on the first row event that this vplayer sees. foreignKeyChecksStateInitialized bool + + hasSkippedCommit bool } // NoForeignKeyCheckFlagBitmask is the bitmask for the 2nd bit (least significant) of the flags in a binlog row event. @@ -166,7 +168,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map vr.dbClient.maxBatchSize = maxAllowedPacket } - return &vplayer{ + vp := &vplayer{ vr: vr, startPos: settings.StartPos, pos: settings.StartPos, @@ -181,6 +183,13 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map commit: commitFunc, batchMode: batchMode, } + + wrappedCommitFunc := func() error { + vp.hasSkippedCommit = false + return commitFunc() + } + vp.commit = wrappedCommitFunc + return vp } // play is the entry point for playing binlogs. @@ -548,6 +557,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // mustSave flag. if !vp.stopPos.IsZero() && vp.pos.AtLeast(vp.stopPos) { mustSave = true + vp.hasSkippedCommit = false break } // In order to group multiple commits into a single one, we look ahead for @@ -556,6 +566,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // also handles the case where the last transaction is partial. In that case, // we only group the transactions with commits we've seen so far. if hasAnotherCommit(items, i, j+1) { + vp.hasSkippedCommit = true continue } } From c4d05660696d534859bc7bf4440e8e9654f0ec13 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 31 Oct 2024 21:34:06 +0100 Subject: [PATCH 06/15] Added rollback to fix deadlock in test. Still doesn't work with Batching enabled Signed-off-by: Rohit Nayak --- go/mysql/query.go | 8 ++ .../workflow_duplicate_key_backoff_test.go | 7 +- .../tabletmanager/vreplication/vdbclient.go | 2 +- .../tabletmanager/vreplication/vplayer.go | 96 +++++++++++++++++-- 4 files changed, 101 insertions(+), 12 deletions(-) diff --git a/go/mysql/query.go b/go/mysql/query.go index 26582cecbd9..ff712bdc1dc 100644 --- a/go/mysql/query.go +++ b/go/mysql/query.go @@ -23,6 +23,8 @@ import ( "strconv" "strings" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" @@ -348,6 +350,12 @@ func (c *Conn) drainMoreResults(more bool, err error) error { // It returns an additional 'more' flag. If it is set, you must fetch the additional // results using ReadQueryResult. func (c *Conn) ExecuteFetchMulti(query string, maxrows int, wantfields bool) (result *sqltypes.Result, more bool, err error) { + if strings.Contains(query, "rollback") || strings.Contains(query, "commit") || + strings.Contains(query, "begin") || + strings.Contains(query, "admins") && strings.Contains(query, "update") { + + log.Infof("$$$$$$$$$$$$$$ ExecuteFetch: %s", query) + } defer func() { if err != nil { if sqlerr, ok := err.(*sqlerror.SQLError); ok { diff --git a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go index 59fe3f1bce7..0105106a7a4 100644 --- a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go +++ b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go @@ -23,7 +23,7 @@ func TestWorkflowDuplicateKeyBackoff(t *testing.T) { }() defaultRdonly = 0 defaultReplicas = 0 - //setAllVTTabletExperimentalFlags() + setAllVTTabletExperimentalFlags() setupMinimalCluster(t) vttablet.InitVReplicationConfigDefaults() @@ -64,10 +64,11 @@ func TestWorkflowDuplicateKeyBackoff(t *testing.T) { // Since -80 is stopped the "update admins set email = 'b@example.com' where team_id = 1" will fail with duplicate key // since it is already set for team_id = 2 // The vplayer stream for -80 should backoff with the new logic and retry should be successful once the -80 stream is restarted - time.Sleep(5 * time.Second) + time.Sleep(2 * time.Second) // fixme: add check that the table has the expected data after the inserts vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Running' where id = 1") + //time.Sleep(5 * time.Second) productTab := vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet waitForResult(t, productTab, "product", "select * from admins order by team_id", - "[[INT32(1) VARCHAR(\"b@example.com\") VARCHAR(\"ibis-4\")] [INT32(2) VARCHAR(\"a@example.com\") VARCHAR(\"ibis-5\")]]", 20*time.Second) + "[[INT32(1) VARCHAR(\"b@example.com\") VARCHAR(\"ibis-4\")] [INT32(2) VARCHAR(\"a@example.com\") VARCHAR(\"ibis-5\")]]", 30*time.Second) log.Infof("TestWorkflowDuplicateKeyBackoff passed") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index af52df459de..b9cac5f4688 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -183,7 +183,7 @@ func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqlty for err != nil { var sqlErr *sqlerror.SQLError if errors.As(err, &sqlErr) && - sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout || sqlErr.Number() == sqlerror.ERDupEntry { + sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout { log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay) if err := vc.Rollback(); err != nil { return nil, err diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index ba447db4ebb..48ce1b28229 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -26,6 +26,9 @@ import ( "strings" "time" + "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -101,6 +104,8 @@ type vplayer struct { foreignKeyChecksStateInitialized bool hasSkippedCommit bool + isMergeWorkflow bool + dontSkipCommits bool } // NoForeignKeyCheckFlagBitmask is the bitmask for the 2nd bit (least significant) of the flags in a binlog row event. @@ -132,6 +137,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return vr.dbClient.ExecuteWithRetry(ctx, sql) } commitFunc := func() error { + log.Infof("Commit func: %v", vr.dbClient.InTransaction) return vr.dbClient.Commit() } batchMode := false @@ -163,6 +169,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch } commitFunc = func() error { + log.Infof("Batch Commit func: %v", vr.dbClient.InTransaction) return vr.dbClient.CommitTrxQueryBatch() // Commit the current trx batch } vr.dbClient.maxBatchSize = maxAllowedPacket @@ -179,9 +186,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map tablePlans: make(map[string]*TablePlan), phase: phase, throttlerAppName: throttlerapp.VPlayerName.ConcatenateString(vr.throttlerAppName()), - query: queryFunc, - commit: commitFunc, batchMode: batchMode, + isMergeWorkflow: true, } wrappedCommitFunc := func() error { @@ -189,9 +195,82 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return commitFunc() } vp.commit = wrappedCommitFunc + + wrappedQueryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { + result, err := queryFunc(ctx, sql) + var sqlErr *sqlerror.SQLError + if err != nil && errors.As(err, &sqlErr) && + sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow { + return vp.backoffAndRetry(ctx, sql, err) + } + return result, err + } + vp.query = wrappedQueryFunc + return vp } +func (vp *vplayer) isRetryable(err error) bool { + if sqlErr, ok := err.(*sqlerror.SQLError); ok { + return sqlErr.Number() == sqlerror.ERDupEntry + } + return false +} + +func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) { + // We will retry the query if it fails with a duplicate entry error. Since this will be a non-recoverable error + // we should wait for a longer time than we would usually do. The backoff is intended to let the other streams catch up + // especially if global query ordering is important. It is possible there is a replica lag skew between shards because + // of one shard being slower or has more write traffic than the others. + origQuery := query + i := 0 + timeout := 1 * time.Minute + shortCtx, cancel := context.WithDeadline(ctx, time.Now().Add(timeout)) + defer cancel() + attempts := 0 + backoffSeconds := 10 + for { + i++ + query = fmt.Sprintf("%s /* backoff:: %d */", origQuery, i) + qr, err := vp.vr.dbClient.ExecuteWithRetry(ctx, query) + log.Flush() + if err == nil { + vp.vr.dbClient.Commit() + return qr, nil + } + if err := vp.vr.dbClient.Rollback(); err != nil { + return nil, err + } + if !vp.isRetryable(err) { + return nil, err + } + attempts++ + log.Infof("Backing off for %v seconds before retrying query: %v, got err %v", backoffSeconds, query, err) + select { + case <-shortCtx.Done(): + return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "backoff timeout exceeded while retrying query: %v", query) + case <-time.After(time.Duration(backoffSeconds) * time.Second): + } + // Exponential backoff with a maximum of "timeout" seconds. + backoffSeconds = (1 << attempts) >> 1 + } +} + +// backoffAndRetry retries the query after a backoff period. +func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string, err error) (*sqltypes.Result, error) { + vp.dontSkipCommits = true + //FIXME : set dontSkipCommits to false after some time? + if vp.hasSkippedCommit { + log.Infof(">>>>>>>> found skipped Commit, issuing a commit before retrying the query: %v", sql) + vp.commit() // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit() + if err := vp.vr.dbClient.Begin(); err != nil { + return nil, err + } + } + return vp.executeWithRetryAndBackoff(ctx, sql) + +} + // play is the entry point for playing binlogs. func (vp *vplayer) play(ctx context.Context) error { if !vp.stopPos.IsZero() && vp.startPos.AtLeast(vp.stopPos) { @@ -506,11 +585,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { return ctx.Err() } // Check throttler. - if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok { - _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary()) - estimateLag() - continue - } + //if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok { + // _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary()) + // estimateLag() + // continue + //} items, err := relay.Fetch() if err != nil { @@ -565,7 +644,8 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // applying the next set of events as part of the current transaction. This approach // also handles the case where the last transaction is partial. In that case, // we only group the transactions with commits we've seen so far. - if hasAnotherCommit(items, i, j+1) { + if /*vp.dontSkipCommits && */ hasAnotherCommit(items, i, j+1) { + log.Infof(">>>>>>>> skipping commit") vp.hasSkippedCommit = true continue } From 0aecdce67540a413af49b038f31dc293e04e17bb Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 6 Nov 2024 22:29:09 +0100 Subject: [PATCH 07/15] Working test with no Batching enabled Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/vdbclient.go | 11 ++++ .../tabletmanager/vreplication/vplayer.go | 62 ++++++++++++------- 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index b9cac5f4688..5a0f7255ace 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -147,6 +147,17 @@ func (vc *vdbClient) AddQueryToTrxBatch(query string) error { return nil } +func (vc *vdbClient) PopLastQueryFromBatch() error { + if !vc.InTransaction { + return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "cannot pop query outside of a transaction") + } + if vc.batchSize > 0 { + vc.batchSize -= 1 + vc.queries = vc.queries[:len(vc.queries)-1] + } + return nil +} + // ExecuteQueryBatch sends the transaction's current batch of queries // down the wire to the database. func (vc *vdbClient) ExecuteTrxQueryBatch() ([]*sqltypes.Result, error) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 48ce1b28229..b813b6d006a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -106,6 +106,7 @@ type vplayer struct { hasSkippedCommit bool isMergeWorkflow bool dontSkipCommits bool + inBackoff bool } // NoForeignKeyCheckFlagBitmask is the bitmask for the 2nd bit (least significant) of the flags in a binlog row event. @@ -132,6 +133,25 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map settings.StopPos = pausePos saveStop = false } + batchMode := false + if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { + batchMode = true + } + + vp := &vplayer{ + vr: vr, + startPos: settings.StartPos, + pos: settings.StartPos, + stopPos: settings.StopPos, + saveStop: saveStop, + copyState: copyState, + timeLastSaved: time.Now(), + tablePlans: make(map[string]*TablePlan), + phase: phase, + throttlerAppName: throttlerapp.VPlayerName.ConcatenateString(vr.throttlerAppName()), + batchMode: batchMode, + isMergeWorkflow: true, + } queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { return vr.dbClient.ExecuteWithRetry(ctx, sql) @@ -140,10 +160,6 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map log.Infof("Commit func: %v", vr.dbClient.InTransaction) return vr.dbClient.Commit() } - batchMode := false - if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { - batchMode = true - } if batchMode { // relayLogMaxSize is effectively the limit used when not batching. maxAllowedPacket := int64(vr.workflowConfig.RelayLogMaxSize) @@ -169,29 +185,19 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch } commitFunc = func() error { - log.Infof("Batch Commit func: %v", vr.dbClient.InTransaction) + log.Infof("Batch Commit func: In Transaction? %v", vr.dbClient.InTransaction) + if vp.inBackoff { + // We get into backoff when there is a ERDupQuery error. + vr.dbClient.PopLastQueryFromBatch() + } return vr.dbClient.CommitTrxQueryBatch() // Commit the current trx batch } vr.dbClient.maxBatchSize = maxAllowedPacket } - vp := &vplayer{ - vr: vr, - startPos: settings.StartPos, - pos: settings.StartPos, - stopPos: settings.StopPos, - saveStop: saveStop, - copyState: copyState, - timeLastSaved: time.Now(), - tablePlans: make(map[string]*TablePlan), - phase: phase, - throttlerAppName: throttlerapp.VPlayerName.ConcatenateString(vr.throttlerAppName()), - batchMode: batchMode, - isMergeWorkflow: true, - } - wrappedCommitFunc := func() error { vp.hasSkippedCommit = false + return commitFunc() } vp.commit = wrappedCommitFunc @@ -199,7 +205,11 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map wrappedQueryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { result, err := queryFunc(ctx, sql) var sqlErr *sqlerror.SQLError - if err != nil && errors.As(err, &sqlErr) && + isSqlErr := errors.As(err, &sqlErr) + if err != nil { + log.Errorf(">>>>>>>>>>>> Error executing query: %v, isSqlErr %t ,err: %v", sql, isSqlErr, err) + } + if err != nil && isSqlErr && sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow { return vp.backoffAndRetry(ctx, sql, err) } @@ -259,10 +269,18 @@ func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) // backoffAndRetry retries the query after a backoff period. func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string, err error) (*sqltypes.Result, error) { vp.dontSkipCommits = true + log.Infof("Setting inBackoff to true for query: %v", sql) + vp.inBackoff = true + defer func() { + log.Infof("Setting inBackoff to false after query: %v", sql) + vp.inBackoff = false + }() //FIXME : set dontSkipCommits to false after some time? if vp.hasSkippedCommit { log.Infof(">>>>>>>> found skipped Commit, issuing a commit before retrying the query: %v", sql) - vp.commit() // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit() + if err := vp.commit(); err != nil { + return nil, err + } // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit() if err := vp.vr.dbClient.Begin(); err != nil { return nil, err } From c2efb9f8250d0084b783f709496ca7293200141e Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 7 Nov 2024 16:55:00 +0100 Subject: [PATCH 08/15] Working e2e tests with and without transaction batching. Leaving comments added for debugging. Code needs better organization Signed-off-by: Rohit Nayak --- go/mysql/query.go | 8 -- go/test/endtoend/vreplication/cluster_test.go | 6 +- .../workflow_duplicate_key_backoff_test.go | 14 ++- .../tabletmanager/vreplication/vdbclient.go | 4 + .../tabletmanager/vreplication/vplayer.go | 99 ++++++++++++++----- 5 files changed, 98 insertions(+), 33 deletions(-) diff --git a/go/mysql/query.go b/go/mysql/query.go index ff712bdc1dc..26582cecbd9 100644 --- a/go/mysql/query.go +++ b/go/mysql/query.go @@ -23,8 +23,6 @@ import ( "strconv" "strings" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" @@ -350,12 +348,6 @@ func (c *Conn) drainMoreResults(more bool, err error) error { // It returns an additional 'more' flag. If it is set, you must fetch the additional // results using ReadQueryResult. func (c *Conn) ExecuteFetchMulti(query string, maxrows int, wantfields bool) (result *sqltypes.Result, more bool, err error) { - if strings.Contains(query, "rollback") || strings.Contains(query, "commit") || - strings.Contains(query, "begin") || - strings.Contains(query, "admins") && strings.Contains(query, "update") { - - log.Infof("$$$$$$$$$$$$$$ ExecuteFetch: %s", query) - } defer func() { if err != nil { if sqlerr, ok := err.(*sqlerror.SQLError); ok { diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 119843651bc..3dc1c9758fe 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -345,7 +345,7 @@ func getClusterConfig(idx int, dataRootDir string) *ClusterConfig { func init() { // for local debugging set this variable so that each run uses VTDATAROOT instead of a random dir // and also does not teardown the cluster for inspecting logs and the databases - if os.Getenv("VREPLICATION_E2E_DEBUG") != "" { + if os.Getenv("VREPLICATION_E2E_DEBUG") != "on" { debugMode = true } originalVtdataroot = os.Getenv("VTDATAROOT") @@ -440,7 +440,10 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) { // This is always set to "true" on GitHub Actions runners: // https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables ci, ok := os.LookupEnv("CI") + fmt.Printf(">>>>>>>>>>>>>>. os.LookupEnv(\"CI\") returns ci, ok: %s, %t\n", ci, ok) if !ok || strings.ToLower(ci) != "true" { + fmt.Println("Not running in CI, skipping cleanup") + //panic("leaving directory") // Leave the directory in place to support local debugging. return } @@ -459,6 +462,7 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) { time.Sleep(1 * time.Second) } require.NoError(t, err) + log.Infof("Recreating vtdataroot %q: %t", dir, recreate) if recreate { err = os.Mkdir(dir, 0700) require.NoError(t, err) diff --git a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go index 0105106a7a4..81c2bb3a905 100644 --- a/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go +++ b/go/test/endtoend/vreplication/workflow_duplicate_key_backoff_test.go @@ -14,6 +14,16 @@ import ( ) func TestWorkflowDuplicateKeyBackoff(t *testing.T) { + t.Run("TestWorkflowDuplicateKeyBackoff with batching off", func(t *testing.T) { + testWorkflowDuplicateKeyBackoff(t, false) + }) + t.Run("TestWorkflowDuplicateKeyBackoff with batching on", func(t *testing.T) { + testWorkflowDuplicateKeyBackoff(t, true) + }) +} + +func testWorkflowDuplicateKeyBackoff(t *testing.T, setExperimentalFlags bool) { + debugMode = false setSidecarDBName("_vt") origDefaultRdonly := defaultRdonly origDefailtReplica := defaultReplicas @@ -23,7 +33,9 @@ func TestWorkflowDuplicateKeyBackoff(t *testing.T) { }() defaultRdonly = 0 defaultReplicas = 0 - setAllVTTabletExperimentalFlags() + if setExperimentalFlags { + setAllVTTabletExperimentalFlags() + } setupMinimalCluster(t) vttablet.InitVReplicationConfigDefaults() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index 5a0f7255ace..d52ac306c39 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -103,6 +103,10 @@ func (vc *vdbClient) CommitTrxQueryBatch() error { return nil } +func (vc *vdbClient) GetQueries() []string { + return vc.queries +} + func (vc *vdbClient) Rollback() error { if !vc.InTransaction { return nil diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index b813b6d006a..25696c05afd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -63,6 +63,8 @@ type vplayer struct { replicatorPlan *ReplicatorPlan tablePlans map[string]*TablePlan + ctx context.Context + // These are set when creating the VPlayer based on whether the VPlayer // is in batch (stmt and trx) execution mode or not. query func(ctx context.Context, sql string) (*sqltypes.Result, error) @@ -184,11 +186,45 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map } return nil, vr.dbClient.AddQueryToTrxBatch(sql) // Should become part of the trx batch } + unrollBatch := func() error { + batchedQueries := vr.dbClient.GetQueries() + if len(batchedQueries) == 0 { + return nil + } + for _, query := range batchedQueries { + log.Infof("Unrolling batch: exec %v", query) + + _, err := vr.dbClient.Execute(query) + if err != nil { + log.Infof("Unrolling batch: failed to exec %v: %v", query, err) + if vp.mustBackoff(err) { + log.Infof("Unrolling batch: backoff needed for query: %v", query) + if vp.hasSkippedCommit { + log.Infof("Unrolling batch: found skipped Commit, issuing a commit before retrying the query: %v", query) + if err := vr.dbClient.Commit(); err != nil { + return err + } + if err := vr.dbClient.Begin(); err != nil { + return err + } + } + _, err2 := vp.backoffAndRetry(vp.ctx, query) + if err2 != nil { + return err2 + } + } + } else { + log.Infof("Unrolling batch: exec %v succeeded", query) + } + } + return vr.dbClient.Commit() + } commitFunc = func() error { - log.Infof("Batch Commit func: In Transaction? %v", vr.dbClient.InTransaction) + log.Infof("Batch Commit func: In Transaction %v", vr.dbClient.InTransaction) if vp.inBackoff { - // We get into backoff when there is a ERDupQuery error. - vr.dbClient.PopLastQueryFromBatch() + // We get into backoff when there is a ERDupQuery error. So one of the queries in the batch is + // causing the issue. We need to run all queries until that one first and then backoff/retry that one + return unrollBatch() } return vr.dbClient.CommitTrxQueryBatch() // Commit the current trx batch } @@ -197,21 +233,24 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map wrappedCommitFunc := func() error { vp.hasSkippedCommit = false - + err := commitFunc() + if !vp.batchMode { + return err + } + vp.inBackoff = true + defer func() { + vp.inBackoff = false + }() + log.Infof("In backoff in wrapped commit func for batch mode, batched queries: %v", vp.vr.dbClient.GetQueries()) return commitFunc() } vp.commit = wrappedCommitFunc wrappedQueryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { result, err := queryFunc(ctx, sql) - var sqlErr *sqlerror.SQLError - isSqlErr := errors.As(err, &sqlErr) - if err != nil { - log.Errorf(">>>>>>>>>>>> Error executing query: %v, isSqlErr %t ,err: %v", sql, isSqlErr, err) - } - if err != nil && isSqlErr && - sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow { - return vp.backoffAndRetry(ctx, sql, err) + log.Infof("wrapped query func: %v, err: %v", sql, err) + if err != nil && vp.mustBackoff(err) { + return vp.backoffAndRetry(ctx, sql) } return result, err } @@ -227,6 +266,17 @@ func (vp *vplayer) isRetryable(err error) bool { return false } +func (vp *vplayer) mustBackoff(err error) bool { + var sqlErr *sqlerror.SQLError + isSqlErr := errors.As(err, &sqlErr) + if err != nil && isSqlErr && + sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow { + log.Infof("mustBackoff for err: %v", err) + return true + } + return false +} + func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) { // We will retry the query if it fails with a duplicate entry error. Since this will be a non-recoverable error // we should wait for a longer time than we would usually do. The backoff is intended to let the other streams catch up @@ -267,7 +317,8 @@ func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) } // backoffAndRetry retries the query after a backoff period. -func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string, err error) (*sqltypes.Result, error) { +func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string) (*sqltypes.Result, error) { + vp.ctx = ctx vp.dontSkipCommits = true log.Infof("Setting inBackoff to true for query: %v", sql) vp.inBackoff = true @@ -276,17 +327,19 @@ func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string, err error) ( vp.inBackoff = false }() //FIXME : set dontSkipCommits to false after some time? - if vp.hasSkippedCommit { - log.Infof(">>>>>>>> found skipped Commit, issuing a commit before retrying the query: %v", sql) - if err := vp.commit(); err != nil { - return nil, err - } // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit() - if err := vp.vr.dbClient.Begin(); err != nil { - return nil, err + if !vp.batchMode { + if vp.hasSkippedCommit { + log.Infof(">>>>>>>> found skipped Commit, issuing a commit before retrying the query: %v", sql) + if err := vp.commit(); err != nil { + return nil, err + } // vp.hasSkippedCommit is reset in the wrapped commit function vp.commit() + if err := vp.vr.dbClient.Begin(); err != nil { + return nil, err + } } + return vp.executeWithRetryAndBackoff(ctx, sql) } - return vp.executeWithRetryAndBackoff(ctx, sql) - + return nil, vp.commit() // is batch mode } // play is the entry point for playing binlogs. @@ -662,7 +715,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // applying the next set of events as part of the current transaction. This approach // also handles the case where the last transaction is partial. In that case, // we only group the transactions with commits we've seen so far. - if /*vp.dontSkipCommits && */ hasAnotherCommit(items, i, j+1) { + if vp.dontSkipCommits && hasAnotherCommit(items, i, j+1) { log.Infof(">>>>>>>> skipping commit") vp.hasSkippedCommit = true continue From b85628d3d9d2fb7c2e39680af40f1cbb20bab7e2 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 7 Nov 2024 19:44:43 +0100 Subject: [PATCH 09/15] Fix incorrect changes to look for VREPLICATION_E2E_DEBUG Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/cluster_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 3dc1c9758fe..53a727e5068 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -345,7 +345,7 @@ func getClusterConfig(idx int, dataRootDir string) *ClusterConfig { func init() { // for local debugging set this variable so that each run uses VTDATAROOT instead of a random dir // and also does not teardown the cluster for inspecting logs and the databases - if os.Getenv("VREPLICATION_E2E_DEBUG") != "on" { + if os.Getenv("VREPLICATION_E2E_DEBUG") != "" { debugMode = true } originalVtdataroot = os.Getenv("VTDATAROOT") @@ -440,10 +440,8 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) { // This is always set to "true" on GitHub Actions runners: // https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables ci, ok := os.LookupEnv("CI") - fmt.Printf(">>>>>>>>>>>>>>. os.LookupEnv(\"CI\") returns ci, ok: %s, %t\n", ci, ok) if !ok || strings.ToLower(ci) != "true" { fmt.Println("Not running in CI, skipping cleanup") - //panic("leaving directory") // Leave the directory in place to support local debugging. return } @@ -462,7 +460,6 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) { time.Sleep(1 * time.Second) } require.NoError(t, err) - log.Infof("Recreating vtdataroot %q: %t", dir, recreate) if recreate { err = os.Mkdir(dir, 0700) require.NoError(t, err) From 028b04978e4d4e0b9ff5a24d72a7289946bb28f7 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 8 Nov 2024 19:29:06 +0100 Subject: [PATCH 10/15] Fix incorrect logic for skipping commits Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 25696c05afd..ce055d20ba5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -325,6 +325,7 @@ func (vp *vplayer) backoffAndRetry(ctx context.Context, sql string) (*sqltypes.R defer func() { log.Infof("Setting inBackoff to false after query: %v", sql) vp.inBackoff = false + vp.dontSkipCommits = false }() //FIXME : set dontSkipCommits to false after some time? if !vp.batchMode { @@ -715,7 +716,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // applying the next set of events as part of the current transaction. This approach // also handles the case where the last transaction is partial. In that case, // we only group the transactions with commits we've seen so far. - if vp.dontSkipCommits && hasAnotherCommit(items, i, j+1) { + if !vp.dontSkipCommits && hasAnotherCommit(items, i, j+1) { log.Infof(">>>>>>>> skipping commit") vp.hasSkippedCommit = true continue From 405c70fad875a534268b79507de56e6e658585c2 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 8 Nov 2024 12:22:22 +0100 Subject: [PATCH 11/15] Running vdiff using both vtctlclient and vtctldclient seems to cause the following vdiff to fail randomly. Only run in vtctldclient to avoid flakiness Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/movetables_buffering_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index eed96768fc5..7ef75390fbc 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -39,7 +39,7 @@ func TestMoveTablesBuffering(t *testing.T) { catchup(t, targetTab1, workflowName, "MoveTables") catchup(t, targetTab2, workflowName, "MoveTables") - vdiffSideBySide(t, ksWorkflow, "") + vdiff(t, targetKs, workflowName, "", false, true, nil) waitForLowLag(t, "customer", workflowName) for i := 0; i < 10; i++ { tstWorkflowSwitchReadsAndWrites(t) From 779de8cd3f7e42092d8f63fa0908bb4097d46c8a Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 8 Nov 2024 22:25:10 +0100 Subject: [PATCH 12/15] Some cleanup. Attempt to fix flaky throttler test: throttle-app-customer Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/cluster_test.go | 1 - go/test/endtoend/vreplication/config_test.go | 12 +++++----- .../vreplication/vreplication_test.go | 8 ++++++- .../tabletmanager/vreplication/vdbclient.go | 24 ++----------------- .../tabletmanager/vreplication/vplayer.go | 5 ++-- 5 files changed, 18 insertions(+), 32 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 53a727e5068..119843651bc 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -441,7 +441,6 @@ func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) { // https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables ci, ok := os.LookupEnv("CI") if !ok || strings.ToLower(ci) != "true" { - fmt.Println("Not running in CI, skipping cleanup") // Leave the directory in place to support local debugging. return } diff --git a/go/test/endtoend/vreplication/config_test.go b/go/test/endtoend/vreplication/config_test.go index d091ada182b..c5f4ca6e090 100644 --- a/go/test/endtoend/vreplication/config_test.go +++ b/go/test/endtoend/vreplication/config_test.go @@ -162,12 +162,12 @@ create table nopk (name varchar(128), age int unsigned); }, "admins": { "column_vindexes": [ - { - "column": "team_id", - "name": "reverse_bits" - } - ] - }, + { + "column": "team_id", + "name": "reverse_bits" + } + ] + }, "enterprise_customer": { "column_vindexes": [ { diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 04a5eabc33b..d37b85104e0 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -1259,12 +1259,18 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { // Now, throttle vreplication on the target side (vplayer), and insert some // more rows. for _, tab := range customerTablets { + { + body, err := unthrottleApp(tab, sourceThrottlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, sourceThrottlerAppName) + waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled) + } body, err := throttleApp(tab, targetThrottlerAppName) assert.NoError(t, err) assert.Contains(t, body, targetThrottlerAppName) // Wait for throttling to take effect (caching will expire by this time): - waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusThrottled) waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled) + waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusThrottled) } insertMoreProductsForTargetThrottler(t) // To be fair to the test, we give the target time to apply the new changes. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index d52ac306c39..5eb857963f0 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -151,18 +151,7 @@ func (vc *vdbClient) AddQueryToTrxBatch(query string) error { return nil } -func (vc *vdbClient) PopLastQueryFromBatch() error { - if !vc.InTransaction { - return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "cannot pop query outside of a transaction") - } - if vc.batchSize > 0 { - vc.batchSize -= 1 - vc.queries = vc.queries[:len(vc.queries)-1] - } - return nil -} - -// ExecuteQueryBatch sends the transaction's current batch of queries +// ExecuteTrxQueryBatch sends the transaction's current batch of queries // down the wire to the database. func (vc *vdbClient) ExecuteTrxQueryBatch() ([]*sqltypes.Result, error) { defer vc.stats.Timings.Record(binlogplayer.BlplMultiQuery, time.Now()) @@ -184,16 +173,7 @@ func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) { return vc.ExecuteFetch(query, vc.relayLogMaxItems) } -func (vc *vdbClient) IsRetryable(err error) bool { - if sqlErr, ok := err.(*sqlerror.SQLError); ok { - return sqlErr.Number() == sqlerror.ERDupEntry - } - return false -} - func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) { - ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() qr, err := vc.Execute(query) for err != nil { var sqlErr *sqlerror.SQLError @@ -206,7 +186,7 @@ func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqlty time.Sleep(dbLockRetryDelay) // Check context here. Otherwise this can become an infinite loop. select { - case <-ctx2.Done(): + case <-ctx.Done(): return nil, io.EOF default: } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index ce055d20ba5..91729fa27cf 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -292,8 +292,9 @@ func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) for { i++ query = fmt.Sprintf("%s /* backoff:: %d */", origQuery, i) - qr, err := vp.vr.dbClient.ExecuteWithRetry(ctx, query) - log.Flush() + shorterCtx, cancel2 := context.WithTimeout(shortCtx, time.Duration(backoffSeconds)*time.Second) + defer cancel2() + qr, err := vp.vr.dbClient.ExecuteWithRetry(shorterCtx, query) if err == nil { vp.vr.dbClient.Commit() return qr, nil From c68d6f694fbcb3881dc8fdbb70e39837d31aecea Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 9 Nov 2024 09:29:45 +0100 Subject: [PATCH 13/15] Re-enable throttler. Allow tests that depend on DBTypeVersion to run locally on Mac with installed version Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/vreplication_test.go | 14 ++++++++++---- .../vttablet/tabletmanager/vreplication/vplayer.go | 10 +++++----- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index d37b85104e0..a68f586af0f 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -29,6 +29,8 @@ import ( "testing" "time" + "github.com/prometheus/common/version" + "github.com/buger/jsonparser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -294,14 +296,18 @@ func TestVreplicationCopyThrottling(t *testing.T) { func TestBasicVreplicationWorkflow(t *testing.T) { defer setAllVTTabletExperimentalFlags() - sourceKsOpts["DBTypeVersion"] = "mysql-8.0" - targetKsOpts["DBTypeVersion"] = "mysql-8.0" + if version.GoOS != "darwin" { // allow running locally on Mac + sourceKsOpts["DBTypeVersion"] = "mysql-8.0" + targetKsOpts["DBTypeVersion"] = "mysql-8.0" + } testBasicVreplicationWorkflow(t, "noblob") } func TestVreplicationCopyParallel(t *testing.T) { - sourceKsOpts["DBTypeVersion"] = "mysql-5.7" - targetKsOpts["DBTypeVersion"] = "mysql-5.7" + if version.GoOS != "darwin" { // allow running locally on Mac + sourceKsOpts["DBTypeVersion"] = "mysql-5.7" + targetKsOpts["DBTypeVersion"] = "mysql-5.7" + } extraVTTabletArgs = []string{ parallelInsertWorkers, } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 91729fa27cf..83517405018 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -658,11 +658,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { return ctx.Err() } // Check throttler. - //if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok { - // _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary()) - // estimateLag() - // continue - //} + if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok { + _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary()) + estimateLag() + continue + } items, err := relay.Fetch() if err != nil { From 88408ca0bbdc8b4cdb392ecbddf9640bbeb8aa9d Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 9 Nov 2024 10:36:11 +0100 Subject: [PATCH 14/15] Only retry movetables/reshard workflows Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 83517405018..c50354e0c9f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -260,6 +260,14 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map } func (vp *vplayer) isRetryable(err error) bool { + workflowType := binlogdatapb.VReplicationWorkflowType(vp.vr.WorkflowType) + // Only retry on duplicate entry errors for merge workflows and if it is a movetables/reshard workflow. + if !vp.isMergeWorkflow || + !(workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables || + workflowType == binlogdatapb.VReplicationWorkflowType_Reshard) { + + return false + } if sqlErr, ok := err.(*sqlerror.SQLError); ok { return sqlErr.Number() == sqlerror.ERDupEntry } From 79178e1b7e6678829a7572daf98f7b638809f1fa Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 9 Nov 2024 10:43:15 +0100 Subject: [PATCH 15/15] Don't try backoff for online ddl Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/vplayer.go | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index c50354e0c9f..170e4801d1c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -259,9 +259,9 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return vp } -func (vp *vplayer) isRetryable(err error) bool { +// Only use backoff/retry logic for duplicate entry errors for merge workflows and if it is a movetables/reshard workflow. +func (vp *vplayer) mustBackoff(err error) bool { workflowType := binlogdatapb.VReplicationWorkflowType(vp.vr.WorkflowType) - // Only retry on duplicate entry errors for merge workflows and if it is a movetables/reshard workflow. if !vp.isMergeWorkflow || !(workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables || workflowType == binlogdatapb.VReplicationWorkflowType_Reshard) { @@ -274,17 +274,6 @@ func (vp *vplayer) isRetryable(err error) bool { return false } -func (vp *vplayer) mustBackoff(err error) bool { - var sqlErr *sqlerror.SQLError - isSqlErr := errors.As(err, &sqlErr) - if err != nil && isSqlErr && - sqlErr.Number() == sqlerror.ERDupEntry && vp.isMergeWorkflow { - log.Infof("mustBackoff for err: %v", err) - return true - } - return false -} - func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) (*sqltypes.Result, error) { // We will retry the query if it fails with a duplicate entry error. Since this will be a non-recoverable error // we should wait for a longer time than we would usually do. The backoff is intended to let the other streams catch up @@ -310,7 +299,7 @@ func (vp *vplayer) executeWithRetryAndBackoff(ctx context.Context, query string) if err := vp.vr.dbClient.Rollback(); err != nil { return nil, err } - if !vp.isRetryable(err) { + if !vp.mustBackoff(err) { return nil, err } attempts++