Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add semi-sync monitor to unblock primaries blocked on semi-sync ACKs #17763

Merged
merged 28 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
771dc7e
feat: add the first iteration of semi-sync watcher
GuptaManan100 Feb 12, 2025
4dcaf54
feat: add semisync recovery sql and other missing pieces in the code
GuptaManan100 Feb 13, 2025
8a47cc0
test: add a test to check semi-sync block issue
GuptaManan100 Feb 14, 2025
bf9b845
feat: address review comments
GuptaManan100 Feb 14, 2025
955d0c0
feat: wire up the monitor to the state manager and the tablet server
GuptaManan100 Feb 14, 2025
3124339
feat: fix some bugs
GuptaManan100 Feb 14, 2025
77f964f
Merge remote-tracking branch 'upstream/main' into semi-sync-watcher
GuptaManan100 Feb 17, 2025
2b2708f
feat: add the capability to the monitor to keep track of outstanding …
GuptaManan100 Feb 17, 2025
0be0743
feat: add the capability to run ERS to VTOrc when it detects semi-syn…
GuptaManan100 Feb 17, 2025
d33b460
feat: added test to verify VTOrc runs an ERS when semi-sync is blocke…
GuptaManan100 Feb 17, 2025
6092978
feat: make the monitor interval configurable
GuptaManan100 Feb 17, 2025
7286419
feat: add a gauge for the monitor
GuptaManan100 Feb 17, 2025
4725dd9
feat: move the semi-sync monitor from the tabletserver to tablet mana…
GuptaManan100 Feb 17, 2025
a8d2cbd
test: poll the new gauge in the test
GuptaManan100 Feb 17, 2025
da4a8f8
feat: fix bug in isClosed function where we locked twice :facepalm:
GuptaManan100 Feb 17, 2025
6cf02ab
test: add comprehensive testing for the semi-sync monitor
GuptaManan100 Feb 17, 2025
942eaf0
test: fix tests by doing nil checks and adding semi-sync monitor to f…
GuptaManan100 Feb 18, 2025
3a3db2a
test: add demote primary test that makes sure we block when semi-sync…
GuptaManan100 Feb 18, 2025
eaa9246
summary: add summary changes to make users aware of the change
GuptaManan100 Feb 18, 2025
a571e48
test: fix more tests by updating expectations
GuptaManan100 Feb 18, 2025
05c3d2e
feat: make the test less flaky and improve flag documentation
GuptaManan100 Feb 20, 2025
dddc6e5
feat: address review comments
GuptaManan100 Feb 20, 2025
9995a0d
feat: change the read query to be more specific
GuptaManan100 Feb 20, 2025
cdd609d
test: increase time for eventual checks
GuptaManan100 Feb 20, 2025
ffd5b50
feat: also mark semi-sync unblocked after a write succeeds
GuptaManan100 Feb 20, 2025
3bcbd3b
feat: rename some fields
GuptaManan100 Feb 20, 2025
ee098c6
feat: add a new counter for tracking number of errors
GuptaManan100 Feb 21, 2025
4373407
feat: use remote operation timeout constant instead of direct 15 seco…
GuptaManan100 Feb 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
- **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Semi-sync monitor in vttablet](#semi-sync-monitor)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
- **[VTTablet ACL enforcement and reloading](#reloading-vttablet-acl)**
Expand Down Expand Up @@ -162,6 +163,14 @@ Users can continue to specify exact keyranges. The new feature is backward compa

The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.

### <a id="semi-sync-monitor"/>Semi-sync monitor in vttablet</a>

A new component has been added to the vttablet binary to monitor the semi-sync status of primary vttablets. We've observed cases where a brief network disruption can cause the primary to get stuck indefinitely waiting for semi-sync ACKs. In rare scenarios, this can block reparent operations and render the primary unresponsive. More information can be found in the issues https://github.com/vitessio/vitess/issues/17709 and https://github.com/vitessio/vitess/issues/17749.

To address this, the new component continuously monitors the semi-sync status. If the primary becomes stuck on semi-sync ACKs, it generates writes to unblock it. If this fails, VTOrc is notified of the issue and initiates an emergency reparent operation.

The monitoring interval can be adjusted using the `--semi-sync-monitor-interval` flag, which defaults to 10 seconds.

## <a id="minor-changes"/>Minor Changes</a>

#### <a id="flags-vttablet"/>VTTablet Flags</a>
Expand Down
7 changes: 7 additions & 0 deletions go/cmd/vtctldclient/command/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/grpctmserver"
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"
"vitess.io/vitess/go/vt/vttablet/tmclient"
Expand Down Expand Up @@ -158,6 +160,10 @@ func NewFakeTablet(t *testing.T, ts *topo.Server, cell string, uid uint32, table
}
}

var (
exporter = servenv.NewExporter("TestVtctldClientCommand", "")
)

// StartActionLoop will start the action loop for a fake tablet,
// using ft.FakeMysqlDaemon as the backing mysqld.
func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) {
Expand Down Expand Up @@ -203,6 +209,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) {
DBConfigs: &dbconfigs.DBConfigs{},
QueryServiceControl: tabletservermock.NewController(),
VREngine: vreplication.NewTestEngine(ts, ft.Tablet.Alias.Cell, ft.FakeMysqlDaemon, binlogplayer.NewFakeDBClient, binlogplayer.NewFakeDBClient, topoproto.TabletDbName(ft.Tablet), nil),
SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(ft.FakeMysqlDaemon.DB(), exporter),
Env: vtenv.NewTestEnv(),
}
if err := ft.TM.Start(ft.Tablet, nil); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/vttablet/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
Expand Down Expand Up @@ -168,6 +169,7 @@ func run(cmd *cobra.Command, args []string) error {
QueryServiceControl: qsc,
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine(), env.Parser()),
VREngine: vreplication.NewEngine(env, config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()),
SemiSyncMonitor: semisyncmonitor.NewMonitor(config, qsc.Exporter()),
VDiffEngine: vdiff.NewEngine(ts, tablet, env.CollationEnv(), env.Parser()),
}
if err := tm.Start(tablet, config); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ Flags:
--schema_change_signal Enable the schema tracker; requires queryserver-config-schema-change-signal to be enabled on the underlying vttablets for this to work (default true)
--schema_dir string Schema base directory. Should contain one directory per keyspace, with a vschema.json file if necessary.
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--semi-sync-monitor-interval duration How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ Flags:
--schema-change-reload-timeout duration query server schema change reload timeout, this is how long to wait for the signaled schema reload operation to complete before giving up (default 30s)
--schema-version-max-age-seconds int max age of schema version records to kept in memory by the vreplication historian
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--semi-sync-monitor-interval duration How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
Expand Down
101 changes: 101 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ package newfeaturetest
import (
"context"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
)

