Skip to content

Commit

Permalink
CURATOR-724. Fix LeaderLatch recover on reconnected and missing leade…
Browse files Browse the repository at this point in the history
…rPath (#515)

Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Dec 21, 2024
1 parent eb99124 commit dd67936
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<extension>
<groupId>com.gradle</groupId>
<artifactId>develocity-maven-extension</artifactId>
<version>1.21.4</version>
<version>1.23</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
getChildren();
}
} else {
log.error("getChildren() failed. rc = {}", event.getResultCode());
log.error("[id={}] creatingParentContainersIfNeeded() failed (rc = {})", id, event.getResultCode());
}
}
};
Expand All @@ -528,7 +528,7 @@ private synchronized void internalStart() {
reset();
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
log.error("An error occurred checking resetting leadership.", e);
log.error("[id={}] failed to check resetting leadership.", id, e);
}
}
}
Expand All @@ -545,10 +545,10 @@ private void checkLeadership(List<String> children) throws Exception {
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;

log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren);
log.debug("[id={}] checkLeadership with ourPath: {}, children: {}", id, localOurPath, sortedChildren);

if (ourIndex < 0) {
log.error("Can't find our node. Resetting. Index: {}", ourIndex);
log.error("[id={}] failed to find our node; resetting (index: {})", id, ourIndex);
reset();
return;
}
Expand Down Expand Up @@ -582,7 +582,7 @@ public void process(WatchedEvent event) {
getChildren();
} catch (Exception ex) {
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
log.error("[id={}] failed to check the leadership.", id, ex);
}
}
}
Expand All @@ -592,7 +592,7 @@ public void process(WatchedEvent event) {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
// previous node is gone - retry getChildren
log.debug("[id={}] previous node is gone; retry getChildren", id);
getChildren();
}
}
Expand All @@ -607,29 +607,37 @@ private void getChildren() throws Exception {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
checkLeadership(event.getChildren());
} else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
log.debug("[id={}] latchPath has gone; reset", id);
// This is possible when RECONNECTED during:
// (1) Scale the zk cluster to 0 nodes.
// (2) Scale it back.
//
// See also https://issues.apache.org/jira/browse/CURATOR-724
reset();
} else {
log.error("[id={}] getChildren() failed (rc = {})", id, event.getResultCode());
}
}
};
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}

@VisibleForTesting
volatile CountDownLatch debugHandleReconnectedLatch = null;

@VisibleForTesting
protected void handleStateChange(ConnectionState newState) {
switch (newState) {
default: {
// NOP
break;
}

case RECONNECTED: {
try {
if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED)
|| !hasLeadership.get()) {
getChildren();
if (debugHandleReconnectedLatch != null) {
debugHandleReconnectedLatch.await();
}
getChildren();
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
log.error("Could not reset leader latch", e);
log.error("[id={}] failed to recheck leadership on reconnected", id, e);
setLeadership(false);
}
break;
Expand All @@ -646,6 +654,11 @@ protected void handleStateChange(ConnectionState newState) {
setLeadership(false);
break;
}

default: {
// NOP
break;
}
}
}

Expand All @@ -662,7 +675,7 @@ private synchronized void setLeadership(boolean newValue) {

private void setNode(String newValue) throws Exception {
String oldPath = ourPath.getAndSet(newValue);
log.debug("setNode with id: {}, oldPath: {}, newValue: {}", id, oldPath, newValue);
log.debug("[id={}] setNode with oldPath: {}, newValue: {}", id, oldPath, newValue);
if (oldPath != null) {
client.delete().guaranteed().inBackground().forPath(oldPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,63 @@ public void testUncreatedPathGetLeader() throws Exception {
}
}

// @see https://issues.apache.org/jira/browse/CURATOR-724
@Test
public void testGetChildrenHitsNoNode() throws Exception {
final String latchPath = "/testGetChildrenHitsNoNode";
final Timing2 timing = new Timing2();
final BlockingQueue<TestEvent> events0 = new LinkedBlockingQueue<>();
final BlockingQueue<TestEvent> events1 = new LinkedBlockingQueue<>();

final List<Closeable> closeableResources = new ArrayList<>();
try {
final String id0 = "id0";
final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null);
closeableResources.add(client0);
final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events0);
closeableResources.add(latch0);

assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP));

final String id1 = "id1";
final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null);
closeableResources.add(client1);
final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events1);
closeableResources.add(latch1);

// wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation
// this call is time-consuming but necessary because we don't have a handle to detect the end of the reset
// call
timing.forWaiting().sleepABit();

assertTrue(latch0.hasLeadership());
assertFalse(latch1.hasLeadership());

// ensure we can observe the leadership transferred to latch1
latch0.debugHandleReconnectedLatch = new CountDownLatch(1);

// scale to zero - recreate the cluster
final int port = server.getPort();
server.close();
server = new TestingServer(port, true);

assertThat(events1.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP));

latch0.debugHandleReconnectedLatch.countDown();
assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP));
} finally {
// reverse is necessary for closing the LeaderLatch instances before closing the corresponding client
Collections.reverse(closeableResources);
closeableResources.forEach(CloseableUtils::closeQuietly);
}
}

@Test
public void testWatchedNodeDeletedOnReconnect() throws Exception {
final String latchPath = "/foo/bar";
Expand Down
2 changes: 1 addition & 1 deletion curator-recipes/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ log4j.additivity.org.apache.curator=false

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p %c %x %m [%t]%n

0 comments on commit dd67936

Please sign in to comment.