Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] VReplication Merge Workflows: add heuristic to handle valid duplicate key errors caused by unique key columns. #17074

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ create table `+"`blüb_tbl`"+` (id int, val1 varchar(20), `+"`blöb1`"+` blob,
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
create table loadtest (id int, name varchar(256), primary key(id), key(name));
create table nopk (name varchar(128), age int unsigned);
create table admins(team_id int, email varchar(128), val varchar(256), primary key(team_id), unique key(email));
`, strings.Join(customerTypes, ","))
// These should always be ignored in vreplication
internalSchema = `
Expand All @@ -85,6 +86,7 @@ create table nopk (name varchar(128), age int unsigned);
"tables": {
"product": {},
"merchant": {},
"admins": {},
"orders": {},
"loadtest": {},
"customer": {},
Expand Down Expand Up @@ -158,8 +160,16 @@ create table nopk (name varchar(128), age int unsigned);
}
]
},
"enterprise_customer": {
"admins": {
"column_vindexes": [
{
"column": "team_id",
"name": "reverse_bits"
}
]
},
"enterprise_customer": {
"column_vindexes": [
{
"column": "cid",
"name": "xxhash"
Expand Down
21 changes: 21 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,27 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da
}
}

// Wait for the data fetched by the query from the specified tablet and database to match the expected result.
func waitForResult(t *testing.T, vttablet *cluster.VttabletProcess, database string, query string, want string, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
qr, err := vttablet.QueryTablet(query, database, true)
require.NoError(t, err)
require.NotNil(t, qr)
if want == fmt.Sprintf("%v", qr.Rows) {
return
}
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("query %q did not reach the expected result (%s) on tablet %q before the timeout of %s; last seen result: %s",
query, want, vttablet.Name, timeout, qr.Rows))
default:
time.Sleep(defaultTick)
}
}
}

