From e96822df02cdc3f9c7be0fdc71c18f65648df5b9 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Fri, 8 Nov 2024 17:11:44 -0500 Subject: [PATCH] Close zookeeper topo connection on disconnect (#17136) Signed-off-by: shanth96 --- go/vt/topo/zk2topo/zk_conn.go | 12 +++---- go/vt/topo/zk2topo/zk_conn_test.go | 57 ++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 8 deletions(-) create mode 100644 go/vt/topo/zk2topo/zk_conn_test.go diff --git a/go/vt/topo/zk2topo/zk_conn.go b/go/vt/topo/zk2topo/zk_conn.go index a0eec8b4340..6aff7af5dbb 100644 --- a/go/vt/topo/zk2topo/zk_conn.go +++ b/go/vt/topo/zk2topo/zk_conn.go @@ -277,6 +277,8 @@ func (c *ZkConn) withRetry(ctx context.Context, action func(conn *zk.Conn) error c.conn = nil } c.mu.Unlock() + log.Infof("zk conn: got ErrConnectionClosed for addr %v: closing", c.addr) + conn.Close() } return } @@ -327,13 +329,9 @@ func (c *ZkConn) maybeAddAuth(ctx context.Context) { // clears out the connection record. func (c *ZkConn) handleSessionEvents(conn *zk.Conn, session <-chan zk.Event) { for event := range session { - closeRequired := false switch event.State { - case zk.StateExpired, zk.StateConnecting: - closeRequired = true - fallthrough - case zk.StateDisconnected: + case zk.StateDisconnected, zk.StateExpired, zk.StateConnecting: c.mu.Lock() if c.conn == conn { // The ZkConn still references this @@ -341,9 +339,7 @@ func (c *ZkConn) handleSessionEvents(conn *zk.Conn, session <-chan zk.Event) { c.conn = nil } c.mu.Unlock() - if closeRequired { - conn.Close() - } + conn.Close() log.Infof("zk conn: session for addr %v ended: %v", c.addr, event) return } diff --git a/go/vt/topo/zk2topo/zk_conn_test.go b/go/vt/topo/zk2topo/zk_conn_test.go new file mode 100644 index 00000000000..b0b94c07074 --- /dev/null +++ b/go/vt/topo/zk2topo/zk_conn_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package zk2topo + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/z-division/go-zookeeper/zk" + + "vitess.io/vitess/go/testfiles" + "vitess.io/vitess/go/vt/zkctl" +) + +func TestZkConnClosedOnDisconnect(t *testing.T) { + zkd, serverAddr := zkctl.StartLocalZk(testfiles.GoVtTopoZk2topoZkID, testfiles.GoVtTopoZk2topoPort) + defer zkd.Teardown() + + conn := Connect(serverAddr) + defer conn.Close() + + _, _, err := conn.Get(context.Background(), "/") + require.NoError(t, err, "Get() failed") + + require.True(t, conn.conn.State().IsConnected(), "Connection not connected") + + oldConn := conn.conn + + // force a disconnect + zkd.Shutdown() + zkd.Start() + + // do another get to trigger a new connection + _, _, err = conn.Get(context.Background(), "/") + require.NoError(t, err, "Get() failed") + + // Check that old connection is closed + _, _, err = oldConn.Get("/") + require.ErrorContains(t, err, "zookeeper is closing") + + require.Equal(t, zk.StateDisconnected, oldConn.State(), "Connection is not in disconnected state") +}