From 92c3641478ed4f52769a5997e484669aaf0d0333 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 19 Mar 2024 14:49:32 +0530 Subject: [PATCH 1/4] test: add failing test for full status connection pooling Signed-off-by: Manan Gupta --- go/test/endtoend/vtorc/general/vtorc_test.go | 73 ++++++++++++++++++++ go/test/endtoend/vtorc/utils/utils.go | 2 +- 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index d79e2964f3e..38bc5f34df9 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -495,3 +495,76 @@ func TestDurabilityPolicySetLater(t *testing.T) { assert.NotNil(t, primary, "should have elected a primary") utils.CheckReplication(t, newCluster, primary, shard0.Vttablets, 10*time.Second) } + +func TestFullStatusConnectionPooling(t *testing.T) { + defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance) + defer cluster.PanicHandler(t) + utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 4, 0, []string{ + "--tablet_manager_grpc_concurrency=1", + }, cluster.VTOrcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1, "") + keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] + + // find primary from topo + curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) + assert.NotNil(t, curPrimary, "should have elected a primary") + vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0] + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1) + utils.WaitForSuccessfulPRSCount(t, vtOrcProcess, keyspace.Name, shard0.Name, 1) + + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + time.Sleep(1 * time.Minute) + + // Change the primaries ports and restart it. + curPrimary.VttabletProcess.Port = clusterInfo.ClusterInstance.GetAndReservePort() + curPrimary.VttabletProcess.GrpcPort = clusterInfo.ClusterInstance.GetAndReservePort() + err := curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) + + // REPEATED + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + time.Sleep(1 * time.Minute) + + // Change the primaries ports back to original and restart it. + curPrimary.VttabletProcess.Port = curPrimary.HTTPPort + curPrimary.VttabletProcess.GrpcPort = curPrimary.GrpcPort + err = curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) +} diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index dca2c7b1e26..00f75740338 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -733,7 +733,7 @@ func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status // The function provided takes in the status and response and returns if we should continue to retry or not func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, retry func(int, string) bool) (status int, response string) { t.Helper() - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) for { select { case <-timeout: From 937a20d43bcf05aa81fac433c09efbf6f5791b6a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 24 Mar 2024 12:49:28 +0200 Subject: [PATCH 2/4] specialized dialPool, reduce concurrency to 1 Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 98 +++++++++++++++++---------- 1 file changed, 63 insertions(+), 35 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 4c1a8f1e41f..4732ae1b616 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -45,6 +45,17 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +type DialPoolGroup int + +const ( + // DialPoolGroupDefault is the default group for dialing. + DialPoolGroupDefault DialPoolGroup = iota + DialPoolGroupThrottler + DialPoolGroupVTOrc +) + +type invalidatorFunc func(error) + var ( concurrency = 8 cert string @@ -92,14 +103,17 @@ type tmc struct { client tabletmanagerservicepb.TabletManagerClient } +// rpcClientMap maps an address to a tmc +type rpcClientMap map[string]*tmc + // grpcClient implements both dialer and poolDialer. type grpcClient struct { // This cache of connections is to maximize QPS for ExecuteFetchAs{Dba,App}, // CheckThrottler and FullStatus. Note we'll keep the clients open and close them upon Close() only. // But that's OK because usually the tasks that use them are one-purpose only. // The map is protected by the mutex. - mu sync.Mutex - rpcClientMap map[string]chan *tmc + mu sync.Mutex + rpcClientMaps map[DialPoolGroup]rpcClientMap } type dialer interface { @@ -108,7 +122,7 @@ type dialer interface { } type poolDialer interface { - dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) + dialPool(ctx context.Context, dialPoolGroup DialPoolGroup, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, invalidatorFunc, error) } // Client implements tmclient.TabletManagerClient. @@ -152,53 +166,59 @@ func (client *grpcClient) dial(ctx context.Context, tablet *topodatapb.Tablet) ( return tabletmanagerservicepb.NewTabletManagerClient(cc), cc, nil } -func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) { +func (client *grpcClient) dialPool(ctx context.Context, dialPoolGroup DialPoolGroup, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, invalidatorFunc, error) { addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"])) opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name) if err != nil { - return nil, err + return nil, nil, err } client.mu.Lock() - if client.rpcClientMap == nil { - client.rpcClientMap = make(map[string]chan *tmc) + defer client.mu.Unlock() + + if client.rpcClientMaps == nil { + client.rpcClientMaps = make(map[DialPoolGroup]rpcClientMap) } - c, ok := client.rpcClientMap[addr] + m, ok := client.rpcClientMaps[dialPoolGroup] if !ok { - c = make(chan *tmc, concurrency) - client.rpcClientMap[addr] = c - client.mu.Unlock() - - for i := 0; i < cap(c); i++ { - cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) - if err != nil { - return nil, err - } - c <- &tmc{ - cc: cc, - client: tabletmanagerservicepb.NewTabletManagerClient(cc), - } + m = make(rpcClientMap) + client.rpcClientMaps[dialPoolGroup] = m + } + invalidator := func(error) { + if err == nil { + return } - } else { - client.mu.Unlock() + m[addr].cc.Close() + delete(m, addr) + } + + if tm, ok := m[addr]; ok { + return tm.client, invalidator, nil + } + + cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) + if err != nil { + return nil, nil, err } + tm := &tmc{ + cc: cc, + client: tabletmanagerservicepb.NewTabletManagerClient(cc), + } + m[addr] = tm - result := <-c - c <- result - return result.client, nil + return tm.client, invalidator, nil } // Close is part of the tmclient.TabletManagerClient interface. func (client *grpcClient) Close() { client.mu.Lock() defer client.mu.Unlock() - for _, c := range client.rpcClientMap { - close(c) - for ch := range c { - ch.cc.Close() + for _, m := range client.rpcClientMaps { + for _, tm := range m { + tm.cc.Close() } } - client.rpcClientMap = nil + client.rpcClientMaps = nil } // @@ -472,7 +492,7 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb. var err error if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, DialPoolGroupDefault, tablet) if err != nil { return nil, err } @@ -528,7 +548,7 @@ func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb. var err error if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, DialPoolGroupDefault, tablet) if err != nil { return nil, err } @@ -575,9 +595,10 @@ func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb. // and dialing the other tablet every time is not practical. func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error) { var c tabletmanagerservicepb.TabletManagerClient + var invalidator invalidatorFunc var err error if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, DialPoolGroupVTOrc, tablet) if err != nil { return nil, err } @@ -593,6 +614,9 @@ func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) } response, err := c.FullStatus(ctx, &tabletmanagerdatapb.FullStatusRequest{}) + if invalidator != nil { + invalidator(err) + } if err != nil { return nil, err } @@ -1065,9 +1089,10 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req // and dialing the other tablet every time is not practical. func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) { var c tabletmanagerservicepb.TabletManagerClient + var invalidator invalidatorFunc var err error if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, DialPoolGroupThrottler, tablet) if err != nil { return nil, err } @@ -1083,6 +1108,9 @@ func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tab } response, err := c.CheckThrottler(ctx, req) + if invalidator != nil { + invalidator(err) + } if err != nil { return nil, err } From 2df2144aa204ed65358bf5a45fd0e4b4d986a65f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 24 Mar 2024 13:29:22 +0200 Subject: [PATCH 3/4] resovle conflict Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 7233ec128a5..ca55ec7d521 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -528,7 +528,7 @@ func (client *Client) ExecuteMultiFetchAsDba(ctx context.Context, tablet *topoda var err error if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, DialPoolGroupDefault, tablet) if err != nil { return nil, err } From 803592679a5cf47f1de9a197f5ae45bad36e0e43 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 24 Mar 2024 13:51:58 +0200 Subject: [PATCH 4/4] mutex protect Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index ca55ec7d521..45ba031b23c 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -188,6 +188,8 @@ func (client *grpcClient) dialPool(ctx context.Context, dialPoolGroup DialPoolGr if err == nil { return } + client.mu.Lock() + defer client.mu.Unlock() m[addr].cc.Close() delete(m, addr) }