// waitForSequenceValue queries the provided sequence name in the
// provided database using the provided vtgate connection until
// we get a next value from it. This allows us to move forward
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestMoveTablesBuffering(t *testing.T) {

catchup(t, targetTab1, workflowName, "MoveTables")
catchup(t, targetTab2, workflowName, "MoveTables")
vdiffSideBySide(t, ksWorkflow, "")
vdiff(t, targetKs, workflowName, "", false, true, nil)
waitForLowLag(t, "customer", workflowName)
for i := 0; i < 10; i++ {
tstWorkflowSwitchReadsAndWrites(t)
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vreplication/unsharded_init_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ insert into reftable (id, val1) values (2, 'b')
insert into reftable (id, val1) values (3, 'c')
insert into reftable (id, val1) values (4, 'd')
insert into reftable (id, val1) values (5, 'e')

insert into admins(team_id, email, val) values(1, '[email protected]', 'ibis-1')
insert into admins(team_id, email, val) values(2, '[email protected]', 'ibis-2')
22 changes: 17 additions & 5 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"testing"
"time"

"github.com/prometheus/common/version"

"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -294,14 +296,18 @@ func TestVreplicationCopyThrottling(t *testing.T) {

func TestBasicVreplicationWorkflow(t *testing.T) {
defer setAllVTTabletExperimentalFlags()
sourceKsOpts["DBTypeVersion"] = "mysql-8.0"
targetKsOpts["DBTypeVersion"] = "mysql-8.0"
if version.GoOS != "darwin" { // allow running locally on Mac
sourceKsOpts["DBTypeVersion"] = "mysql-8.0"
targetKsOpts["DBTypeVersion"] = "mysql-8.0"
}
testBasicVreplicationWorkflow(t, "noblob")
}

func TestVreplicationCopyParallel(t *testing.T) {
sourceKsOpts["DBTypeVersion"] = "mysql-5.7"
targetKsOpts["DBTypeVersion"] = "mysql-5.7"
if version.GoOS != "darwin" { // allow running locally on Mac
sourceKsOpts["DBTypeVersion"] = "mysql-5.7"
targetKsOpts["DBTypeVersion"] = "mysql-5.7"
}
extraVTTabletArgs = []string{
parallelInsertWorkers,
}
Expand Down Expand Up @@ -1259,12 +1265,18 @@ func materializeProduct(t *testing.T, useVtctldClient bool) {
// Now, throttle vreplication on the target side (vplayer), and insert some
// more rows.
for _, tab := range customerTablets {
{
body, err := unthrottleApp(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, sourceThrottlerAppName)
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
}
body, err := throttleApp(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
// Wait for throttling to take effect (caching will expire by this time):
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusThrottled)
waitForTabletThrottlingStatus(t, tab, sourceThrottlerAppName, throttlerStatusNotThrottled)
waitForTabletThrottlingStatus(t, tab, targetThrottlerAppName, throttlerStatusThrottled)
}
insertMoreProductsForTargetThrottler(t)
// To be fair to the test, we give the target time to apply the new changes.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package vreplication

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

func TestWorkflowDuplicateKeyBackoff(t *testing.T) {
t.Run("TestWorkflowDuplicateKeyBackoff with batching off", func(t *testing.T) {
testWorkflowDuplicateKeyBackoff(t, false)
})
t.Run("TestWorkflowDuplicateKeyBackoff with batching on", func(t *testing.T) {
testWorkflowDuplicateKeyBackoff(t, true)
})
}

func testWorkflowDuplicateKeyBackoff(t *testing.T, setExperimentalFlags bool) {
debugMode = false
setSidecarDBName("_vt")
origDefaultRdonly := defaultRdonly
origDefailtReplica := defaultReplicas
defer func() {
defaultRdonly = origDefaultRdonly
defaultReplicas = origDefailtReplica
}()
defaultRdonly = 0
defaultReplicas = 0
if setExperimentalFlags {
setAllVTTabletExperimentalFlags()
}

setupMinimalCluster(t)
vttablet.InitVReplicationConfigDefaults()
defer vc.TearDown()

sourceKeyspaceName := "product"
targetKeyspaceName := "customer"
workflowName := "wf1"
targetTabs := setupMinimalCustomerKeyspace(t)
_ = targetTabs
tables := "customer,admins"

req := &vtctldatapb.UpdateThrottlerConfigRequest{
Enable: false,
}
res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "customer", req, nil, nil)
require.NoError(t, err, res)
res, err = throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "product", req, nil, nil)
require.NoError(t, err, res)

mt := createMoveTables(t, sourceKeyspaceName, targetKeyspaceName, workflowName, tables, nil, nil, nil)
waitForWorkflowState(t, vc, "customer.wf1", binlogdatapb.VReplicationWorkflowState_Running.String())
mt.SwitchReadsAndWrites()
vtgateConn, cancel := getVTGateConn()
defer cancel()

// team_id 1 => 80-, team_id 2 => -80
queries := []string{
"update admins set email = null, val = 'ibis-3' where team_id = 2", // -80
"update admins set email = '[email protected]', val = 'ibis-4' where team_id = 1", // 80-
"update admins set email = '[email protected]', val = 'ibis-5' where team_id = 2", // -80
}

vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Stopped' where id = 1") //-80
for _, query := range queries {
execVtgateQuery(t, vtgateConn, targetKeyspaceName, query)
}
// Since -80 is stopped the "update admins set email = '[email protected]' where team_id = 1" will fail with duplicate key
// since it is already set for team_id = 2
// The vplayer stream for -80 should backoff with the new logic and retry should be successful once the -80 stream is restarted
time.Sleep(2 * time.Second) // fixme: add check that the table has the expected data after the inserts
vc.VtctlClient.ExecuteCommandWithOutput("VReplicationExec", "zone1-100", "update _vt.vreplication set state = 'Running' where id = 1")
//time.Sleep(5 * time.Second)
productTab := vc.Cells["zone1"].Keyspaces[sourceKeyspaceName].Shards["0"].Tablets["zone1-100"].Vttablet
waitForResult(t, productTab, "product", "select * from admins order by team_id",
"[[INT32(1) VARCHAR(\"[email protected]\") VARCHAR(\"ibis-4\")] [INT32(2) VARCHAR(\"[email protected]\") VARCHAR(\"ibis-5\")]]", 30*time.Second)
log.Infof("TestWorkflowDuplicateKeyBackoff passed")
}
11 changes: 9 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vreplication

import (
"context"
"errors"
"io"
"strings"
"time"
Expand Down Expand Up @@ -102,6 +103,10 @@ func (vc *vdbClient) CommitTrxQueryBatch() error {
return nil
}

func (vc *vdbClient) GetQueries() []string {
return vc.queries
}

func (vc *vdbClient) Rollback() error {
if !vc.InTransaction {
return nil
Expand Down Expand Up @@ -146,7 +151,7 @@ func (vc *vdbClient) AddQueryToTrxBatch(query string) error {
return nil
}

// ExecuteQueryBatch sends the transaction's current batch of queries
// ExecuteTrxQueryBatch sends the transaction's current batch of queries
// down the wire to the database.
func (vc *vdbClient) ExecuteTrxQueryBatch() ([]*sqltypes.Result, error) {
defer vc.stats.Timings.Record(binlogplayer.BlplMultiQuery, time.Now())
Expand All @@ -171,7 +176,9 @@ func (vc *vdbClient) Execute(query string) (*sqltypes.Result, error) {
func (vc *vdbClient) ExecuteWithRetry(ctx context.Context, query string) (*sqltypes.Result, error) {
qr, err := vc.Execute(query)
for err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout {
var sqlErr *sqlerror.SQLError
if errors.As(err, &sqlErr) &&
sqlErr.Number() == sqlerror.ERLockDeadlock || sqlErr.Number() == sqlerror.ERLockWaitTimeout {
log.Infof("retryable error: %v, waiting for %v and retrying", sqlErr, dbLockRetryDelay)
if err := vc.Rollback(); err != nil {
return nil, err
Expand Down
Loading
Loading