Skip to content

Commit

Permalink
VTGate VStream: Ensure reasonable delivery time for reshard journal e…
Browse files Browse the repository at this point in the history
…vent (#16639)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Aug 29, 2024
1 parent 1131b7b commit d916e81
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 22 deletions.
17 changes: 10 additions & 7 deletions examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"log"
"time"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

/*
Expand Down Expand Up @@ -73,15 +73,18 @@ func main() {
}
defer conn.Close()
flags := &vtgatepb.VStreamFlags{
//MinimizeSkew: false,
//HeartbeatInterval: 60, //seconds
// MinimizeSkew: false,
// HeartbeatInterval: 60, //seconds
// StopOnReshard: true,
}
reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
log.Fatal(err)
}
for {
e, err := reader.Recv()
switch err {
case nil:
_ = e
fmt.Printf("%v\n", e)
case io.EOF:
fmt.Printf("stream ended\n")
Expand Down
218 changes: 208 additions & 10 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"

"vitess.io/vitess/go/vt/vtgate/vtgateconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

// Validates that we have a working VStream API
Expand Down Expand Up @@ -603,8 +603,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down Expand Up @@ -658,8 +657,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {

// Now start a new VStream from our previous VGTID which only has the old/original shards.
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down Expand Up @@ -694,8 +692,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
}()

// We should have a mix of events across the old and new shards.
require.NotZero(t, oldShardRowEvents)
require.NotZero(t, newShardRowEvents)
require.Greater(t, oldShardRowEvents, 0)
require.Greater(t, newShardRowEvents, 0)

// The number of row events streamed by the VStream API should match the number of rows inserted.
customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer")
Expand All @@ -704,6 +702,206 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents))
}

// TestMultiVStreamsKeyspaceStopOnReshard confirms that journal events are received
// when resuming a VStream after a reshard.
func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
ctx := context.Background()
ks := "testks"
wf := "multiVStreamsKeyspaceReshard"
baseTabletID := 100
tabletType := topodatapb.TabletType_PRIMARY.String()
oldShards := "-80,80-"
newShards := "-40,40-80,80-c0,c0-"
oldShardRowEvents, journalEvents := 0, 0
vc = NewVitessCluster(t, nil)
defer vc.TearDown()
defaultCell := vc.Cells[vc.CellNames[0]]
ogdr := defaultReplicas
defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

// For our sequences etc.
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil)
require.NoError(t, err)

// Setup the keyspace with our old/original shards.
keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil)
require.NoError(t, err)

// Add the new shards.
err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts)
require.NoError(t, err)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// Ensure that we're starting with a clean slate.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false)
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
insertRow(ks, "customer", id)
time.Sleep(250 * time.Millisecond)
id++
}
}
}()

// Create the Reshard workflow and wait for it to finish the copy phase.
reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String())

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
// Only stream the keyspace that we're resharding. Otherwise the client stream
// will continue to run with only the tablet stream from the global keyspace.
Keyspace: ks,
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// Stream all tables.
Match: "/.*",
}},
}
flags := &vtgatepb.VStreamFlags{
StopOnReshard: true,
}

// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.GetRowEvent().GetShard()
switch shard {
case "-80", "80-":
oldShardRowEvents++
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
// We want a VGTID with a ShardGtid for both of the old shards.
if len(newVGTID.GetShardGtids()) == 2 {
canStop := true
for _, sg := range newVGTID.GetShardGtids() {
if sg.GetGtid() == "" {
canStop = false
}
}
if canStop {
return
}
}
}
}
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-streamCtx.Done():
return
default:
}
}
}()

// Confirm that we have shard GTIDs for the old/original shards.
require.Len(t, newVGTID.GetShardGtids(), 2)
t.Logf("Position at end of first stream: %+v", newVGTID.GetShardGtids())

// Switch the traffic to the new shards.
reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType)

// Now start a new VStream from our previous VGTID which only has the old/original shards.
expectedJournalEvents := 2 // One for each old shard: -80,80-
var streamStopped bool // We expect the stream to end with io.EOF from the reshard
runResumeStream := func() {
journalEvents = 0
streamStopped = false
t.Logf("Streaming from position: %+v", newVGTID.GetShardGtids())
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for i, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "-80", "80-":
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_JOURNAL:
t.Logf("Journal event: %+v", ev)
journalEvents++
require.Equal(t, binlogdatapb.VEventType_BEGIN, evs[i-1].Type, "JOURNAL event not preceded by BEGIN event")
require.Equal(t, binlogdatapb.VEventType_VGTID, evs[i+1].Type, "JOURNAL event not followed by VGTID event")
require.Equal(t, binlogdatapb.VEventType_COMMIT, evs[i+2].Type, "JOURNAL event not followed by COMMIT event")
}
}
case io.EOF:
streamStopped = true
return
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-done:
return
default:
}
}
}

// Multiple VStream clients should be able to resume from where they left off and
// get the reshard journal event.
for i := 1; i <= expectedJournalEvents; i++ {
runResumeStream()
// We should have seen the journal event for each shard in the stream due to
// using StopOnReshard.
require.Equal(t, expectedJournalEvents, journalEvents,
"did not get expected journal events on resume vstream #%d", i)
// Confirm that the stream stopped on the reshard.
require.True(t, streamStopped, "the vstream did not stop with io.EOF as expected")
}
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,6 @@ func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows [
})

}
ts.Logger().Infof("Creating journal %v", journal)
ts.Logger().Infof("Creating journal: %v", journal)
statement := fmt.Sprintf("insert into _vt.resharding_journal "+
"(id, db_name, val) "+
Expand Down
35 changes: 31 additions & 4 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const maxSkewTimeoutSeconds = 10 * 60
// for a vstream
const tabletPickerContextTimeout = 90 * time.Second

// stopOnReshardDelay is how long we wait, at a minimum, after sending a reshard journal event before
// ending the stream from the tablet.
const stopOnReshardDelay = 500 * time.Millisecond

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -620,7 +624,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for _, event := range events {
for i, event := range events {
switch event.Type {
case binlogdatapb.VEventType_FIELD:
// Update table names and send.
Expand Down Expand Up @@ -670,12 +674,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
}

case binlogdatapb.VEventType_JOURNAL:
journal := event.Journal
// Journal events are not sent to clients by default, but only when StopOnReshard is set
// Journal events are not sent to clients by default, but only when
// StopOnReshard is set.
if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS {
sendevents = append(sendevents, event)
// Read any subsequent events until we get the VGTID->COMMIT events that
// always follow the JOURNAL event which is generated as a result of
// an autocommit insert into the _vt.resharding_journal table on the
// tablet.
for j := i + 1; j < len(events); j++ {
sendevents = append(sendevents, events[j])
if events[j].Type == binlogdatapb.VEventType_COMMIT {
break
}
}
eventss = append(eventss, sendevents)
if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
Expand All @@ -688,12 +702,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return err
}
if je != nil {
// Wait till all other participants converge and return EOF.
// We're going to be ending the tablet stream, so we ensure a reasonable
// minimum amount of time is alloted for clients to Recv the journal event
// before the stream's context is cancelled (which would cause the grpc
// SendMsg or RecvMsg to fail). If the client doesn't Recv the journal
// event before the stream ends then they'll have to resume from the last
// ShardGtid they received before the journal event.
endTimer := time.NewTimer(stopOnReshardDelay)
defer endTimer.Stop()
// Wait until all other participants converge and then return EOF after
// the minimum delay has passed.
journalDone = je.done
select {
case <-ctx.Done():
return ctx.Err()
case <-journalDone:
<-endTimer.C
return io.EOF
}
}
Expand Down Expand Up @@ -966,6 +990,9 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string
return false, err
}

vs.mu.Lock()
defer vs.mu.Unlock()

// First check the typical case, where the VGTID shards match the serving shards.
// In that case it's NOT possible that an applicable reshard has happened because
// the VGTID contains shards that are all serving.
Expand Down

0 comments on commit d916e81

Please sign in to comment.