diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 3b3f5142c97..30fe5e41172 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -411,6 +411,8 @@ Flags: --vreplication_healthcheck_topology_refresh duration refresh interval for re-reading the topology (default 30s) --vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) --vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence + --vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) + --vreplication_net_write_timeout int Session value of net_write_timeout for vreplication, in seconds (default 600) --vreplication_replica_lag_tolerance duration Replica lag threshold duration: once lag is below this we switch from copy phase to the replication (streaming) phase (default 1m0s) --vreplication_retry_delay duration delay before retrying a failed workflow event in the replication phase (default 5s) --vreplication_store_compressed_gtid Store compressed gtids in the pos column of the sidecar database's vreplication table diff --git a/go/vt/vttablet/flags.go b/go/vt/vttablet/flags.go index 460a5427358..87453e2ebcd 100644 --- a/go/vt/vttablet/flags.go +++ b/go/vt/vttablet/flags.go @@ -27,7 +27,11 @@ const ( VReplicationExperimentalFlagAllowNoBlobBinlogRowImage = int64(2) ) -var VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage +var ( + VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage + VReplicationNetReadTimeout = 300 + VReplicationNetWriteTimeout = 600 +) func init() { servenv.OnParseFor("vttablet", registerFlags) @@ -36,4 +40,6 @@ func init() { func registerFlags(fs *pflag.FlagSet) { fs.Int64Var(&VReplicationExperimentalFlags, "vreplication_experimental_flags", VReplicationExperimentalFlags, "(Bitmask) of experimental features in vreplication to enable") + fs.IntVar(&VReplicationNetReadTimeout, "vreplication_net_read_timeout", VReplicationNetReadTimeout, "Session value of net_read_timeout for vreplication, in seconds") + fs.IntVar(&VReplicationNetWriteTimeout, "vreplication_net_write_timeout", VReplicationNetWriteTimeout, "Session value of net_write_timeout for vreplication, in seconds") } diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index f3f01b289b8..1f985733371 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -25,6 +25,8 @@ import ( "strings" "testing" + "vitess.io/vitess/go/vt/vttablet" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/constants/sidecar" @@ -84,7 +86,9 @@ var ( }, }, } - position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition) + position = fmt.Sprintf("%s/%s", gtidFlavor, gtidPosition) + setNetReadTimeout = fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout) + setNetWriteTimeout = fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout) ) // TestCreateVReplicationWorkflow tests the query generated @@ -324,6 +328,8 @@ func TestMoveTables(t *testing.T) { ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil) ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) + ftc.vrdbClient.ExpectRequest(setNetReadTimeout, &sqltypes.Result{}, nil) + ftc.vrdbClient.ExpectRequest(setNetWriteTimeout, &sqltypes.Result{}, nil) ftc.vrdbClient.ExpectRequest(getRowsCopied, sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -895,6 +901,8 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) { &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(setNetReadTimeout, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(setNetWriteTimeout, &sqltypes.Result{}, nil) targetTablet.vrdbClient.ExpectRequest(getRowsCopied, sqltypes.MakeTestResult( sqltypes.MakeTestFields( diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 94e4741eeee..3262dca9bc6 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -24,6 +24,8 @@ import ( "sync/atomic" "time" + "vitess.io/vitess/go/vt/vttablet" + "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/vt/discovery" @@ -227,6 +229,12 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if _, err := dbClient.ExecuteFetch("set names 'binary'", 10000); err != nil { return err } + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 10000); err != nil { + return err + } + if _, err := dbClient.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 10000); err != nil { + return err + } // We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid. if _, err := dbClient.ExecuteFetch("set @@session.sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')", 10000); err != nil { return err diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 25a5cd7bf87..f595f680d4d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/vttablet" + "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqlescape" @@ -137,6 +139,12 @@ func (rs *rowStreamer) Stream() error { if _, err := rs.conn.ExecuteFetch("set names 'binary'", 1, false); err != nil { return err } + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil { + return err + } + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil { + return err + } } return rs.streamQuery(rs.send) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go index 1039f21b4d6..0bbd265435b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer.go @@ -23,6 +23,8 @@ import ( "strings" "sync/atomic" + "vitess.io/vitess/go/vt/vttablet" + "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" @@ -108,6 +110,12 @@ func (ts *tableStreamer) Stream() error { if _, err := conn.ExecuteFetch("set names 'binary'", 1, false); err != nil { return err } + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_read_timeout = %v", vttablet.VReplicationNetReadTimeout), 1, false); err != nil { + return err + } + if _, err := conn.ExecuteFetch(fmt.Sprintf("set @@session.net_write_timeout = %v", vttablet.VReplicationNetWriteTimeout), 1, false); err != nil { + return err + } rs, err := conn.ExecuteFetch("show tables", -1, true) if err != nil {