Expand Down Expand Up @@ -234,3 +239,99 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) {
// Wait for all the writes to have succeeded.
wg.Wait()
}

// TestSemiSyncBlockDueToDisruption tests that Vitess can recover from a situation
// where a primary is stuck waiting for semi-sync ACKs due to a network issue,
// even if no new writes from the user arrives.
func TestSemiSyncBlockDueToDisruption(t *testing.T) {
// This is always set to "true" on GitHub Actions runners:
// https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
ci, ok := os.LookupEnv("CI")
if ok && strings.ToLower(ci) == "true" {
t.Skip("Test not meant to be run on CI")
}
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// stop heartbeats on all the replicas
for idx, tablet := range tablets {
if idx == 0 {
continue
}
utils.RunSQLs(context.Background(), t, []string{
"stop slave;",
"change master to MASTER_HEARTBEAT_PERIOD = 0;",
"start slave;",
}, tablet)
}

// Take a backup of the pf.conf file
runCommandWithSudo(t, "cp", "/etc/pf.conf", "/etc/pf.conf.backup")
defer func() {
// Restore the file from backup
runCommandWithSudo(t, "mv", "/etc/pf.conf.backup", "/etc/pf.conf")
runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")
}()
// Disrupt the network between the primary and the replicas
runCommandWithSudo(t, "sh", "-c", fmt.Sprintf("echo 'block in proto tcp from any to any port %d' | sudo tee -a /etc/pf.conf > /dev/null", tablets[0].MySQLPort))

// This following command is only required if pfctl is not already enabled
//runCommandWithSudo(t, "pfctl", "-e")
runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")
rules := runCommandWithSudo(t, "pfctl", "-s", "rules")
log.Errorf("Rules enforced - %v", rules)

// Start a write that will be blocked by the primary waiting for semi-sync ACKs
ch := make(chan any)
go func() {
defer func() {
close(ch)
}()
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})
}()

