From 81ce29c91d349602ee58782d847d59f6e3dc50b3 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Mon, 24 Feb 2025 16:53:43 +0530 Subject: [PATCH] Add semi-sync monitor to unblock primaries blocked on semi-sync ACKs (#17763) Signed-off-by: Manan Gupta --- changelog/22.0/22.0.0/summary.md | 9 + go/cmd/vtctldclient/command/framework_test.go | 7 + go/cmd/vttablet/cli/cli.go | 2 + go/flags/endtoend/vtcombo.txt | 1 + go/flags/endtoend/vttablet.txt | 1 + .../reparent/newfeaturetest/reparent_test.go | 101 +++ .../endtoend/vreplication/sidecardb_test.go | 2 +- .../vtorc/readtopologyinstance/main_test.go | 2 + go/vt/mysqlctl/fakemysqldaemon.go | 5 + .../replicationdata/replicationdata.pb.go | 29 +- .../replicationdata_vtproto.pb.go | 36 + .../schema/misc/semisync_heartbeat.sql | 21 + go/vt/vtorc/db/generate_base.go | 1 + go/vt/vtorc/inst/analysis.go | 2 + go/vt/vtorc/inst/analysis_dao.go | 11 + go/vt/vtorc/inst/analysis_dao_test.go | 64 +- go/vt/vtorc/inst/instance.go | 1 + go/vt/vtorc/inst/instance_dao.go | 4 + go/vt/vtorc/inst/instance_dao_test.go | 20 +- go/vt/vtorc/logic/topology_recovery.go | 2 +- go/vt/vtorc/logic/topology_recovery_test.go | 15 + go/vt/vtorc/test/recovery_analysis.go | 2 + .../vttablet/tabletmanager/rpc_replication.go | 25 +- .../tabletmanager/rpc_replication_test.go | 66 +- .../tabletmanager/semisyncmonitor/monitor.go | 406 ++++++++++ .../semisyncmonitor/monitor_test.go | 728 ++++++++++++++++++ go/vt/vttablet/tabletmanager/tm_init.go | 2 + go/vt/vttablet/tabletmanager/tm_init_test.go | 9 +- .../tabletserver/state_manager_test.go | 1 - .../vttablet/tabletserver/tabletenv/config.go | 44 ++ .../tabletserver/tabletenv/config_test.go | 9 + go/vt/wrangler/fake_tablet_test.go | 7 + go/vt/wrangler/testlib/fake_tablet.go | 7 + proto/replicationdata.proto | 1 + web/vtadmin/src/proto/vtadmin.d.ts | 6 + web/vtadmin/src/proto/vtadmin.js | 23 + 36 files changed, 1642 insertions(+), 30 deletions(-) create mode 100644 go/vt/sidecardb/schema/misc/semisync_heartbeat.sql create mode 100644 go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go create mode 100644 go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md index 1e7d713b60e..2acac12c694 100644 --- a/changelog/22.0/22.0.0/summary.md +++ b/changelog/22.0/22.0.0/summary.md @@ -18,6 +18,7 @@ - **[Update lite images to Debian Bookworm](#debian-bookworm)** - **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)** - **[Support for Filtering Query logs on Error](#query-logs)** + - **[Semi-sync monitor in vttablet](#semi-sync-monitor)** - **[Minor Changes](#minor-changes)** - **[VTTablet Flags](#flags-vttablet)** - **[VTTablet ACL enforcement and reloading](#reloading-vttablet-acl)** @@ -162,6 +163,14 @@ Users can continue to specify exact keyranges. The new feature is backward compa The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet. +### Semi-sync monitor in vttablet + +A new component has been added to the vttablet binary to monitor the semi-sync status of primary vttablets. We've observed cases where a brief network disruption can cause the primary to get stuck indefinitely waiting for semi-sync ACKs. In rare scenarios, this can block reparent operations and render the primary unresponsive. More information can be found in the issues https://github.com/vitessio/vitess/issues/17709 and https://github.com/vitessio/vitess/issues/17749. + +To address this, the new component continuously monitors the semi-sync status. If the primary becomes stuck on semi-sync ACKs, it generates writes to unblock it. If this fails, VTOrc is notified of the issue and initiates an emergency reparent operation. + +The monitoring interval can be adjusted using the `--semi-sync-monitor-interval` flag, which defaults to 10 seconds. + ## Minor Changes #### VTTablet Flags diff --git a/go/cmd/vtctldclient/command/framework_test.go b/go/cmd/vtctldclient/command/framework_test.go index 351356ea3a0..b4c06cdd014 100644 --- a/go/cmd/vtctldclient/command/framework_test.go +++ b/go/cmd/vtctldclient/command/framework_test.go @@ -30,12 +30,14 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttablet/grpctmserver" "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletservermock" "vitess.io/vitess/go/vt/vttablet/tmclient" @@ -158,6 +160,10 @@ func NewFakeTablet(t *testing.T, ts *topo.Server, cell string, uid uint32, table } } +var ( + exporter = servenv.NewExporter("TestVtctldClientCommand", "") +) + // StartActionLoop will start the action loop for a fake tablet, // using ft.FakeMysqlDaemon as the backing mysqld. func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) { @@ -203,6 +209,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) { DBConfigs: &dbconfigs.DBConfigs{}, QueryServiceControl: tabletservermock.NewController(), VREngine: vreplication.NewTestEngine(ts, ft.Tablet.Alias.Cell, ft.FakeMysqlDaemon, binlogplayer.NewFakeDBClient, binlogplayer.NewFakeDBClient, topoproto.TabletDbName(ft.Tablet), nil), + SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(ft.FakeMysqlDaemon.DB(), exporter), Env: vtenv.NewTestEnv(), } if err := ft.TM.Start(ft.Tablet, nil); err != nil { diff --git a/go/cmd/vttablet/cli/cli.go b/go/cmd/vttablet/cli/cli.go index e48a11c79dc..a953ed2b448 100644 --- a/go/cmd/vttablet/cli/cli.go +++ b/go/cmd/vttablet/cli/cli.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttablet/tabletmanager" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletserver" @@ -168,6 +169,7 @@ func run(cmd *cobra.Command, args []string) error { QueryServiceControl: qsc, UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine(), env.Parser()), VREngine: vreplication.NewEngine(env, config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()), + SemiSyncMonitor: semisyncmonitor.NewMonitor(config, qsc.Exporter()), VDiffEngine: vdiff.NewEngine(ts, tablet, env.CollationEnv(), env.Parser()), } if err := tm.Start(tablet, config); err != nil { diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 04520a8d1ad..9807595b6a6 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -322,6 +322,7 @@ Flags: --schema_change_signal Enable the schema tracker; requires queryserver-config-schema-change-signal to be enabled on the underlying vttablets for this to work (default true) --schema_dir string Schema base directory. Should contain one directory per keyspace, with a vschema.json file if necessary. --security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only) + --semi-sync-monitor-interval duration How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s) --service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice --serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state --shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 727448ce3de..e37d1ccc6af 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -322,6 +322,7 @@ Flags: --schema-change-reload-timeout duration query server schema change reload timeout, this is how long to wait for the signaled schema reload operation to complete before giving up (default 30s) --schema-version-max-age-seconds int max age of schema version records to kept in memory by the vreplication historian --security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only) + --semi-sync-monitor-interval duration How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s) --service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice --serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state --shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s) diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index fc5db965847..ca9cbcefcf5 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -19,15 +19,20 @@ package newfeaturetest import ( "context" "fmt" + "os" + "os/exec" + "strings" "sync" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/reparent/utils" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" ) @@ -234,3 +239,99 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) { // Wait for all the writes to have succeeded. wg.Wait() } + +// TestSemiSyncBlockDueToDisruption tests that Vitess can recover from a situation +// where a primary is stuck waiting for semi-sync ACKs due to a network issue, +// even if no new writes from the user arrives. +func TestSemiSyncBlockDueToDisruption(t *testing.T) { + // This is always set to "true" on GitHub Actions runners: + // https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables + ci, ok := os.LookupEnv("CI") + if ok && strings.ToLower(ci) == "true" { + t.Skip("Test not meant to be run on CI") + } + clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync) + defer utils.TeardownCluster(clusterInstance) + tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets + utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]}) + + // stop heartbeats on all the replicas + for idx, tablet := range tablets { + if idx == 0 { + continue + } + utils.RunSQLs(context.Background(), t, []string{ + "stop slave;", + "change master to MASTER_HEARTBEAT_PERIOD = 0;", + "start slave;", + }, tablet) + } + + // Take a backup of the pf.conf file + runCommandWithSudo(t, "cp", "/etc/pf.conf", "/etc/pf.conf.backup") + defer func() { + // Restore the file from backup + runCommandWithSudo(t, "mv", "/etc/pf.conf.backup", "/etc/pf.conf") + runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf") + }() + // Disrupt the network between the primary and the replicas + runCommandWithSudo(t, "sh", "-c", fmt.Sprintf("echo 'block in proto tcp from any to any port %d' | sudo tee -a /etc/pf.conf > /dev/null", tablets[0].MySQLPort)) + + // This following command is only required if pfctl is not already enabled + //runCommandWithSudo(t, "pfctl", "-e") + runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf") + rules := runCommandWithSudo(t, "pfctl", "-s", "rules") + log.Errorf("Rules enforced - %v", rules) + + // Start a write that will be blocked by the primary waiting for semi-sync ACKs + ch := make(chan any) + go func() { + defer func() { + close(ch) + }() + utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]}) + }() + + // Starting VTOrc later now, because we don't want it to fix the heartbeat interval + // on the replica's before the disruption has been introduced. + err := clusterInstance.StartVTOrc(clusterInstance.Keyspaces[0].Name) + require.NoError(t, err) + go func() { + for { + select { + case <-ch: + return + case <-time.After(1 * time.Second): + str, isPresent := tablets[0].VttabletProcess.GetVars()["SemiSyncMonitorWritesBlocked"] + if isPresent { + log.Errorf("SemiSyncMonitorWritesBlocked - %v", str) + } + } + } + }() + // If the network disruption is too long lived, then we will end up running ERS from VTOrc. + networkDisruptionDuration := 43 * time.Second + time.Sleep(networkDisruptionDuration) + + // Restore the network + runCommandWithSudo(t, "cp", "/etc/pf.conf.backup", "/etc/pf.conf") + runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf") + + // We expect the problem to be resolved in less than 30 seconds. + select { + case <-time.After(30 * time.Second): + t.Errorf("Timed out waiting for semi-sync to be unblocked") + case <-ch: + log.Errorf("Woohoo, write finished!") + } +} + +// runCommandWithSudo runs the provided command with sudo privileges +// when the command is run, it prompts the user for the password, and it must be +// entered for the program to resume. +func runCommandWithSudo(t *testing.T, args ...string) string { + cmd := exec.Command("sudo", args...) + out, err := cmd.CombinedOutput() + assert.NoError(t, err, string(out)) + return string(out) +} diff --git a/go/test/endtoend/vreplication/sidecardb_test.go b/go/test/endtoend/vreplication/sidecardb_test.go index f908d66a2ec..54c1a10130f 100644 --- a/go/test/endtoend/vreplication/sidecardb_test.go +++ b/go/test/endtoend/vreplication/sidecardb_test.go @@ -38,7 +38,7 @@ var ddls1, ddls2 []string func init() { sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action", - "redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", + "redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", "semisync_heartbeat", "tables", "udfs", "vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"} numSidecarDBTables = len(sidecarDBTables) ddls1 = []string{ diff --git a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go index 6a565ac046f..531b069535a 100644 --- a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go +++ b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go @@ -91,6 +91,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.True(t, primaryInstance.SemiSyncReplicaEnabled) assert.True(t, primaryInstance.SemiSyncPrimaryStatus) assert.False(t, primaryInstance.SemiSyncReplicaStatus) + assert.False(t, primaryInstance.SemiSyncBlocked) assert.EqualValues(t, 2, primaryInstance.SemiSyncPrimaryClients) assert.EqualValues(t, 1, primaryInstance.SemiSyncPrimaryWaitForReplicaCount) assert.EqualValues(t, 1000000000000000000, primaryInstance.SemiSyncPrimaryTimeout) @@ -142,6 +143,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) { assert.False(t, replicaInstance.SemiSyncPrimaryEnabled) assert.True(t, replicaInstance.SemiSyncReplicaEnabled) assert.False(t, replicaInstance.SemiSyncPrimaryStatus) + assert.False(t, replicaInstance.SemiSyncBlocked) assert.True(t, replicaInstance.SemiSyncReplicaStatus) assert.EqualValues(t, 0, replicaInstance.SemiSyncPrimaryClients) assert.EqualValues(t, 1, replicaInstance.SemiSyncPrimaryWaitForReplicaCount) diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index e6afe7917f1..a2b5e66bd49 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -217,6 +217,11 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon { return result } +// DB returns the fakesqldb.DB object. +func (fmd *FakeMysqlDaemon) DB() *fakesqldb.DB { + return fmd.db +} + // Start is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *Mycnf, mysqldArgs ...string) error { if fmd.Running { diff --git a/go/vt/proto/replicationdata/replicationdata.pb.go b/go/vt/proto/replicationdata/replicationdata.pb.go index d99881e2cad..2cd8424135a 100644 --- a/go/vt/proto/replicationdata/replicationdata.pb.go +++ b/go/vt/proto/replicationdata/replicationdata.pb.go @@ -504,6 +504,7 @@ type FullStatus struct { SuperReadOnly bool `protobuf:"varint,21,opt,name=super_read_only,json=superReadOnly,proto3" json:"super_read_only,omitempty"` ReplicationConfiguration *Configuration `protobuf:"bytes,22,opt,name=replication_configuration,json=replicationConfiguration,proto3" json:"replication_configuration,omitempty"` DiskStalled bool `protobuf:"varint,23,opt,name=disk_stalled,json=diskStalled,proto3" json:"disk_stalled,omitempty"` + SemiSyncBlocked bool `protobuf:"varint,24,opt,name=semi_sync_blocked,json=semiSyncBlocked,proto3" json:"semi_sync_blocked,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -699,6 +700,13 @@ func (x *FullStatus) GetDiskStalled() bool { return false } +func (x *FullStatus) GetSemiSyncBlocked() bool { + if x != nil { + return x.SemiSyncBlocked + } + return false +} + var File_replicationdata_proto protoreflect.FileDescriptor var file_replicationdata_proto_rawDesc = string([]byte{ @@ -786,7 +794,7 @@ var file_replicationdata_proto_rawDesc = string([]byte{ 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x66, 0x69, 0x6c, 0x65, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x55, 0x75, 0x69, 0x64, 0x22, 0xeb, 0x08, 0x0a, 0x0a, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x74, 0x61, + 0x55, 0x75, 0x69, 0x64, 0x22, 0x97, 0x09, 0x0a, 0x0a, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18, @@ -857,14 +865,17 @@ var file_replicationdata_proto_rawDesc = string([]byte{ 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x64, 0x69, 0x73, 0x6b, 0x5f, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x65, 0x64, 0x18, 0x17, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x64, 0x69, 0x73, 0x6b, 0x53, 0x74, 0x61, 0x6c, 0x6c, - 0x65, 0x64, 0x2a, 0x3b, 0x0a, 0x13, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4f, 0x41, - 0x4e, 0x44, 0x53, 0x51, 0x4c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, - 0x0c, 0x49, 0x4f, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x4f, 0x4e, 0x4c, 0x59, 0x10, 0x01, 0x42, - 0x2e, 0x5a, 0x2c, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, - 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x64, 0x12, 0x2a, 0x0a, 0x11, 0x73, 0x65, 0x6d, 0x69, 0x5f, 0x73, 0x79, 0x6e, 0x63, 0x5f, + 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x18, 0x18, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x73, + 0x65, 0x6d, 0x69, 0x53, 0x79, 0x6e, 0x63, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x65, 0x64, 0x2a, 0x3b, + 0x0a, 0x13, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4f, 0x41, 0x4e, 0x44, 0x53, 0x51, + 0x4c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x49, 0x4f, 0x54, + 0x48, 0x52, 0x45, 0x41, 0x44, 0x4f, 0x4e, 0x4c, 0x59, 0x10, 0x01, 0x42, 0x2e, 0x5a, 0x2c, 0x76, + 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, + 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x65, 0x70, 0x6c, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, }) var ( diff --git a/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go b/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go index 92f8e3074c3..d4e2af4496e 100644 --- a/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go +++ b/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go @@ -143,6 +143,7 @@ func (m *FullStatus) CloneVT() *FullStatus { r.SuperReadOnly = m.SuperReadOnly r.ReplicationConfiguration = m.ReplicationConfiguration.CloneVT() r.DiskStalled = m.DiskStalled + r.SemiSyncBlocked = m.SemiSyncBlocked if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -553,6 +554,18 @@ func (m *FullStatus) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.SemiSyncBlocked { + i-- + if m.SemiSyncBlocked { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xc0 + } if m.DiskStalled { i-- if m.DiskStalled { @@ -991,6 +1004,9 @@ func (m *FullStatus) SizeVT() (n int) { if m.DiskStalled { n += 3 } + if m.SemiSyncBlocked { + n += 3 + } n += len(m.unknownFields) return n } @@ -2587,6 +2603,26 @@ func (m *FullStatus) UnmarshalVT(dAtA []byte) error { } } m.DiskStalled = bool(v != 0) + case 24: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SemiSyncBlocked", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.SemiSyncBlocked = bool(v != 0) default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/go/vt/sidecardb/schema/misc/semisync_heartbeat.sql b/go/vt/sidecardb/schema/misc/semisync_heartbeat.sql new file mode 100644 index 00000000000..01b8e5d21c2 --- /dev/null +++ b/go/vt/sidecardb/schema/misc/semisync_heartbeat.sql @@ -0,0 +1,21 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +CREATE TABLE IF NOT EXISTS semisync_heartbeat +( + ts BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (`ts`) +) ENGINE = InnoDB CHARSET = utf8mb4 diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index 8baa9a12476..cdbc943690e 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -105,6 +105,7 @@ CREATE TABLE database_instance ( semi_sync_primary_status TINYint NOT NULL DEFAULT 0, semi_sync_replica_status TINYint NOT NULL DEFAULT 0, semi_sync_primary_clients int NOT NULL DEFAULT 0, + semi_sync_blocked tinyint NOT NULL DEFAULT 0, is_disk_stalled TINYint NOT NULL DEFAULT 0, PRIMARY KEY (alias) )`, diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go index 6a800e5ee0b..3647134cd54 100644 --- a/go/vt/vtorc/inst/analysis.go +++ b/go/vt/vtorc/inst/analysis.go @@ -55,6 +55,7 @@ const ( AllPrimaryReplicasNotReplicatingOrDead AnalysisCode = "AllPrimaryReplicasNotReplicatingOrDead" LockedSemiSyncPrimaryHypothesis AnalysisCode = "LockedSemiSyncPrimaryHypothesis" LockedSemiSyncPrimary AnalysisCode = "LockedSemiSyncPrimary" + PrimarySemiSyncBlocked AnalysisCode = "PrimarySemiSyncBlocked" ErrantGTIDDetected AnalysisCode = "ErrantGTIDDetected" PrimaryDiskStalled AnalysisCode = "PrimaryDiskStalled" ) @@ -115,6 +116,7 @@ type ReplicationAnalysis struct { SemiSyncPrimaryWaitForReplicaCount uint SemiSyncPrimaryClients uint SemiSyncReplicaEnabled bool + SemiSyncBlocked bool CountSemiSyncReplicasEnabled uint CountLoggingReplicas uint CountStatementBasedLoggingReplicas uint diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index d487973b0f0..685d84b8a00 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -152,6 +152,9 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna MIN( primary_instance.semi_sync_primary_status ) AS semi_sync_primary_status, + MIN( + primary_instance.semi_sync_blocked + ) AS semi_sync_blocked, MIN( primary_instance.semi_sync_replica_enabled ) AS semi_sync_replica_enabled, @@ -333,6 +336,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna a.BinlogServerImmediateTopology = countValidBinlogServerReplicas == a.CountValidReplicas && a.CountValidReplicas > 0 a.SemiSyncPrimaryEnabled = m.GetBool("semi_sync_primary_enabled") a.SemiSyncPrimaryStatus = m.GetBool("semi_sync_primary_status") + a.SemiSyncBlocked = m.GetBool("semi_sync_blocked") a.SemiSyncReplicaEnabled = m.GetBool("semi_sync_replica_enabled") a.CountSemiSyncReplicasEnabled = m.GetUint("count_semi_sync_replicas") // countValidSemiSyncReplicasEnabled := m.GetUint("count_valid_semi_sync_replicas") @@ -458,6 +462,13 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna a.Analysis = PrimaryTabletDeleted a.Description = "Primary tablet has been deleted" ca.hasClusterwideAction = true + } else if a.IsPrimary && a.SemiSyncBlocked && a.CountSemiSyncReplicasEnabled >= a.SemiSyncPrimaryWaitForReplicaCount { + // The primary is reporting that semi-sync monitor is blocked on writes. + // There are enough replicas configured to send semi-sync ACKs such that the primary shouldn't be blocked. + // There is some network diruption in progress. We should run an ERS. + a.Analysis = PrimarySemiSyncBlocked + a.Description = "Writes seem to be blocked on semi-sync acks on the primary, even though sufficient replicas are configured to send ACKs" + ca.hasClusterwideAction = true } else if topo.IsReplicaType(a.TabletType) && !a.IsReadOnly { a.Analysis = ReplicaIsWritable a.Description = "Replica is writable" diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index baa1121b776..a58414af3e8 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -34,10 +34,10 @@ var ( // The initialSQL is a set of insert commands copied from a dump of an actual running VTOrc instances. The relevant insert commands are here. // This is a dump taken from a test running 4 tablets, zone1-101 is the primary, zone1-100 is a replica, zone1-112 is a rdonly and zone2-200 is a cross-cell replica. initialSQL = []string{ - `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0,false);`, - `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0,false);`, - `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2,false);`, - `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0,false);`, + `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0,false,false);`, + `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0,false,false);`, + `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2,false,false);`, + `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0,false,false);`, `INSERT INTO vitess_tablet VALUES('zone1-0000000100','localhost',6711,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731307d20706f72745f6d61703a7b6b65793a227674222076616c75653a363730397d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363731312064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, `INSERT INTO vitess_tablet VALUES('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, `INSERT INTO vitess_tablet VALUES('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, @@ -119,6 +119,62 @@ func TestGetReplicationAnalysisDecision(t *testing.T) { keyspaceWanted: "ks", shardWanted: "0", codeWanted: PrimaryDiskStalled, + }, { + name: "PrimarySemiSyncBlocked", + info: []*test.InfoForRecoveryAnalysis{{ + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_PRIMARY, + MysqlHostname: "localhost", + MysqlPort: 6709, + }, + DurabilityPolicy: "semi_sync", + LastCheckValid: 1, + CountReplicas: 4, + CountValidReplicas: 4, + CountValidReplicatingReplicas: 4, + IsPrimary: 1, + SemiSyncPrimaryEnabled: 1, + SemiSyncPrimaryStatus: 1, + SemiSyncPrimaryWaitForReplicaCount: 2, + CountSemiSyncReplicasEnabled: 2, + SemiSyncPrimaryClients: 0, + SemiSyncBlocked: 1, + }}, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: PrimarySemiSyncBlocked, + }, { + name: "LockedSemiSync", + info: []*test.InfoForRecoveryAnalysis{{ + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_PRIMARY, + MysqlHostname: "localhost", + MysqlPort: 6709, + }, + DurabilityPolicy: "semi_sync", + LastCheckValid: 1, + CountReplicas: 4, + CountValidReplicas: 4, + CountValidReplicatingReplicas: 4, + IsPrimary: 1, + SemiSyncPrimaryEnabled: 1, + SemiSyncPrimaryStatus: 1, + SemiSyncPrimaryWaitForReplicaCount: 2, + CountSemiSyncReplicasEnabled: 1, + SemiSyncPrimaryClients: 1, + SemiSyncBlocked: 1, + }}, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: LockedSemiSyncPrimaryHypothesis, }, { name: "DeadPrimary", info: []*test.InfoForRecoveryAnalysis{{ diff --git a/go/vt/vtorc/inst/instance.go b/go/vt/vtorc/inst/instance.go index b7b097bb14d..f8329e62288 100644 --- a/go/vt/vtorc/inst/instance.go +++ b/go/vt/vtorc/inst/instance.go @@ -85,6 +85,7 @@ type Instance struct { SemiSyncPrimaryStatus bool SemiSyncPrimaryClients uint SemiSyncReplicaStatus bool + SemiSyncBlocked bool LastSeenTimestamp string IsLastCheckValid bool diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 916f4757722..82a28370069 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -240,6 +240,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named instance.SemiSyncPrimaryClients = uint(fs.SemiSyncPrimaryClients) instance.SemiSyncPrimaryStatus = fs.SemiSyncPrimaryStatus instance.SemiSyncReplicaStatus = fs.SemiSyncReplicaStatus + instance.SemiSyncBlocked = fs.SemiSyncBlocked if instance.IsOracleMySQL() || instance.IsPercona() { // Stuff only supported on Oracle / Percona MySQL @@ -579,6 +580,7 @@ func readInstanceRow(m sqlutils.RowMap) *Instance { instance.SemiSyncPrimaryStatus = m.GetBool("semi_sync_primary_status") instance.SemiSyncPrimaryClients = m.GetUint("semi_sync_primary_clients") instance.SemiSyncReplicaStatus = m.GetBool("semi_sync_replica_status") + instance.SemiSyncBlocked = m.GetBool("semi_sync_blocked") instance.ReplicationDepth = m.GetUint("replication_depth") instance.IsCoPrimary = m.GetBool("is_co_primary") instance.HasReplicationCredentials = m.GetBool("has_replication_credentials") @@ -879,6 +881,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, "semi_sync_primary_status", "semi_sync_primary_clients", "semi_sync_replica_status", + "semi_sync_blocked", "last_discovery_latency", "is_disk_stalled", } @@ -959,6 +962,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, args = append(args, instance.SemiSyncPrimaryStatus) args = append(args, instance.SemiSyncPrimaryClients) args = append(args, instance.SemiSyncReplicaStatus) + args = append(args, instance.SemiSyncBlocked) args = append(args, instance.LastDiscoveryLatency.Nanoseconds()) args = append(args, instance.StalledDisk) } diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index c3b99455741..e518d563739 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -64,13 +64,13 @@ func TestMkInsertSingle(t *testing.T) { version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, - source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, is_disk_stalled, last_seen) + source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, semi_sync_blocked, last_discovery_latency, is_disk_stalled, last_seen) VALUES - (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) ` a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, - false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,` + false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false,` sql1, args1, err := mkInsertForInstances(instances[:1], false, true) require.NoError(t, err) @@ -87,16 +87,16 @@ func TestMkInsertThree(t *testing.T) { version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, - source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, is_disk_stalled, last_seen) + source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, semi_sync_blocked, last_discovery_latency, is_disk_stalled, last_seen) VALUES - (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), - (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), - (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) ` a3 := ` - zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false, - zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false, - zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false, + zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false ,false, 0, false, + zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false, + zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, false, 0, false, ` sql3, args3, err := mkInsertForInstances(instances[:3], true, true) diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 41c0ca5d398..82894e8b369 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -293,7 +293,7 @@ func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.Repl func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, tabletAlias string) recoveryFunction { switch analysisCode { // primary - case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas, inst.PrimaryDiskStalled: + case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas, inst.PrimaryDiskStalled, inst.PrimarySemiSyncBlocked: // If ERS is disabled, we have no way of repairing the cluster. if !config.ERSEnabled() { log.Infof("VTOrc not configured to run ERS, skipping recovering %v", analysisCode) diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go index 9df5fc989f0..68579dd05cb 100644 --- a/go/vt/vtorc/logic/topology_recovery_test.go +++ b/go/vt/vtorc/logic/topology_recovery_test.go @@ -49,6 +49,11 @@ func TestAnalysisEntriesHaveSameRecovery(t *testing.T) { prevAnalysisCode: inst.DeadPrimary, newAnalysisCode: inst.PrimaryDiskStalled, shouldBeEqual: true, + }, { + // PrimarySemiSyncBlocked and PrimaryDiskStalled have the same recovery + prevAnalysisCode: inst.PrimarySemiSyncBlocked, + newAnalysisCode: inst.PrimaryDiskStalled, + shouldBeEqual: true, }, { // DeadPrimary and PrimaryTabletDeleted are different recoveries. prevAnalysisCode: inst.DeadPrimary, @@ -232,6 +237,16 @@ func TestGetCheckAndRecoverFunctionCode(t *testing.T) { ersEnabled: false, analysisCode: inst.PrimaryDiskStalled, wantRecoveryFunction: noRecoveryFunc, + }, { + name: "PrimarySemiSyncBlocked with ERS enabled", + ersEnabled: true, + analysisCode: inst.PrimarySemiSyncBlocked, + wantRecoveryFunction: recoverDeadPrimaryFunc, + }, { + name: "PrimarySemiSyncBlocked with ERS disabled", + ersEnabled: false, + analysisCode: inst.PrimarySemiSyncBlocked, + wantRecoveryFunction: noRecoveryFunc, }, { name: "PrimaryTabletDeleted with ERS enabled", ersEnabled: true, diff --git a/go/vt/vtorc/test/recovery_analysis.go b/go/vt/vtorc/test/recovery_analysis.go index bb6e4132243..eaf0dac2258 100644 --- a/go/vt/vtorc/test/recovery_analysis.go +++ b/go/vt/vtorc/test/recovery_analysis.go @@ -65,6 +65,7 @@ type InfoForRecoveryAnalysis struct { CountValidBinlogServerReplicas uint SemiSyncPrimaryEnabled int SemiSyncPrimaryStatus int + SemiSyncBlocked int SemiSyncPrimaryWaitForReplicaCount uint SemiSyncPrimaryClients uint SemiSyncReplicaEnabled int @@ -142,6 +143,7 @@ func (info *InfoForRecoveryAnalysis) ConvertToRowMap() sqlutils.RowMap { rowMap["semi_sync_primary_clients"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncPrimaryClients), Valid: true} rowMap["semi_sync_primary_enabled"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncPrimaryEnabled), Valid: true} rowMap["semi_sync_primary_status"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncPrimaryStatus), Valid: true} + rowMap["semi_sync_blocked"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncBlocked), Valid: true} rowMap["semi_sync_primary_wait_for_replica_count"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncPrimaryWaitForReplicaCount), Valid: true} rowMap["semi_sync_replica_enabled"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncReplicaEnabled), Valid: true} res, _ := prototext.Marshal(info.TabletInfo) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 070eab9a38a..e8536521ede 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -187,6 +187,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful SemiSyncPrimaryClients: semiSyncClients, SemiSyncPrimaryTimeout: semiSyncTimeout, SemiSyncWaitForReplicaCount: semiSyncNumReplicas, + SemiSyncBlocked: tm.SemiSyncMonitor.AllWritesBlocked(), SuperReadOnly: superReadOnly, ReplicationConfiguration: replConfiguration, }, nil @@ -592,8 +593,14 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure }() } - // Now that we know no writes are in-flight and no new writes can occur, - // set MySQL to super_read_only mode. If we are already super_read_only because of a + // Now we know no writes are in-flight and no new writes can occur. + // We just need to wait for no write being blocked on semi-sync ACKs. + err = tm.SemiSyncMonitor.WaitUntilSemiSyncUnblocked(ctx) + if err != nil { + return nil, err + } + + // We can now set MySQL to super_read_only mode. If we are already super_read_only because of a // previous demotion, or because we are not primary anyway, this should be // idempotent. if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, true); err != nil { @@ -1052,10 +1059,24 @@ func (tm *TabletManager) fixSemiSync(ctx context.Context, tabletType topodatapb. case SemiSyncActionNone: return nil case SemiSyncActionSet: + if tm.SemiSyncMonitor != nil { + // We want to enable the semi-sync monitor only if the tablet is going to start + // expecting semi-sync ACKs. + if tabletType == topodatapb.TabletType_PRIMARY { + tm.SemiSyncMonitor.Open() + } else { + tm.SemiSyncMonitor.Close() + } + } // Always enable replica-side since it doesn't hurt to keep it on for a primary. // The primary-side needs to be off for a replica, or else it will get stuck. return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, tabletType == topodatapb.TabletType_PRIMARY, true) case SemiSyncActionUnset: + // The nil check is required for vtcombo, which doesn't run the semi-sync monitor + // but does try to turn off semi-sync. + if tm.SemiSyncMonitor != nil { + tm.SemiSyncMonitor.Close() + } return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, false, false) default: return vterrors.Errorf(vtrpc.Code_INTERNAL, "Unknown SemiSyncAction - %v", semiSync) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index 4efb7b13081..98600b5f9b5 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -25,7 +25,12 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/mysqlctl" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor" "vitess.io/vitess/go/vt/vttablet/tabletserver" ) @@ -77,15 +82,17 @@ func TestDemotePrimaryStalled(t *testing.T) { qsWaitChan: make(chan any), } // Create a tablet manager with a replica type tablet. + fakeDb := newTestMysqlDaemon(t, 1) tm := &TabletManager{ actionSema: semaphore.NewWeighted(1), - MysqlDaemon: newTestMysqlDaemon(t, 1), + MysqlDaemon: fakeDb, tmState: &tmState{ displayState: displayState{ tablet: newTestTablet(t, 100, "ks", "-", map[string]string{}), }, }, QueryServiceControl: qsc, + SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(fakeDb.DB(), exporter), } go func() { @@ -105,3 +112,60 @@ func TestDemotePrimaryStalled(t *testing.T) { return !qsc.primaryStalled.Load() }, 5*time.Second, 100*time.Millisecond) } + +// TestDemotePrimaryWaitingForSemiSyncUnblock tests that demote primary unblocks if the primary is blocked on semi-sync ACKs +// and doesn't issue the set super read-only query until all writes waiting on semi-sync ACKs have gone through. +func TestDemotePrimaryWaitingForSemiSyncUnblock(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ts := memorytopo.NewServer(ctx, "cell1") + tm := newTestTM(t, ts, 1, "ks", "0", nil) + // Make the tablet a primary. + err := tm.ChangeType(ctx, topodatapb.TabletType_PRIMARY, false) + require.NoError(t, err) + fakeMysqlDaemon := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon) + fakeDb := fakeMysqlDaemon.DB() + fakeDb.SetNeverFail(true) + + tm.SemiSyncMonitor.Open() + // Add a universal insert query pattern that would block until we make it unblock. + ch := make(chan int) + fakeDb.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { + <-ch + }) + // Add a fake query that makes the semi-sync monitor believe that the tablet is blocked on semi-sync ACKs. + fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1")) + + // Verify that in the beginning the tablet is serving. + require.True(t, tm.QueryServiceControl.IsServing()) + + // Start the demote primary operation in a go routine. + var demotePrimaryFinished atomic.Bool + go func() { + _, err := tm.demotePrimary(ctx, false) + require.NoError(t, err) + demotePrimaryFinished.Store(true) + }() + + // Wait for the demote primary operation to have changed the serving state. + // After that point, we can assume that the demote primary gets blocked on writes waiting for semi-sync ACKs. + require.Eventually(t, func() bool { + return !tm.QueryServiceControl.IsServing() + }, 5*time.Second, 100*time.Millisecond) + + // DemotePrimary shouldn't have finished yet. + require.False(t, demotePrimaryFinished.Load()) + // We shouldn't have seen the super-read only query either. + require.False(t, fakeMysqlDaemon.SuperReadOnly.Load()) + + // Now we unblock the semi-sync monitor. + fakeDb.AddQuery("select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')", sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + close(ch) + + // This should unblock the demote primary operation eventually. + require.Eventually(t, func() bool { + return demotePrimaryFinished.Load() + }, 5*time.Second, 100*time.Millisecond) + // We should have also seen the super-read only query. + require.True(t, fakeMysqlDaemon.SuperReadOnly.Load()) +} diff --git a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go new file mode 100644 index 00000000000..0ea37e29284 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor.go @@ -0,0 +1,406 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package semisyncmonitor + +import ( + "context" + "errors" + "sync" + "time" + + "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/timer" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/dbconnpool" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" +) + +const ( + semiSyncWaitSessionsRead = "select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')" + semiSyncHeartbeatWrite = "INSERT INTO %s.semisync_heartbeat (ts) VALUES (NOW())" + semiSyncHeartbeatClear = "TRUNCATE TABLE %s.semisync_heartbeat" + maxWritesPermitted = 15 + clearTimerDuration = 24 * time.Hour +) + +var ( + // waitBetweenWrites is the time to wait between consecutive writes. + // This is a variable instead of a constant only to be tweaked in tests. + waitBetweenWrites = 1 * time.Second +) + +// Monitor is a monitor that checks if the primary tablet +// is blocked on a semi-sync ack from the replica. +// If the semi-sync ACK is lost in the network, +// it is possible that the primary is indefinitely stuck, +// blocking PRS. The monitor looks for this situation and manufactures a write +// periodically to unblock the primary. +type Monitor struct { + // config is used to get the connection parameters. + config *tabletenv.TabletConfig + // ticks is the ticker on which we'll check + // if the primary is blocked on semi-sync ACKs or not. + ticks *timer.Timer + // clearTicks is the ticker to clear the data in + // the semisync_heartbeat table. + clearTicks *timer.Timer + + // mu protects the fields below. + mu sync.Mutex + appPool *dbconnpool.ConnectionPool + isOpen bool + // isWriting stores if the monitor is currently writing to the DB. + // We don't want two different threads initiating writes, so we use this + // for synchronization. + isWriting bool + // inProgressWriteCount is the number of writes currently in progress. + // The writes from the monitor themselves might get blocked and hence a count for them is required. + // After enough writes are blocked, we want to notify VTOrc to run an ERS. + inProgressWriteCount int + // isBlocked stores if the primary is blocked on semi-sync ack. + isBlocked bool + // waiters stores the list of waiters that are waiting for the primary to be unblocked. + waiters []chan struct{} + // writesBlockedGauge is a gauge tracking the number of writes the monitor is blocked on. + writesBlockedGauge *stats.Gauge + // errorCount is the number of errors that the semi-sync monitor ran into. + // We ignore some of the errors, so the counter is a good way to track how many errors we have seen. + errorCount *stats.Counter +} + +// NewMonitor creates a new Monitor. +func NewMonitor(config *tabletenv.TabletConfig, exporter *servenv.Exporter) *Monitor { + return &Monitor{ + config: config, + ticks: timer.NewTimer(config.SemiSyncMonitor.Interval), + // We clear the data every day. We can make it configurable in the future, + // but this seams fine for now. + clearTicks: timer.NewTimer(clearTimerDuration), + writesBlockedGauge: exporter.NewGauge("SemiSyncMonitorWritesBlocked", "Number of writes blocked in the semi-sync monitor"), + errorCount: exporter.NewCounter("SemiSyncMonitorErrorCount", "Number of errors encountered by the semi-sync monitor"), + appPool: dbconnpool.NewConnectionPool("SemiSyncMonitorAppPool", exporter, maxWritesPermitted+5, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution), + waiters: make([]chan struct{}, 0), + } +} + +// CreateTestSemiSyncMonitor created a monitor for testing. +// It takes an optional fake db. +func CreateTestSemiSyncMonitor(db *fakesqldb.DB, exporter *servenv.Exporter) *Monitor { + var dbc *dbconfigs.DBConfigs + if db != nil { + params := db.ConnParams() + cp := *params + dbc = dbconfigs.NewTestDBConfigs(cp, cp, "") + } + return NewMonitor(&tabletenv.TabletConfig{ + DB: dbc, + SemiSyncMonitor: tabletenv.SemiSyncMonitorConfig{ + Interval: 1 * time.Second, + }, + }, exporter) +} + +// Open starts the monitor. +func (m *Monitor) Open() { + m.mu.Lock() + defer m.mu.Unlock() + // The check for config being nil is only requried for tests. + if m.isOpen || m.config == nil || m.config.DB == nil { + // If we are already open, then there is nothing to do + return + } + // Set the monitor to be open. + m.isOpen = true + log.Info("SemiSync Monitor: opening") + + // This function could be running from within a unit test scope, in which case we use + // mock pools that are already open. This is why we test for the pool being open. + if !m.appPool.IsOpen() { + m.appPool.Open(m.config.DB.AppWithDB()) + } + m.clearTicks.Start(m.clearAllData) + m.ticks.Start(m.checkAndFixSemiSyncBlocked) +} + +// Close stops the monitor. +func (m *Monitor) Close() { + m.mu.Lock() + defer m.mu.Unlock() + if !m.isOpen { + // If we are already closed, then there is nothing to do + return + } + m.isOpen = false + log.Info("SemiSync Monitor: closing") + m.clearTicks.Stop() + m.ticks.Stop() + m.appPool.Close() +} + +// checkAndFixSemiSyncBlocked checks if the primary is blocked on semi-sync ack +// and manufactures a write to unblock the primary. This function is safe to +// be called multiple times in parallel. +func (m *Monitor) checkAndFixSemiSyncBlocked() { + // Check if semi-sync is blocked or not + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + isBlocked, err := m.isSemiSyncBlocked(ctx) + if err != nil { + m.errorCount.Add(1) + // If we are unable to determine whether the primary is blocked or not, + // then we can just abort the function and try again later. + log.Errorf("SemiSync Monitor: failed to check if primary is blocked on semi-sync: %v", err) + return + } + // Set the isBlocked state. + m.setIsBlocked(isBlocked) + if isBlocked { + // If we are blocked, then we want to start the writes. + // That function is re-entrant. If we are already writing, then it will just return. + // We start it in a go-routine, because we want to continue to check for when + // we get unblocked. + go m.startWrites() + } +} + +// isSemiSyncBlocked checks if the primary is blocked on semi-sync. +func (m *Monitor) isSemiSyncBlocked(ctx context.Context) (bool, error) { + // Get a connection from the pool + conn, err := m.appPool.Get(ctx) + if err != nil { + return false, err + } + defer conn.Recycle() + + // Execute the query to check if the primary is blocked on semi-sync. + res, err := conn.Conn.ExecuteFetch(semiSyncWaitSessionsRead, 1, false) + if err != nil { + return false, err + } + // If we have no rows, then the primary doesn't have semi-sync enabled. + // It then follows, that the primary isn't blocked :) + if len(res.Rows) == 0 { + return false, nil + } + + // Read the status value and check if it is non-zero. + if len(res.Rows) != 1 || len(res.Rows[0]) != 2 { + return false, errors.New("unexpected number of rows received") + } + value, err := res.Rows[0][1].ToCastInt64() + return value != 0, err +} + +// isClosed returns if the monitor is currently closed or not. +func (m *Monitor) isClosed() bool { + m.mu.Lock() + defer m.mu.Unlock() + return !m.isOpen +} + +// WaitUntilSemiSyncUnblocked waits until the primary is not blocked +// on semi-sync or until the context expires. +func (m *Monitor) WaitUntilSemiSyncUnblocked(ctx context.Context) error { + // SemiSyncMonitor is closed, which means semi-sync is not enabled. + // We don't have anything to wait for. + if m.isClosed() { + return nil + } + // run one iteration of checking if semi-sync is blocked or not. + m.checkAndFixSemiSyncBlocked() + if !m.stillBlocked() { + // If we find that the primary isn't blocked, we're good, + // we don't need to wait for anything. + log.Infof("Primary not blocked on semi-sync ACKs") + return nil + } + log.Infof("Waiting for semi-sync to be unblocked") + // The primary is blocked. We need to wait for it to be unblocked + // or the context to expire. + ch := m.addWaiter() + select { + case <-ch: + log.Infof("Finished waiting for semi-sync to be unblocked") + return nil + case <-ctx.Done(): + log.Infof("Error while waiting for semi-sync to be unblocked - %s", ctx.Err().Error()) + return ctx.Err() + } +} + +// stillBlocked returns true if the monitor should continue writing to the DB +// because the monitor is still open, and the primary is still blocked. +func (m *Monitor) stillBlocked() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.isOpen && m.isBlocked +} + +// checkAndSetIsWriting checks if the monitor is already writing to the DB. +// If it is not, then it sets the isWriting field and signals the caller. +func (m *Monitor) checkAndSetIsWriting() bool { + m.mu.Lock() + defer m.mu.Unlock() + if m.isWriting { + return false + } + m.isWriting = true + return true +} + +// clearIsWriting clears the isWriting field. +func (m *Monitor) clearIsWriting() { + m.mu.Lock() + defer m.mu.Unlock() + m.isWriting = false +} + +// startWrites starts writing to the DB. +// It is re-entrant and will return if we are already writing. +func (m *Monitor) startWrites() { + // If we are already writing, then we can just return. + if !m.checkAndSetIsWriting() { + return + } + // We defer the clear of the isWriting field. + defer m.clearIsWriting() + + // Check if we need to continue writing or not. + for m.stillBlocked() { + // We do the writes in a go-routine because if the network disruption + // is somewhat long-lived, then the writes themselves can also block. + // By doing them in a go-routine we give the system more time to recover while + // exponentially backing off. We will not do more than maxWritesPermitted writes and once + // all maxWritesPermitted writes are blocked, we'll wait for VTOrc to run an ERS. + go m.write() + time.Sleep(waitBetweenWrites) + } +} + +// incrementWriteCount tries to increment the write count. It +// also checks that the write count value should not exceed +// the maximum value configured. It returns whether it was able +// to increment the value or not. +func (m *Monitor) incrementWriteCount() bool { + m.mu.Lock() + defer m.mu.Unlock() + if m.inProgressWriteCount == maxWritesPermitted { + return false + } + m.inProgressWriteCount++ + m.writesBlockedGauge.Set(int64(m.inProgressWriteCount)) + return true +} + +// AllWritesBlocked returns if maxWritesPermitted number of writes +// are already outstanding. +func (m *Monitor) AllWritesBlocked() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.isOpen && m.inProgressWriteCount == maxWritesPermitted +} + +// decrementWriteCount decrements the write count. +func (m *Monitor) decrementWriteCount() { + m.mu.Lock() + defer m.mu.Unlock() + m.inProgressWriteCount-- + m.writesBlockedGauge.Set(int64(m.inProgressWriteCount)) +} + +// write writes a heartbeat to unblock semi-sync being stuck. +func (m *Monitor) write() { + shouldWrite := m.incrementWriteCount() + if !shouldWrite { + return + } + defer m.decrementWriteCount() + // Get a connection from the pool + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + conn, err := m.appPool.Get(ctx) + if err != nil { + m.errorCount.Add(1) + log.Errorf("SemiSync Monitor: failed to get a connection when writing to semisync_heartbeat table: %v", err) + return + } + defer conn.Recycle() + _, err = conn.Conn.ExecuteFetch(m.bindSideCarDBName(semiSyncHeartbeatWrite), 0, false) + if err != nil { + m.errorCount.Add(1) + log.Errorf("SemiSync Monitor: failed to write to semisync_heartbeat table: %v", err) + } else { + // One of the writes went through without an error. + // This means that we aren't blocked on semi-sync anymore. + m.setIsBlocked(false) + } +} + +// setIsBlocked sets the isBlocked field. +func (m *Monitor) setIsBlocked(val bool) { + m.mu.Lock() + defer m.mu.Unlock() + m.isBlocked = val + if !val { + // If we are unblocked, then we need to signal all the waiters. + for _, ch := range m.waiters { + close(ch) + } + // We also empty the list of current waiters. + m.waiters = nil + } +} + +// clearAllData clears all the data in the table so that it never +// consumes too much space on the MySQL instance. +func (m *Monitor) clearAllData() { + // Get a connection from the pool + conn, err := m.appPool.Get(context.Background()) + if err != nil { + m.errorCount.Add(1) + log.Errorf("SemiSync Monitor: failed get a connection to clear semisync_heartbeat table: %v", err) + return + } + defer conn.Recycle() + _, err = conn.Conn.ExecuteFetch(m.bindSideCarDBName(semiSyncHeartbeatClear), 0, false) + if err != nil { + m.errorCount.Add(1) + log.Errorf("SemiSync Monitor: failed to clear semisync_heartbeat table: %v", err) + } +} + +// addWaiter adds a waiter to the list of waiters +// that will be unblocked when the primary is no longer blocked. +func (m *Monitor) addWaiter() chan struct{} { + m.mu.Lock() + defer m.mu.Unlock() + ch := make(chan struct{}) + m.waiters = append(m.waiters, ch) + return ch +} + +// bindSideCarDBName binds the sidecar db name to the query. +func (m *Monitor) bindSideCarDBName(query string) string { + return sqlparser.BuildParsedQuery(query, sidecar.GetIdentifier()).Query +} diff --git a/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go new file mode 100644 index 00000000000..97716569563 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/semisyncmonitor/monitor_test.go @@ -0,0 +1,728 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package semisyncmonitor + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" +) + +var ( + exporter = servenv.NewExporter("TestSemiSyncMonitor", "") +) + +// createFakeDBAndMonitor created a fake DB and a monitor for testing. +func createFakeDBAndMonitor(t *testing.T) (*fakesqldb.DB, *Monitor) { + db := fakesqldb.New(t) + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "") + config := &tabletenv.TabletConfig{ + DB: dbc, + SemiSyncMonitor: tabletenv.SemiSyncMonitorConfig{ + Interval: 10 * time.Second, + }, + } + monitor := NewMonitor(config, exporter) + monitor.mu.Lock() + defer monitor.mu.Unlock() + monitor.isOpen = true + monitor.appPool.Open(config.DB.AppWithDB()) + return db, monitor + +} + +// TestMonitorIsSemiSyncBlocked tests the functionality of isSemiSyncBlocked. +func TestMonitorIsSemiSyncBlocked(t *testing.T) { + tests := []struct { + name string + res *sqltypes.Result + want bool + wantErr string + }{ + { + name: "no rows", + res: &sqltypes.Result{}, + want: false, + }, + { + name: "incorrect number of rows", + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1", "Rpl_semi_sync_master_wait_sessions|1"), + wantErr: "Row count exceeded 1", + }, + { + name: "incorrect number of fields", + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value|a", "varchar|varchar|int"), "Rpl_semi_sync_source_wait_sessions|1|2"), + wantErr: "unexpected number of rows received", + }, + { + name: "Unblocked", + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0"), + want: false, + }, + { + name: "Blocked", + res: sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1"), + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + db.AddQuery(semiSyncWaitSessionsRead, tt.res) + got, err := m.isSemiSyncBlocked(context.Background()) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + return + } + require.NoError(t, err) + require.EqualValues(t, tt.want, got) + }) + } +} + +func TestMonitorBindSideCarDBName(t *testing.T) { + tests := []struct { + query string + expected string + }{ + { + query: semiSyncHeartbeatWrite, + expected: "INSERT INTO _vt.semisync_heartbeat (ts) VALUES (NOW())", + }, + { + query: semiSyncHeartbeatClear, + expected: "TRUNCATE TABLE _vt.semisync_heartbeat", + }, + } + for _, tt := range tests { + t.Run(tt.query, func(t *testing.T) { + m := &Monitor{} + require.EqualValues(t, tt.expected, m.bindSideCarDBName(tt.query)) + }) + } +} + +func TestMonitorClearAllData(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + db.SetNeverFail(true) + m.clearAllData() + ql := db.QueryLog() + require.EqualValues(t, "truncate table _vt.semisync_heartbeat", ql) +} + +// TestMonitorWaitMechanism tests that the wait mechanism works as intended. +// Setting the monitor to unblock state should unblock the waiters. +func TestMonitorWaitMechanism(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + + // Add a waiter. + ch := m.addWaiter() + var waiterUnblocked atomic.Bool + go func() { + <-ch + waiterUnblocked.Store(true) + }() + + // Ensure that the waiter is currently blocked. + require.False(t, waiterUnblocked.Load()) + + // Verify that setting again to being blocked doesn't unblock the waiter. + m.setIsBlocked(true) + require.False(t, waiterUnblocked.Load()) + require.False(t, m.isClosed()) + require.True(t, m.stillBlocked()) + + // Verify that setting we are no longer blocked, unblocks the waiter. + m.setIsBlocked(false) + require.Eventually(t, func() bool { + return waiterUnblocked.Load() + }, 2*time.Second, time.Millisecond*100) + require.False(t, m.stillBlocked()) + require.False(t, m.isClosed()) +} + +func TestMonitorIncrementWriteCount(t *testing.T) { + tests := []struct { + initVal int + finalVal int + want bool + }{ + { + initVal: maxWritesPermitted - 2, + finalVal: maxWritesPermitted - 1, + want: true, + }, { + initVal: maxWritesPermitted - 1, + finalVal: maxWritesPermitted, + want: true, + }, { + initVal: maxWritesPermitted, + finalVal: maxWritesPermitted, + want: false, + }, { + initVal: 0, + finalVal: 1, + want: true, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%d", tt.initVal), func(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + m.mu.Lock() + m.inProgressWriteCount = tt.initVal + m.mu.Unlock() + got := m.incrementWriteCount() + require.EqualValues(t, tt.want, got) + m.mu.Lock() + require.EqualValues(t, tt.finalVal, m.inProgressWriteCount) + require.EqualValues(t, tt.finalVal, m.writesBlockedGauge.Get()) + m.mu.Unlock() + }) + } +} + +func TestMonitorDecrementWriteCount(t *testing.T) { + tests := []struct { + initVal int + finalVal int + }{ + { + initVal: maxWritesPermitted - 1, + finalVal: maxWritesPermitted - 2, + }, { + initVal: maxWritesPermitted, + finalVal: maxWritesPermitted - 1, + }, { + initVal: 1, + finalVal: 0, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%d", tt.initVal), func(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + m.mu.Lock() + m.inProgressWriteCount = tt.initVal + m.mu.Unlock() + m.decrementWriteCount() + m.mu.Lock() + require.EqualValues(t, tt.finalVal, m.inProgressWriteCount) + require.EqualValues(t, tt.finalVal, m.writesBlockedGauge.Get()) + m.mu.Unlock() + }) + } +} + +func TestMonitorAllWritesBlocked(t *testing.T) { + tests := []struct { + initVal int + expected bool + }{ + { + initVal: maxWritesPermitted - 1, + expected: false, + }, { + initVal: maxWritesPermitted, + expected: true, + }, { + initVal: 1, + expected: false, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%d", tt.initVal), func(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + m.mu.Lock() + m.inProgressWriteCount = tt.initVal + m.mu.Unlock() + require.EqualValues(t, tt.expected, m.AllWritesBlocked()) + }) + } +} + +func TestMonitorWrite(t *testing.T) { + tests := []struct { + initVal int + queryLog string + }{ + { + initVal: maxWritesPermitted - 2, + queryLog: "insert into _vt.semisync_heartbeat (ts) values (now())", + }, { + initVal: maxWritesPermitted - 1, + queryLog: "insert into _vt.semisync_heartbeat (ts) values (now())", + }, { + initVal: maxWritesPermitted, + queryLog: "", + }, { + initVal: 0, + queryLog: "insert into _vt.semisync_heartbeat (ts) values (now())", + }, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%d", tt.initVal), func(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + db.SetNeverFail(true) + m.mu.Lock() + m.inProgressWriteCount = tt.initVal + m.writesBlockedGauge.Set(int64(tt.initVal)) + m.mu.Unlock() + m.write() + m.mu.Lock() + require.EqualValues(t, tt.initVal, m.inProgressWriteCount) + require.EqualValues(t, tt.initVal, m.writesBlockedGauge.Get()) + m.mu.Unlock() + require.EqualValues(t, tt.queryLog, db.QueryLog()) + }) + } +} + +// TestMonitorWriteBlocked tests the write function when the writes are blocked. +func TestMonitorWriteBlocked(t *testing.T) { + initialVal := waitBetweenWrites + waitBetweenWrites = 250 * time.Millisecond + defer func() { + waitBetweenWrites = initialVal + }() + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + + // Check the initial value of the inProgressWriteCount. + m.mu.Lock() + require.EqualValues(t, 0, m.inProgressWriteCount) + m.mu.Unlock() + + // Add a universal insert query pattern that would block until we make it unblock. + ch := make(chan int) + db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { + <-ch + }) + + // Do a write, which we expect to block. + var writeFinished atomic.Bool + go func() { + m.write() + writeFinished.Store(true) + }() + // We should see the number of writes blocked to increase. + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.inProgressWriteCount == 1 + }, 2*time.Second, 100*time.Millisecond) + + // Once the writers are unblocked, we expect to see a zero value again. + close(ch) + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.inProgressWriteCount == 0 + }, 2*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return writeFinished.Load() + }, 2*time.Second, 100*time.Millisecond) +} + +// TestIsWriting checks the transitions for the isWriting field. +func TestIsWriting(t *testing.T) { + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + + // Check the initial value of the isWriting field. + m.mu.Lock() + require.False(t, m.isWriting) + m.mu.Unlock() + + // Clearing a false field does nothing. + m.clearIsWriting() + m.mu.Lock() + require.False(t, m.isWriting) + m.mu.Unlock() + + // Check and set should set the field. + set := m.checkAndSetIsWriting() + require.True(t, set) + m.mu.Lock() + require.True(t, m.isWriting) + m.mu.Unlock() + + // Checking and setting shouldn't do anything. + set = m.checkAndSetIsWriting() + require.False(t, set) + m.mu.Lock() + require.True(t, m.isWriting) + m.mu.Unlock() + + // Clearing should now make the field false. + m.clearIsWriting() + m.mu.Lock() + require.False(t, m.isWriting) + m.mu.Unlock() +} + +func TestStartWrites(t *testing.T) { + initialVal := waitBetweenWrites + waitBetweenWrites = 250 * time.Millisecond + defer func() { + waitBetweenWrites = initialVal + }() + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + + // Add a universal insert query pattern that would block until we make it unblock. + ch := make(chan int) + db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { + <-ch + }) + + // If we aren't blocked, then start writes doesn't do anything. + m.startWrites() + require.EqualValues(t, "", db.QueryLog()) + + // Now we set the monitor to be blocked. + m.setIsBlocked(true) + + var writesFinished atomic.Bool + go func() { + m.startWrites() + writesFinished.Store(true) + }() + + // We should see the number of writes blocked to increase. + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.inProgressWriteCount >= 1 + }, 2*time.Second, 100*time.Millisecond) + + // Once the writes have started, another call to startWrites shouldn't do anything + m.startWrites() + + // We should continue to see the number of writes blocked increase. + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.inProgressWriteCount >= 2 + }, 2*time.Second, 100*time.Millisecond) + + // Check that the writes are still going. + require.False(t, writesFinished.Load()) + + // Make the monitor unblocked. This should stop the writes eventually. + m.setIsBlocked(false) + close(ch) + + require.Eventually(t, func() bool { + return writesFinished.Load() + }, 2*time.Second, 100*time.Millisecond) + + // Check that no writes are in progress anymore. + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.inProgressWriteCount == 0 + }, 2*time.Second, 100*time.Millisecond) +} + +func TestCheckAndFixSemiSyncBlocked(t *testing.T) { + initialVal := waitBetweenWrites + waitBetweenWrites = 250 * time.Millisecond + defer func() { + waitBetweenWrites = initialVal + }() + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + + // Initially everything is unblocked. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + // Add a universal insert query pattern that would block until we make it unblock. + ch := make(chan int) + db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { + <-ch + }) + + // Check that the monitor thinks we are unblocked. + m.checkAndFixSemiSyncBlocked() + m.mu.Lock() + require.False(t, m.isBlocked) + m.mu.Unlock() + + // Now we set the monitor to be blocked. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|2")) + m.checkAndFixSemiSyncBlocked() + + m.mu.Lock() + require.True(t, m.isBlocked) + m.mu.Unlock() + + // Checking again shouldn't make a difference. + m.checkAndFixSemiSyncBlocked() + m.mu.Lock() + require.True(t, m.isBlocked) + m.mu.Unlock() + + // Meanwhile writes should have started and should be getting blocked. + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.inProgressWriteCount >= 2 + }, 2*time.Second, 100*time.Millisecond) + + // Now we set the monitor to be unblocked. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + close(ch) + m.checkAndFixSemiSyncBlocked() + + // We expect the writes to clear out and also the monitor should think its unblocked. + m.mu.Lock() + require.False(t, m.isBlocked) + m.mu.Unlock() + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.inProgressWriteCount == 0 && m.isWriting == false + }, 2*time.Second, 100*time.Millisecond) +} + +func TestWaitUntilSemiSyncUnblocked(t *testing.T) { + initialVal := waitBetweenWrites + waitBetweenWrites = 250 * time.Millisecond + defer func() { + waitBetweenWrites = initialVal + }() + db, m := createFakeDBAndMonitor(t) + defer db.Close() + defer m.Close() + + db.SetNeverFail(true) + // Initially everything is unblocked. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + + // When everything is unblocked, then this should return without blocking. + err := m.WaitUntilSemiSyncUnblocked(context.Background()) + require.NoError(t, err) + + // Add a universal insert query pattern that would block until we make it unblock. + ch := make(chan int) + db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { + <-ch + }) + // Now we set the monitor to be blocked. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|3")) + + // wg is used to keep track of all the go routines. + wg := sync.WaitGroup{} + // Start a cancellable context and use that to wait. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wg.Add(1) + var ctxErr error + var mu sync.Mutex + go func() { + defer wg.Done() + err := m.WaitUntilSemiSyncUnblocked(ctx) + mu.Lock() + ctxErr = err + mu.Unlock() + }() + + // Start another go routine, also waiting for semi-sync being unblocked, but not using the cancellable context. + wg.Add(1) + go func() { + defer wg.Done() + err := m.WaitUntilSemiSyncUnblocked(context.Background()) + require.NoError(t, err) + }() + + // Wait until the writes have started. + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.isWriting + }, 2*time.Second, 100*time.Millisecond) + + // Now we cancel the context. This should fail the first wait. + cancel() + // Since we cancel the context before the semi-sync has been unblocked, we expect a context timeout error. + require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return ctxErr != nil + }, 2*time.Second, 100*time.Millisecond) + mu.Lock() + require.EqualError(t, ctxErr, "context canceled") + mu.Unlock() + + // Now we set the monitor to be unblocked. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + close(ch) + err = m.WaitUntilSemiSyncUnblocked(context.Background()) + require.NoError(t, err) + // This should unblock the second wait. + wg.Wait() + // Eventually the writes should also stop. + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return !m.isWriting + }, 2*time.Second, 100*time.Millisecond) + + // Also verify that if the monitor is closed, we don't wait. + m.Close() + err = m.WaitUntilSemiSyncUnblocked(context.Background()) + require.NoError(t, err) + require.True(t, m.isClosed()) +} + +// TestSemiSyncMonitor tests the semi-sync monitor as a black box. +// It only calls the exported methods to see they work as intended. +func TestSemiSyncMonitor(t *testing.T) { + initialVal := waitBetweenWrites + waitBetweenWrites = 250 * time.Millisecond + defer func() { + waitBetweenWrites = initialVal + }() + db := fakesqldb.New(t) + defer db.Close() + params := db.ConnParams() + cp := *params + dbc := dbconfigs.NewTestDBConfigs(cp, cp, "") + config := &tabletenv.TabletConfig{ + DB: dbc, + SemiSyncMonitor: tabletenv.SemiSyncMonitorConfig{ + Interval: 1 * time.Second, + }, + } + m := NewMonitor(config, exporter) + defer m.Close() + + db.SetNeverFail(true) + // Initially everything is unblocked. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + + // Open the monitor. + m.Open() + + // Initially writes aren't blocked and the wait returns immediately. + require.False(t, m.AllWritesBlocked()) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + err := m.WaitUntilSemiSyncUnblocked(ctx) + require.NoError(t, err) + + // Add a universal insert query pattern that would block until we make it unblock. + ch := make(chan int) + db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { + <-ch + }) + // Now we set the monitor to be blocked. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1")) + + // Start a go routine waiting for semi-sync being unblocked. + var waitFinished atomic.Bool + go func() { + err := m.WaitUntilSemiSyncUnblocked(context.Background()) + require.NoError(t, err) + waitFinished.Store(true) + }() + + // Even if we wait a second, the wait shouldn't be over. + time.Sleep(1 * time.Second) + require.False(t, waitFinished.Load()) + + // If we unblock the semi-sync, then the wait should finish. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + close(ch) + require.Eventually(t, func() bool { + return waitFinished.Load() + }, 2*time.Second, 100*time.Millisecond) + require.False(t, m.AllWritesBlocked()) + + // Add a universal insert query pattern that would block until we make it unblock. + ch = make(chan int) + db.AddQueryPatternWithCallback("^INSERT INTO.*", sqltypes.MakeTestResult(nil), func(s string) { + <-ch + }) + // We block the semi-sync again. + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|1")) + + // Start another go routine, also waiting for semi-sync being unblocked. + waitFinished.Store(false) + go func() { + err := m.WaitUntilSemiSyncUnblocked(context.Background()) + require.NoError(t, err) + waitFinished.Store(true) + }() + + // Since the writes are now blocking, eventually all the writes should block. + require.Eventually(t, func() bool { + return m.AllWritesBlocked() + }, 10*time.Second, 100*time.Millisecond) + + // The wait should still not have ended. + require.False(t, waitFinished.Load()) + + // Now we unblock the writes and semi-sync. + close(ch) + db.AddQuery(semiSyncWaitSessionsRead, sqltypes.MakeTestResult(sqltypes.MakeTestFields("Variable_name|Value", "varchar|varchar"), "Rpl_semi_sync_source_wait_sessions|0")) + + // The wait should now finish. + require.Eventually(t, func() bool { + return waitFinished.Load() + }, 2*time.Second, 100*time.Millisecond) + require.False(t, m.AllWritesBlocked()) + + // Close the monitor. + m.Close() + // The test is technically over, but we wait for the writes to have stopped to prevent any data races. + require.Eventually(t, func() bool { + m.mu.Lock() + defer m.mu.Unlock() + return !m.isWriting + }, 2*time.Second, 100*time.Millisecond) +} diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index fbef04de357..4c7b706bc4b 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -73,6 +73,7 @@ import ( "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletserver" @@ -162,6 +163,7 @@ type TabletManager struct { QueryServiceControl tabletserver.Controller UpdateStream binlog.UpdateStreamControl VREngine *vreplication.Engine + SemiSyncMonitor *semisyncmonitor.Monitor VDiffEngine *vdiff.Engine Env *vtenv.Environment diff --git a/go/vt/vttablet/tabletmanager/tm_init_test.go b/go/vt/vttablet/tabletmanager/tm_init_test.go index b8c9c54dcc2..a6c5d33c975 100644 --- a/go/vt/vttablet/tabletmanager/tm_init_test.go +++ b/go/vt/vttablet/tabletmanager/tm_init_test.go @@ -39,6 +39,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletservermock" "vitess.io/vitess/go/vt/vttest" @@ -683,6 +684,10 @@ func newTestMysqlDaemon(t *testing.T, port int32) *mysqlctl.FakeMysqlDaemon { return mysqld } +var ( + exporter = servenv.NewExporter("TestTabletManager", "") +) + func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string, tags map[string]string) *TabletManager { // reset stats statsTabletTags.ResetAll() @@ -691,11 +696,13 @@ func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string, t t.Helper() ctx := context.Background() tablet := newTestTablet(t, uid, keyspace, shard, tags) + fakeDb := newTestMysqlDaemon(t, 1) tm := &TabletManager{ BatchCtx: ctx, TopoServer: ts, - MysqlDaemon: newTestMysqlDaemon(t, 1), + MysqlDaemon: fakeDb, DBConfigs: &dbconfigs.DBConfigs{}, + SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(fakeDb.DB(), exporter), QueryServiceControl: tabletservermock.NewController(), } err := tm.Start(tablet, nil) diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 9af2e061502..53b0db271a2 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -79,7 +79,6 @@ func TestStateManagerServePrimary(t *testing.T) { assert.Equal(t, testNow, sm.ptsTimestamp) verifySubcomponent(t, 1, sm.watcher, testStateClosed) - verifySubcomponent(t, 2, sm.se, testStateOpen) verifySubcomponent(t, 3, sm.vstreamer, testStateOpen) verifySubcomponent(t, 4, sm.qe, testStateOpen) diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index ddab935d393..47a9acfc930 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -75,6 +75,7 @@ var ( heartbeatInterval time.Duration heartbeatOnDemandDuration time.Duration healthCheckInterval time.Duration + semiSyncMonitorInterval time.Duration degradedThreshold time.Duration unhealthyThreshold time.Duration transitionGracePeriod time.Duration @@ -203,6 +204,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.DurationVar(°radedThreshold, "degraded_threshold", defaultConfig.Healthcheck.DegradedThreshold, "replication lag after which a replica is considered degraded") fs.DurationVar(&unhealthyThreshold, "unhealthy_threshold", defaultConfig.Healthcheck.UnhealthyThreshold, "replication lag after which a replica is considered unhealthy") fs.DurationVar(&transitionGracePeriod, "serving_state_grace_period", 0, "how long to pause after broadcasting health to vtgate, before enforcing a new serving state") + fs.DurationVar(&semiSyncMonitorInterval, "semi-sync-monitor-interval", defaultConfig.SemiSyncMonitor.Interval, "How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs") fs.BoolVar(&enableReplicationReporter, "enable_replication_reporter", false, "Use polling to track replication lag.") fs.BoolVar(¤tConfig.EnableOnlineDDL, "queryserver_enable_online_ddl", true, "Enable online DDL.") @@ -278,6 +280,7 @@ func Init() { currentConfig.Healthcheck.DegradedThreshold = degradedThreshold currentConfig.Healthcheck.UnhealthyThreshold = unhealthyThreshold currentConfig.GracePeriods.Transition = transitionGracePeriod + currentConfig.SemiSyncMonitor.Interval = semiSyncMonitorInterval logFormat := streamlog.GetQueryLogConfig().Format switch logFormat { @@ -317,6 +320,8 @@ type TabletConfig struct { Healthcheck HealthcheckConfig `json:"healthcheck,omitempty"` GracePeriods GracePeriodsConfig `json:"gracePeriods,omitempty"` + SemiSyncMonitor SemiSyncMonitorConfig `json:"semiSyncMonitor,omitempty"` + ReplicationTracker ReplicationTrackerConfig `json:"replicationTracker,omitempty"` // Consolidator can be enable, disable, or notOnPrimary. Default is enable. @@ -613,6 +618,42 @@ type HotRowProtectionConfig struct { MaxConcurrency int `json:"maxConcurrency,omitempty"` } +// SemiSyncMonitorConfig contains the config for the semi-sync monitor. +type SemiSyncMonitorConfig struct { + Interval time.Duration +} + +func (cfg *SemiSyncMonitorConfig) MarshalJSON() ([]byte, error) { + var tmp struct { + IntervalSeconds string `json:"intervalSeconds,omitempty"` + } + + if d := cfg.Interval; d != 0 { + tmp.IntervalSeconds = d.String() + } + + return json.Marshal(&tmp) +} + +func (cfg *SemiSyncMonitorConfig) UnmarshalJSON(data []byte) (err error) { + var tmp struct { + Interval string `json:"intervalSeconds,omitempty"` + } + + if err = json.Unmarshal(data, &tmp); err != nil { + return err + } + + if tmp.Interval != "" { + cfg.Interval, err = time.ParseDuration(tmp.Interval) + if err != nil { + return err + } + } + + return nil +} + // HealthcheckConfig contains the config for healthcheck. type HealthcheckConfig struct { Interval time.Duration @@ -1008,6 +1049,9 @@ var defaultConfig = TabletConfig{ DegradedThreshold: 30 * time.Second, UnhealthyThreshold: 2 * time.Hour, }, + SemiSyncMonitor: SemiSyncMonitorConfig{ + Interval: 10 * time.Second, + }, ReplicationTracker: ReplicationTrackerConfig{ Mode: Disable, HeartbeatInterval: 250 * time.Millisecond, diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 9ae653bafb9..20f4541cdc8 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -97,6 +97,7 @@ rowStreamer: maxMySQLReplLagSecs: 400 schemaChangeReloadTimeout: 30s schemaReloadIntervalSeconds: 30m0s +semiSyncMonitor: {} txPool: {} ` assert.Equal(t, wantBytes, string(gotBytes)) @@ -164,6 +165,8 @@ rowStreamer: maxMySQLReplLagSecs: 43200 schemaChangeReloadTimeout: 30s schemaReloadIntervalSeconds: 30m0s +semiSyncMonitor: + intervalSeconds: 10s signalWhenSchemaChange: true streamBufferSize: 32768 txPool: @@ -300,6 +303,12 @@ func TestFlags(t *testing.T) { want.Healthcheck.Interval = time.Second assert.Equal(t, want, currentConfig) + semiSyncMonitorInterval = time.Second + currentConfig.SemiSyncMonitor.Interval = 0 + Init() + want.SemiSyncMonitor.Interval = time.Second + assert.Equal(t, want, currentConfig) + degradedThreshold = 2 * time.Second currentConfig.Healthcheck.DegradedThreshold = 0 Init() diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index b70a64d644e..2eec782f301 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/mysqlctl" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtenv" @@ -41,6 +42,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor" vdiff2 "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" "vitess.io/vitess/go/vt/vttablet/tabletservermock" "vitess.io/vitess/go/vt/vttablet/tmclient" @@ -158,6 +160,10 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy } } +var ( + exporter = servenv.NewExporter("TestWrangler", "") +) + // StartActionLoop will start the action loop for a fake tablet, // using ft.FakeMysqlDaemon as the backing mysqld. func (ft *fakeTablet) StartActionLoop(t *testing.T, wr *Wrangler) { @@ -199,6 +205,7 @@ func (ft *fakeTablet) StartActionLoop(t *testing.T, wr *Wrangler) { DBConfigs: &dbconfigs.DBConfigs{}, QueryServiceControl: tabletservermock.NewController(), VDiffEngine: vdiff2.NewEngine(wr.TopoServer(), ft.Tablet, collations.MySQL8(), sqlparser.NewTestParser()), + SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(ft.FakeMysqlDaemon.DB(), exporter), Env: vtenv.NewTestEnv(), } if err := ft.TM.Start(ft.Tablet, nil); err != nil { diff --git a/go/vt/wrangler/testlib/fake_tablet.go b/go/vt/wrangler/testlib/fake_tablet.go index 9649d717d73..7881ebf6a2a 100644 --- a/go/vt/wrangler/testlib/fake_tablet.go +++ b/go/vt/wrangler/testlib/fake_tablet.go @@ -33,12 +33,14 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttablet/grpctmserver" "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletservermock" "vitess.io/vitess/go/vt/vttablet/tmclient" @@ -159,6 +161,10 @@ func NewFakeTablet(t *testing.T, wr *wrangler.Wrangler, cell string, uid uint32, } } +var ( + exporter = servenv.NewExporter("TestWranglerTestLib", "") +) + // StartActionLoop will start the action loop for a fake tablet, // using ft.FakeMysqlDaemon as the backing mysqld. func (ft *FakeTablet) StartActionLoop(t *testing.T, wr *wrangler.Wrangler) { @@ -202,6 +208,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, wr *wrangler.Wrangler) { DBConfigs: &dbconfigs.DBConfigs{}, QueryServiceControl: tabletservermock.NewController(), VREngine: vreplication.NewTestEngine(wr.TopoServer(), ft.Tablet.Alias.Cell, ft.FakeMysqlDaemon, binlogplayer.NewFakeDBClient, binlogplayer.NewFakeDBClient, topoproto.TabletDbName(ft.Tablet), nil), + SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(ft.FakeMysqlDaemon.DB(), exporter), Env: vtenv.NewTestEnv(), } if err := ft.TM.Start(ft.Tablet, nil); err != nil { diff --git a/proto/replicationdata.proto b/proto/replicationdata.proto index eba4d323ee6..0c9ad87e210 100644 --- a/proto/replicationdata.proto +++ b/proto/replicationdata.proto @@ -106,4 +106,5 @@ message FullStatus { bool super_read_only = 21; replicationdata.Configuration replication_configuration = 22; bool disk_stalled = 23; + bool semi_sync_blocked = 24; } diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 527adc01326..55fb86d17e0 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -48801,6 +48801,9 @@ export namespace replicationdata { /** FullStatus disk_stalled */ disk_stalled?: (boolean|null); + + /** FullStatus semi_sync_blocked */ + semi_sync_blocked?: (boolean|null); } /** Represents a FullStatus. */ @@ -48881,6 +48884,9 @@ export namespace replicationdata { /** FullStatus disk_stalled. */ public disk_stalled: boolean; + /** FullStatus semi_sync_blocked. */ + public semi_sync_blocked: boolean; + /** * Creates a new FullStatus instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index 1209c59cbe9..4cfc709fa9b 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -118503,6 +118503,7 @@ export const replicationdata = $root.replicationdata = (() => { * @property {boolean|null} [super_read_only] FullStatus super_read_only * @property {replicationdata.IConfiguration|null} [replication_configuration] FullStatus replication_configuration * @property {boolean|null} [disk_stalled] FullStatus disk_stalled + * @property {boolean|null} [semi_sync_blocked] FullStatus semi_sync_blocked */ /** @@ -118704,6 +118705,14 @@ export const replicationdata = $root.replicationdata = (() => { */ FullStatus.prototype.disk_stalled = false; + /** + * FullStatus semi_sync_blocked. + * @member {boolean} semi_sync_blocked + * @memberof replicationdata.FullStatus + * @instance + */ + FullStatus.prototype.semi_sync_blocked = false; + /** * Creates a new FullStatus instance using the specified properties. * @function create @@ -118774,6 +118783,8 @@ export const replicationdata = $root.replicationdata = (() => { $root.replicationdata.Configuration.encode(message.replication_configuration, writer.uint32(/* id 22, wireType 2 =*/178).fork()).ldelim(); if (message.disk_stalled != null && Object.hasOwnProperty.call(message, "disk_stalled")) writer.uint32(/* id 23, wireType 0 =*/184).bool(message.disk_stalled); + if (message.semi_sync_blocked != null && Object.hasOwnProperty.call(message, "semi_sync_blocked")) + writer.uint32(/* id 24, wireType 0 =*/192).bool(message.semi_sync_blocked); return writer; }; @@ -118900,6 +118911,10 @@ export const replicationdata = $root.replicationdata = (() => { message.disk_stalled = reader.bool(); break; } + case 24: { + message.semi_sync_blocked = reader.bool(); + break; + } default: reader.skipType(tag & 7); break; @@ -119010,6 +119025,9 @@ export const replicationdata = $root.replicationdata = (() => { if (message.disk_stalled != null && message.hasOwnProperty("disk_stalled")) if (typeof message.disk_stalled !== "boolean") return "disk_stalled: boolean expected"; + if (message.semi_sync_blocked != null && message.hasOwnProperty("semi_sync_blocked")) + if (typeof message.semi_sync_blocked !== "boolean") + return "semi_sync_blocked: boolean expected"; return null; }; @@ -119087,6 +119105,8 @@ export const replicationdata = $root.replicationdata = (() => { } if (object.disk_stalled != null) message.disk_stalled = Boolean(object.disk_stalled); + if (object.semi_sync_blocked != null) + message.semi_sync_blocked = Boolean(object.semi_sync_blocked); return message; }; @@ -119131,6 +119151,7 @@ export const replicationdata = $root.replicationdata = (() => { object.super_read_only = false; object.replication_configuration = null; object.disk_stalled = false; + object.semi_sync_blocked = false; } if (message.server_id != null && message.hasOwnProperty("server_id")) object.server_id = message.server_id; @@ -119181,6 +119202,8 @@ export const replicationdata = $root.replicationdata = (() => { object.replication_configuration = $root.replicationdata.Configuration.toObject(message.replication_configuration, options); if (message.disk_stalled != null && message.hasOwnProperty("disk_stalled")) object.disk_stalled = message.disk_stalled; + if (message.semi_sync_blocked != null && message.hasOwnProperty("semi_sync_blocked")) + object.semi_sync_blocked = message.semi_sync_blocked; return object; };