Skip to content

Commit

Permalink
Add semi-sync monitor to unblock primaries blocked on semi-sync ACKs (#…
Browse files Browse the repository at this point in the history
…17763)

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Feb 24, 2025
1 parent 80707f7 commit 81ce29c
Show file tree
Hide file tree
Showing 36 changed files with 1,642 additions and 30 deletions.
9 changes: 9 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
- **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Semi-sync monitor in vttablet](#semi-sync-monitor)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
- **[VTTablet ACL enforcement and reloading](#reloading-vttablet-acl)**
Expand Down Expand Up @@ -162,6 +163,14 @@ Users can continue to specify exact keyranges. The new feature is backward compa

The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.

### <a id="semi-sync-monitor"/>Semi-sync monitor in vttablet</a>

A new component has been added to the vttablet binary to monitor the semi-sync status of primary vttablets. We've observed cases where a brief network disruption can cause the primary to get stuck indefinitely waiting for semi-sync ACKs. In rare scenarios, this can block reparent operations and render the primary unresponsive. More information can be found in the issues https://github.com/vitessio/vitess/issues/17709 and https://github.com/vitessio/vitess/issues/17749.

To address this, the new component continuously monitors the semi-sync status. If the primary becomes stuck on semi-sync ACKs, it generates writes to unblock it. If this fails, VTOrc is notified of the issue and initiates an emergency reparent operation.

The monitoring interval can be adjusted using the `--semi-sync-monitor-interval` flag, which defaults to 10 seconds.

## <a id="minor-changes"/>Minor Changes</a>

#### <a id="flags-vttablet"/>VTTablet Flags</a>
Expand Down
7 changes: 7 additions & 0 deletions go/cmd/vtctldclient/command/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/grpctmserver"
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"
"vitess.io/vitess/go/vt/vttablet/tmclient"
Expand Down Expand Up @@ -158,6 +160,10 @@ func NewFakeTablet(t *testing.T, ts *topo.Server, cell string, uid uint32, table
}
}

var (
exporter = servenv.NewExporter("TestVtctldClientCommand", "")
)

// StartActionLoop will start the action loop for a fake tablet,
// using ft.FakeMysqlDaemon as the backing mysqld.
func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) {
Expand Down Expand Up @@ -203,6 +209,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) {
DBConfigs: &dbconfigs.DBConfigs{},
QueryServiceControl: tabletservermock.NewController(),
VREngine: vreplication.NewTestEngine(ts, ft.Tablet.Alias.Cell, ft.FakeMysqlDaemon, binlogplayer.NewFakeDBClient, binlogplayer.NewFakeDBClient, topoproto.TabletDbName(ft.Tablet), nil),
SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(ft.FakeMysqlDaemon.DB(), exporter),
Env: vtenv.NewTestEnv(),
}
if err := ft.TM.Start(ft.Tablet, nil); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/vttablet/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
Expand Down Expand Up @@ -168,6 +169,7 @@ func run(cmd *cobra.Command, args []string) error {
QueryServiceControl: qsc,
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine(), env.Parser()),
VREngine: vreplication.NewEngine(env, config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()),
SemiSyncMonitor: semisyncmonitor.NewMonitor(config, qsc.Exporter()),
VDiffEngine: vdiff.NewEngine(ts, tablet, env.CollationEnv(), env.Parser()),
}
if err := tm.Start(tablet, config); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ Flags:
--schema_change_signal Enable the schema tracker; requires queryserver-config-schema-change-signal to be enabled on the underlying vttablets for this to work (default true)
--schema_dir string Schema base directory. Should contain one directory per keyspace, with a vschema.json file if necessary.
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--semi-sync-monitor-interval duration How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ Flags:
--schema-change-reload-timeout duration query server schema change reload timeout, this is how long to wait for the signaled schema reload operation to complete before giving up (default 30s)
--schema-version-max-age-seconds int max age of schema version records to kept in memory by the vreplication historian
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--semi-sync-monitor-interval duration How frequently the semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
Expand Down
101 changes: 101 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ package newfeaturetest
import (
"context"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
)

Expand Down Expand Up @@ -234,3 +239,99 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) {
// Wait for all the writes to have succeeded.
wg.Wait()
}

// TestSemiSyncBlockDueToDisruption tests that Vitess can recover from a situation
// where a primary is stuck waiting for semi-sync ACKs due to a network issue,
// even if no new writes from the user arrives.
func TestSemiSyncBlockDueToDisruption(t *testing.T) {
// This is always set to "true" on GitHub Actions runners:
// https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
ci, ok := os.LookupEnv("CI")
if ok && strings.ToLower(ci) == "true" {
t.Skip("Test not meant to be run on CI")
}
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// stop heartbeats on all the replicas
for idx, tablet := range tablets {
if idx == 0 {
continue
}
utils.RunSQLs(context.Background(), t, []string{
"stop slave;",
"change master to MASTER_HEARTBEAT_PERIOD = 0;",
"start slave;",
}, tablet)
}

// Take a backup of the pf.conf file
runCommandWithSudo(t, "cp", "/etc/pf.conf", "/etc/pf.conf.backup")
defer func() {
// Restore the file from backup
runCommandWithSudo(t, "mv", "/etc/pf.conf.backup", "/etc/pf.conf")
runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")
}()
// Disrupt the network between the primary and the replicas
runCommandWithSudo(t, "sh", "-c", fmt.Sprintf("echo 'block in proto tcp from any to any port %d' | sudo tee -a /etc/pf.conf > /dev/null", tablets[0].MySQLPort))

// This following command is only required if pfctl is not already enabled
//runCommandWithSudo(t, "pfctl", "-e")
runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")
rules := runCommandWithSudo(t, "pfctl", "-s", "rules")
log.Errorf("Rules enforced - %v", rules)

// Start a write that will be blocked by the primary waiting for semi-sync ACKs
ch := make(chan any)
go func() {
defer func() {
close(ch)
}()
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})
}()

// Starting VTOrc later now, because we don't want it to fix the heartbeat interval
// on the replica's before the disruption has been introduced.
err := clusterInstance.StartVTOrc(clusterInstance.Keyspaces[0].Name)
require.NoError(t, err)
go func() {
for {
select {
case <-ch:
return
case <-time.After(1 * time.Second):
str, isPresent := tablets[0].VttabletProcess.GetVars()["SemiSyncMonitorWritesBlocked"]
if isPresent {
log.Errorf("SemiSyncMonitorWritesBlocked - %v", str)
}
}
}
}()
// If the network disruption is too long lived, then we will end up running ERS from VTOrc.
networkDisruptionDuration := 43 * time.Second
time.Sleep(networkDisruptionDuration)

// Restore the network
runCommandWithSudo(t, "cp", "/etc/pf.conf.backup", "/etc/pf.conf")
runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")

// We expect the problem to be resolved in less than 30 seconds.
select {
case <-time.After(30 * time.Second):
t.Errorf("Timed out waiting for semi-sync to be unblocked")
case <-ch:
log.Errorf("Woohoo, write finished!")
}
}

// runCommandWithSudo runs the provided command with sudo privileges
// when the command is run, it prompts the user for the password, and it must be
// entered for the program to resume.
func runCommandWithSudo(t *testing.T, args ...string) string {
cmd := exec.Command("sudo", args...)
out, err := cmd.CombinedOutput()
assert.NoError(t, err, string(out))
return string(out)
}
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/sidecardb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var ddls1, ddls2 []string

func init() {
sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action",
"redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version",
"redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", "semisync_heartbeat",
"tables", "udfs", "vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"}
numSidecarDBTables = len(sidecarDBTables)
ddls1 = []string{
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/vtorc/readtopologyinstance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.True(t, primaryInstance.SemiSyncReplicaEnabled)
assert.True(t, primaryInstance.SemiSyncPrimaryStatus)
assert.False(t, primaryInstance.SemiSyncReplicaStatus)
assert.False(t, primaryInstance.SemiSyncBlocked)
assert.EqualValues(t, 2, primaryInstance.SemiSyncPrimaryClients)
assert.EqualValues(t, 1, primaryInstance.SemiSyncPrimaryWaitForReplicaCount)
assert.EqualValues(t, 1000000000000000000, primaryInstance.SemiSyncPrimaryTimeout)
Expand Down Expand Up @@ -142,6 +143,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.False(t, replicaInstance.SemiSyncPrimaryEnabled)
assert.True(t, replicaInstance.SemiSyncReplicaEnabled)
assert.False(t, replicaInstance.SemiSyncPrimaryStatus)
assert.False(t, replicaInstance.SemiSyncBlocked)
assert.True(t, replicaInstance.SemiSyncReplicaStatus)
assert.EqualValues(t, 0, replicaInstance.SemiSyncPrimaryClients)
assert.EqualValues(t, 1, replicaInstance.SemiSyncPrimaryWaitForReplicaCount)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
return result
}

// DB returns the fakesqldb.DB object.
func (fmd *FakeMysqlDaemon) DB() *fakesqldb.DB {
return fmd.db
}

// Start is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *Mycnf, mysqldArgs ...string) error {
if fmd.Running {
Expand Down
29 changes: 20 additions & 9 deletions go/vt/proto/replicationdata/replicationdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions go/vt/proto/replicationdata/replicationdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions go/vt/sidecardb/schema/misc/semisync_heartbeat.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2025 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

CREATE TABLE IF NOT EXISTS semisync_heartbeat
(
ts BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`ts`)
) ENGINE = InnoDB CHARSET = utf8mb4
1 change: 1 addition & 0 deletions go/vt/vtorc/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ CREATE TABLE database_instance (
semi_sync_primary_status TINYint NOT NULL DEFAULT 0,
semi_sync_replica_status TINYint NOT NULL DEFAULT 0,
semi_sync_primary_clients int NOT NULL DEFAULT 0,
semi_sync_blocked tinyint NOT NULL DEFAULT 0,
is_disk_stalled TINYint NOT NULL DEFAULT 0,
PRIMARY KEY (alias)
)`,
Expand Down
Loading

0 comments on commit 81ce29c

Please sign in to comment.