diff --git a/pom.xml b/pom.xml
index cb512ec..d810e0e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
4.0.0
com.sonian
elasticsearch-zookeeper
- 1.2.0
+ 1.2.3
jar
ZooKeeper plugin for Elasticsearch
2011
@@ -30,7 +30,7 @@
- 1.2.1
+ 1.2.3
3.4.6
diff --git a/src/test/java/com/sonian/elasticsearch/zookeeper/discovery/ZooKeeperDiscoveryTests.java b/src/test/java/com/sonian/elasticsearch/zookeeper/discovery/ZooKeeperDiscoveryTests.java
index 9e60079..3e46508 100644
--- a/src/test/java/com/sonian/elasticsearch/zookeeper/discovery/ZooKeeperDiscoveryTests.java
+++ b/src/test/java/com/sonian/elasticsearch/zookeeper/discovery/ZooKeeperDiscoveryTests.java
@@ -215,13 +215,27 @@ public void shutdownZooKeeper() {
buildNode("node2");
// Ensure node1 is master
- ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1");
+ ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node1").start();
+ ClusterState cs = nodeMonitor.await();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
// Start all other nodes
- ClusterStateMonitor clientMonitor = new ClusterStateMonitor("client");
- nodeMonitor = new ClusterStateMonitor("node2");
+ ClusterStateMonitor clientMonitor = new ClusterStateMonitor("client", new ClusterStateCondition() {
+ @Override
+ public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
+ nodeMonitor = new ClusterStateMonitor("node2", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node2").start();
node("client").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
@@ -280,7 +294,11 @@ public void shutdownZooKeeper() {
buildNode("node1");
// Ensure node1 is master
- ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1");
+ ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node1").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
@@ -298,12 +316,20 @@ public void shutdownZooKeeper() {
buildNode("node2");
// Ensure node1 is master
- ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1");
+ ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node1").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
// Wait for the second node to start
- nodeMonitor = new ClusterStateMonitor("node2");
+ nodeMonitor = new ClusterStateMonitor("node2", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node2").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
@@ -324,12 +350,20 @@ public void shutdownZooKeeper() {
buildNode("node2");
// Ensure node1 is master
- ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1");
+ ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node1").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
// Wait for the second node to start
- nodeMonitor = new ClusterStateMonitor("node2");
+ nodeMonitor = new ClusterStateMonitor("node2", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node2").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
@@ -368,17 +402,29 @@ public void shutdownZooKeeper() {
);
// Ensure node1 is master
- ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1");
+ ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node1").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
// Wait for the second node to start
- nodeMonitor = new ClusterStateMonitor("node2");
+ nodeMonitor = new ClusterStateMonitor("node2", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node2").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
// Wait for the third node to start
- nodeMonitor = new ClusterStateMonitor("node3");
+ nodeMonitor = new ClusterStateMonitor("node3", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node3").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
@@ -531,17 +577,29 @@ private long countResults(int indexCount) throws InterruptedException {
);
// Ensure node1 is master
- ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1");
+ ClusterStateMonitor nodeMonitor = new ClusterStateMonitor("node1", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node1").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
// Wait for the second node to start
- nodeMonitor = new ClusterStateMonitor("node2");
+ nodeMonitor = new ClusterStateMonitor("node2", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node2").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
// Wait for the third node to start
- nodeMonitor = new ClusterStateMonitor("node3");
+ nodeMonitor = new ClusterStateMonitor("node3", new ClusterStateCondition() {
+ @Override public boolean check(ClusterChangedEvent event) {
+ return event.state().nodes().masterNode() != null;
+ }
+ });
node("node3").start();
assertThat(nodeMonitor.await().nodes().masterNode().name(), equalTo("node1"));
@@ -636,8 +694,8 @@ public ClusterStateMonitor(final String id, ClusterStateCondition condition) {
clusterStateListener = new ClusterStateListener() {
@Override public void clusterChanged(ClusterChangedEvent event) {
if (checkCondition(event)) {
- logger.info("clusterChangedEvent {} state {} ", event.source(), state);
state = event.state();
+ logger.info("clusterChangedEvent {} state {} ", event.source(), state);
clusterService(id).remove(this);
latchNode.countDown();
} else {