From fb78e23d5228c7c5c850a2aa638c7e11caa931e9 Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Sun, 19 Jan 2025 18:18:54 +0800 Subject: [PATCH] CURATOR-710: Fix leaking watch in EnsembleTracker (#508) CURATOR-667(#474) fixes asynchronous event path for `getConfig` to "/zookeeper/config" by using `CuratorFramework::usingNamespace(null)` to fetch data. It causes watcher not registering to possible `WatcherRemovalManager`, so leaking in `WatcherRemoveCuratorFramework::removeWatchers`. Signed-off-by: tison Co-authored-by: tison --- .../framework/imps/GetConfigBuilderImpl.java | 4 +- .../framework/imps/WatcherRemovalFacade.java | 9 ++- .../curator/framework/imps/Watching.java | 6 +- .../imps/TestWatcherRemovalManager.java | 64 +++++++++++++++++++ 4 files changed, 74 insertions(+), 9 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java index 91f20d4d3..af215cbad 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java @@ -39,9 +39,7 @@ public class GetConfigBuilderImpl private Stat stat; public GetConfigBuilderImpl(CuratorFrameworkImpl client) { - this.client = (CuratorFrameworkImpl) client.usingNamespace(null); - backgrounding = new Backgrounding(); - watching = new Watching(this.client); + this(client, new Backgrounding(), null, null); } public GetConfigBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, Watcher watcher, Stat stat) { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java index ba5ce42cc..01e96f5e3 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java @@ -37,9 +37,13 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove private final WatcherRemovalManager removalManager; WatcherRemovalFacade(CuratorFrameworkImpl client) { + this(client, new WatcherRemovalManager(client)); + } + + private WatcherRemovalFacade(CuratorFrameworkImpl client, WatcherRemovalManager removalManager) { super(client); this.client = client; - removalManager = new WatcherRemovalManager(client); + this.removalManager = removalManager; } @Override @@ -73,7 +77,8 @@ public CuratorFramework nonNamespaceView() { @Override public CuratorFramework usingNamespace(String newNamespace) { - return client.usingNamespace(newNamespace); + final CuratorFrameworkImpl newClient = (CuratorFrameworkImpl) client.usingNamespace(newNamespace); + return new WatcherRemovalFacade(newClient, removalManager); } @Override diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index 92b16731c..b381f4136 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -85,10 +85,8 @@ void commitWatcher(int rc, boolean isExists) { doCommit = (rc == KeeperException.Code.OK.intValue()); } - if (doCommit && (namespaceWatcher != null)) { - if (client.getWatcherRemovalManager() != null) { - client.getWatcherRemovalManager().add(namespaceWatcher); - } + if (doCommit && namespaceWatcher != null && client.getWatcherRemovalManager() != null) { + client.getWatcherRemovalManager().add(namespaceWatcher); } } } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java index 960a86b9c..b5e90c628 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -34,12 +35,27 @@ import org.apache.curator.test.Timing; import org.apache.curator.test.WatchersDebug; import org.apache.curator.test.compatibility.CuratorTestBase; +import org.apache.curator.utils.DebugUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class TestWatcherRemovalManager extends CuratorTestBase { + private static final String superUserPasswordDigest = "curator-test:zghsj3JfJqK7DbWf0RQ1BgbJH9w="; // ran from + private static final String superUserPassword = "curator-test"; + + @BeforeEach + @Override + public void setup() throws Exception { + System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", superUserPasswordDigest); + super.setup(); + } + @Test public void testSameWatcherDifferentPaths1Triggered() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); @@ -302,6 +318,54 @@ public void testBasicNamespace3() throws Exception { } } + @Test + public void testEnsembleTracker() throws Exception { + // given: client with ensemble tracker + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .namespace("hey") + .ensembleTracker(true) + .authorization("digest", superUserPassword.getBytes()) + .build(); + try { + client.start(); + + // We are using standalone, so "/zookeeper/config" will be empty. + // So let's set it directly. + QuorumMaj quorumMaj = new QuorumMaj(Collections.singletonMap( + 1L, + new QuorumPeer.QuorumServer(1, "127.0.0.1:2182:2183:participant;" + server.getConnectString()))); + quorumMaj.setVersion(1); + client.usingNamespace(null) + .setData() + .forPath(ZooDefs.CONFIG_NODE, quorumMaj.toString().getBytes()); + + // when: zookeeper config node data fetched + while (client.getCurrentConfig().getVersion() == 0) { + Thread.sleep(100); + } + + // then: the watcher must be attached + assertEquals( + 1, + WatchersDebug.getDataWatches(client.getZookeeperClient().getZooKeeper()) + .size()); + + // when: ensemble tracker closed + System.setProperty(DebugUtils.PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true"); + ((CuratorFrameworkImpl) client).getEnsembleTracker().close(); + + // then: the watcher must be removed + assertEquals( + 0, + WatchersDebug.getDataWatches(client.getZookeeperClient().getZooKeeper()) + .size()); + } finally { + TestCleanState.closeAndTestClean(client); + } + } + @Test public void testSameWatcher() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));