diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java index 4a3ca9c55b..cfc6cbd91b 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java @@ -96,7 +96,8 @@ public void before() throws Exception { final ZooKeeperHolder zkHolder = mock(ZooKeeperHolder.class); when(zkHolder.get()).thenReturn(CURATOR); - when(zkHolder.getSubscriptionCurator(anyInt())).thenReturn(CURATOR); + when(zkHolder.getSubscriptionCurator(anyInt())) + .thenReturn(new ZooKeeperHolder.DisposableCuratorFramework(CURATOR)); final TopicRepository topicRepository = mock(TopicRepository.class); final TimelineService timelineService = mock(TimelineService.class); diff --git a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java index 0ff1d2b952..c10cb60edd 100644 --- a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java +++ b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java @@ -10,7 +10,10 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.zalando.nakadi.config.NakadiSettings; +import org.zalando.nakadi.exceptions.runtime.ZookeeperException; +import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.TimeUnit; @@ -47,62 +50,94 @@ public ZooKeeperHolder(final String zookeeperBrokers, this.connectionTimeoutMs = connectionTimeoutMs; this.maxCommitTimeoutMs = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()); - initExhibitor(); - } - - private void initExhibitor() throws Exception { - final RetryPolicy retryPolicy = new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX); - final EnsembleProvider ensembleProvider; - if (exhibitorAddresses != null) { - final Collection exhibitorHosts = Arrays.asList(exhibitorAddresses.split("\\s*,\\s*")); - final Exhibitors exhibitors = new Exhibitors(exhibitorHosts, exhibitorPort, - () -> zookeeperBrokers + zookeeperKafkaNamespace); - final ExhibitorRestClient exhibitorRestClient = new DefaultExhibitorRestClient(); - ensembleProvider = new ExhibitorEnsembleProvider(exhibitors, - exhibitorRestClient, "/exhibitor/v1/cluster/list", EXHIBITOR_POLLING_MS, retryPolicy); - ((ExhibitorEnsembleProvider) ensembleProvider).pollForInitialEnsemble(); - } else { - ensembleProvider = new FixedEnsembleProvider(zookeeperBrokers + zookeeperKafkaNamespace); - } - zooKeeper = CuratorFrameworkFactory.builder() - .ensembleProvider(ensembleProvider) - .retryPolicy(retryPolicy) - .sessionTimeoutMs(sessionTimeoutMs) - .connectionTimeoutMs(connectionTimeoutMs) - .build(); - zooKeeper.start(); - - subscriptionCurator = CuratorFrameworkFactory.builder() - .ensembleProvider(ensembleProvider) - .retryPolicy(retryPolicy) - // max commit timeout is not higher than 60 seconds, it is safe to cast to integer - .sessionTimeoutMs((int) maxCommitTimeoutMs) - .connectionTimeoutMs(connectionTimeoutMs) - .build(); - subscriptionCurator.start(); + zooKeeper = createCuratorFramework(sessionTimeoutMs, connectionTimeoutMs); + subscriptionCurator = createCuratorFramework((int) maxCommitTimeoutMs, connectionTimeoutMs); } public CuratorFramework get() { return zooKeeper; } - public CuratorFramework getSubscriptionCurator(final long sessionTimeoutMs) { + public CloseableCuratorFramework getSubscriptionCurator(final long sessionTimeoutMs) throws ZookeeperException { // most of the clients use default max timeout, subscriptionCurator client saves zookeeper resource if (sessionTimeoutMs == maxCommitTimeoutMs) { - return subscriptionCurator; + return new StaticCuratorFramework(subscriptionCurator); + } + + try { + // max commit timeout is not higher than 60 seconds, it is safe to cast to integer + return new DisposableCuratorFramework(createCuratorFramework((int) sessionTimeoutMs, connectionTimeoutMs)); + } catch (final Exception e) { + throw new ZookeeperException("Failed to create curator framework", e); } + } + + public abstract static class CloseableCuratorFramework implements Closeable { + + private final CuratorFramework curatorFramework; + + public CloseableCuratorFramework(final CuratorFramework curatorFramework) { + this.curatorFramework = curatorFramework; + } + + public CuratorFramework getCuratorFramework() { + return curatorFramework; + } + } + public static class StaticCuratorFramework extends CloseableCuratorFramework { + + public StaticCuratorFramework(final CuratorFramework curatorFramework) { + super(curatorFramework); + } + + @Override + public void close() throws IOException { + // do not ever close this particular instance of curator + } + } + + public static class DisposableCuratorFramework extends CloseableCuratorFramework { + + public DisposableCuratorFramework(final CuratorFramework curatorFramework) { + super(curatorFramework); + } + + @Override + public void close() throws IOException { + getCuratorFramework().close(); + } + } + + private CuratorFramework createCuratorFramework(final int sessionTimeoutMs, + final int connectionTimeoutMs) throws Exception { final CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() - .connectString(zooKeeper.getZookeeperClient().getCurrentConnectionString()) + .ensembleProvider(createEnsembleProvider()) .retryPolicy(new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX)) - // max commit timeout is not higher than 60 seconds, it is safe to cast to integer - .sessionTimeoutMs((int) sessionTimeoutMs) + .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .build(); curatorFramework.start(); return curatorFramework; } + private EnsembleProvider createEnsembleProvider() throws Exception { + final EnsembleProvider ensembleProvider; + final RetryPolicy retryPolicy = new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX); + if (exhibitorAddresses != null) { + final Collection exhibitorHosts = Arrays.asList(exhibitorAddresses.split("\\s*,\\s*")); + final Exhibitors exhibitors = new Exhibitors(exhibitorHosts, exhibitorPort, + () -> zookeeperBrokers + zookeeperKafkaNamespace); + final ExhibitorRestClient exhibitorRestClient = new DefaultExhibitorRestClient(); + ensembleProvider = new ExhibitorEnsembleProvider(exhibitors, + exhibitorRestClient, "/exhibitor/v1/cluster/list", EXHIBITOR_POLLING_MS, retryPolicy); + ((ExhibitorEnsembleProvider) ensembleProvider).pollForInitialEnsemble(); + } else { + ensembleProvider = new FixedEnsembleProvider(zookeeperBrokers + zookeeperKafkaNamespace); + } + return ensembleProvider; + } + private class ExhibitorEnsembleProvider extends org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider { ExhibitorEnsembleProvider(final Exhibitors exhibitors, final ExhibitorRestClient restClient, diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java index 1bc2b25d73..be517f2172 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/CleanupState.java @@ -3,6 +3,7 @@ import org.zalando.nakadi.service.subscription.StreamingContext; import javax.annotation.Nullable; +import java.io.IOException; public class CleanupState extends State { private final Exception exception; @@ -19,8 +20,11 @@ public CleanupState(@Nullable final Exception e) { public void onEnter() { try { getContext().unregisterAuthorizationUpdates(); + getContext().getZkClient().close(); } catch (final RuntimeException ex) { getLog().error("Unexpected fail during removing callback for registration updates", ex); + } catch (IOException e) { + getLog().error("Unexpected fail to release zk connection", e); } try { if (null != exception) { diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index b553dd2a7f..767dbdb43c 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -55,7 +55,7 @@ public abstract class AbstractZkSubscriptionClient implements ZkSubscriptionClie private final String subscriptionId; private final CuratorFramework defaultCurator; - private final CuratorFramework subscriptionCurator; + private final ZooKeeperHolder.CloseableCuratorFramework closeableCuratorFramework; private final String resetCursorPath; private final Logger log; private InterProcessSemaphoreMutex lock; @@ -64,16 +64,20 @@ public AbstractZkSubscriptionClient( final String subscriptionId, final ZooKeeperHolder zooKeeperHolder, final String loggingPath, - final long zkSessionTimeout) { + final long zkSessionTimeout) throws ZookeeperException { this.subscriptionId = subscriptionId; this.defaultCurator = zooKeeperHolder.get(); - this.subscriptionCurator = zooKeeperHolder.getSubscriptionCurator(zkSessionTimeout); + this.closeableCuratorFramework = zooKeeperHolder.getSubscriptionCurator(zkSessionTimeout); this.resetCursorPath = getSubscriptionPath("/cursor_reset"); this.log = LoggerFactory.getLogger(loggingPath + ".zk"); } protected CuratorFramework getCurator() { - return this.subscriptionCurator; + return this.closeableCuratorFramework.getCuratorFramework(); + } + + protected ZooKeeperHolder.CloseableCuratorFramework getCloseableCuratorFramework() { + return this.closeableCuratorFramework; } protected String getSubscriptionId() { @@ -474,4 +478,9 @@ protected abstract byte[] createTopologyAndOffsets(Collection partitions) /** * Retrieves subscription data like partitions and sessions from ZK without a lock + * * @return list of partitions and sessions wrapped in * {@link org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode} */