diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorClosedException.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorClosedException.java new file mode 100644 index 000000000..cd1746ca7 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorClosedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.api; + +public class CuratorClosedException extends IllegalStateException { + public CuratorClosedException() { + super("Expected state [STARTED] was [STOPPED]"); + } +} diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 816d0bda0..3b17b6fb1 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -51,6 +51,7 @@ import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.api.CompressionProvider; import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.CuratorClosedException; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.api.CuratorListener; @@ -462,11 +463,16 @@ public String getNamespace() { private void checkState() { CuratorFrameworkState state = getState(); - Preconditions.checkState( - state == CuratorFrameworkState.STARTED, - "Expected state [%s] was [%s]", - CuratorFrameworkState.STARTED, - state); + switch (state) { + case STARTED: + return; + case STOPPED: + throw new CuratorClosedException(); + default: + String msg = String.format( + "Expected state [%s] was [%s]", CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED); + throw new IllegalStateException(msg); + } } @Override @@ -525,11 +531,13 @@ public SetACLBuilder setACL() { @Override public ReconfigBuilder reconfig() { + checkState(); return new ReconfigBuilderImpl(this); } @Override public GetConfigBuilder getConfig() { + checkState(); return new GetConfigBuilderImpl(this); } @@ -577,11 +585,13 @@ public void sync(String path, Object context) { @Override public SyncBuilder sync() { + checkState(); return new SyncBuilderImpl(this); } @Override public RemoveWatchesBuilder watches() { + checkState(); return new RemoveWatchesBuilderImpl(this); } @@ -590,6 +600,7 @@ public WatchesBuilder watchers() { Preconditions.checkState( zookeeperCompatibility.hasPersistentWatchers(), "watchers() is not supported in the ZooKeeper library and/or server being used. Use watches() instead."); + checkState(); return new WatchesBuilderImpl(this); } diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java index 59a0c6abf..d03e9d9b4 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java @@ -25,12 +25,15 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorClosedException; +import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.listen.StandardListenerManager; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +80,11 @@ public PersistentWatcher(CuratorFramework client, String basePath, boolean recur public void start() { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); client.getConnectionStateListenable().addListener(connectionStateListener); + client.getCuratorListenable().addListener(((ignored, event) -> { + if (event.getType() == CuratorEventType.CLOSING) { + onClientClosed(); + } + })); reset(); } @@ -97,6 +105,13 @@ public void close() { } } + private void onClientClosed() { + if (state.compareAndSet(State.STARTED, State.CLOSED)) { + WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Closed, null); + watcher.process(event); + } + } + /** * Container for setting listeners * @@ -135,6 +150,8 @@ private void reset() { .inBackground(callback) .usingWatcher(watcher) .forPath(basePath); + } catch (CuratorClosedException ignored) { + onClientClosed(); } catch (Exception e) { log.error("Could not reset persistent watch at path: " + basePath, e); } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java index f486bd534..902b18a0a 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java @@ -20,10 +20,12 @@ package org.apache.curator.framework.recipes.watch; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.state.ConnectionState; @@ -46,6 +48,53 @@ public void testConnectionLost() throws Exception { internalTest(false); } + @Test + public void testConcurrentClientClose() throws Exception { + BlockingQueue events = new LinkedBlockingQueue<>(); + + // given: started curator client + CuratorFramework client = CuratorFrameworkFactory.newClient( + server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + + // given: started persistent watcher + PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", true); + persistentWatcher.getListenable().addListener(events::add); + persistentWatcher.start(); + + // when: curator client closed + client.close(); + + // then: listener get Closed notification + WatchedEvent event = events.poll(5, TimeUnit.SECONDS); + assertNotNull(event); + assertEquals(Watcher.Event.EventType.None, event.getType()); + assertEquals(Watcher.Event.KeeperState.Closed, event.getState()); + } + + @Test + public void testAfterClientClose() throws Exception { + BlockingQueue events = new LinkedBlockingQueue<>(); + + // given: closed client + CuratorFramework client = CuratorFrameworkFactory.newClient( + server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + client.close(); + + // when: start persistent watcher + try (PersistentWatcher persistentWatcher = new PersistentWatcher(client, "/top/main", true)) { + persistentWatcher.getListenable().addListener(events::add); + persistentWatcher.start(); + } + + // then: listener get Closed notification + WatchedEvent event = events.poll(5, TimeUnit.SECONDS); + assertNotNull(event); + assertEquals(Watcher.Event.EventType.None, event.getType()); + assertEquals(Watcher.Event.KeeperState.Closed, event.getState()); + } + private void internalTest(boolean recursive) throws Exception { try (CuratorFramework client = CuratorFrameworkFactory.newClient( server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) {