Skip to content

Commit 137bec3

Browse files
committed
close connections on disconnect
Signed-off-by: shanth96 <[email protected]>
1 parent f40e076 commit 137bec3

File tree

2 files changed

+61
-8
lines changed

2 files changed

+61
-8
lines changed

go/vt/topo/zk2topo/zk_conn.go

+4-8
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,8 @@ func (c *ZkConn) withRetry(ctx context.Context, action func(conn *zk.Conn) error
272272
c.conn = nil
273273
}
274274
c.mu.Unlock()
275+
log.Infof("zk conn: got ErrConnectionClosed for addr %v: closing", c.addr)
276+
conn.Close()
275277
}
276278
return
277279
}
@@ -322,23 +324,17 @@ func (c *ZkConn) maybeAddAuth(ctx context.Context) {
322324
// clears out the connection record.
323325
func (c *ZkConn) handleSessionEvents(conn *zk.Conn, session <-chan zk.Event) {
324326
for event := range session {
325-
closeRequired := false
326327

327328
switch event.State {
328-
case zk.StateExpired, zk.StateConnecting:
329-
closeRequired = true
330-
fallthrough
331-
case zk.StateDisconnected:
329+
case zk.StateDisconnected, zk.StateExpired, zk.StateConnecting:
332330
c.mu.Lock()
333331
if c.conn == conn {
334332
// The ZkConn still references this
335333
// connection, let's nil it.
336334
c.conn = nil
337335
}
338336
c.mu.Unlock()
339-
if closeRequired {
340-
conn.Close()
341-
}
337+
conn.Close()
342338
log.Infof("zk conn: session for addr %v ended: %v", c.addr, event)
343339
return
344340
}

go/vt/topo/zk2topo/zk_conn_test.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright 2024 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package zk2topo
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/stretchr/testify/require"
24+
"github.com/z-division/go-zookeeper/zk"
25+
26+
"vitess.io/vitess/go/testfiles"
27+
"vitess.io/vitess/go/vt/zkctl"
28+
)
29+
30+
func TestZkConnClosedOnDisconnect(t *testing.T) {
31+
zkd, serverAddr := zkctl.StartLocalZk(testfiles.GoVtTopoZk2topoZkID, testfiles.GoVtTopoZk2topoPort)
32+
defer zkd.Teardown()
33+
34+
conn := Connect(serverAddr)
35+
defer conn.Close()
36+
37+
_, _, err := conn.Get(context.Background(), "/")
38+
require.NoError(t, err, "Get() failed")
39+
40+
require.True(t, conn.conn.State().IsConnected(), "Connection not connected")
41+
42+
oldConn := conn.conn
43+
44+
// force a disconnect
45+
zkd.Shutdown()
46+
zkd.Start()
47+
48+
// do another get to trigger a new connection
49+
_, _, err = conn.Get(context.Background(), "/")
50+
require.NoError(t, err, "Get() failed")
51+
52+
// Check that old connection is closed
53+
_, _, err = oldConn.Get("/")
54+
require.ErrorContains(t, err, "zookeeper is closing")
55+
56+
require.Equal(t, zk.StateDisconnected, oldConn.State(), "Connection is not in disconnected state")
57+
}

0 commit comments

Comments
 (0)