From ebabc22c07dd9276b0e9a58cb2fcf42b9aa5738e Mon Sep 17 00:00:00 2001 From: HuangWei Date: Mon, 8 Jan 2024 10:50:40 +0800 Subject: [PATCH] fix: taskmanager ha reconnect (#3668) --- .../taskmanager/server/TaskManagerServer.java | 7 +- .../taskmanager/zk/FailoverWatcher.java | 584 ++++++++++-------- .../taskmanager/zk/RecoverableZooKeeper.java | 181 ++++++ .../taskmanager/zk/ZooKeeperUtil.java | 197 +++--- 4 files changed, 593 insertions(+), 376 deletions(-) create mode 100644 java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/RecoverableZooKeeper.java diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerServer.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerServer.java index 0a75c2e37b2..031b02764e7 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerServer.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/server/TaskManagerServer.java @@ -70,9 +70,12 @@ public void start(Boolean blocking) throws ConfigException, IOException, Interru logger.info("The server becomes active master and prepare to do business logic"); if (TaskManagerConfig.getTrackUnfinishedJobs()) { // Start threads to track unfinished jobs - JobTrackerService.startTrackerThreads(); + JobTrackerService.startTrackerThreads(); // may throw exception + } + // if blocking, start a bg thread to reconnect zk + if (blocking) { + failoverWatcher.startReconnectThread(); } - // Start brpc server startRpcServer(blocking); } diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/FailoverWatcher.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/FailoverWatcher.java index 69c7689bd45..c176919d92c 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/FailoverWatcher.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/FailoverWatcher.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -32,316 +32,354 @@ * blocking and notification for leader election. */ public class FailoverWatcher implements Watcher { - private static final Log LOG = LogFactory.getLog(FailoverWatcher.class); + private static final Log LOG = LogFactory.getLog(FailoverWatcher.class); - private final String baseZnode; - private final String masterZnode; - private final String zkQuorum; - private final int sessionTimeout; - private final int connectRetryTimes; - private final HostPort hostPort; - private ZooKeeper zooKeeper; - private final AtomicBoolean hasActiveServer = new AtomicBoolean(false); - private final AtomicBoolean becomeActiveServer = new AtomicBoolean(false); + private final String baseZnode; + private final String masterZnode; + private final String zkQuorum; + private final int sessionTimeout; + private final int connectRetryTimes; + private final HostPort hostPort; + private RecoverableZooKeeper zooKeeper; // thread-safe + private final AtomicBoolean connected = new AtomicBoolean(false); // record zookeeper connection status + private final AtomicBoolean hasActiveServer = new AtomicBoolean(false); + private final AtomicBoolean becomeActiveServer = new AtomicBoolean(false); - /** - * Initialize FailoverWatcher with properties. - * - * @throws IOException throw when can't connect with ZooKeeper - */ - public FailoverWatcher() throws IOException { + /** + * Initialize FailoverWatcher with properties. + * + * @throws IOException throw when can't connect with ZooKeeper + */ + public FailoverWatcher() throws IOException { - baseZnode = TaskManagerConfig.getZkRootPath() + "/taskmanager"; - masterZnode = baseZnode + "/leader"; - zkQuorum = TaskManagerConfig.getZkCluster(); - sessionTimeout = TaskManagerConfig.getZkSessionTimeout(); - connectRetryTimes = 3; - String serverHost = TaskManagerConfig.getServerHost(); - int serverPort = TaskManagerConfig.getServerPort(); - hostPort = new HostPort(serverHost, serverPort); + baseZnode = TaskManagerConfig.getZkRootPath() + "/taskmanager"; + masterZnode = baseZnode + "/leader"; + zkQuorum = TaskManagerConfig.getZkCluster(); + sessionTimeout = TaskManagerConfig.getZkSessionTimeout(); + connectRetryTimes = 3; + String serverHost = TaskManagerConfig.getServerHost(); + int serverPort = TaskManagerConfig.getServerPort(); + hostPort = new HostPort(serverHost, serverPort); - connectZooKeeper(); + connectZooKeeper(); - initZnode(); - } + initZnode(); + } - /** - * Connect with ZooKeeper with retries. - * - * @throws IOException when error to construct ZooKeeper object after retrying - */ - protected void connectZooKeeper() throws IOException { - LOG.info("Connecting ZooKeeper " + zkQuorum); + /** + * Connect with ZooKeeper with retries. + * + * @throws IOException when error to construct ZooKeeper object after retrying + */ + protected void connectZooKeeper() throws IOException { + LOG.info("Connecting ZooKeeper " + zkQuorum); - for (int i = 0; i <= connectRetryTimes; i++) { - try { - zooKeeper = new ZooKeeper(zkQuorum, sessionTimeout, this); - break; - } catch (IOException e) { - if (i == connectRetryTimes) { - throw new IOException("Can't connect ZooKeeper after retrying", e); + for (int i = 0; i <= connectRetryTimes; i++) { + try { + zooKeeper = new RecoverableZooKeeper(zkQuorum, sessionTimeout, this); + break; + } catch (IOException e) { + if (i == connectRetryTimes) { + throw new IOException("Can't connect ZooKeeper after retrying", e); + } + LOG.error("Exception to connect ZooKeeper, retry " + (i + 1) + " times"); + } } - LOG.error("Exception to connect ZooKeeper, retry " + (i + 1) + " times"); - } } - } - /** - * Initialize the base znodes. - */ - protected void initZnode() { - try { - ZooKeeperUtil.createAndFailSilent(this, TaskManagerConfig.getZkRootPath()); - ZooKeeperUtil.createAndFailSilent(this, baseZnode); - } catch (Exception e) { - LOG.fatal("Error to create znode " + baseZnode - + ", exit immediately", e); - System.exit(0); + /** + * Initialize the base znodes. + */ + protected void initZnode() { + try { + ZooKeeperUtil.createAndFailSilent(this, TaskManagerConfig.getZkRootPath()); + ZooKeeperUtil.createAndFailSilent(this, baseZnode); + } catch (Exception e) { + LOG.fatal("Error to create znode " + baseZnode + ", exit immediately", e); + System.exit(0); + } } - } - /** - * Override this mothod to deal with events for leader election. - * - * @param event the ZooKeeper event - */ - @Override - public void process(WatchedEvent event) { - if (LOG.isDebugEnabled()) { - LOG.debug("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state=" - + event.getState() + ", " + "path=" + event.getPath()); - } + /** + * Override this mothod to deal with events for leader election. + * + * @param event the ZooKeeper event + */ + @Override + public void process(WatchedEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received ZooKeeper Event, " + "type=" + event.getType() + ", " + "state=" + event.getState() + + ", " + "path=" + event.getPath()); + } - switch (event.getType()) { - case None: { - processConnection(event); - break; - } - case NodeCreated: { - processNodeCreated(event.getPath()); - break; - } - case NodeDeleted: { - processNodeDeleted(event.getPath()); - break; - } - case NodeDataChanged: { - processDataChanged(event.getPath()); - break; - } - case NodeChildrenChanged: { - processNodeChildrenChanged(event.getPath()); - break; - } - default: - break; + switch (event.getType()) { + case None: { + processConnection(event); + break; + } + case NodeCreated: { + processNodeCreated(event.getPath()); + break; + } + case NodeDeleted: { + processNodeDeleted(event.getPath()); + break; + } + case NodeDataChanged: { + processDataChanged(event.getPath()); + break; + } + case NodeChildrenChanged: { + processNodeChildrenChanged(event.getPath()); + break; + } + default: + break; + } } - } - - /** - * Deal with connection event, exit current process if auth fails or session expires. - * - * @param event the ZooKeeper event - */ - protected void processConnection(WatchedEvent event) { - switch (event.getState()) { - case SyncConnected: - LOG.info(hostPort.getHostPort() + " sync connect from ZooKeeper"); - try { - waitToInitZooKeeper(2000); // init zookeeper in another thread, wait for a while - } catch (Exception e) { - LOG.fatal("Error to init ZooKeeper object after sleeping 2000 ms, exit immediately"); - System.exit(0); - } - break; - /* - case Disconnected: // be triggered when kill the server or the leader of zk cluster - LOG.warn(hostPort.getHostPort() + " received disconnected from ZooKeeper"); - if (becomeActiveServer.get()) { - // Exit if this is master and disconnect from ZK - System.exit(0); - } - break; - */ - case AuthFailed: - LOG.fatal(hostPort.getHostPort() + " auth fail, exit immediately"); - System.exit(0); - case Expired: - LOG.fatal(hostPort.getHostPort() + " received expired from ZooKeeper, exit immediately"); - System.exit(0); - break; - default: - break; + /** + * Deal with connection event, exit current process if auth fails or session expires. + * + * @param event the ZooKeeper event + */ + protected void processConnection(WatchedEvent event) { + switch (event.getState()) { + case SyncConnected: + LOG.info(hostPort.getHostPort() + " sync connect from ZooKeeper"); + try { + waitToInitZooKeeper(2000); // init zookeeper in another thread, wait for a while + } catch (Exception e) { + LOG.fatal("Error to init ZooKeeper object after sleeping 2000 ms, exit immediately"); + System.exit(0); + } + LOG.info(hostPort.getHostPort() + " init ZooKeeper object successfully, session id is 0x" + Long.toHexString( + zooKeeper.getSessionId())); + connected.set(true); + break; + /* + case Disconnected: // be triggered when kill the server or the leader of zk cluster + LOG.warn(hostPort.getHostPort() + " received disconnected from ZooKeeper"); + + if (becomeActiveServer.get()) { + // Exit if this is master and disconnect from ZK + System.exit(0); + } + break; + */ + case AuthFailed: + LOG.fatal(hostPort.getHostPort() + " auth fail, exit immediately"); + System.exit(0); + case Expired: + LOG.warn(hostPort.getHostPort() + " received expired from ZooKeeper"); + default: + // expired or other unknown state: mark as disconnected + connected.set(false); + break; + } } - } - /** - * Deal with create node event, just call the leader election. - * - * @param path which znode is created - */ - protected void processNodeCreated(String path) { - if (path.equals(masterZnode)) { - LOG.info(masterZnode + " created and try to become active master"); - handleMasterNodeChange(); + /** + * Deal with create node event, just call the leader election. + * + * @param path which znode is created + */ + protected void processNodeCreated(String path) { + if (path.equals(masterZnode)) { + LOG.info(masterZnode + " created and try to become active master"); + handleMasterNodeChange(); + } } - } - /** - * Deal with delete node event, just call the leader election. - * - * @param path which znode is deleted - */ - protected void processNodeDeleted(String path) { - if (path.equals(masterZnode)) { - LOG.info(masterZnode + " deleted and try to become active master"); - handleMasterNodeChange(); + /** + * Deal with delete node event, just call the leader election. + * + * @param path which znode is deleted + */ + protected void processNodeDeleted(String path) { + if (path.equals(masterZnode)) { + LOG.info(masterZnode + " deleted and try to become active master"); + handleMasterNodeChange(); + } } - } - /** - * Do nothing when data changes, should be overrided. - * - * @param path which znode's data is changed - */ - protected void processDataChanged(String path) { + /** + * Do nothing when data changes, should be overrided. + * + * @param path which znode's data is changed + */ + protected void processDataChanged(String path) { - } + } - /** - * Do nothing when children znode changes, should be overrided. - * - * @param path which znode's children is changed. - */ - protected void processNodeChildrenChanged(String path) { + /** + * Do nothing when children znode changes, should be overrided. + * + * @param path which znode's children is changed. + */ + protected void processNodeChildrenChanged(String path) { - } + } - /** - * Implement the logic of leader election. - */ - private void handleMasterNodeChange() { - try { - synchronized (hasActiveServer) { - if (ZooKeeperUtil.watchAndCheckExists(this, masterZnode)) { - // A master node exists, there is an active master - if (LOG.isDebugEnabled()) { - LOG.debug("A master is now available"); - } - hasActiveServer.set(true); - } else { - // Node is no longer there, cluster does not have an active master - if (LOG.isDebugEnabled()) { - LOG.debug("No master available. Notifying waiting threads"); - } - hasActiveServer.set(false); - // Notify any thread waiting to become the active master - hasActiveServer.notifyAll(); + /** + * Implement the logic of leader election. + */ + private void handleMasterNodeChange() { + try { + synchronized (hasActiveServer) { + if (ZooKeeperUtil.watchAndCheckExists(this, masterZnode)) { + // A master node exists, there is an active master + if (LOG.isDebugEnabled()) { + LOG.debug("A master is now available"); + } + hasActiveServer.set(true); + } else { + // Node is no longer there, cluster does not have an active master + if (LOG.isDebugEnabled()) { + LOG.debug("No master available. Notifying waiting threads"); + } + hasActiveServer.set(false); + // Notify any thread waiting to become the active master + hasActiveServer.notifyAll(); + } + } + } catch (KeeperException ke) { + LOG.error("Received an unexpected KeeperException, aborting", ke); } - } - } catch (KeeperException ke) { - LOG.error("Received an unexpected KeeperException, aborting", ke); } - } - /** - * Implement the logic of server to wait to become active master. - * - * @return false if error to wait to become active master - */ - public boolean blockUntilActive() { - while (true) { - try { - if (ZooKeeperUtil.createEphemeralNodeAndWatch(this, masterZnode, hostPort.getHostPort() - .getBytes())) { + /** + * Implement the logic of server to wait to become active master. + * + * @return false if error to wait to become active master + */ + public boolean blockUntilActive() { + while (true) { + try { + if (ZooKeeperUtil.createEphemeralNodeAndWatch(this, masterZnode, hostPort.getHostPort().getBytes())) { + // We are the master, return + hasActiveServer.set(true); + becomeActiveServer.set(true); + LOG.info("Become active master in " + hostPort.getHostPort()); + return true; + } - // We are the master, return - hasActiveServer.set(true); - becomeActiveServer.set(true); - LOG.info("Become active master in " + hostPort.getHostPort()); - return true; - } + hasActiveServer.set(true); - hasActiveServer.set(true); + // we start the server with the same ip_port stored in master znode, that means we want to + // restart the server? + String msg; + byte[] bytes = ZooKeeperUtil.getDataAndWatch(this, masterZnode); + if (bytes == null) { + msg = ("A master was detected, but went down before its address " + + "could be read. Attempting to become the next active master"); + } else { + if (hostPort.getHostPort().equals(new String(bytes))) { + msg = ("Current master has this master's address, " + hostPort.getHostPort() + + "; master was restarted? Deleting node."); + // Hurry along the expiration of the znode. + ZooKeeperUtil.deleteNode(this, masterZnode); + } else { + msg = "Another master " + new String(bytes) + " is the active master, " + hostPort.getHostPort() + + "; waiting to become the next active master"; + } + } + LOG.info(msg); + } catch (KeeperException ke) { + LOG.error("Received an unexpected KeeperException when block to become active, aborting", ke); + return false; + } - // we start the server with the same ip_port stored in master znode, that means we want to - // restart the server? - String msg; - byte[] bytes = ZooKeeperUtil.getDataAndWatch(this, masterZnode); - if (bytes == null) { - msg = ("A master was detected, but went down before its address " - + "could be read. Attempting to become the next active master"); - } else { - if (hostPort.getHostPort().equals(new String(bytes))) { - msg = ("Current master has this master's address, " + hostPort.getHostPort() + "; master was restarted? Deleting node."); - // Hurry along the expiration of the znode. - ZooKeeperUtil.deleteNode(this, masterZnode); - } else { - msg = "Another master " + new String(bytes) + " is the active master, " - + hostPort.getHostPort() + "; waiting to become the next active master"; - } + synchronized (hasActiveServer) { + while (hasActiveServer.get()) { + try { + hasActiveServer.wait(); + } catch (InterruptedException e) { + // We expect to be interrupted when a master dies, will fall out if so + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted while waiting to be master"); + } + return false; + } + } + } } - LOG.info(msg); - } catch (KeeperException ke) { - LOG.error("Received an unexpected KeeperException when block to become active, aborting", - ke); - return false; - } + } - synchronized (hasActiveServer) { - while (hasActiveServer.get()) { - try { - hasActiveServer.wait(); - } catch (InterruptedException e) { - // We expect to be interrupted when a master dies, will fall out if so - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupted while waiting to be master"); + /** + * Close the ZooKeeper object. + */ + public void close() { + if (zooKeeper != null) { + try { + connected.set(false); + zooKeeper.close(); + } catch (InterruptedException e) { + LOG.error("Interrupt when closing zookeeper connection", e); } - return false; - } } - } } - } - /** - * Close the ZooKeeper object. - */ - public void close() { - if (zooKeeper != null) { - try { - zooKeeper.close(); - } catch (InterruptedException e) { - LOG.error("Interrupt when closing zookeeper connection", e); - } - } - } + /** + * Wait to init ZooKeeper object, only sleep when it's null. + * + * @param maxWaitMillis the max sleep time + * @throws Exception if ZooKeeper object is still null + */ + public void waitToInitZooKeeper(long maxWaitMillis) throws Exception { + long finished = System.currentTimeMillis() + maxWaitMillis; + while (System.currentTimeMillis() < finished) { + if (this.zooKeeper != null) { + return; + } - /** - * Wait to init ZooKeeper object, only sleep when it's null. - * - * @param maxWaitMillis the max sleep time - * @throws Exception if ZooKeeper object is still null - */ - public void waitToInitZooKeeper(long maxWaitMillis) throws Exception { - long finished = System.currentTimeMillis() + maxWaitMillis; - while (System.currentTimeMillis() < finished) { - if (this.zooKeeper != null) { - return; - } - - try { - Thread.sleep(1); - } catch (InterruptedException e) { - throw new Exception(e); - } + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new Exception(e); + } + } + throw new Exception(); } - throw new Exception(); - } - public ZooKeeper getZooKeeper() { - return zooKeeper; - } + public ZooKeeper getZooKeeper() throws KeeperException { + return zooKeeper.checkZk(); // return raw zookeeper here + } -} \ No newline at end of file + // we can't know from the zookeeper client whether it's connected or not, so we use a flag to record it + // Another way is reconnect after get expired exception when get/set from zk + public void startReconnectThread() { + // TODO: just create a thread now, if more threads for new tasks, should use thread pool + // becomeActiveServer will be set only in this thread + new Thread(new Runnable() { + @Override + public void run() { + while (true) { + try { + Thread.sleep(2000); + synchronized (connected) { + if (connected.get() == false) { + LOG.info("Try to reconnect ZooKeeper"); + // set connected in event + zooKeeper.reconnectAfterExpiration(); + becomeActiveServer.set(false); + // Won't init znode again to avoid exit in initZnode, it's already created in init. + // If znode is deleted, should restart taskmanager + } + } + synchronized(connected) { + if (connected.get() == true && becomeActiveServer.get() == false) { + LOG.info("Available cxn, try to become active master after reconnecting ZooKeeper"); + if(!blockUntilActive()) { + LOG.warn("block failed, try later"); + } + } + } + } catch (Exception e) { + LOG.error("Error to reconnect ZooKeeper", e); + } + } + } + }).start(); + } +} diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/RecoverableZooKeeper.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/RecoverableZooKeeper.java new file mode 100644 index 00000000000..9ff2b9349b4 --- /dev/null +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/RecoverableZooKeeper.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com._4paradigm.openmldb.taskmanager.zk; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.proto.SetDataRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * ref + * https://github.com/apache/hbase/blob/25e9228e2c0a9a752db02e48d55010e0197fd203/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java + * It's a thread-safe class. No opentelemetry trace, and no retry mechanism. If + * we need retry, we can include RetryCounter. + */ +@ThreadSafe +public class RecoverableZooKeeper { + private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class); + // the actual ZooKeeper client instance + private ZooKeeper zk; + // private final RetryCounterFactory retryCounterFactory; + // An identifier of this process in the cluster + private final String identifier; + private final byte[] id; + private final Watcher watcher; + private final int sessionTimeout; + private final String quorumServers; + private final int maxMultiSize; // unused now + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", justification = "None. Its always been this way.") + public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher) throws IOException { + // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers + // String as we should. + String identifier = null; + if (identifier == null || identifier.length() == 0) { + // the identifier = processID@hostName + identifier = ManagementFactory.getRuntimeMXBean().getName(); + } + LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier, quorumServers); + this.identifier = identifier; + this.id = identifier.getBytes(StandardCharsets.UTF_8.name()); + + this.watcher = watcher; + this.sessionTimeout = sessionTimeout; + this.quorumServers = quorumServers; + this.maxMultiSize = 1024 * 1024; + + try { + checkZk(); + } catch (Exception x) { + /* ignore */ + } + } + + /** + * Returns the maximum size (in bytes) that should be included in any single + * multi() call. NB: + * This is an approximation, so there may be variance in the msg actually sent + * over the wire. + * Please be sure to set this approximately, with respect to your ZK server + * configuration for + * jute.maxbuffer. + */ + public int getMaxMultiSizeLimit() { + return maxMultiSize; + } + + /** + * Try to create a ZooKeeper connection. Turns any exception encountered into a + * KeeperException.OperationTimeoutException so it can retried. + * + * @return The created ZooKeeper connection object + * @throws KeeperException if a ZooKeeper operation fails + */ + protected synchronized ZooKeeper checkZk() throws KeeperException { + if (this.zk == null) { + try { + this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); + } catch (IOException ex) { + LOG.warn("Unable to create ZooKeeper Connection", ex); + throw new KeeperException.OperationTimeoutException(); + } + } + return zk; + } + + public synchronized void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { + if (zk != null) { + LOG.info("Closing dead ZooKeeper connection, session" + " was: 0x" + Long.toHexString(zk.getSessionId())); + zk.close(); + // reset the ZooKeeper connection + zk = null; + } + checkZk(); + LOG.info("Recreated a ZooKeeper, session" + " is: 0x" + Long.toHexString(zk.getSessionId())); + } + + public synchronized long getSessionId() { + return zk == null ? -1 : zk.getSessionId(); + } + + public synchronized void close() throws InterruptedException { + if (zk != null) { + zk.close(); + } + } + + public synchronized States getState() { + return zk == null ? null : zk.getState(); + } + + public synchronized ZooKeeper getZooKeeper() { + return zk; + } + + public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { + checkZk().sync(path, cb, ctx); + } + + /** + * Filters the given node list by the given prefixes. This method is + * all-inclusive--if any element + * in the node list starts with any of the given prefixes, then it is included + * in the result. + * + * @param nodes the nodes to filter + * @param prefixes the prefixes to include in the result + * @return list of every element that starts with one of the prefixes + */ + private static List filterByPrefix(List nodes, String... prefixes) { + List lockChildren = new ArrayList<>(); + for (String child : nodes) { + for (String prefix : prefixes) { + if (child.startsWith(prefix)) { + lockChildren.add(child); + break; + } + } + } + return lockChildren; + } + + public String getIdentifier() { + return identifier; + } +} diff --git a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/ZooKeeperUtil.java b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/ZooKeeperUtil.java index 406488292a7..c2283629ee7 100644 --- a/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/ZooKeeperUtil.java +++ b/java/openmldb-taskmanager/src/main/java/com/_4paradigm/openmldb/taskmanager/zk/ZooKeeperUtil.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -31,119 +31,114 @@ * Most code is from HBase ZKUtil and ZooKeeperWatcher is replaced with FailoverWatcher. */ public class ZooKeeperUtil { - private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtil.class); + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtil.class); - public static boolean watchAndCheckExists(FailoverWatcher failoverWatcher, String znode) - throws KeeperException { - try { - Stat s = failoverWatcher.getZooKeeper().exists(znode, failoverWatcher); - boolean exists = s != null; - if (LOG.isDebugEnabled()) { - if (exists) { - LOG.debug("Set watcher on existing znode " + znode); - } else { - LOG.debug(znode + " does not exist. Watcher is set."); + public static boolean watchAndCheckExists(FailoverWatcher failoverWatcher, String znode) throws KeeperException { + try { + Stat s = failoverWatcher.getZooKeeper().exists(znode, failoverWatcher); + boolean exists = s != null; + if (LOG.isDebugEnabled()) { + if (exists) { + LOG.debug("Set watcher on existing znode " + znode); + } else { + LOG.debug(znode + " does not exist. Watcher is set."); + } + } + return exists; + } catch (KeeperException e) { + LOG.warn("Unable to set watcher on znode " + znode, e); + LOG.warn("Received unexpected KeeperException, re-throwing exception"); + throw e; + } catch (InterruptedException e) { + LOG.warn("Unable to set watcher on znode " + znode, e); + return false; } - } - return exists; - } catch (KeeperException e) { - LOG.warn("Unable to set watcher on znode " + znode, e); - LOG.warn("Received unexpected KeeperException, re-throwing exception"); - throw e; - } catch (InterruptedException e) { - LOG.warn("Unable to set watcher on znode " + znode, e); - return false; } - } - public static boolean createEphemeralNodeAndWatch(FailoverWatcher failoverWatcher, String znode, - byte[] data) throws KeeperException { - try { - LOG.info("Try to create emphemeral znode " + znode); - failoverWatcher.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } catch (KeeperException.NodeExistsException nee) { - if (!watchAndCheckExists(failoverWatcher, znode)) { - // It did exist but now it doesn't, try again - return createEphemeralNodeAndWatch(failoverWatcher, znode, data); - } - return false; - } catch (InterruptedException e) { - LOG.info("Interrupted", e); - Thread.currentThread().interrupt(); + public static boolean createEphemeralNodeAndWatch(FailoverWatcher failoverWatcher, String znode, byte[] data) + throws KeeperException { + try { + LOG.info("Try to create emphemeral znode " + znode); + failoverWatcher.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } catch (KeeperException.NodeExistsException nee) { + if (!watchAndCheckExists(failoverWatcher, znode)) { + // It did exist but now it doesn't, try again + return createEphemeralNodeAndWatch(failoverWatcher, znode, data); + } + return false; + } catch (InterruptedException e) { + LOG.info("Interrupted", e); + Thread.currentThread().interrupt(); + } + return true; } - return true; - } - public static void deleteNode(FailoverWatcher failoverWatcher, String node) - throws KeeperException { - deleteNode(failoverWatcher, node, -1); - } + public static void deleteNode(FailoverWatcher failoverWatcher, String node) throws KeeperException { + deleteNode(failoverWatcher, node, -1); + } - public static boolean deleteNode(FailoverWatcher failoverWatcher, String node, int version) - throws KeeperException { - try { - failoverWatcher.getZooKeeper().delete(node, version); - return true; - } catch (KeeperException.BadVersionException bve) { - LOG.debug("Bad version exception when delete node '{}'", node, bve); - return false; - } catch (InterruptedException ie) { - LOG.debug("Received InterruptedException, doing nothing here", ie); - return false; + public static boolean deleteNode(FailoverWatcher failoverWatcher, String node, int version) throws KeeperException { + try { + failoverWatcher.getZooKeeper().delete(node, version); + return true; + } catch (KeeperException.BadVersionException bve) { + LOG.debug("Bad version exception when delete node '{}'", node, bve); + return false; + } catch (InterruptedException ie) { + LOG.debug("Received InterruptedException, doing nothing here", ie); + return false; + } } - } - public static byte[] getDataAndWatch(FailoverWatcher failoverWatcher, String znode) { - return getDataInternal(failoverWatcher, znode, null); - } + public static byte[] getDataAndWatch(FailoverWatcher failoverWatcher, String znode) { + return getDataInternal(failoverWatcher, znode, null); + } - @Nullable - private static byte[] getDataInternal(FailoverWatcher failoverWatcher, String znode, Stat stat) { - try { - byte[] data = failoverWatcher.getZooKeeper().getData(znode, failoverWatcher, stat); - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved " + ((data == null) ? 0 : data.length) - + " byte(s) of data from znode " + znode); - } - return data; - } catch (KeeperException.NoNodeException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Unable to get data of znode " + znode + " " - + "because node does not exist (not an error)"); - } - return null; - } catch (KeeperException | InterruptedException e) { - LOG.warn("Unable to get data of znode " + znode, e); - LOG.warn("Received unexpected KeeperException, re-throwing exception"); - return null; + @Nullable + private static byte[] getDataInternal(FailoverWatcher failoverWatcher, String znode, Stat stat) { + try { + byte[] data = failoverWatcher.getZooKeeper().getData(znode, failoverWatcher, stat); + if (LOG.isDebugEnabled()) { + LOG.debug("Retrieved " + ((data == null) ? 0 : data.length) + " byte(s) of data from znode " + znode); + } + return data; + } catch (KeeperException.NoNodeException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to get data of znode " + znode + " " + "because node does not exist (not an error)"); + } + return null; + } catch (KeeperException | InterruptedException e) { + LOG.warn("Unable to get data of znode " + znode, e); + LOG.warn("Received unexpected KeeperException, re-throwing exception"); + return null; + } } - } - public static void createAndFailSilent(FailoverWatcher failoverWatcher, String znode) - throws KeeperException, InterruptedException { - try { - LOG.info("Try to create persistent znode " + znode); - ZooKeeper zk = failoverWatcher.getZooKeeper(); - if (zk.exists(znode, false) == null) { - zk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } catch (KeeperException.NodeExistsException ignore) { - //we just ignore result if the node already exist - LOG.info("Znode " + znode + " already exist"); - } catch (KeeperException.NoAuthException nee) { - try { - if (null == failoverWatcher.getZooKeeper().exists(znode, false)) { - // If we failed to create the file and it does not already exist. - throw nee; + public static void createAndFailSilent(FailoverWatcher failoverWatcher, String znode) + throws KeeperException, InterruptedException { + try { + LOG.info("Try to create persistent znode " + znode); + ZooKeeper zk = failoverWatcher.getZooKeeper(); + if (zk.exists(znode, false) == null) { + zk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (KeeperException.NodeExistsException ignore) { + // we just ignore result if the node already exist + LOG.info("Znode " + znode + " already exist"); + } catch (KeeperException.NoAuthException nee) { + try { + if (null == failoverWatcher.getZooKeeper().exists(znode, false)) { + // If we failed to create the file and it does not already exist. + throw nee; + } + } catch (InterruptedException ie) { + LOG.debug("Received InterruptedException, re-throw the exception", ie); + throw ie; + } + } catch (InterruptedException ie) { + LOG.debug("Received InterruptedException, re-throw the exception", ie); + throw ie; } - } catch (InterruptedException ie) { - LOG.debug("Received InterruptedException, re-throw the exception", ie); - throw ie; - } - } catch (InterruptedException ie) { - LOG.debug("Received InterruptedException, re-throw the exception", ie); - throw ie; } - } }