Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHOW VITESS_REPLICATION_STATUS: Only use replication tracker when it's enabled #15348

Merged
merged 12 commits into from
Feb 26, 2024
41 changes: 40 additions & 1 deletion go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestVtgateReplicationStatusCheck(t *testing.T) {
time.Sleep(2 * time.Second)
verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
conn, err := mysql.Connect(ctx, &vtParams) // VTGate
require.NoError(t, err)
defer conn.Close()

Expand All @@ -68,6 +68,40 @@ func TestVtgateReplicationStatusCheck(t *testing.T) {
expectNumRows := 2
numRows := len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status. Expected %d, got %d", expectNumRows, numRows))

// Stop any VTOrc(s) so that it doesn't immediately repair/restart replication.
mattlord marked this conversation as resolved.
Show resolved Hide resolved
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
err := vtorcProcess.TearDown()
require.NoError(t, err)
}
// Restart them afterward as the cluster is re-used.
defer func() {
for _, vtorcProcess := range clusterInstance.VTOrcProcesses {
err := vtorcProcess.Setup()
require.NoError(t, err)
}
}()
// Stop replication on the non-PRIMARY tablets.
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave")
require.NoError(t, err)
// Restart replication as the cluster is re-used.
defer func() {
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave")
require.NoError(t, err)
}()
time.Sleep(2 * time.Second) // Build up some replication lag
res, err := conn.ExecuteFetch("show vitess_replication_status", 2, false)
require.NoError(t, err)
expectNumRows = 2
numRows = len(qr.Rows)
assert.Equal(t, expectNumRows, numRows, fmt.Sprintf("wrong number of results from show vitess_replication_status, expected %d, got %d", expectNumRows, numRows))
rawLag := res.Named().Rows[0]["ReplicationLag"] // Let's just look at the first row
lagInt, _ := rawLag.ToInt64() // Don't check the error as the value could be "NULL"
assert.True(t, rawLag.IsNull() || lagInt > 0, "replication lag should be NULL or greater than 0 but was: %s", rawLag.ToString())
}

func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) {
Expand All @@ -90,6 +124,11 @@ func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) {
rdOnlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly()
err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE)
require.NoError(t, err)
// Change it back to RDONLY afterward as the cluster is re-used.
defer func() {
err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_RDONLY)
require.NoError(t, err)
}()

