diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index df604c003cc..17cf3e6dd01 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -45,6 +45,12 @@ create table t1( primary key(id1) ) Engine=InnoDB; +create table t1_copy_resume( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + create table t1_id2_idx( id2 bigint, keyspace_id varbinary(10), @@ -133,6 +139,12 @@ create table t1_sharded( Name: "t1_id2_vdx", }}, }, + "t1_copy_resume": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, "t1_sharded": { ColumnVindexes: []*vschemapb.ColumnVindex{{ Column: "id1", diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 477bb2518b5..a13aac8291d 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "sort" "sync" "testing" @@ -232,6 +233,119 @@ func TestVStreamCopyBasic(t *testing.T) { } } +func TestVStreamCopyResume(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + _, err := conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + if err != nil { + t.Fatal(err) + } + + // Any subsequent GTIDs will be part of the stream + mpos, err := mconn.PrimaryPosition() + require.NoError(t, err) + + // lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9) + lastPK := sqltypes.Result{ + Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}}, + Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}}, + } + tableLastPK := []*binlogdatapb.TableLastPK{{ + TableName: "t1_copy_resume", + Lastpk: sqltypes.ResultToProto3(&lastPK), + }} + + catchupQueries := []string{ + "insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy + "update t1_copy_resume set id2 = 10 where id1 = 1", + "insert into t1(id1, id2) values(100,100)", + "delete from t1_copy_resume where id1 = 1", + "update t1_copy_resume set id2 = 90 where id1 = 9", + } + for _, query := range catchupQueries { + _, err = conn.ExecuteFetch(query, 1, false) + require.NoError(t, err) + } + + var shardGtids []*binlogdatapb.ShardGtid + var vgtid = &binlogdatapb.VGtid{} + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "-80", + Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), + TablePKs: tableLastPK, + }) + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "80-", + Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), + TablePKs: tableLastPK, + }) + vgtid.ShardGtids = shardGtids + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1_copy_resume", + Filter: "select * from t1_copy_resume", + }}, + } + flags := &vtgatepb.VStreamFlags{} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + if err != nil { + t.Fatal(err) + } + require.NotNil(t, reader) + + expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9) + expectedCatchupEvents := len(catchupQueries) - 1 // insert into t1 should never reach + rowCopyEvents, replCatchupEvents := 0, 0 + expectedEvents := []string{ + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"99"} after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + } + var evs []*binlogdatapb.VEvent + for { + e, err := reader.Recv() + switch err { + case nil: + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_ROW { + evs = append(evs, ev) + if ev.Timestamp == 0 { + rowCopyEvents++ + } else { + replCatchupEvents++ + } + printEvents(evs) // for debugging ci failures + } + } + if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents { + sort.Sort(VEventSorter(evs)) + for i, ev := range evs { + require.Regexp(t, expectedEvents[i], ev.String()) + } + t.Logf("TestVStreamCopyResume was successful") + return + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + t.Fatalf("remote error: %v\n", err) + } + } +} + func TestVStreamCurrent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -396,3 +510,31 @@ func printEvents(evs []*binlogdatapb.VEvent) { s += "===END===" + "\n" log.Infof("%s", s) } + +// Sort the VEvents by the first row change's after value bytes primarily, with +// secondary ordering by timestamp (ASC). Note that row copy events do not have +// a timestamp and the value will be 0. +type VEventSorter []*binlogdatapb.VEvent + +func (v VEventSorter) Len() int { + return len(v) +} +func (v VEventSorter) Swap(i, j int) { + v[i], v[j] = v[j], v[i] +} +func (v VEventSorter) Less(i, j int) bool { + valsI := v[i].GetRowEvent().RowChanges[0].After + if valsI == nil { + valsI = v[i].GetRowEvent().RowChanges[0].Before + } + valsJ := v[j].GetRowEvent().RowChanges[0].After + if valsJ == nil { + valsJ = v[j].GetRowEvent().RowChanges[0].Before + } + valI := string(valsI.Values) + valJ := string(valsJ.Values) + if valI == valJ { + return v[i].Timestamp < v[j].Timestamp + } + return valI < valJ +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 2590300f7c2..056d5da1822 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -218,7 +218,9 @@ func getQuery(tableName string, filter string) string { query = buf.String() case key.IsKeyRange(filter): buf := sqlparser.NewTrackedBuffer(nil) - buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewTableIdent(tableName), sqlparser.NewStrLiteral(filter)) + // note: sqlparser.NewTableIdent is renamed to NewIdentifierCS in v15 + buf.Myprintf("select * from %v where in_keyrange(%v)", + sqlparser.NewTableIdent(tableName), sqlparser.NewStrLiteral(filter)) query = buf.String() } return query @@ -229,7 +231,40 @@ func (uvs *uvstreamer) Cancel() { uvs.cancel() } -// during copy phase only send streaming events (during catchup/fastforward) for pks already seen +// We have not yet implemented the logic to check if an event is for a row that is already copied, +// so we always return true so that we send all events for this table and so we don't miss events. +func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bool { + return true +} + +// Only send catchup/fastforward events for tables whose copy phase is complete or in progress. +// This ensures we fulfill the at-least-once delivery semantics for events. +// TODO: filter out events for rows not yet copied. Note that we can only do this as a best-effort +// for comparable PKs. +func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool { + table, ok := uvs.plans[tableName] + // Event is for a table which is not in its copy phase. + if !ok { + return true + } + + // if table copy was not started and no tablePK was specified we can ignore catchup/fastforward events for it + if table.tablePK == nil || table.tablePK.Lastpk == nil { + return false + } + + // Table is currently in its copy phase. We have not yet implemented the logic to + // check if an event is for a row that is already copied, so we always return true + // there so that we don't miss events. + // We may send duplicate insert events or update/delete events for rows not yet seen + // to the client for the table being copied. This is ok as the client is expected to be + // idempotent: we only promise at-least-once semantics for VStream API (not exactly-once). + // Aside: vreplication workflows handle at-least-once by adding where clauses that render + // DML queries, related to events for rows not yet copied, as no-ops. + return uvs.isRowCopied(tableName, ev) +} + +// Do not send internal heartbeat events. Filter out events for tables whose copy has not been started. func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.VEvent { if len(uvs.plans) == 0 { return evs @@ -239,25 +274,21 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. var shouldSend bool for _, ev := range evs { - shouldSend = false - tableName = "" switch ev.Type { case binlogdatapb.VEventType_ROW: tableName = ev.RowEvent.TableName case binlogdatapb.VEventType_FIELD: tableName = ev.FieldEvent.TableName + default: + tableName = "" + } + switch ev.Type { case binlogdatapb.VEventType_HEARTBEAT: shouldSend = false default: - shouldSend = true - } - if !shouldSend && tableName != "" { - shouldSend = true - _, ok := uvs.plans[tableName] - if ok { - shouldSend = false - } + shouldSend = uvs.shouldSendEventForTable(tableName, ev) } + if shouldSend { evs2 = append(evs2, ev) } @@ -331,7 +362,9 @@ func (uvs *uvstreamer) setStreamStartPosition() error { } if !curPos.AtLeast(pos) { uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1) - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, + "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", + mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) } uvs.pos = pos return nil @@ -346,17 +379,22 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) { return conn.PrimaryPosition() } +// Possible states: +// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos +// 2. TablePKs nil, startPos empty => full table copy of tables matching filter +// 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK) +// 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK) func (uvs *uvstreamer) init() error { - if uvs.startPos != "" { - if err := uvs.setStreamStartPosition(); err != nil { + if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ { + if err := uvs.buildTablePlan(); err != nil { return err } - } else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { - if err := uvs.buildTablePlan(); err != nil { + } + if uvs.startPos != "" { + if err := uvs.setStreamStartPosition(); err != nil { return err } } - if uvs.pos.IsZero() && (len(uvs.plans) == 0) { return fmt.Errorf("stream needs a position or a table to copy") } @@ -378,7 +416,8 @@ func (uvs *uvstreamer) Stream() error { } uvs.sendTestEvent("Copy Done") } - vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) + vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), + uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) uvs.setVs(vs) return vs.Stream() diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index fdd60b8207f..1ed673ebf90 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -182,6 +182,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { uvstreamerTestMode = true defer func() { uvstreamerTestMode = false }() initialize(t) + if err := engine.se.Reload(context.Background()); err != nil { t.Fatal("Error reloading schema") } @@ -190,6 +191,12 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { var tablePKs []*binlogdatapb.TableLastPK for i, table := range testState.tables { rules = append(rules, getRule(table)) + + // for table t2, let tablepk be nil, so that we don't send events for the insert in initTables() + if table == "t2" { + continue + } + tablePKs = append(tablePKs, getTablePK(table, i+1)) } filter := &binlogdatapb.Filter{ @@ -246,7 +253,7 @@ commit;" numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) numCopyEvents += 2 /* GTID + Test event after all copy is done */ - numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/ + numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */ numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */