Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-17.0] SHOW VITESS_REPLICATION_STATUS: Only use replication tracker when it's enabled (#15348) #15360

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ import (
"testing"
"time"

"vitess.io/vitess/go/test/endtoend/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils"
)

func TestVtgateHealthCheck(t *testing.T) {
Expand All @@ -58,7 +57,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) {
time.Sleep(2 * time.Second)
verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
conn, err := mysql.Connect(ctx, &vtParams) // VTGate
require.NoError(t, err)
defer conn.Close()

Expand All @@ -67,6 +66,38 @@ func TestVtgateReplicationStatusCheck(t *testing.T) {
expectNumRows := 2
numRows := len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows))

// Disable VTOrc(s) recoveries so that it doesn't immediately repair/restart replication.
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
vtorcutils.DisableGlobalRecoveries(t, vtorcProcess)
}
// Re-enable recoveries afterward as the cluster is re-used.
defer func() {
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
vtorcutils.EnableGlobalRecoveries(t, vtorcProcess)
}
}()
// Stop replication on the non-PRIMARY tablets.
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave")
require.NoError(t, err)
// Restart replication afterward as the cluster is re-used.
defer func() {
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave")
require.NoError(t, err)
}()
time.Sleep(2 * time.Second) // Build up some replication lag
res, err := conn.ExecuteFetch("show vitess_replication_status", 2, false)
require.NoError(t, err)
expectNumRows = 2
numRows = len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status, expected %d, got %d", expectNumRows, numRows))
rawLag := res.Named().Rows[0]["ReplicationLag"] // Let's just look at the first row
lagInt, _ := rawLag.ToInt64() // Don't check the error as the value could be "NULL"
assert.True(t, rawLag.IsNull() || lagInt > 0, "replication lag should be NULL or greater than 0 but was: %s", rawLag.ToString())
}

func verifyVtgateVariables(t *testing.T, url string) {
Expand Down
16 changes: 16 additions & 0 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,3 +1020,19 @@ func PrintVTOrcLogsOnFailure(t *testing.T, clusterInstance *cluster.LocalProcess
log.Errorf("%s", string(content))
}
}

// EnableGlobalRecoveries enables global recoveries for the given VTOrc.
func EnableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) {
status, resp, err := MakeAPICall(t, vtorc, "/api/enable-global-recoveries")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Equal(t, "Global recoveries enabled\n", resp)
}

// DisableGlobalRecoveries disables global recoveries for the given VTOrc.
func DisableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) {
status, resp, err := MakeAPICall(t, vtorc, "/api/disable-global-recoveries")
require.NoError(t, err)
assert.Equal(t, 200, status)
assert.Equal(t, "Global recoveries disabled\n", resp)
}
32 changes: 26 additions & 6 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,15 +868,15 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
tabletHostPort := ts.GetTabletHostPort()
throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort)
if err != nil {
log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err)
log.Warningf("Could not get throttler status from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err)
}

replSourceHost := ""
replSourcePort := int64(0)
replIOThreadHealth := ""
replSQLThreadHealth := ""
replLastError := ""
replLag := int64(-1)
replLag := "-1" // A string to support NULL as a value
sql := "show slave status"
results, err := e.txConn.tabletGateway.Execute(ctx, ts.Target, sql, nil, 0, 0, nil)
if err != nil || results == nil {
Expand All @@ -887,8 +887,25 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
replIOThreadHealth = row["Slave_IO_Running"].ToString()
replSQLThreadHealth = row["Slave_SQL_Running"].ToString()
replLastError = row["Last_Error"].ToString()
if ts.Stats != nil {
replLag = int64(ts.Stats.ReplicationLagSeconds)
// We cannot check the tablet's tabletenv config from here so
// we only use the tablet's stat -- which is managed by the
// ReplicationTracker -- if we can tell that it's enabled,
// meaning that it has a non-zero value. If it's actually
// enabled AND zero (rather than the zeroval), then mysqld
// should also return 0 so in this case the value is correct
// and equivalent either way. The only reason that we would
// want to use the ReplicationTracker based value, when we
// can, is because the polling method allows us to get the
// estimated lag value when replication is not running (based
// on how long we've seen that it's not been running).
if ts.Stats != nil && ts.Stats.ReplicationLagSeconds > 0 { // Use the value we get from the ReplicationTracker
replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds)
} else { // Use the value from mysqld
if row["Seconds_Behind_Master"].IsNull() {
replLag = strings.ToUpper(sqltypes.NullStr) // Uppercase to match mysqld's output in SHOW REPLICA STATUS
} else {
replLag = row["Seconds_Behind_Master"].ToString()
}
}
}
replicationHealth := fmt.Sprintf("{\"EventStreamRunning\":\"%s\",\"EventApplierRunning\":\"%s\",\"LastError\":\"%s\"}", replIOThreadHealth, replSQLThreadHealth, replLastError)
Expand All @@ -901,7 +918,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
ts.Tablet.Hostname,
fmt.Sprintf("%s:%d", replSourceHost, replSourcePort),
replicationHealth,
fmt.Sprintf("%d", replLag),
replLag,
throttlerStatus,
))
}
Expand Down Expand Up @@ -1401,11 +1418,14 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument")
}

// getTabletThrottlerStatus uses HTTP to get the throttler status
// on a tablet. It uses HTTP because the CheckThrottler RPC is a
// tmclient RPC and you cannot use tmclient outside of a tablet.
func getTabletThrottlerStatus(tabletHostPort string) (string, error) {
client := http.Client{
Timeout: 100 * time.Millisecond,
}
resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=vtgate", tabletHostPort))
resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check-self", tabletHostPort))
if err != nil {
return "", err
}
Expand Down
23 changes: 8 additions & 15 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,22 @@ import (
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/safehtml/template"

"vitess.io/vitess/go/vt/vtgate/logstats"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/collations"

"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/vtgate/engine"

"vitess.io/vitess/go/vt/topo"

"github.com/google/go-cmp/cmp"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vtgate/vschemaacl"

Expand All @@ -56,9 +52,6 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestExecutorResultsExceeded(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"time"

"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/mysqlctl"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var replicationLagSeconds = stats.NewGauge("replicationLagSec", "replication lag in seconds")
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/heartbeat"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
Expand Down
Loading