Skip to content

Commit

Permalink
set vreplication net read and net write timeout session vars to high …
Browse files Browse the repository at this point in the history
…values (#14203)

Signed-off-by: Olga Shestopalova <[email protected]>
Co-authored-by: Olga Shestopalova <[email protected]>
  • Loading branch information
olyazavr and Olga Shestopalova authored Oct 11, 2023
1 parent 44e32cd commit 6953390
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 2 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ Flags:
--vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
--vreplication_net_write_timeout int Session value of net_write_timeout for vreplication, in seconds (default 600)
--vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s)
--vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s)
--vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table
Expand Down
8 changes: 7 additions & 1 deletion go/vt/vttablet/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ const (
VReplicationExperimentalFlagAllowNoBlobBinlogRowImage = int64(2)
)

var VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
var (
VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
VReplicationNetReadTimeout = 300
VReplicationNetWriteTimeout = 600
)

func init() {
servenv.OnParseFor("vttablet", registerFlags)
Expand All @@ -36,4 +40,6 @@ func init() {
func registerFlags(fs *pflag.FlagSet) {
fs.Int64Var(&VReplicationExperimentalFlags, "vreplication_experimental_flags", VReplicationExperimentalFlags,
"(Bitmask) of experimental features in vreplication to enable")
fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds")
fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds")
}
10 changes: 9 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"testing"

"vitess.io/vitess/go/vt/vttablet"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/constants/sidecar"
Expand Down Expand Up @@ -84,7 +86,9 @@ var (
},
},
}
position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition)
position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition)
setNetReadTimeout = fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout)
setNetWriteTimeout = fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout)
)

// TestCreateVReplicationWorkflow tests the query generated
Expand Down Expand Up @@ -324,6 +328,8 @@ func TestMoveTables(t *testing.T) {
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setNetReadTimeout, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setNetWriteTimeout, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(getRowsCopied,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down Expand Up @@ -895,6 +901,8 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) {
&sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setNetReadTimeout, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setNetWriteTimeout, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(getRowsCopied,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/vttablet"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/vt/discovery"
Expand Down Expand Up @@ -227,6 +229,12 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
if _, err := dbClient.ExecuteFetch("set names 'binary'", 10000); err != nil {
return err
}
if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 10000); err != nil {
return err
}
if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 10000); err != nil {
return err
}
// We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid.
if _, err := dbClient.ExecuteFetch("set @@session.sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')", 10000); err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqlescape"
Expand Down Expand Up @@ -137,6 +139,12 @@ func (rs *rowStreamer) Stream() error {
if _, err := rs.conn.ExecuteFetch("set names 'binary'", 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil {
return err
}
}
return rs.streamQuery(rs.send)
}
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"strings"
"sync/atomic"

"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
Expand Down Expand Up @@ -108,6 +110,12 @@ func (ts *tableStreamer) Stream() error {
if _, err := conn.ExecuteFetch("set names 'binary'", 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil {
return err
}
if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil {
return err
}

rs, err := conn.ExecuteFetch("show tables", -1, true)
if err != nil {
Expand Down

0 comments on commit 6953390

Please sign in to comment.