Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1069 from zalando/ARUHA-2359
Browse files Browse the repository at this point in the history
zk session timeout the same as commit timeout
  • Loading branch information
adyach authored Jun 20, 2019
2 parents 5a36c91 + 712c4a9 commit 46e5b18
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -95,6 +96,7 @@ public void before() throws Exception {

final ZooKeeperHolder zkHolder = mock(ZooKeeperHolder.class);
when(zkHolder.get()).thenReturn(CURATOR);
when(zkHolder.getSubscriptionCurator(anyInt())).thenReturn(CURATOR);

final TopicRepository topicRepository = mock(TopicRepository.class);
final TimelineService timelineService = mock(TimelineService.class);
Expand All @@ -109,7 +111,8 @@ public void before() throws Exception {
when(subscriptionRepo.getSubscription(sid)).thenReturn(subscription);
final SubscriptionCache subscriptionCache = mock(SubscriptionCache.class);
when(subscriptionCache.getSubscription(sid)).thenReturn(subscription);
final SubscriptionClientFactory zkSubscriptionFactory = new SubscriptionClientFactory(zkHolder, MAPPER);
final SubscriptionClientFactory zkSubscriptionFactory = new SubscriptionClientFactory(
zkHolder, MAPPER, mock(NakadiSettings.class));
uuidGenerator = mock(UUIDGenerator.class);
when(uuidGenerator.isUUID(any())).thenReturn(true);
cursorsService = new CursorsService(subscriptionRepo, subscriptionCache, null, mock(NakadiSettings.class),
Expand Down
8 changes: 1 addition & 7 deletions src/main/java/org/zalando/nakadi/config/NakadiConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
import org.springframework.scheduling.annotation.EnableScheduling;
import org.zalando.nakadi.domain.DefaultStorage;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
import org.zalando.nakadi.exceptions.runtime.DuplicatedStorageException;
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
import org.zalando.nakadi.repository.db.StorageDbRepository;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory;
import org.zalando.nakadi.service.StorageService;

import java.util.Optional;
Expand All @@ -34,11 +33,6 @@ public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}

@Bean
public ZooKeeperLockFactory zooKeeperLockFactory(final ZooKeeperHolder zooKeeperHolder) {
return new ZooKeeperLockFactory(zooKeeperHolder);
}

@Bean
@Qualifier("default_storage")
public DefaultStorage defaultStorage(final StorageDbRepository storageDbRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic
kafkaConfiguration.getExhibitorAddress(),
kafkaConfiguration.getExhibitorPort(),
zookeeperSettings.getZkSessionTimeoutMs(),
zookeeperSettings.getZkConnectionTimeoutMs());
zookeeperSettings.getZkConnectionTimeoutMs(),
nakadiSettings);
final KafkaFactory kafkaFactory =
new KafkaFactory(new KafkaLocationManager(zooKeeperHolder, kafkaSettings), metricRegistry);
final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository(zooKeeperHolder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.zalando.nakadi.config.NakadiSettings;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;

public class ZooKeeperHolder {

Expand All @@ -25,21 +27,25 @@ public class ZooKeeperHolder {
private final Integer exhibitorPort;
private final Integer sessionTimeoutMs;
private final Integer connectionTimeoutMs;
private final long maxCommitTimeoutMs;

private CuratorFramework zooKeeper;
private CuratorFramework subscriptionCurator;

public ZooKeeperHolder(final String zookeeperBrokers,
final String zookeeperKafkaNamespace,
final String exhibitorAddresses,
final Integer exhibitorPort,
final Integer sessionTimeoutMs,
final Integer connectionTimeoutMs) throws Exception {
final Integer connectionTimeoutMs,
final NakadiSettings nakadiSettings) throws Exception {
this.zookeeperBrokers = zookeeperBrokers;
this.zookeeperKafkaNamespace = zookeeperKafkaNamespace;
this.exhibitorAddresses = exhibitorAddresses;
this.exhibitorPort = exhibitorPort;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.maxCommitTimeoutMs = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout());

initExhibitor();
}
Expand All @@ -65,12 +71,38 @@ private void initExhibitor() throws Exception {
.connectionTimeoutMs(connectionTimeoutMs)
.build();
zooKeeper.start();

subscriptionCurator = CuratorFrameworkFactory.builder()
.ensembleProvider(ensembleProvider)
.retryPolicy(retryPolicy)
// max commit timeout is not higher than 60 seconds, it is safe to cast to integer
.sessionTimeoutMs((int) maxCommitTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.build();
subscriptionCurator.start();
}

public CuratorFramework get() {
return zooKeeper;
}

public CuratorFramework getSubscriptionCurator(final long sessionTimeoutMs) {
// most of the clients use default max timeout, subscriptionCurator client saves zookeeper resource
if (sessionTimeoutMs == maxCommitTimeoutMs) {
return subscriptionCurator;
}

final CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(zooKeeper.getZookeeperClient().getCurrentConnectionString())
.retryPolicy(new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX))
// max commit timeout is not higher than 60 seconds, it is safe to cast to integer
.sessionTimeoutMs((int) sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.build();
curatorFramework.start();
return curatorFramework;
}

private class ExhibitorEnsembleProvider extends org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider {

ExhibitorEnsembleProvider(final Exhibitors exhibitors, final ExhibitorRestClient restClient,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.zalando.nakadi.config.NakadiSettings;

@Configuration
@Profile("!test")
public class ZookeeperConfig {

@Bean
public ZooKeeperHolder zooKeeperHolder(final Environment environment) throws Exception {
public ZooKeeperHolder zooKeeperHolder(final Environment environment,
final NakadiSettings nakadiSettings) throws Exception {
return new ZooKeeperHolder(
environment.getProperty("nakadi.zookeeper.brokers"),
environment.getProperty("nakadi.zookeeper.kafkaNamespace", ""),
environment.getProperty("nakadi.zookeeper.exhibitor.brokers"),
Integer.parseInt(environment.getProperty("nakadi.zookeeper.exhibitor.port", "0")),
Integer.parseInt(environment.getProperty("nakadi.zookeeper.sessionTimeoutMs")),
Integer.parseInt(environment.getProperty("nakadi.zookeeper.connectionTimeoutMs"))
Integer.parseInt(environment.getProperty("nakadi.zookeeper.connectionTimeoutMs")),
nakadiSettings
);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.zalando.nakadi.service.NakadiKpiPublisher;
import org.zalando.nakadi.service.subscription.model.Session;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.timeline.TimelineService;

import java.util.concurrent.Executors;
Expand Down Expand Up @@ -86,15 +87,19 @@ public SubscriptionStreamer build(
final BlacklistService blacklistService)
throws InternalNakadiException, NoSuchEventTypeException {
final Session session = Session.generate(1, streamParameters.getPartitions());
final ZkSubscriptionClient zkClient = zkClientFactory.createClient(
subscription,
LogPathBuilder.build(subscription.getId(), session.getId()),
streamParameters.commitTimeoutMillis);

// Create streaming context
return new StreamingContext.Builder()
.setOut(output)
.setStreamMemoryLimitBytes(streamMemoryLimitBytes)
.setParameters(streamParameters)
.setSession(session)
.setTimer(executorService)
.setZkClient(zkClientFactory.createClient(
subscription, LogPathBuilder.build(subscription.getId(), session.getId())))
.setZkClient(zkClient)
.setRebalancer(new SubscriptionRebalancer())
.setKafkaPollTimeout(kafkaPollTimeout)
.setConnectionReady(connectionReady)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.service.subscription.model.Session;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

Expand All @@ -46,30 +47,33 @@
import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry;

public abstract class AbstractZkSubscriptionClient implements ZkSubscriptionClient {
public static final int SECONDS_TO_WAIT_FOR_LOCK = 15;
protected static final String NODE_TOPOLOGY = "/topology";
private static final int SECONDS_TO_WAIT_FOR_LOCK = 15;
private static final String STATE_INITIALIZED = "INITIALIZED";
private static final int COMMIT_CONFLICT_RETRY_TIMES = 5;
private static final int MAX_ZK_RESPONSE_SECONDS = 5;
protected static final String NODE_TOPOLOGY = "/topology";

private final String subscriptionId;
private final CuratorFramework curatorFramework;
private final CuratorFramework defaultCurator;
private final CuratorFramework subscriptionCurator;
private final String resetCursorPath;
private final Logger log;
private InterProcessSemaphoreMutex lock;

public AbstractZkSubscriptionClient(
final String subscriptionId,
final CuratorFramework curatorFramework,
final String loggingPath) {
final ZooKeeperHolder zooKeeperHolder,
final String loggingPath,
final long zkSessionTimeout) {
this.subscriptionId = subscriptionId;
this.curatorFramework = curatorFramework;
this.defaultCurator = zooKeeperHolder.get();
this.subscriptionCurator = zooKeeperHolder.getSubscriptionCurator(zkSessionTimeout);
this.resetCursorPath = getSubscriptionPath("/cursor_reset");
this.log = LoggerFactory.getLogger(loggingPath + ".zk");
}

protected CuratorFramework getCurator() {
return this.curatorFramework;
return this.subscriptionCurator;
}

protected String getSubscriptionId() {
Expand All @@ -93,7 +97,7 @@ public final <T> T runLocked(final Callable<T> function) {
try {
Exception releaseException = null;
if (null == lock) {
lock = new InterProcessSemaphoreMutex(curatorFramework, getSubscriptionLockPath());
lock = new InterProcessSemaphoreMutex(defaultCurator, getSubscriptionLockPath());
}

final boolean acquired = lock.acquire(SECONDS_TO_WAIT_FOR_LOCK, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.model.Session;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;
Expand Down Expand Up @@ -65,10 +65,11 @@ public class NewZkSubscriptionClient extends AbstractZkSubscriptionClient {

public NewZkSubscriptionClient(
final String subscriptionId,
final CuratorFramework curatorFramework,
final ZooKeeperHolder zooKeeperHolder,
final String loggingPath,
final ObjectMapper objectMapper) {
super(subscriptionId, curatorFramework, loggingPath);
final ObjectMapper objectMapper,
final long zkSessionTimeout) {
super(subscriptionId, zooKeeperHolder, loggingPath, zkSessionTimeout);
this.objectMapper = objectMapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,46 @@
import com.google.common.base.Preconditions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;

import java.util.concurrent.TimeUnit;

@Service
public class SubscriptionClientFactory {

private final ZooKeeperHolder zkHolder;
private final ObjectMapper objectMapper;
private final long maxCommitTimeoutMs;

@Autowired
public SubscriptionClientFactory(
final ZooKeeperHolder zkHolder,
final ObjectMapper objectMapper) {
final ObjectMapper objectMapper,
final NakadiSettings nakadiSettings) {
this.zkHolder = zkHolder;
this.objectMapper = objectMapper;
this.maxCommitTimeoutMs = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout());
}

public ZkSubscriptionClient createClient(final Subscription subscription, final String loggingPath)
throws InternalNakadiException, NoSuchEventTypeException {
return createClient(subscription, loggingPath, maxCommitTimeoutMs);
}

public ZkSubscriptionClient createClient(final Subscription subscription,
final String loggingPath,
final long commitTimeoutMillis)
throws InternalNakadiException, NoSuchEventTypeException {
Preconditions.checkNotNull(subscription.getId());
return new NewZkSubscriptionClient(
subscription.getId(),
zkHolder.get(),
zkHolder,
loggingPath,
objectMapper);
objectMapper,
commitTimeoutMillis);
}
}

0 comments on commit 46e5b18

Please sign in to comment.