// Only returns rows for REPLICA and RDONLY tablets -- so should be 1 of them since we updated 1 to spare
qr = utils.Exec(t, conn, "show vitess_replication_status like '%'")
Expand Down
83 changes: 34 additions & 49 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/spf13/pflag"
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/cache/theine"
Expand All @@ -40,11 +40,6 @@ import (
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
Expand All @@ -61,6 +56,16 @@ import (
"vitess.io/vitess/go/vt/vtgate/vschemaacl"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"
"vitess.io/vitess/go/vt/vthash"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var (
Expand Down Expand Up @@ -909,6 +914,7 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
rows := [][]sqltypes.Value{}

status := e.scatterConn.GetHealthCheckCacheStatus()
tmc := tmclient.NewTabletManagerClient()

for _, s := range status {
for _, ts := range s.TabletsStats {
Expand All @@ -927,17 +933,24 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
}

tabletHostPort := ts.GetTabletHostPort()
throttlerStatus, err := getTabletThrottlerStatus(tabletHostPort)

res, err := tmc.CheckThrottler(ctx, ts.Tablet, &tabletmanagerdatapb.CheckThrottlerRequest{
AppName: throttlerapp.VTGateName,
})
if err != nil {
log.Warningf("Could not get check the tablet throttler on %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err)
}
throttlerStatus, err := protojson.Marshal(res)
if err != nil {
log.Warningf("Could not get throttler status from %s: %v", tabletHostPort, err)
log.Warningf("Invalid tablet throttler response from %s: %v", topoproto.TabletAliasString(ts.Tablet.Alias), err)
}

replSourceHost := ""
replSourcePort := int64(0)
replIOThreadHealth := ""
replSQLThreadHealth := ""
replLastError := ""
replLag := int64(-1)
replLag := "-1" // A string to support NULL as a value
sql := "show slave status"
results, err := e.txConn.tabletGateway.Execute(ctx, ts.Target, sql, nil, 0, 0, nil)
if err != nil || results == nil {
Expand All @@ -948,8 +961,16 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
replIOThreadHealth = row["Slave_IO_Running"].ToString()
replSQLThreadHealth = row["Slave_SQL_Running"].ToString()
replLastError = row["Last_Error"].ToString()
if ts.Stats != nil {
replLag = int64(ts.Stats.ReplicationLagSeconds)
if tabletenv.NewCurrentConfig().ReplicationTracker.Mode == tabletenv.Disable { // Use the value from mysqld
if row["Seconds_Behind_Master"].IsNull() {
replLag = strings.ToUpper(sqltypes.NullStr) // Uppercase to match mysqld's output in SHOW REPLICA STATUS
} else {
replLag = row["Seconds_Behind_Master"].ToString()
}
} else { // Use the value we get from the replication tracker
if ts.Stats != nil {
replLag = fmt.Sprintf("%d", ts.Stats.ReplicationLagSeconds)
}
}
}
replicationHealth := fmt.Sprintf("{\"EventStreamRunning\":\"%s\",\"EventApplierRunning\":\"%s\",\"LastError\":\"%s\"}", replIOThreadHealth, replSQLThreadHealth, replLastError)
Expand All @@ -962,8 +983,8 @@ func (e *Executor) showVitessReplicationStatus(ctx context.Context, filter *sqlp
ts.Tablet.Hostname,
fmt.Sprintf("%s:%d", replSourceHost, replSourcePort),
replicationHealth,
fmt.Sprintf("%d", replLag),
throttlerStatus,
replLag,
string(throttlerStatus),
))
}
}
Expand Down Expand Up @@ -1477,42 +1498,6 @@ func (e *Executor) checkThatPlanIsValid(stmt sqlparser.Statement, plan *engine.P
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "plan includes scatter, which is disallowed using the `no_scatter` command line argument")
}

func getTabletThrottlerStatus(tabletHostPort string) (string, error) {
client := http.Client{
Timeout: 100 * time.Millisecond,
}
resp, err := client.Get(fmt.Sprintf("http://%s/throttler/check?app=vtgate", tabletHostPort))
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

var elements struct {
StatusCode int
Value float64
Threshold float64
Message string
}
err = json.Unmarshal(body, &elements)
if err != nil {
return "", err
}

httpStatusStr := http.StatusText(elements.StatusCode)

load := float64(0)
if elements.Threshold > 0 {
load = float64((elements.Value / elements.Threshold) * 100)
}

status := fmt.Sprintf("{\"state\":\"%s\",\"load\":%.2f,\"message\":\"%s\"}", httpStatusStr, load, elements.Message)
return status, nil
}

// ReleaseLock implements the IExecutor interface
func (e *Executor) ReleaseLock(ctx context.Context, session *SafeSession) error {
return e.txConn.ReleaseLock(ctx, session)
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ import (
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtgate/buffer"
Expand All @@ -55,6 +50,12 @@ import (
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vtgate/vschemaacl"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestExecutorResultsExceeded(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"time"

"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/mysqlctl"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

var replicationLagSeconds = stats.NewGauge("replicationLagSec", "replication lag in seconds")
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/heartbeat"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (n Name) Concatenate(other Name) Name {
}

const (
// DefaultName is the app name used by vitess when app doesn't indicate its name
// DefaultName is the app name used by vitess when app doesn't indicate its name.
DefaultName Name = "default"
VitessName Name = "vitess"

Expand All @@ -62,6 +62,8 @@ const (
BinlogWatcherName Name = "binlog-watcher"
MessagerName Name = "messager"
SchemaTrackerName Name = "schema-tracker"

VTGateName = "vtgate"
)

var (
Expand Down
Loading