Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Optimize replication on target tablets #17166

Merged
merged 20 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/common/scripts/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ vttablet \
--service_map 'grpc-queryservice,grpc-tabletmanager,grpc-updatestream' \
--pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \
--heartbeat_on_demand_duration=5s \
--pprof-http \
mattlord marked this conversation as resolved.
Show resolved Hide resolved
> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &

# Block waiting for the tablet to be listening
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ Flags:
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
--vreplication_copy_phase_max_mysql_replication_lag int The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 43200)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 3)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 7)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ Flags:
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
--vreplication_copy_phase_max_mysql_replication_lag int The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 43200)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 3)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 7)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
Expand Down
4 changes: 0 additions & 4 deletions go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)
Expand Down Expand Up @@ -145,9 +144,6 @@ func TestMain(m *testing.M) {
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "2s",
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type WriteMetrics struct {
Expand Down Expand Up @@ -184,9 +183,6 @@ func TestMain(m *testing.M) {
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type testcase struct {
Expand Down Expand Up @@ -436,9 +435,6 @@ func TestMain(m *testing.M) {
"--migration_check_interval", "5s",
"--vstream_packet_size", "4096", // Keep this value small and below 10k to ensure multilple vstream iterations
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
13 changes: 0 additions & 13 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)
Expand Down Expand Up @@ -101,18 +100,6 @@ func (cc *ClusterConfig) enableGTIDCompression() func() {
}
}

// setAllVTTabletExperimentalFlags sets all the experimental flags for vttablet and returns a function
// that can be used to reset them in a defer.
func setAllVTTabletExperimentalFlags() func() {
experimentalArgs := fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching)
oldArgs := extraVTTabletArgs
extraVTTabletArgs = append(extraVTTabletArgs, experimentalArgs)
return func() {
extraVTTabletArgs = oldArgs
}
}

// VitessCluster represents all components within the test cluster
type VitessCluster struct {
t *testing.T
Expand Down
4 changes: 0 additions & 4 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand All @@ -43,9 +42,6 @@ func TestFKWorkflow(t *testing.T) {
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
defer func() { extraVTTabletArgs = nil }()

Expand Down
4 changes: 0 additions & 4 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -140,9 +139,6 @@ func TestVDiff2(t *testing.T) {
extraVTTabletArgs = []string{
// This forces us to use multiple vstream packets even with small test tables.
"--vstream_packet_size=1",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}

vc = NewVitessCluster(t, &clusterOptions{cells: strings.Split(cellNames, ",")})
Expand Down
7 changes: 4 additions & 3 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

const (
vdiffTimeout = 120 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
vdiffTimeout = 180 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
Copy link
Contributor Author

@mattlord mattlord Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the reason for the vdiff test changes in the commit message: f0f61db

vdiffRetryTimeout = 30 * time.Second
vdiffStatusCheckInterval = 5 * time.Second
vdiffRetryInterval = 5 * time.Second
Expand Down Expand Up @@ -71,7 +71,8 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow)
t.Run(fmt.Sprintf("vtctlclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry", "--update-table-stats")
uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry",
"--update-table-stats", fmt.Sprintf("--filtered_replication_wait_time=%v", vdiffTimeout/2))
info := waitForVDiff2ToComplete(t, true, ksWorkflow, cells, uuid, time.Time{})
require.NotNil(t, info)
require.Equal(t, workflow, info.Workflow)
Expand Down Expand Up @@ -164,7 +165,7 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow)
t.Run(fmt.Sprintf("vtctldclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
flags := []string{"--auto-retry", "--update-table-stats"}
flags := []string{"--auto-retry", "--update-table-stats", fmt.Sprintf("--filtered-replication-wait-time=%v", vdiffTimeout/2)}
if len(extraFlags) > 0 {
flags = append(flags, extraFlags...)
}
Expand Down
3 changes: 0 additions & 3 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ func TestVreplicationCopyThrottling(t *testing.T) {
}

func TestBasicVreplicationWorkflow(t *testing.T) {
defer setAllVTTabletExperimentalFlags()
sourceKsOpts["DBTypeVersion"] = "mysql-8.0"
targetKsOpts["DBTypeVersion"] = "mysql-8.0"
testBasicVreplicationWorkflow(t, "noblob")
Expand Down Expand Up @@ -595,8 +594,6 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
resetCompression := mainClusterConfig.enableGTIDCompression()
defer resetCompression()
resetExperimentalFlags := setAllVTTabletExperimentalFlags()
defer resetExperimentalFlags()
vc = NewVitessCluster(t, &clusterOptions{cells: cells})
defer vc.TearDown()

Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ const (
)

var (
// Default flags: currently VReplicationExperimentalFlagVPlayerBatching is not enabled by default.
vreplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
vreplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage | VReplicationExperimentalFlagVPlayerBatching
vreplicationNetReadTimeout = 300
vreplicationNetWriteTimeout = 600
vreplicationCopyPhaseDuration = 1 * time.Hour
Expand Down
72 changes: 34 additions & 38 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,67 +617,59 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
// on the source: sum/count for aggregation queries, for example.
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(tp.Fields), len(bindLocations))
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
}
if length > 0 {
offset += row.Lengths[i]
usedFieldCnt := len(tp.Fields) - len(tp.FieldsToSkip)
if usedFieldCnt != len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations",
usedFieldCnt, len(bindLocations))
}

// Bind field values to locations.
var (
offset int64
offsetQuery int
fieldsIndex int
field *querypb.Field
)
for i, loc := range bindLocations {
field = tp.Fields[fieldsIndex]
length := row.Lengths[fieldsIndex]
for tp.FieldsToSkip[strings.ToLower(field.Name)] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we are using the for loop to skip. This feels more intuitive.

if tp.FieldsToSkip[strings.ToLower(field.Name)] {
    if length > 0 {
          offset += length
    }
    fieldsIndex++
    continue
}

Copy link
Contributor Author

@mattlord mattlord Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we don't want to move to the next bind location. We have a bind location X, and we want to get the corresponding next non-skipped field to bind to that location.

if length > 0 {
offset += length
}
fieldsIndex++
field = tp.Fields[fieldsIndex]
length = row.Lengths[fieldsIndex]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fine. I don't have the context to understand why this change is necessarily faster. Can you please explain?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cuts out the overhead of the intermediate data structure colInfo: mainly in memory allocation and also a bit of cpu because of the large number of rows inserted during the copy phase.

}
}

// bind field values to locations
var offsetQuery int
for i, loc := range bindLocations {
col := rowInfo[i]
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
typ := col.typ
typ := field.Type

switch typ {
case querypb.Type_TUPLE:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i)
case querypb.Type_JSON:
if col.length < 0 { // An SQL NULL and not an actual JSON value
if length < 0 { // An SQL NULL and not an actual JSON value
buf.WriteString(sqltypes.NullStr)
} else { // A JSON value (which may be a JSON null literal value)
buf2 := row.Values[col.offset : col.offset+col.length]
buf2 := row.Values[offset : offset+length]
vv, err := vjson.MarshalSQLValue(buf2)
if err != nil {
return err
}
buf.WriteString(vv.RawStr())
}
default:
if col.length < 0 {
if length < 0 {
// -1 means a null variable; serialize it directly
buf.WriteString(sqltypes.NullStr)
} else {
raw := row.Values[col.offset : col.offset+col.length]
raw := row.Values[offset : offset+length]
var vv sqltypes.Value

if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 {
if conversion, ok := tp.ConvertCharset[field.Name]; ok && length > 0 {
// Non-null string value, for which we have a charset conversion instruction
out, err := tp.convertStringCharset(raw, conversion, col.field.Name)
out, err := tp.convertStringCharset(raw, conversion, field.Name)
if err != nil {
return err
}
Expand All @@ -690,6 +682,10 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
}
}
offsetQuery = loc.Offset + loc.Length
if length > 0 {
offset += length
}
fieldsIndex++
}
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:])
return nil
Expand Down
Loading
Loading