From 04096a06ccb1cb62f1896f7bbb4a531df4e8e455 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Wed, 20 Sep 2023 15:43:00 -0700 Subject: [PATCH] apply patch 13547 Signed-off-by: Priya Bibra --- go/vt/vtgate/endtoend/vstream_test.go | 119 ++++++++++++++++++ go/vt/vttablet/tabletserver/vstreamer/copy.go | 16 +++ .../tabletserver/vstreamer/uvstreamer.go | 18 +-- 3 files changed, 146 insertions(+), 7 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index f0de4c9582b..4735d2627b0 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -654,6 +654,125 @@ func TestVStreamSharded(t *testing.T) { } +// TestVStreamCopyTransactions tests that we are properly wrapping +// ROW events in the stream with BEGIN and COMMIT events. +func TestVStreamCopyTransactions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + keyspace := "ks" + shards := []string{"-80", "80-"} + table := "t1_copy_basic" + beginEventSeen, commitEventSeen := false, false + numResultInTrx := 0 + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: keyspace, + Shard: shards[0], + Gtid: "", // Start a vstream copy + }, + { + Keyspace: keyspace, + Shard: shards[1], + Gtid: "", // Start a vstream copy + }, + }, + } + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }}, + } + + gconn, conn, _, closeConnections := initialize(ctx, t) + defer closeConnections() + + // Clear any existing data. + q := fmt.Sprintf("delete from %s", table) + _, err := conn.ExecuteFetch(q, -1, false) + require.NoError(t, err, "error clearing data: %v", err) + + // Generate some test data. Enough to cross the default + // vstream_packet_size threshold. + for i := 1; i <= 100000; i++ { + values := fmt.Sprintf("(%d, %d)", i, i) + q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values) + _, err := conn.ExecuteFetch(q, 1, false) + require.NoError(t, err, "error inserting data: %v", err) + } + + // Start a vstream. + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil) + require.NoError(t, err, "error starting vstream: %v", err) + +recvLoop: + for { + vevents, err := reader.Recv() + numResultInTrx++ + eventCount := len(vevents) + t.Logf("------------------ Received %d events in response #%d for the transaction ------------------\n", + eventCount, numResultInTrx) + switch err { + case nil: + for _, event := range vevents { + switch event.Type { + case binlogdatapb.VEventType_BEGIN: + require.False(t, beginEventSeen, "received a second BEGIN event within the transaction: numResultInTrx=%d\n", + numResultInTrx) + beginEventSeen = true + t.Logf("Found BEGIN event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx) + require.False(t, commitEventSeen, "received a BEGIN event when expecting a COMMIT event: numResultInTrx=%d\n", + numResultInTrx) + case binlogdatapb.VEventType_VGTID: + t.Logf("Found VGTID event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_FIELD: + t.Logf("Found FIELD event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_ROW: + // Uncomment if you need to do more debugging. + // t.Logf("Found ROW event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + // beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + case binlogdatapb.VEventType_COMMIT: + commitEventSeen = true + t.Logf("Found COMMIT event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n", + beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event) + require.True(t, beginEventSeen, "received COMMIT event before receiving BEGIN event: numResultInTrx=%d\n", + numResultInTrx) + case binlogdatapb.VEventType_COPY_COMPLETED: + t.Logf("Finished vstream copy\n") + t.Logf("-------------------------------------------------------------------\n\n") + cancel() + break recvLoop + default: + t.Logf("Found extraneous event: %+v\n", event) + } + if beginEventSeen && commitEventSeen { + t.Logf("Received both BEGIN and COMMIT, so resetting transactional state\n") + beginEventSeen = false + commitEventSeen = false + numResultInTrx = 0 + } + } + case io.EOF: + t.Logf("vstream ended\n") + t.Logf("-------------------------------------------------------------------\n\n") + cancel() + return + default: + require.FailNowf(t, "unexpected error", "encountered error in vstream: %v", err) + return + } + } + // The last response, when the vstream copy completes, does not + // typically contain ROW events. + if beginEventSeen || commitEventSeen { + require.True(t, (beginEventSeen && commitEventSeen), "did not receive both BEGIN and COMMIT events in the final ROW event set") + } +} + var printMu sync.Mutex func printEvents(evs []*binlogdatapb.VEvent) { diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 62b93cf5063..864dbd5d50c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -255,12 +255,26 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendFieldEvent returned error %v", err) return err } + // sendFieldEvent() sends a BEGIN event first. + uvs.inTransaction = true } + if len(rows.Rows) == 0 { log.V(2).Infof("0 rows returned for table %s", tableName) return nil } + // We are about to send ROW events, so we need to ensure + // that we do so within a transaction. The COMMIT event + // will be sent in sendEventsForRows() below. + if !uvs.inTransaction { + evs := []*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_BEGIN, + }} + uvs.send(evs) + uvs.inTransaction = true + } + newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{ Fields: uvs.pkfields, Rows: []*querypb.Row{rows.Lastpk}, @@ -271,6 +285,8 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { log.Infof("sendEventsForRows returned error %v", err) return err } + // sendEventsForRows() sends a COMMIT event last. + uvs.inTransaction = false uvs.setCopyState(tableName, qrLastPK) log.V(2).Infof("NewLastPK: %v", qrLastPK) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 2508a625ea1..80695b0ae86 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -51,13 +51,17 @@ type uvstreamer struct { cancel func() // input parameters - vse *Engine - send func([]*binlogdatapb.VEvent) error - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - inTablePKs []*binlogdatapb.TableLastPK + vse *Engine + send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + // Are we currently in an explicit transaction? + // If we are not, and we're about to send ROW + // events, then we need to send a BEGIN event first. + inTransaction bool + filter *binlogdatapb.Filter + inTablePKs []*binlogdatapb.TableLastPK vschema *localVSchema