Skip to content

Commit

Permalink
VStream API: allow keyspace-level heartbeats to be streamed (#16593)
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Aug 27, 2024
1 parent f11d430 commit bc6fb1c
Show file tree
Hide file tree
Showing 34 changed files with 1,679 additions and 572 deletions.
11 changes: 9 additions & 2 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type ClusterConfig struct {
vtorcPort int

vreplicationCompressGTID bool
// Set overrideHeartbeatOptions to true to override the default heartbeat options:
// which are set to only on demand (5s) and 250ms interval.
overrideHeartbeatOptions bool
}

// enableGTIDCompression enables GTID compression for the cluster and returns a function
Expand Down Expand Up @@ -514,11 +517,15 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string,
// AddTablet creates new tablet with specified attributes
func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace, shard *Shard, tabletType string, tabletID int) (*Tablet, *exec.Cmd, error) {
tablet := &Tablet{}

options := []string{
var options []string
defaultHeartbeatOptions := []string{
"--heartbeat_on_demand_duration", "5s",
"--heartbeat_interval", "250ms",
}
if !mainClusterConfig.overrideHeartbeatOptions {
options = append(options, defaultHeartbeatOptions...)
}

options = append(options, extraVTTabletArgs...)

if mainClusterConfig.vreplicationCompressGTID {
Expand Down
136 changes: 136 additions & 0 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,3 +747,139 @@ func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) {
require.NotZero(t, ne.numDash40Events)
require.NotZero(t, ne.num40DashEvents)
}

const (
vstreamHeartbeatsTestContextTimeout = 20 * time.Second
// Expect a reasonable number of heartbeats to be received in the test duration, should ideally be ~ timeout
// since the heartbeat interval is set to 1s. But we set it to 10 to be conservative to avoid CI flakiness.
numExpectedHeartbeats = 10
)

func doVStream(t *testing.T, vc *VitessCluster, flags *vtgatepb.VStreamFlags) (numRowEvents map[string]int, numFieldEvents map[string]int) {
// Stream for a while to ensure heartbeats are sent.
ctx, cancel := context.WithTimeout(context.Background(), vstreamHeartbeatsTestContextTimeout)
defer cancel()

numRowEvents = make(map[string]int)
numFieldEvents = make(map[string]int)
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

done := false
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "product",
Shard: "0",
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer",
}},
}
// Stream events from the VStream API.
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for !done {
evs, err := reader.Recv()
switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
rowEvent := ev.RowEvent
arr := strings.Split(rowEvent.TableName, ".")
require.Equal(t, len(arr), 2)
tableName := arr[1]
require.Equal(t, "product", rowEvent.Keyspace)
require.Equal(t, "0", rowEvent.Shard)
numRowEvents[tableName]++

case binlogdatapb.VEventType_FIELD:
fieldEvent := ev.FieldEvent
arr := strings.Split(fieldEvent.TableName, ".")
require.Equal(t, len(arr), 2)
tableName := arr[1]
require.Equal(t, "product", fieldEvent.Keyspace)
require.Equal(t, "0", fieldEvent.Shard)
numFieldEvents[tableName]++
default:
}
}
case io.EOF:
log.Infof("Stream Ended")
done = true
default:
log.Errorf("remote error: %v", err)
done = true
}
}
return numRowEvents, numFieldEvents
}

// TestVStreamHeartbeats enables streaming of the internal Vitess heartbeat tables in the VStream API and
// ensures that the heartbeat events are received as expected by the client.
func TestVStreamHeartbeats(t *testing.T) {
// Enable continuous heartbeats.
extraVTTabletArgs = append(extraVTTabletArgs,
"--heartbeat_enable",
"--heartbeat_interval", "1s",
"--heartbeat_on_demand_duration", "0",
)
setSidecarDBName("_vt")
config := *mainClusterConfig
config.overrideHeartbeatOptions = true
vc = NewVitessCluster(t, &clusterOptions{
clusterConfig: &config,
})
defer vc.TearDown()

require.NotNil(t, vc)
defaultReplicas = 0
defaultRdonly = 0

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema,
defaultReplicas, defaultRdonly, 100, nil)
verifyClusterHealth(t, vc)
insertInitialData(t)

expectedNumRowEvents := make(map[string]int)
expectedNumRowEvents["customer"] = 3 // 3 rows inserted in the customer table in insertInitialData()

type testCase struct {
name string
flags *vtgatepb.VStreamFlags
expectedHeartbeats int
}
testCases := []testCase{
{
name: "With Keyspace Heartbeats On",
flags: &vtgatepb.VStreamFlags{
StreamKeyspaceHeartbeats: true,
},
expectedHeartbeats: numExpectedHeartbeats,
},
{
name: "With Keyspace Heartbeats Off",
flags: nil,
expectedHeartbeats: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gotNumRowEvents, gotNumFieldEvents := doVStream(t, vc, tc.flags)
for k := range expectedNumRowEvents {
require.Equalf(t, 1, gotNumFieldEvents[k], "incorrect number of field events for table %s, got %d", k, gotNumFieldEvents[k])
}
require.GreaterOrEqual(t, gotNumRowEvents["heartbeat"], tc.expectedHeartbeats, "incorrect number of heartbeat events received")
log.Infof("Total number of heartbeat events received: %v", gotNumRowEvents["heartbeat"])
delete(gotNumRowEvents, "heartbeat")
require.Equal(t, expectedNumRowEvents, gotNumRowEvents)
})
}
}
Loading

0 comments on commit bc6fb1c

Please sign in to comment.