diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java index 3ab86e40cb..4a3ca9c55b 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/CursorsServiceAT.java @@ -39,6 +39,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -95,6 +96,7 @@ public void before() throws Exception { final ZooKeeperHolder zkHolder = mock(ZooKeeperHolder.class); when(zkHolder.get()).thenReturn(CURATOR); + when(zkHolder.getSubscriptionCurator(anyInt())).thenReturn(CURATOR); final TopicRepository topicRepository = mock(TopicRepository.class); final TimelineService timelineService = mock(TimelineService.class); @@ -109,7 +111,8 @@ public void before() throws Exception { when(subscriptionRepo.getSubscription(sid)).thenReturn(subscription); final SubscriptionCache subscriptionCache = mock(SubscriptionCache.class); when(subscriptionCache.getSubscription(sid)).thenReturn(subscription); - final SubscriptionClientFactory zkSubscriptionFactory = new SubscriptionClientFactory(zkHolder, MAPPER); + final SubscriptionClientFactory zkSubscriptionFactory = new SubscriptionClientFactory( + zkHolder, MAPPER, mock(NakadiSettings.class)); uuidGenerator = mock(UUIDGenerator.class); when(uuidGenerator.isUUID(any())).thenReturn(true); cursorsService = new CursorsService(subscriptionRepo, subscriptionCache, null, mock(NakadiSettings.class), diff --git a/src/main/java/org/zalando/nakadi/config/NakadiConfig.java b/src/main/java/org/zalando/nakadi/config/NakadiConfig.java index f1cf7f4fb6..89f828cb8f 100644 --- a/src/main/java/org/zalando/nakadi/config/NakadiConfig.java +++ b/src/main/java/org/zalando/nakadi/config/NakadiConfig.java @@ -14,11 +14,10 @@ import org.springframework.scheduling.annotation.EnableScheduling; import org.zalando.nakadi.domain.DefaultStorage; import org.zalando.nakadi.domain.Storage; -import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException; +import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.repository.db.StorageDbRepository; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory; import org.zalando.nakadi.service.StorageService; import java.util.Optional; @@ -34,11 +33,6 @@ public TaskExecutor taskExecutor() { return new SimpleAsyncTaskExecutor(); } - @Bean - public ZooKeeperLockFactory zooKeeperLockFactory(final ZooKeeperHolder zooKeeperHolder) { - return new ZooKeeperLockFactory(zooKeeperHolder); - } - @Bean @Qualifier("default_storage") public DefaultStorage defaultStorage(final StorageDbRepository storageDbRepository, diff --git a/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java b/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java index e510c003ce..5929f92a32 100644 --- a/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java +++ b/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java @@ -54,7 +54,8 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic kafkaConfiguration.getExhibitorAddress(), kafkaConfiguration.getExhibitorPort(), zookeeperSettings.getZkSessionTimeoutMs(), - zookeeperSettings.getZkConnectionTimeoutMs()); + zookeeperSettings.getZkConnectionTimeoutMs(), + nakadiSettings); final KafkaFactory kafkaFactory = new KafkaFactory(new KafkaLocationManager(zooKeeperHolder, kafkaSettings), metricRegistry); final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository(zooKeeperHolder, diff --git a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java index dcd708001f..0ff1d2b952 100644 --- a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java +++ b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperHolder.java @@ -9,9 +9,11 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.zalando.nakadi.config.NakadiSettings; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.TimeUnit; public class ZooKeeperHolder { @@ -25,21 +27,25 @@ public class ZooKeeperHolder { private final Integer exhibitorPort; private final Integer sessionTimeoutMs; private final Integer connectionTimeoutMs; + private final long maxCommitTimeoutMs; private CuratorFramework zooKeeper; + private CuratorFramework subscriptionCurator; public ZooKeeperHolder(final String zookeeperBrokers, final String zookeeperKafkaNamespace, final String exhibitorAddresses, final Integer exhibitorPort, final Integer sessionTimeoutMs, - final Integer connectionTimeoutMs) throws Exception { + final Integer connectionTimeoutMs, + final NakadiSettings nakadiSettings) throws Exception { this.zookeeperBrokers = zookeeperBrokers; this.zookeeperKafkaNamespace = zookeeperKafkaNamespace; this.exhibitorAddresses = exhibitorAddresses; this.exhibitorPort = exhibitorPort; this.sessionTimeoutMs = sessionTimeoutMs; this.connectionTimeoutMs = connectionTimeoutMs; + this.maxCommitTimeoutMs = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()); initExhibitor(); } @@ -65,12 +71,38 @@ private void initExhibitor() throws Exception { .connectionTimeoutMs(connectionTimeoutMs) .build(); zooKeeper.start(); + + subscriptionCurator = CuratorFrameworkFactory.builder() + .ensembleProvider(ensembleProvider) + .retryPolicy(retryPolicy) + // max commit timeout is not higher than 60 seconds, it is safe to cast to integer + .sessionTimeoutMs((int) maxCommitTimeoutMs) + .connectionTimeoutMs(connectionTimeoutMs) + .build(); + subscriptionCurator.start(); } public CuratorFramework get() { return zooKeeper; } + public CuratorFramework getSubscriptionCurator(final long sessionTimeoutMs) { + // most of the clients use default max timeout, subscriptionCurator client saves zookeeper resource + if (sessionTimeoutMs == maxCommitTimeoutMs) { + return subscriptionCurator; + } + + final CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() + .connectString(zooKeeper.getZookeeperClient().getCurrentConnectionString()) + .retryPolicy(new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX)) + // max commit timeout is not higher than 60 seconds, it is safe to cast to integer + .sessionTimeoutMs((int) sessionTimeoutMs) + .connectionTimeoutMs(connectionTimeoutMs) + .build(); + curatorFramework.start(); + return curatorFramework; + } + private class ExhibitorEnsembleProvider extends org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider { ExhibitorEnsembleProvider(final Exhibitors exhibitors, final ExhibitorRestClient restClient, diff --git a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperLockFactory.java b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperLockFactory.java deleted file mode 100644 index ba727658ad..0000000000 --- a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZooKeeperLockFactory.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.zalando.nakadi.repository.zookeeper; - -import org.apache.curator.framework.recipes.locks.InterProcessLock; -import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; - -public class ZooKeeperLockFactory { - - private final ZooKeeperHolder zkHolder; - - public ZooKeeperLockFactory(final ZooKeeperHolder zkHolder) { - this.zkHolder = zkHolder; - } - - public InterProcessLock createLock(final String path) { - return new InterProcessSemaphoreMutex(zkHolder.get(), path); - } -} diff --git a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperConfig.java b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperConfig.java index 00d331c9da..4a83f3c180 100644 --- a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperConfig.java +++ b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperConfig.java @@ -4,20 +4,23 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.core.env.Environment; +import org.zalando.nakadi.config.NakadiSettings; @Configuration @Profile("!test") public class ZookeeperConfig { @Bean - public ZooKeeperHolder zooKeeperHolder(final Environment environment) throws Exception { + public ZooKeeperHolder zooKeeperHolder(final Environment environment, + final NakadiSettings nakadiSettings) throws Exception { return new ZooKeeperHolder( environment.getProperty("nakadi.zookeeper.brokers"), environment.getProperty("nakadi.zookeeper.kafkaNamespace", ""), environment.getProperty("nakadi.zookeeper.exhibitor.brokers"), Integer.parseInt(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")), Integer.parseInt(environment.getProperty("nakadi.zookeeper.sessionTimeoutMs")), - Integer.parseInt(environment.getProperty("nakadi.zookeeper.connectionTimeoutMs")) + Integer.parseInt(environment.getProperty("nakadi.zookeeper.connectionTimeoutMs")), + nakadiSettings ); } } diff --git a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperUtils.java b/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperUtils.java deleted file mode 100644 index 34bdcb9953..0000000000 --- a/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperUtils.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.zalando.nakadi.repository.zookeeper; - -import org.apache.curator.framework.recipes.locks.InterProcessLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; - -public class ZookeeperUtils { - - private static final Logger LOG = LoggerFactory.getLogger(ZookeeperUtils.class); - - private ZookeeperUtils() { - } - - public static V runLocked(final Callable callable, final InterProcessLock lock) throws Exception { - lock.acquire(); - try { - return callable.call(); - } finally { - try { - lock.release(); - } catch (final Exception e) { - LOG.warn("Error occurred when releasing ZK lock", e); - } - } - } -} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java index 343a3e9bc2..ed2ade4028 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java @@ -20,6 +20,7 @@ import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; +import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.timeline.TimelineService; import java.util.concurrent.Executors; @@ -86,6 +87,11 @@ public SubscriptionStreamer build( final BlacklistService blacklistService) throws InternalNakadiException, NoSuchEventTypeException { final Session session = Session.generate(1, streamParameters.getPartitions()); + final ZkSubscriptionClient zkClient = zkClientFactory.createClient( + subscription, + LogPathBuilder.build(subscription.getId(), session.getId()), + streamParameters.commitTimeoutMillis); + // Create streaming context return new StreamingContext.Builder() .setOut(output) @@ -93,8 +99,7 @@ public SubscriptionStreamer build( .setParameters(streamParameters) .setSession(session) .setTimer(executorService) - .setZkClient(zkClientFactory.createClient( - subscription, LogPathBuilder.build(subscription.getId(), session.getId()))) + .setZkClient(zkClient) .setRebalancer(new SubscriptionRebalancer()) .setKafkaPollTimeout(kafkaPollTimeout) .setConnectionReady(connectionReady) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index f74c33f804..b553dd2a7f 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -20,6 +20,7 @@ import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.exceptions.runtime.UnableProcessException; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; +import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -46,30 +47,33 @@ import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; public abstract class AbstractZkSubscriptionClient implements ZkSubscriptionClient { - public static final int SECONDS_TO_WAIT_FOR_LOCK = 15; - protected static final String NODE_TOPOLOGY = "/topology"; + private static final int SECONDS_TO_WAIT_FOR_LOCK = 15; private static final String STATE_INITIALIZED = "INITIALIZED"; private static final int COMMIT_CONFLICT_RETRY_TIMES = 5; private static final int MAX_ZK_RESPONSE_SECONDS = 5; + protected static final String NODE_TOPOLOGY = "/topology"; private final String subscriptionId; - private final CuratorFramework curatorFramework; + private final CuratorFramework defaultCurator; + private final CuratorFramework subscriptionCurator; private final String resetCursorPath; private final Logger log; private InterProcessSemaphoreMutex lock; public AbstractZkSubscriptionClient( final String subscriptionId, - final CuratorFramework curatorFramework, - final String loggingPath) { + final ZooKeeperHolder zooKeeperHolder, + final String loggingPath, + final long zkSessionTimeout) { this.subscriptionId = subscriptionId; - this.curatorFramework = curatorFramework; + this.defaultCurator = zooKeeperHolder.get(); + this.subscriptionCurator = zooKeeperHolder.getSubscriptionCurator(zkSessionTimeout); this.resetCursorPath = getSubscriptionPath("/cursor_reset"); this.log = LoggerFactory.getLogger(loggingPath + ".zk"); } protected CuratorFramework getCurator() { - return this.curatorFramework; + return this.subscriptionCurator; } protected String getSubscriptionId() { @@ -93,7 +97,7 @@ public final T runLocked(final Callable function) { try { Exception releaseException = null; if (null == lock) { - lock = new InterProcessSemaphoreMutex(curatorFramework, getSubscriptionLockPath()); + lock = new InterProcessSemaphoreMutex(defaultCurator, getSubscriptionLockPath()); } final boolean acquired = lock.acquire(SECONDS_TO_WAIT_FOR_LOCK, TimeUnit.SECONDS); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index 0b82c9da82..710ed5cd61 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -3,11 +3,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.KeeperException; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; +import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -65,10 +65,11 @@ public class NewZkSubscriptionClient extends AbstractZkSubscriptionClient { public NewZkSubscriptionClient( final String subscriptionId, - final CuratorFramework curatorFramework, + final ZooKeeperHolder zooKeeperHolder, final String loggingPath, - final ObjectMapper objectMapper) { - super(subscriptionId, curatorFramework, loggingPath); + final ObjectMapper objectMapper, + final long zkSessionTimeout) { + super(subscriptionId, zooKeeperHolder, loggingPath, zkSessionTimeout); this.objectMapper = objectMapper; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/SubscriptionClientFactory.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/SubscriptionClientFactory.java index 47f005bc84..d0e64c87a6 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/SubscriptionClientFactory.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/SubscriptionClientFactory.java @@ -4,31 +4,46 @@ import com.google.common.base.Preconditions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.Subscription; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; +import java.util.concurrent.TimeUnit; + @Service public class SubscriptionClientFactory { + private final ZooKeeperHolder zkHolder; private final ObjectMapper objectMapper; + private final long maxCommitTimeoutMs; @Autowired public SubscriptionClientFactory( final ZooKeeperHolder zkHolder, - final ObjectMapper objectMapper) { + final ObjectMapper objectMapper, + final NakadiSettings nakadiSettings) { this.zkHolder = zkHolder; this.objectMapper = objectMapper; + this.maxCommitTimeoutMs = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()); } public ZkSubscriptionClient createClient(final Subscription subscription, final String loggingPath) throws InternalNakadiException, NoSuchEventTypeException { + return createClient(subscription, loggingPath, maxCommitTimeoutMs); + } + + public ZkSubscriptionClient createClient(final Subscription subscription, + final String loggingPath, + final long commitTimeoutMillis) + throws InternalNakadiException, NoSuchEventTypeException { Preconditions.checkNotNull(subscription.getId()); return new NewZkSubscriptionClient( subscription.getId(), - zkHolder.get(), + zkHolder, loggingPath, - objectMapper); + objectMapper, + commitTimeoutMillis); } }