Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CURATOR-724. Fix LeaderLatch recover on reconnected and missing leaderPath #515

Merged
merged 3 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<extension>
<groupId>com.gradle</groupId>
<artifactId>develocity-maven-extension</artifactId>
<version>1.21.4</version>
<version>1.23</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
getChildren();
}
} else {
log.error("getChildren() failed. rc = {}", event.getResultCode());
log.error("[id={}] creatingParentContainersIfNeeded() failed (rc = {})", id, event.getResultCode());
}
}
};
Expand All @@ -528,7 +528,7 @@ private synchronized void internalStart() {
reset();
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
log.error("An error occurred checking resetting leadership.", e);
log.error("[id={}] failed to check resetting leadership.", id, e);
}
}
}
Expand All @@ -545,10 +545,10 @@ private void checkLeadership(List<String> children) throws Exception {
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;

log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren);
log.debug("[id={}] checkLeadership with ourPath: {}, children: {}", id, localOurPath, sortedChildren);

if (ourIndex < 0) {
log.error("Can't find our node. Resetting. Index: {}", ourIndex);
log.error("[id={}] failed to find our node; resetting (index: {})", id, ourIndex);
reset();
return;
}
Expand Down Expand Up @@ -582,7 +582,7 @@ public void process(WatchedEvent event) {
getChildren();
} catch (Exception ex) {
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
log.error("[id={}] failed to check the leadership.", id, ex);
}
}
}
Expand All @@ -592,7 +592,7 @@ public void process(WatchedEvent event) {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
// previous node is gone - retry getChildren
log.debug("[id={}] previous node is gone; retry getChildren", id);
getChildren();
}
}
Expand All @@ -607,29 +607,37 @@ private void getChildren() throws Exception {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
checkLeadership(event.getChildren());
} else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
log.debug("[id={}] latchPath has gone; reset", id);
// This is possible when RECONNECTED during:
// (1) Scale the zk cluster to 0 nodes.
// (2) Scale it back.
//
// See also https://issues.apache.org/jira/browse/CURATOR-724
reset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, before #430, the leaderLatch calls reset method after reconnection, and reset will whatever create the missing parent path nodes. So I didn't find this issue in the old versions. -- CURATOR-724

Just for sure, so, in case of RECONNECTED, we have no idea what happened to the cluster and getChildren is a conservative option comparing to reset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Updated add a test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also ensure that the test failed without this fix.

} else {
log.error("[id={}] getChildren() failed (rc = {})", id, event.getResultCode());
}
}
};
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}

@VisibleForTesting
volatile CountDownLatch debugHandleReconnectedLatch = null;

@VisibleForTesting
protected void handleStateChange(ConnectionState newState) {
switch (newState) {
default: {
// NOP
break;
}

case RECONNECTED: {
try {
if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED)
|| !hasLeadership.get()) {
getChildren();
if (debugHandleReconnectedLatch != null) {
debugHandleReconnectedLatch.await();
}
getChildren();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it doesn't hurt we always call getChildren to check our leadership. It's not reset now anyway.

} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
log.error("Could not reset leader latch", e);
log.error("[id={}] failed to recheck leadership on reconnected", id, e);
setLeadership(false);
}
break;
Expand All @@ -646,6 +654,11 @@ protected void handleStateChange(ConnectionState newState) {
setLeadership(false);
break;
}

default: {
// NOP
break;
}
}
}

Expand All @@ -662,7 +675,7 @@ private synchronized void setLeadership(boolean newValue) {

private void setNode(String newValue) throws Exception {
String oldPath = ourPath.getAndSet(newValue);
log.debug("setNode with id: {}, oldPath: {}, newValue: {}", id, oldPath, newValue);
log.debug("[id={}] setNode with oldPath: {}, newValue: {}", id, oldPath, newValue);
if (oldPath != null) {
client.delete().guaranteed().inBackground().forPath(oldPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,63 @@ public void testUncreatedPathGetLeader() throws Exception {
}
}

// @see https://issues.apache.org/jira/browse/CURATOR-724
@Test
public void testGetChildrenHitsNoNode() throws Exception {
final String latchPath = "/testGetChildrenHitsNoNode";
final Timing2 timing = new Timing2();
final BlockingQueue<TestEvent> events0 = new LinkedBlockingQueue<>();
final BlockingQueue<TestEvent> events1 = new LinkedBlockingQueue<>();

final List<Closeable> closeableResources = new ArrayList<>();
try {
final String id0 = "id0";
final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null);
closeableResources.add(client0);
final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events0);
closeableResources.add(latch0);

assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP));

final String id1 = "id1";
final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null);
closeableResources.add(client1);
final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events1);
closeableResources.add(latch1);

// wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation
// this call is time-consuming but necessary because we don't have a handle to detect the end of the reset
// call
timing.forWaiting().sleepABit();

assertTrue(latch0.hasLeadership());
assertFalse(latch1.hasLeadership());

// ensure we can observe the leadership transferred to latch1
latch0.debugHandleReconnectedLatch = new CountDownLatch(1);

// scale to zero - recreate the cluster
final int port = server.getPort();
server.close();
server = new TestingServer(port, true);

assertThat(events1.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP));

latch0.debugHandleReconnectedLatch.countDown();
assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
.isNotNull()
.isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP));
} finally {
// reverse is necessary for closing the LeaderLatch instances before closing the corresponding client
Collections.reverse(closeableResources);
closeableResources.forEach(CloseableUtils::closeQuietly);
}
}

@Test
public void testWatchedNodeDeletedOnReconnect() throws Exception {
final String latchPath = "/foo/bar";
Expand Down
2 changes: 1 addition & 1 deletion curator-recipes/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ log4j.additivity.org.apache.curator=false

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-5p %c %x %m [%t]%n
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p %c %x %m [%t]%n
Loading