diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index d79e2964f3e..cfbb6a69a67 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -495,3 +495,78 @@ func TestDurabilityPolicySetLater(t *testing.T) { assert.NotNil(t, primary, "should have elected a primary") utils.CheckReplication(t, newCluster, primary, shard0.Vttablets, 10*time.Second) } + +// TestFullStatusConnectionPooling tests that full status RPC succeeds despite a vttablet restarting with a different +// IP address and then back to its original. This test has been added in response to a bug seen in production with a similar situation occurring. +func TestFullStatusConnectionPooling(t *testing.T) { + defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance) + defer cluster.PanicHandler(t) + utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 4, 0, []string{ + "--tablet_manager_grpc_concurrency=1", + }, cluster.VTOrcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1, "") + keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] + + // find primary from topo + curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) + assert.NotNil(t, curPrimary, "should have elected a primary") + vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0] + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1) + utils.WaitForSuccessfulPRSCount(t, vtOrcProcess, keyspace.Name, shard0.Name, 1) + + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + // We have to wait for some time to ensure the gRPC connections from VTOrc to vttablet + // are broken and closed due to keep-alives. Without this timeout the gRPC connections stay open and test passes trivially. + time.Sleep(1 * time.Minute) + + // Change the primaries ports and restart it. + curPrimary.VttabletProcess.Port = clusterInfo.ClusterInstance.GetAndReservePort() + curPrimary.VttabletProcess.GrpcPort = clusterInfo.ClusterInstance.GetAndReservePort() + err := curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) + + // REPEATED + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + // Change the primaries ports back to original and restart it. + curPrimary.VttabletProcess.Port = curPrimary.HTTPPort + curPrimary.VttabletProcess.GrpcPort = curPrimary.GrpcPort + err = curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) +} diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index dca2c7b1e26..00f75740338 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -733,7 +733,7 @@ func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status // The function provided takes in the status and response and returns if we should continue to retry or not func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, retry func(int, string) bool) (status int, response string) { t.Helper() - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) for { select { case <-timeout: diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 4c1a8f1e41f..12d399aaf4a 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "time" "github.com/spf13/pflag" @@ -98,8 +99,10 @@ type grpcClient struct { // CheckThrottler and FullStatus. Note we'll keep the clients open and close them upon Close() only. // But that's OK because usually the tasks that use them are one-purpose only. // The map is protected by the mutex. - mu sync.Mutex - rpcClientMap map[string]chan *tmc + mu sync.Mutex + rpcClientMap map[string]chan *tmc + rpcClientPoolMap map[string](chan *tmc) + rpcClientPoolRequests atomic.Int32 } type dialer interface { @@ -109,6 +112,7 @@ type dialer interface { type poolDialer interface { dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) + dialPoolPrivate(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, func(error), error) } // Client implements tmclient.TabletManagerClient. @@ -151,6 +155,69 @@ func (client *grpcClient) dial(ctx context.Context, tablet *topodatapb.Tablet) ( return tabletmanagerservicepb.NewTabletManagerClient(cc), cc, nil } +func (client *grpcClient) dialPoolPrivate(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, func(error), error) { + addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"])) + opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name) + if err != nil { + return nil, nil, err + } + client.mu.Lock() + defer client.mu.Unlock() + + if client.rpcClientPoolMap == nil { + client.rpcClientPoolMap = make(map[string](chan *tmc)) + } + + createNewPooledClient := func() (*tmc, error) { + cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) + if err != nil { + return nil, err + } + return &tmc{ + cc: cc, + client: tabletmanagerservicepb.NewTabletManagerClient(cc), + }, nil + } + pool, ok := client.rpcClientPoolMap[addr] + if !ok { + // One time population of the pool + client.rpcClientPoolRequests.Add(int32(concurrency)) + pool = make(chan *tmc, concurrency) + client.rpcClientPoolMap[addr] = pool + } + for client.rpcClientPoolRequests.Load() > 0 { + tmClient, err := createNewPooledClient() + if err != nil { + return nil, nil, err + } + pool <- tmClient + client.rpcClientPoolRequests.Add(-1) + } + var tmClient *tmc + var recycle func(error) + if len(pool) == 0 { + // By choice, if the pool is empty, we do not block. We instead create a new + // client on the fly. Since this client was not part of the pool, it will also not return to the pool. + var err error + tmClient, err = createNewPooledClient() + if err != nil { + return nil, nil, err + } + } else { + tmClient = <-pool + recycle = func(err error) { + if err == nil { + pool <- tmClient + } else { + // The connection had an error. We will close it and not return it to the pool + tmClient.cc.Close() + // Indicate that the pool needs to be populated with another client: + client.rpcClientPoolRequests.Add(1) + } + } + } + return tmClient.client, recycle, nil +} func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) { addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"])) @@ -575,9 +642,10 @@ func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb. // and dialing the other tablet every time is not practical. func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error) { var c tabletmanagerservicepb.TabletManagerClient + var recycle func(error) var err error if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, recycle, err = poolDialer.dialPoolPrivate(ctx, tablet) if err != nil { return nil, err } @@ -593,6 +661,9 @@ func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) } response, err := c.FullStatus(ctx, &tabletmanagerdatapb.FullStatusRequest{}) + if recycle != nil { + defer recycle(err) + } if err != nil { return nil, err } @@ -1065,9 +1136,10 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req // and dialing the other tablet every time is not practical. func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) { var c tabletmanagerservicepb.TabletManagerClient + var recycle func(error) var err error if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, recycle, err = poolDialer.dialPoolPrivate(ctx, tablet) if err != nil { return nil, err } @@ -1083,6 +1155,9 @@ func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tab } response, err := c.CheckThrottler(ctx, req) + if recycle != nil { + defer recycle(err) + } if err != nil { return nil, err }