diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 47b08555476..86a4001d33f 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -60,6 +60,7 @@ Usage of vtgate: --grpc_server_keepalive_enforcement_policy_min_time duration gRPC server minimum keepalive time (default 10s) --grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. + --healthcheck-dial-concurrency int Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration the health check timeout period (default 1m0s) -h, --help display usage and exit diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index a1b7b7d023f..a9fcffcb7d7 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -243,6 +243,7 @@ Usage of vttablet: --grpc_server_keepalive_enforcement_policy_min_time duration gRPC server minimum keepalive time (default 10s) --grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs) --health_check_interval duration Interval between health checks (default 20s) + --healthcheck-dial-concurrency int Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks. --heartbeat_interval duration How frequently to read and write replication heartbeat. (default 1s) --heartbeat_on_demand_duration duration If non-zero, heartbeats are only written upon consumer request, and only run for up to given duration following the request. Frequent requests can keep the heartbeat running consistently; when requests are infrequent heartbeat may completely stop between requests (default 0s) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index ad93336e98e..0441fc71df1 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -47,6 +47,7 @@ import ( "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" @@ -81,6 +82,8 @@ var ( refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes") // topoReadConcurrency tells us how many topo reads are allowed in parallel topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads") + // healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000. + healthCheckDialConcurrency = flag.Int("healthcheck-dial-concurrency", 1024, "Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") ) // See the documentation for NewHealthCheck below for an explanation of these parameters. @@ -260,6 +263,8 @@ type HealthCheckImpl struct { subMu sync.Mutex // subscribers subscribers map[chan *TabletHealth]struct{} + // healthCheckDialSem is used to limit how many healthchecks initiate in parallel. + healthCheckDialSem *sync2.Semaphore } // NewHealthCheck creates a new HealthCheck object. @@ -294,6 +299,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cell: localCell, retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, + healthCheckDialSem: sync2.NewSemaphore(*healthCheckDialConcurrency, 0), healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), @@ -780,7 +786,7 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target // TODO: test that throws this error return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } - return thc.Connection(), nil + return thc.Connection(hc), nil } // getAliasByCell should only be called while holding hc.mu diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index f0ad9b0a2ac..62a4407fd9e 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -19,6 +19,7 @@ package discovery import ( "context" "fmt" + "net" "strings" "sync" "time" @@ -34,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" + "google.golang.org/grpc" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/proto/query" @@ -123,8 +125,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) { } // stream streams healthcheck responses to callback. -func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error { - conn := thc.Connection() +func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error { + conn := thc.Connection(hc) if conn == nil { // This signals the caller to retry return nil @@ -137,14 +139,30 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.S return err } -func (thc *tabletHealthCheck) Connection() queryservice.QueryService { +func (thc *tabletHealthCheck) Connection(hc *HealthCheckImpl) queryservice.QueryService { thc.connMu.Lock() defer thc.connMu.Unlock() - return thc.connectionLocked() + return thc.connectionLocked(hc) } -func (thc *tabletHealthCheck) connectionLocked() queryservice.QueryService { +func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) { + return func(ctx context.Context, addr string) (net.Conn, error) { + // Limit the number of healthcheck connections opened in parallel to avoid high OS-thread + // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, + // etc). Without this limit it is possible for vtgates watching >10k tablets to hit + // the panic: 'runtime: program exceeds 10000-thread limit'. + hc.healthCheckDialSem.Acquire() + defer hc.healthCheckDialSem.Release() + var dialer net.Dialer + return dialer.DialContext(ctx, "tcp", addr) + } +} + +func (thc *tabletHealthCheck) connectionLocked(hc *HealthCheckImpl) queryservice.QueryService { if thc.Conn == nil { + grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { + return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil + }) conn, err := tabletconn.GetDialer()(thc.Tablet, grpcclient.FailFast(true)) if err != nil { thc.LastError = err @@ -273,7 +291,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { }() // Read stream health responses. - err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error { + err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay // Don't block on send to avoid deadlocks. diff --git a/go/vt/grpcclient/client.go b/go/vt/grpcclient/client.go index c870967168b..910a16be1b5 100644 --- a/go/vt/grpcclient/client.go +++ b/go/vt/grpcclient/client.go @@ -22,6 +22,7 @@ import ( "context" "crypto/tls" "flag" + "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -39,6 +40,7 @@ import ( ) var ( + grpcDialOptionsMu sync.Mutex keepaliveTime = flag.Duration("grpc_keepalive_time", 10*time.Second, "After a duration of this time, if the client doesn't see any activity, it pings the server to see if the transport is still alive.") keepaliveTimeout = flag.Duration("grpc_keepalive_timeout", 10*time.Second, "After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that the connection is closed.") initialConnWindowSize = flag.Int("grpc_initial_conn_window_size", 0, "gRPC initial connection window size") @@ -53,6 +55,8 @@ var grpcDialOptions []func(opts []grpc.DialOption) ([]grpc.DialOption, error) // RegisterGRPCDialOptions registers an implementation of AuthServer. func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([]grpc.DialOption, error)) { + grpcDialOptionsMu.Lock() + defer grpcDialOptionsMu.Unlock() grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc) } @@ -101,12 +105,14 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ... newopts = append(newopts, opts...) var err error + grpcDialOptionsMu.Lock() for _, grpcDialOptionInitializer := range grpcDialOptions { newopts, err = grpcDialOptionInitializer(newopts) if err != nil { log.Fatalf("There was an error initializing client grpc.DialOption: %v", err) } } + grpcDialOptionsMu.Unlock() newopts = append(newopts, interceptors()...) diff --git a/go/vt/grpcclient/client_auth_static.go b/go/vt/grpcclient/client_auth_static.go index 53be18cc4ff..5b0596d6353 100644 --- a/go/vt/grpcclient/client_auth_static.go +++ b/go/vt/grpcclient/client_auth_static.go @@ -17,29 +17,39 @@ limitations under the License. package grpcclient import ( + "context" "encoding/json" "flag" "os" - - "context" + "os/signal" + "sync" + "syscall" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + + "vitess.io/vitess/go/vt/servenv" ) var ( credsFile = flag.String("grpc_auth_static_client_creds", "", "when using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server") // StaticAuthClientCreds implements client interface to be able to WithPerRPCCredentials _ credentials.PerRPCCredentials = (*StaticAuthClientCreds)(nil) + + clientCreds *StaticAuthClientCreds + clientCredsCancel context.CancelFunc + clientCredsErr error + clientCredsMu sync.Mutex + clientCredsSigChan chan os.Signal ) -// StaticAuthClientCreds holder for client credentials +// StaticAuthClientCreds holder for client credentials. type StaticAuthClientCreds struct { Username string Password string } -// GetRequestMetadata gets the request metadata as a map from StaticAuthClientCreds +// GetRequestMetadata gets the request metadata as a map from StaticAuthClientCreds. func (c *StaticAuthClientCreds) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { return map[string]string{ "username": c.Username, @@ -49,30 +59,82 @@ func (c *StaticAuthClientCreds) GetRequestMetadata(context.Context, ...string) ( // RequireTransportSecurity indicates whether the credentials requires transport security. // Given that people can use this with or without TLS, at the moment we are not enforcing -// transport security +// transport security. func (c *StaticAuthClientCreds) RequireTransportSecurity() bool { return false } // AppendStaticAuth optionally appends static auth credentials if provided. func AppendStaticAuth(opts []grpc.DialOption) ([]grpc.DialOption, error) { - if *credsFile == "" { - return opts, nil - } - data, err := os.ReadFile(*credsFile) + creds, err := getStaticAuthCreds() if err != nil { return nil, err } - clientCreds := &StaticAuthClientCreds{} - err = json.Unmarshal(data, clientCreds) + if creds != nil { + grpcCreds := grpc.WithPerRPCCredentials(creds) + opts = append(opts, grpcCreds) + } + return opts, nil +} + +// ResetStaticAuth resets the static auth credentials. +func ResetStaticAuth() { + clientCredsMu.Lock() + defer clientCredsMu.Unlock() + if clientCredsCancel != nil { + clientCredsCancel() + clientCredsCancel = nil + } + clientCreds = nil + clientCredsErr = nil +} + +// getStaticAuthCreds returns the static auth creds and error. +func getStaticAuthCreds() (*StaticAuthClientCreds, error) { + clientCredsMu.Lock() + defer clientCredsMu.Unlock() + if *credsFile != "" && clientCreds == nil { + var ctx context.Context + ctx, clientCredsCancel = context.WithCancel(context.Background()) + go handleClientCredsSignals(ctx) + clientCreds, clientCredsErr = loadStaticAuthCredsFromFile(*credsFile) + } + return clientCreds, clientCredsErr +} + +// handleClientCredsSignals handles signals to reload client creds. +func handleClientCredsSignals(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-clientCredsSigChan: + if newCreds, err := loadStaticAuthCredsFromFile(*credsFile); err == nil { + clientCredsMu.Lock() + clientCreds = newCreds + clientCredsErr = err + clientCredsMu.Unlock() + } + } + } +} + +// loadStaticAuthCredsFromFile loads static auth credentials from a file. +func loadStaticAuthCredsFromFile(path string) (*StaticAuthClientCreds, error) { + data, err := os.ReadFile(path) if err != nil { return nil, err } - creds := grpc.WithPerRPCCredentials(clientCreds) - opts = append(opts, creds) - return opts, nil + creds := &StaticAuthClientCreds{} + err = json.Unmarshal(data, creds) + return creds, err } func init() { + servenv.OnInit(func() { + clientCredsSigChan = make(chan os.Signal, 1) + signal.Notify(clientCredsSigChan, syscall.SIGHUP) + _, _ = getStaticAuthCreds() // preload static auth credentials + }) RegisterGRPCDialOptions(AppendStaticAuth) } diff --git a/go/vt/grpcclient/client_auth_static_test.go b/go/vt/grpcclient/client_auth_static_test.go new file mode 100644 index 00000000000..99c8db5e2f1 --- /dev/null +++ b/go/vt/grpcclient/client_auth_static_test.go @@ -0,0 +1,127 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpcclient + +import ( + "errors" + "fmt" + "os" + "reflect" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +func TestAppendStaticAuth(t *testing.T) { + { + clientCreds = nil + clientCredsErr = nil + opts, err := AppendStaticAuth([]grpc.DialOption{}) + assert.Nil(t, err) + assert.Len(t, opts, 0) + } + { + clientCreds = nil + clientCredsErr = errors.New("test err") + opts, err := AppendStaticAuth([]grpc.DialOption{}) + assert.NotNil(t, err) + assert.Len(t, opts, 0) + } + { + clientCreds = &StaticAuthClientCreds{Username: "test", Password: "123456"} + clientCredsErr = nil + opts, err := AppendStaticAuth([]grpc.DialOption{}) + assert.Nil(t, err) + assert.Len(t, opts, 1) + } +} + +func TestGetStaticAuthCreds(t *testing.T) { + tmp, err := os.CreateTemp("", t.Name()) + assert.Nil(t, err) + defer os.Remove(tmp.Name()) + credsFileTmp := tmp.Name() + credsFile = &credsFileTmp + clientCredsSigChan = make(chan os.Signal, 1) + + // load old creds + fmt.Fprint(tmp, `{"Username": "old", "Password": "123456"}`) + ResetStaticAuth() + creds, err := getStaticAuthCreds() + assert.Nil(t, err) + assert.Equal(t, &StaticAuthClientCreds{Username: "old", Password: "123456"}, creds) + + // write new creds to the same file + _ = tmp.Truncate(0) + _, _ = tmp.Seek(0, 0) + fmt.Fprint(tmp, `{"Username": "new", "Password": "123456789"}`) + + // test the creds did not change yet + creds, err = getStaticAuthCreds() + assert.Nil(t, err) + assert.Equal(t, &StaticAuthClientCreds{Username: "old", Password: "123456"}, creds) + + // test SIGHUP signal triggers reload + credsOld := creds + clientCredsSigChan <- syscall.SIGHUP + timeoutChan := time.After(time.Second * 10) + for { + select { + case <-timeoutChan: + assert.Fail(t, "timed out waiting for SIGHUP reload of static auth creds") + return + default: + // confirm new creds get loaded + creds, err = getStaticAuthCreds() + if reflect.DeepEqual(creds, credsOld) { + continue // not changed yet + } + assert.Nil(t, err) + assert.Equal(t, &StaticAuthClientCreds{Username: "new", Password: "123456789"}, creds) + return + } + } +} + +func TestLoadStaticAuthCredsFromFile(t *testing.T) { + { + f, err := os.CreateTemp("", t.Name()) + if !assert.Nil(t, err) { + assert.FailNowf(t, "cannot create temp file: %s", err.Error()) + } + defer os.Remove(f.Name()) + fmt.Fprint(f, `{ + "Username": "test", + "Password": "correct horse battery staple" + }`) + if !assert.Nil(t, err) { + assert.FailNowf(t, "cannot read auth file: %s", err.Error()) + } + + creds, err := loadStaticAuthCredsFromFile(f.Name()) + assert.Nil(t, err) + assert.Equal(t, "test", creds.Username) + assert.Equal(t, "correct horse battery staple", creds.Password) + } + { + _, err := loadStaticAuthCredsFromFile(`does-not-exist`) + assert.NotNil(t, err) + } +} diff --git a/go/vt/vtgate/grpcvtgateconn/conn_rpc_test.go b/go/vt/vtgate/grpcvtgateconn/conn_rpc_test.go index 28f9573634e..2c908c980d7 100644 --- a/go/vt/vtgate/grpcvtgateconn/conn_rpc_test.go +++ b/go/vt/vtgate/grpcvtgateconn/conn_rpc_test.go @@ -27,6 +27,7 @@ import ( "context" + "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vtgate/grpcvtgateservice" "vitess.io/vitess/go/vt/vtgate/vtgateconn" @@ -105,6 +106,7 @@ func TestGRPCVTGateConnAuth(t *testing.T) { // Create a Go RPC client connecting to the server ctx := context.Background() flag.Set("grpc_auth_static_client_creds", f.Name()) + grpcclient.ResetStaticAuth() client, err := dial(ctx, listener.Addr().String()) if err != nil { t.Fatalf("dial failed: %v", err) @@ -138,6 +140,7 @@ func TestGRPCVTGateConnAuth(t *testing.T) { // Create a Go RPC client connecting to the server ctx = context.Background() flag.Set("grpc_auth_static_client_creds", f.Name()) + grpcclient.ResetStaticAuth() client, err = dial(ctx, listener.Addr().String()) if err != nil { t.Fatalf("dial failed: %v", err) diff --git a/go/vt/vttablet/tabletmanager/replmanager.go b/go/vt/vttablet/tabletmanager/replmanager.go index 3f949494be4..4985e2401ba 100644 --- a/go/vt/vttablet/tabletmanager/replmanager.go +++ b/go/vt/vttablet/tabletmanager/replmanager.go @@ -17,13 +17,12 @@ limitations under the License. package tabletmanager import ( + "context" "os" "path" "sync" "time" - "context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/log" @@ -105,6 +104,7 @@ func (rm *replManager) check() { func (rm *replManager) checkActionLocked() { status, err := rm.tm.MysqlDaemon.ReplicationStatus() if err != nil { + log.Infof("slack-debug: %v", err) if err != mysql.ErrNotReplica { return } @@ -116,12 +116,14 @@ func (rm *replManager) checkActionLocked() { } } + log.Infof("slack-debug: rm.failed=%v", rm.failed) if !rm.failed { log.Infof("Replication is stopped, reconnecting to primary.") } ctx, cancel := context.WithTimeout(rm.ctx, 5*time.Second) defer cancel() if err := rm.tm.repairReplication(ctx); err != nil { + log.Infof("slack-debug: repairReplication failed with=%v", err) if !rm.failed { rm.failed = true log.Infof("Failed to reconnect to primary: %v, will keep retrying.", err) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 7f15a3cf26e..1b5dc22348d 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -17,6 +17,7 @@ limitations under the License. package tabletmanager import ( + "context" "flag" "fmt" "strconv" @@ -25,8 +26,6 @@ import ( "vitess.io/vitess/go/vt/proto/vtrpc" - "context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -476,10 +475,10 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab // // It attemps to idempotently ensure the following guarantees upon returning // successfully: -// * No future writes will be accepted. -// * No writes are in-flight. -// * MySQL is in read-only mode. -// * Semi-sync settings are consistent with a REPLICA tablet. +// - No future writes will be accepted. +// - No writes are in-flight. +// - MySQL is in read-only mode. +// - Semi-sync settings are consistent with a REPLICA tablet. // // If necessary, it waits for all in-flight writes to complete or time out. // @@ -703,6 +702,7 @@ func (tm *TabletManager) setReplicationSourceRepairReplication(ctx context.Conte return err } + log.Infof("slack-debug: calling tm.TopoServer.LockShard") ctx, unlock, lockErr := tm.TopoServer.LockShard(ctx, parent.Tablet.GetKeyspace(), parent.Tablet.GetShard(), fmt.Sprintf("repairReplication to %v as parent)", topoproto.TabletAliasString(parentAlias))) if lockErr != nil { return lockErr @@ -724,6 +724,12 @@ func (tm *TabletManager) setReplicationSourceSemiSyncNoAction(ctx context.Contex } func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool, semiSync SemiSyncAction) (err error) { + tm._isSetReplicationSourceLockedRunning = true + + defer func() { + tm._isSetReplicationSourceLockedRunning = false + }() + // End orchestrator maintenance at the end of fixing replication. // This is a best effort operation, so it should happen in a goroutine defer func() { @@ -745,6 +751,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA // unintentionally change the type of RDONLY tablets tablet := tm.Tablet() if tablet.Type == topodatapb.TabletType_PRIMARY { + log.Infof("slack-debug: calling tm.tmState.ChangeTabletType") if err := tm.tmState.ChangeTabletType(ctx, topodatapb.TabletType_REPLICA, DBActionNone); err != nil { return err } @@ -755,6 +762,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA shouldbeReplicating := false status, err := tm.MysqlDaemon.ReplicationStatus() if err == mysql.ErrNotReplica { + log.Infof("slack-debug: err == mysql.ErrNotReplica") // This is a special error that means we actually succeeded in reading // the status, but the status is empty because replication is not // configured. We assume this means we used to be a primary, so we always @@ -781,6 +789,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA if tabletType == topodatapb.TabletType_PRIMARY { tabletType = topodatapb.TabletType_REPLICA } + log.Infof("slack-debug: calling tm.fixSemiSync") if err := tm.fixSemiSync(tabletType, semiSync); err != nil { return err } @@ -797,6 +806,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA host := parent.Tablet.MysqlHostname port := int(parent.Tablet.MysqlPort) if status.SourceHost != host || status.SourcePort != port { + log.Infof("slack-debug: calling tm.MysqlDaemon.SetReplicationSource") // This handles both changing the address and starting replication. if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, wasReplicating, shouldbeReplicating); err != nil { if err := tm.handleRelayLogError(err); err != nil { @@ -1053,18 +1063,18 @@ func (tm *TabletManager) fixSemiSync(tabletType topodatapb.TabletType, semiSync // This following code will be uncommented and the above deleted when we are ready to use the // durability policies for setting the semi_sync information - //switch semiSync { - //case SemiSyncActionNone: + // switch semiSync { + // case SemiSyncActionNone: // return nil - //case SemiSyncActionSet: + // case SemiSyncActionSet: // // Always enable replica-side since it doesn't hurt to keep it on for a primary. // // The primary-side needs to be off for a replica, or else it will get stuck. // return tm.MysqlDaemon.SetSemiSyncEnabled(tabletType == topodatapb.TabletType_PRIMARY, true) - //case SemiSyncActionUnset: + // case SemiSyncActionUnset: // return tm.MysqlDaemon.SetSemiSyncEnabled(false, false) - //default: + // default: // return vterrors.Errorf(vtrpc.Code_INTERNAL, "Unknown SemiSyncAction - %v", semiSync) - //} + // } } func (tm *TabletManager) isPrimarySideSemiSyncEnabled() bool { @@ -1077,10 +1087,10 @@ func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletT // Semi-sync handling is not enabled. return nil } - //if semiSync == SemiSyncActionNone { + // if semiSync == SemiSyncActionNone { // // Semi-sync handling is not required. // return nil - //} + // } if tabletType == topodatapb.TabletType_PRIMARY { // Primary is special. It is always handled at the @@ -1106,7 +1116,7 @@ func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletT return nil } - //shouldAck := semiSync == SemiSyncActionSet + // shouldAck := semiSync == SemiSyncActionSet shouldAck := isPrimaryEligible(tabletType) acking, err := tm.MysqlDaemon.SemiSyncReplicationStatus() if err != nil { @@ -1144,6 +1154,17 @@ func (tm *TabletManager) handleRelayLogError(err error) error { // repairReplication tries to connect this server to whoever is // the current primary of the shard, and start replicating. func (tm *TabletManager) repairReplication(ctx context.Context) error { + log.Infof("slack-debug: entering repairReplication") + + if tm._isSetReplicationSourceLockedRunning { + // we are actively setting replication source, + // repairReplication will block due to higher + // authority holding a shard lock (PRS on vtctld) + log.Infof("slack-debug: we are actively setting replication source, exiting") + + return nil + } + tablet := tm.Tablet() si, err := tm.TopoServer.GetShard(ctx, tablet.Keyspace, tablet.Shard) @@ -1164,6 +1185,7 @@ func (tm *TabletManager) repairReplication(ctx context.Context) error { // If Orchestrator is configured and if Orchestrator is actively reparenting, we should not repairReplication if tm.orc != nil { + log.Infof("slack-debug: tm.orc != nil") re, err := tm.orc.InActiveShardRecovery(tablet) if err != nil { return err diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index b56d1a31bbe..b04eda587ca 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -195,6 +195,8 @@ type TabletManager struct { _lockTablesTimer *time.Timer // _isBackupRunning tells us whether there is a backup that is currently running _isBackupRunning bool + // _isSetReplicationSourceLockedRunning indicates we are actively running setReplicationSourceLocked + _isSetReplicationSourceLockedRunning bool } // BuildTabletFromInput builds a tablet record from input parameters.