Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1071 from zalando/ARUHA-2359
Browse files Browse the repository at this point in the history
create curator ensemble and close curator for short commit timeout
  • Loading branch information
adyach authored Jun 20, 2019
2 parents 46e5b18 + a788944 commit e2d7e2b
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public void before() throws Exception {

final ZooKeeperHolder zkHolder = mock(ZooKeeperHolder.class);
when(zkHolder.get()).thenReturn(CURATOR);
when(zkHolder.getSubscriptionCurator(anyInt())).thenReturn(CURATOR);
when(zkHolder.getSubscriptionCurator(anyInt()))
.thenReturn(new ZooKeeperHolder.DisposableCuratorFramework(CURATOR));

final TopicRepository topicRepository = mock(TopicRepository.class);
final TimelineService timelineService = mock(TimelineService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -47,62 +50,94 @@ public ZooKeeperHolder(final String zookeeperBrokers,
this.connectionTimeoutMs = connectionTimeoutMs;
this.maxCommitTimeoutMs = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout());

initExhibitor();
}

private void initExhibitor() throws Exception {
final RetryPolicy retryPolicy = new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX);
final EnsembleProvider ensembleProvider;
if (exhibitorAddresses != null) {
final Collection<String> exhibitorHosts = Arrays.asList(exhibitorAddresses.split("\\s*,\\s*"));
final Exhibitors exhibitors = new Exhibitors(exhibitorHosts, exhibitorPort,
() -> zookeeperBrokers + zookeeperKafkaNamespace);
final ExhibitorRestClient exhibitorRestClient = new DefaultExhibitorRestClient();
ensembleProvider = new ExhibitorEnsembleProvider(exhibitors,
exhibitorRestClient, "/exhibitor/v1/cluster/list", EXHIBITOR_POLLING_MS, retryPolicy);
((ExhibitorEnsembleProvider) ensembleProvider).pollForInitialEnsemble();
} else {
ensembleProvider = new FixedEnsembleProvider(zookeeperBrokers + zookeeperKafkaNamespace);
}
zooKeeper = CuratorFrameworkFactory.builder()
.ensembleProvider(ensembleProvider)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.build();
zooKeeper.start();

subscriptionCurator = CuratorFrameworkFactory.builder()
.ensembleProvider(ensembleProvider)
.retryPolicy(retryPolicy)
// max commit timeout is not higher than 60 seconds, it is safe to cast to integer
.sessionTimeoutMs((int) maxCommitTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.build();
subscriptionCurator.start();
zooKeeper = createCuratorFramework(sessionTimeoutMs, connectionTimeoutMs);
subscriptionCurator = createCuratorFramework((int) maxCommitTimeoutMs, connectionTimeoutMs);
}

public CuratorFramework get() {
return zooKeeper;
}

public CuratorFramework getSubscriptionCurator(final long sessionTimeoutMs) {
public CloseableCuratorFramework getSubscriptionCurator(final long sessionTimeoutMs) throws ZookeeperException {
// most of the clients use default max timeout, subscriptionCurator client saves zookeeper resource
if (sessionTimeoutMs == maxCommitTimeoutMs) {
return subscriptionCurator;
return new StaticCuratorFramework(subscriptionCurator);
}

try {
// max commit timeout is not higher than 60 seconds, it is safe to cast to integer
return new DisposableCuratorFramework(createCuratorFramework((int) sessionTimeoutMs, connectionTimeoutMs));
} catch (final Exception e) {
throw new ZookeeperException("Failed to create curator framework", e);
}
}

public abstract static class CloseableCuratorFramework implements Closeable {

private final CuratorFramework curatorFramework;

public CloseableCuratorFramework(final CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework;
}

public CuratorFramework getCuratorFramework() {
return curatorFramework;
}
}

public static class StaticCuratorFramework extends CloseableCuratorFramework {

public StaticCuratorFramework(final CuratorFramework curatorFramework) {
super(curatorFramework);
}

@Override
public void close() throws IOException {
// do not ever close this particular instance of curator
}
}

public static class DisposableCuratorFramework extends CloseableCuratorFramework {

public DisposableCuratorFramework(final CuratorFramework curatorFramework) {
super(curatorFramework);
}

@Override
public void close() throws IOException {
getCuratorFramework().close();
}
}

private CuratorFramework createCuratorFramework(final int sessionTimeoutMs,
final int connectionTimeoutMs) throws Exception {
final CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(zooKeeper.getZookeeperClient().getCurrentConnectionString())
.ensembleProvider(createEnsembleProvider())
.retryPolicy(new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX))
// max commit timeout is not higher than 60 seconds, it is safe to cast to integer
.sessionTimeoutMs((int) sessionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.build();
curatorFramework.start();
return curatorFramework;
}

private EnsembleProvider createEnsembleProvider() throws Exception {
final EnsembleProvider ensembleProvider;
final RetryPolicy retryPolicy = new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX);
if (exhibitorAddresses != null) {
final Collection<String> exhibitorHosts = Arrays.asList(exhibitorAddresses.split("\\s*,\\s*"));
final Exhibitors exhibitors = new Exhibitors(exhibitorHosts, exhibitorPort,
() -> zookeeperBrokers + zookeeperKafkaNamespace);
final ExhibitorRestClient exhibitorRestClient = new DefaultExhibitorRestClient();
ensembleProvider = new ExhibitorEnsembleProvider(exhibitors,
exhibitorRestClient, "/exhibitor/v1/cluster/list", EXHIBITOR_POLLING_MS, retryPolicy);
((ExhibitorEnsembleProvider) ensembleProvider).pollForInitialEnsemble();
} else {
ensembleProvider = new FixedEnsembleProvider(zookeeperBrokers + zookeeperKafkaNamespace);
}
return ensembleProvider;
}

private class ExhibitorEnsembleProvider extends org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider {

ExhibitorEnsembleProvider(final Exhibitors exhibitors, final ExhibitorRestClient restClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.zalando.nakadi.service.subscription.StreamingContext;

import javax.annotation.Nullable;
import java.io.IOException;

public class CleanupState extends State {
private final Exception exception;
Expand All @@ -19,8 +20,11 @@ public CleanupState(@Nullable final Exception e) {
public void onEnter() {
try {
getContext().unregisterAuthorizationUpdates();
getContext().getZkClient().close();
} catch (final RuntimeException ex) {
getLog().error("Unexpected fail during removing callback for registration updates", ex);
} catch (IOException e) {
getLog().error("Unexpected fail to release zk connection", e);
}
try {
if (null != exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class AbstractZkSubscriptionClient implements ZkSubscriptionClie

private final String subscriptionId;
private final CuratorFramework defaultCurator;
private final CuratorFramework subscriptionCurator;
private final ZooKeeperHolder.CloseableCuratorFramework closeableCuratorFramework;
private final String resetCursorPath;
private final Logger log;
private InterProcessSemaphoreMutex lock;
Expand All @@ -64,16 +64,20 @@ public AbstractZkSubscriptionClient(
final String subscriptionId,
final ZooKeeperHolder zooKeeperHolder,
final String loggingPath,
final long zkSessionTimeout) {
final long zkSessionTimeout) throws ZookeeperException {
this.subscriptionId = subscriptionId;
this.defaultCurator = zooKeeperHolder.get();
this.subscriptionCurator = zooKeeperHolder.getSubscriptionCurator(zkSessionTimeout);
this.closeableCuratorFramework = zooKeeperHolder.getSubscriptionCurator(zkSessionTimeout);
this.resetCursorPath = getSubscriptionPath("/cursor_reset");
this.log = LoggerFactory.getLogger(loggingPath + ".zk");
}

protected CuratorFramework getCurator() {
return this.subscriptionCurator;
return this.closeableCuratorFramework.getCuratorFramework();
}

protected ZooKeeperHolder.CloseableCuratorFramework getCloseableCuratorFramework() {
return this.closeableCuratorFramework;
}

protected String getSubscriptionId() {
Expand Down Expand Up @@ -474,4 +478,9 @@ protected abstract byte[] createTopologyAndOffsets(Collection<SubscriptionCursor
protected abstract byte[] serializeSession(Session session) throws NakadiRuntimeException;

protected abstract Session deserializeSession(String sessionId, byte[] sessionZkData) throws NakadiRuntimeException;

@Override
public void close() throws IOException {
getCloseableCuratorFramework().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.model.Session;
Expand Down Expand Up @@ -68,7 +69,7 @@ public NewZkSubscriptionClient(
final ZooKeeperHolder zooKeeperHolder,
final String loggingPath,
final ObjectMapper objectMapper,
final long zkSessionTimeout) {
final long zkSessionTimeout) throws ZookeeperException {
super(subscriptionId, zooKeeperHolder, loggingPath, zkSessionTimeout);
this.objectMapper = objectMapper;
}
Expand Down Expand Up @@ -215,5 +216,4 @@ public void transfer(final String sessionId, final Collection<EventTypePartition
changeSet.toArray(new Partition[changeSet.size()]));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;

import java.util.concurrent.TimeUnit;
Expand All @@ -30,14 +31,14 @@ public SubscriptionClientFactory(
}

public ZkSubscriptionClient createClient(final Subscription subscription, final String loggingPath)
throws InternalNakadiException, NoSuchEventTypeException {
throws InternalNakadiException, NoSuchEventTypeException, ZookeeperException {
return createClient(subscription, loggingPath, maxCommitTimeoutMs);
}

public ZkSubscriptionClient createClient(final Subscription subscription,
final String loggingPath,
final long commitTimeoutMillis)
throws InternalNakadiException, NoSuchEventTypeException {
throws InternalNakadiException, NoSuchEventTypeException, ZookeeperException {
Preconditions.checkNotNull(subscription.getId());
return new NewZkSubscriptionClient(
subscription.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.codec.binary.Hex;
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.NakadiBaseException;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.ZookeeperException;
Expand All @@ -25,7 +25,7 @@
import java.util.Optional;
import java.util.concurrent.Callable;

public interface ZkSubscriptionClient {
public interface ZkSubscriptionClient extends Closeable {

/**
* Makes runLocked on subscription, using zk path /nakadi/locks/subscription_{subscriptionId}
Expand Down Expand Up @@ -145,6 +145,7 @@ void transfer(String sessionId, Collection<EventTypePartition> partitions)

/**
* Retrieves subscription data like partitions and sessions from ZK without a lock
*
* @return list of partitions and sessions wrapped in
* {@link org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode}
*/
Expand Down

0 comments on commit e2d7e2b

Please sign in to comment.