// Starting VTOrc later now, because we don't want it to fix the heartbeat interval
// on the replica's before the disruption has been introduced.
err := clusterInstance.StartVTOrc(clusterInstance.Keyspaces[0].Name)
require.NoError(t, err)
go func() {
for {
select {
case <-ch:
return
case <-time.After(1 * time.Second):
str, isPresent := tablets[0].VttabletProcess.GetVars()["SemiSyncMonitorWritesBlocked"]
if isPresent {
log.Errorf("SemiSyncMonitorWritesBlocked - %v", str)
}
}
}
}()
// If the network disruption is too long lived, then we will end up running ERS from VTOrc.
networkDisruptionDuration := 43 * time.Second
time.Sleep(networkDisruptionDuration)

// Restore the network
runCommandWithSudo(t, "cp", "/etc/pf.conf.backup", "/etc/pf.conf")
runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")

// We expect the problem to be resolved in less than 30 seconds.
select {
case <-time.After(30 * time.Second):
t.Errorf("Timed out waiting for semi-sync to be unblocked")
case <-ch:
log.Errorf("Woohoo, write finished!")
}
}

// runCommandWithSudo runs the provided command with sudo privileges
// when the command is run, it prompts the user for the password, and it must be
// entered for the program to resume.
func runCommandWithSudo(t *testing.T, args ...string) string {
cmd := exec.Command("sudo", args...)
out, err := cmd.CombinedOutput()
assert.NoError(t, err, string(out))
return string(out)
}
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/sidecardb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var ddls1, ddls2 []string

func init() {
sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action",
"redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version",
"redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", "semisync_heartbeat",
"tables", "udfs", "vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"}
numSidecarDBTables = len(sidecarDBTables)
ddls1 = []string{
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/vtorc/readtopologyinstance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.True(t, primaryInstance.SemiSyncReplicaEnabled)
assert.True(t, primaryInstance.SemiSyncPrimaryStatus)
assert.False(t, primaryInstance.SemiSyncReplicaStatus)
assert.False(t, primaryInstance.SemiSyncBlocked)
assert.EqualValues(t, 2, primaryInstance.SemiSyncPrimaryClients)
assert.EqualValues(t, 1, primaryInstance.SemiSyncPrimaryWaitForReplicaCount)
assert.EqualValues(t, 1000000000000000000, primaryInstance.SemiSyncPrimaryTimeout)
Expand Down Expand Up @@ -142,6 +143,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.False(t, replicaInstance.SemiSyncPrimaryEnabled)
assert.True(t, replicaInstance.SemiSyncReplicaEnabled)
assert.False(t, replicaInstance.SemiSyncPrimaryStatus)
assert.False(t, replicaInstance.SemiSyncBlocked)
assert.True(t, replicaInstance.SemiSyncReplicaStatus)
assert.EqualValues(t, 0, replicaInstance.SemiSyncPrimaryClients)
assert.EqualValues(t, 1, replicaInstance.SemiSyncPrimaryWaitForReplicaCount)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
return result
}

// DB returns the fakesqldb.DB object.
func (fmd *FakeMysqlDaemon) DB() *fakesqldb.DB {
return fmd.db
}

// Start is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *Mycnf, mysqldArgs ...string) error {
if fmd.Running {
Expand Down
29 changes: 20 additions & 9 deletions go/vt/proto/replicationdata/replicationdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions go/vt/proto/replicationdata/replicationdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions go/vt/sidecardb/schema/misc/semisync_heartbeat.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2025 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

CREATE TABLE IF NOT EXISTS semisync_heartbeat
(
ts BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`ts`)
) ENGINE = InnoDB CHARSET = utf8mb4
1 change: 1 addition & 0 deletions go/vt/vtorc/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ CREATE TABLE database_instance (
semi_sync_primary_status TINYint NOT NULL DEFAULT 0,
semi_sync_replica_status TINYint NOT NULL DEFAULT 0,
semi_sync_primary_clients int NOT NULL DEFAULT 0,
semi_sync_blocked tinyint NOT NULL DEFAULT 0,
is_disk_stalled TINYint NOT NULL DEFAULT 0,
PRIMARY KEY (alias)
)`,
Expand Down
Loading
Loading