Skip to content

Zookeeper 会话

litter-fish edited this page Jan 20, 2020 · 2 revisions

Zookeeper的会话创建过程

初始化阶段

  1. 初始化Zookeeper对象 会创建一个客户端的Watcher管理器:ClientWatchManager
private final ZKWatchManager watchManager = new ZKWatchManager();

Watcher管理器:

private static class ZKWatchManager implements ClientWatchManager {
    private final Map<String, Set<Watcher>> dataWatches =
        new HashMap<String, Set<Watcher>>();
    private final Map<String, Set<Watcher>> existWatches =
        new HashMap<String, Set<Watcher>>();
    private final Map<String, Set<Watcher>> childWatches =
        new HashMap<String, Set<Watcher>>();

    private volatile Watcher defaultWatcher;

    // 省略其他逻辑
}
  1. 设置默认watcher 如果构造函数中传入了Watcher则会将其设置为默认的Watcher,即ZKWatchManager中的defaultWatcher属性
watchManager.defaultWatcher = watcher;
  1. 设置服务器地址列表
// 解析chrootPath、解析服务器地址列表
ConnectStringParser connectStringParser = new ConnectStringParser(
        connectString);
// 调用Collections的shuffle方法打乱地址列表
HostProvider hostProvider = new StaticHostProvider(
        connectStringParser.getServerAddresses());

地址列表的处理

public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
    // 省略其他代码
    // 调用Collections的shuffle方法打乱地址列表
    Collections.shuffle(this.serverAddresses);
}

打乱后的地址会组成一个圆环

3054b5a00475507cbc8132149764ce1d.jpeg

每次获取地址时,即调用next时,currentIndex进行+1操作

获取服务器地址

public InetSocketAddress next(long spinDelay) {
    // 游标向下移动一位
    ++currentIndex;
    // 游标移动超过了服务器地址长度,重置为0
    if (currentIndex == serverAddresses.size()) {
        currentIndex = 0;
    }
    // 当前游标地址和上一次使用过的地址一样,则休眠spinDelay秒
    if (currentIndex == lastIndex && spinDelay > 0) {
        try {
            Thread.sleep(spinDelay);
        } catch (InterruptedException e) {
            LOG.warn("Unexpected exception", e);
        }
    } else if (lastIndex == -1) {
        // We don't want to sleep on the first ever connect attempt.
        lastIndex = 0;
    }

    return serverAddresses.get(currentIndex);
}
  1. 创建并初始化客户端网络连接器 ClientCnxn
// 4. 创建 ClientCnxn 实例,用于管理客户端与服务器的网络交互
// 初始化cnxn的 pendingQueue (客户端请求发送队列) 与 outgoingQueue (服务端响应等待队列)队列
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
        hostProvider, sessionTimeout, this, watchManager,
        getClientCnxnSocket(), canBeReadOnly);

初始化核心队列

/**
 * 客户端请求的发送队列
 */
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();

/**
 * 服务端响应的等待队列
 */
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
  1. 创建 SendThread 线程,用于管理客户端与服务器端的所有网络IO操作
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
        ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;

    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;
    // 5. 创建 SendThread 线程,用于管理客户端与服务器端的所有网络IO操作
    // 创建 EventThread 线程,用于客户端事件处理,并初始化 waitingEvents 事件处理队列
    sendThread = new SendThread(clientCnxnSocket);
    eventThread = new EventThread();
}

会话创建阶段

  1. 启动SendThread和EventThread

  2. 获取一个服务器地址

// 获取一个服务器地址
if (rwServerAddress != null) {
    serverAddress = rwServerAddress;
    rwServerAddress = null;
} else {
    serverAddress = hostProvider.next(1000);
}
  1. 创建TCP连接
// ClientCnxnSocket负责和服务器创建一个TCP长连接
startConnect(serverAddress);
private void startConnect(InetSocketAddress addr) throws IOException {
    // initializing it for new connection
    saslLoginFailed = false;
    state = States.CONNECTING;

    setName(getName().replaceAll("\\(.*\\)",
            "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
    if (ZooKeeperSaslClient.isEnabled()) {
        try {
            String principalUserName = System.getProperty(
                    ZK_SASL_CLIENT_USERNAME, "zookeeper");
            zooKeeperSaslClient =
                new ZooKeeperSaslClient(
                        principalUserName+"/"+addr.getHostName());
        } catch (LoginException e) {
            // ......
        }
    }
    logStartConnect(addr);

    clientCnxnSocket.connect(addr);
}
void connect(InetSocketAddress addr) throws IOException {
    // create a socket channel.
    SocketChannel sock = createSock();
    try {
        // register with the selection and connect
       registerAndConnect(sock, addr);
    } catch (IOException e) {
        LOG.error("Unable to open socket to " + addr);
        sock.close();
        throw e;
    }
    initialized = false;

    /*
     * Reset incomingBuffer
     */
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    // 进行连接处理
    boolean immediateConnect = sock.connect(addr);
    if (immediateConnect) {
        // 发送 ConnectRequest 请求,客户端试图与服务器创建一个会话
        sendThread.primeConnection();
    }
}
  1. 构造ConnectRequest
void primeConnection() throws IOException {
    LOG.info("Socket connection established to "
             + clientCnxnSocket.getRemoteSocketAddress()
             + ", initiating session");
    isFirstConnect = false;
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    // 构造 ConnectRequest 请求
    ConnectRequest conReq = new ConnectRequest(0, lastZxid,
            sessionTimeout, sessId, sessionPasswd);
    synchronized (outgoingQueue) {
        // We add backwards since we are pushing into the front
        // Only send if there's a pending watch
        // TODO: here we have the only remaining use of zooKeeper in
        // this class. It's to be eliminated!
        if (!disableAutoWatchReset) {
            List<String> dataWatches = zooKeeper.getDataWatches();
            List<String> existWatches = zooKeeper.getExistWatches();
            List<String> childWatches = zooKeeper.getChildWatches();
            if (!dataWatches.isEmpty()
                        || !existWatches.isEmpty() || !childWatches.isEmpty()) {

                Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                long setWatchesLastZxid = lastZxid;

                while (dataWatchesIter.hasNext()
                               || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
                    List<String> dataWatchesBatch = new ArrayList<String>();
                    List<String> existWatchesBatch = new ArrayList<String>();
                    List<String> childWatchesBatch = new ArrayList<String>();
                    int batchLength = 0;

                    // Note, we may exceed our max length by a bit when we add the last
                    // watch in the batch. This isn't ideal, but it makes the code simpler.
                    while (batchLength < SET_WATCHES_MAX_LENGTH) {
                        final String watch;
                        if (dataWatchesIter.hasNext()) {
                            watch = dataWatchesIter.next();
                            dataWatchesBatch.add(watch);
                        } else if (existWatchesIter.hasNext()) {
                            watch = existWatchesIter.next();
                            existWatchesBatch.add(watch);
                        } else if (childWatchesIter.hasNext()) {
                            watch = childWatchesIter.next();
                            childWatchesBatch.add(watch);
                        } else {
                            break;
                        }
                        batchLength += watch.length();
                    }

                    SetWatches sw = new SetWatches(setWatchesLastZxid,
                            dataWatchesBatch,
                            existWatchesBatch,
                            childWatchesBatch);
                    RequestHeader h = new RequestHeader();
                    h.setType(ZooDefs.OpCode.setWatches);
                    h.setXid(-8);
                    Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
                    outgoingQueue.addFirst(packet);
                }
            }
        }

        for (AuthData id : authInfo) {
            outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                    OpCode.auth), null, new AuthPacket(0, id.scheme,
                    id.data), null, null));
        }
        // 包装成 Packet 对象,放入请求发送队列 outgoingQueue 中
        outgoingQueue.addFirst(new Packet(null, null, conReq,
                    null, null, readOnly));
    }
    clientCnxnSocket.enableReadWriteOnly();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request sent on "
                + clientCnxnSocket.getRemoteSocketAddress());
    }
}
  1. 发送请求

clientCnxnSocket.doTransport => doIO =>

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
  throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    // 省略其他代码
    if (sockKey.isWritable()) {
        // 从Packet 队列中取出包进行发送
        synchronized(outgoingQueue) {
            // 寻找可以发送的Packet 包
            Packet p = findSendablePacket(outgoingQueue,
                    cnxn.sendThread.clientTunneledAuthenticationInProgress());

            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null) &&
                            (p.requestHeader.getType() != OpCode.ping) &&
                            (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    // 将待发送的 Packet 包进行序列化操作
                    p.createBB();
                }
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                    sentCount++;
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                            && p.requestHeader.getType() != OpCode.ping
                            && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) {
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                disableWrite();
            } else {
                enableWrite();
            }
        }
    }
}

响应处理阶段

  1. 接收服务端响应
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
  throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    if (sockKey.isReadable()) {
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
            throw new EndOfStreamException(
                    "Unable to read additional data from server sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely server has closed socket");
        }
        if (!incomingBuffer.hasRemaining()) {
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                recvCount++;
                readLength();
            // 未初始化过进行初始化操作
            } else if (!initialized) {
                // 读取连接请求服务器端返回的结果
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    // Since SASL authentication has completed (if client is configured to do so),
                    // outgoing packets waiting in the outgoingQueue can now be sent.
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else {
                // 读取响应
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
    // 省略发送packet逻辑
}
  1. 处理Response
void readConnectResult() throws IOException {
    if (LOG.isTraceEnabled()) {
        StringBuilder buf = new StringBuilder("0x[");
        for (byte b : incomingBuffer.array()) {
            buf.append(Integer.toHexString(b) + ",");
        }
        buf.append("]");
        LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
                + buf.toString());
    }
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ConnectResponse conRsp = new ConnectResponse();
    // 反序列化得到 ConnectResponse
    conRsp.deserialize(bbia, "connect");

    // read "is read-only" flag
    boolean isRO = false;
    try {
        isRO = bbia.readBool("readOnly");
    } catch (IOException e) {
        // this is ok -- just a packet from an old server which
        // doesn't contain readOnly field
        LOG.warn("Connected to an old server; r-o mode will be unavailable");
    }
    // 获取SessionId
    this.sessionId = conRsp.getSessionId();
    
    sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
            conRsp.getPasswd(), isRO);
}
  1. 连接成功
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
        byte[] _sessionPasswd, boolean isRO) throws IOException {
    negotiatedSessionTimeout = _negotiatedSessionTimeout;
    if (negotiatedSessionTimeout <= 0) {
        state = States.CLOSED;

        eventThread.queueEvent(new WatchedEvent(
                Watcher.Event.EventType.None,
                Watcher.Event.KeeperState.Expired, null));
        eventThread.queueEventOfDeath();

        String warnInfo;
        warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
            + Long.toHexString(sessionId) + " has expired";
        LOG.warn(warnInfo);
        throw new SessionExpiredException(warnInfo);
    }
    if (!readOnly && isRO) {
        LOG.error("Read/write client got connected to read-only server");
    }
    // 对客户端一些参数的设置
    readTimeout = negotiatedSessionTimeout * 2 / 3;
    connectTimeout = negotiatedSessionTimeout / hostProvider.size();
    hostProvider.onConnected();
    sessionId = _sessionId;
    sessionPasswd = _sessionPasswd;
    state = (isRO) ?
            States.CONNECTEDREADONLY : States.CONNECTED;
    seenRwServerBefore |= !isRO;
    LOG.info("Session establishment complete on server "
            + clientCnxnSocket.getRemoteSocketAddress()
            + ", sessionid = 0x" + Long.toHexString(sessionId)
            + ", negotiated timeout = " + negotiatedSessionTimeout
            + (isRO ? " (READ-ONLY mode)" : ""));
    KeeperState eventState = (isRO) ?
            KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
    // 生成SyncConnected - Node事件
    eventThread.queueEvent(new WatchedEvent(
            Watcher.Event.EventType.None,
            eventState, null));
}
  1. 生成事件:SyncConnected - None
public void queueEvent(WatchedEvent event) {
    if (event.getType() == EventType.None
            && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();

    // materialize the watchers based on the event
    WatcherSetEventPair pair = new WatcherSetEventPair(
            watcher.materialize(event.getState(), event.getType(),
                    event.getPath()),
                    event);
    // queue the pair (watch set & event) for later processing
    waitingEvents.add(pair);
}
  1. 查询Watcher
public void run() {
   try {
      isRunning = true;
      while (true) {
         Object event = waitingEvents.take();
         if (event == eventOfDeath) {
            wasKilled = true;
         } else {
            processEvent(event);
         }
         if (wasKilled)
            synchronized (waitingEvents) {
               if (waitingEvents.isEmpty()) {
                  isRunning = false;
                  break;
               }
            }
      }
   } catch (InterruptedException e) {
      LOG.error("Event thread exiting due to interruption", e);
   }

    LOG.info("EventThread shut down for session: 0x{}",
             Long.toHexString(getSessionId()));
}
  1. 处理事件

Session 相关

服务端 SessionID 的生成

/**
 * 服务端初始化sessionID
 * @param id 机器编号
 * @return
 */
public static long initializeNextSession(long id) {
    long nextSid = 0;
    // 1. 获取当前时间毫秒数
    // 000000000000000000000001 0110110010001101100100011110000010100001
    // 2. 左移24位,低位全部用0补齐
    // 0110110010001101100100011110000010100001 000000000000000000000000
    // 3. 无符号右移8位,高位补0
    // 00000000 01101100100011011001000111100000101000010000000000000000
    nextSid = (System.currentTimeMillis() << 24) >>> 8;
    // 4. 将机器ID左移56位
    // 00000010 00000000000000000000000000000000000000000000000000000000
    // 5. 将第三步得到的结果与第四步结果进行或运算
    // 00000010 01101100100011011001000111100000101000010000000000000000
    nextSid =  nextSid | (id <<56);
    return nextSid;
}

SessionTracker:Zookeeper服务器的会话管理器,负责会话的创建、管理和清理等工作。

// 根据sessionID来管理session实体
HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
// 根据下次会话超时时间点来归档会话,便于会话管理和超时检查
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();
// 根据sessionID来管理会话的超时时间,该结构会会被持久化到快照文件中
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;

创建连接

主要分为:处理ConnectRequest请求、会话创建、处理器链路处理和会话响应四个步骤。 首先由 NIOServerCnxn 接收客户端请求,反序列出ConnectRequest请求,然后根据Zookeeper服务端配置的完成会话超时时间协商。 随后,SessionTracker将会为该会话分配一个sessionID,并加入到sessionsById,sessionSets中 ,同时进行会话激活。 接着在处理链中进行流转,最终完成会话创建。

会话管理

分桶策略 将类似的会话放在同一块中进行管理,以便Zookeeper对会话进行不同区块的隔离处理以及同一区块的统一处理

481082fc8788725934ddc072a3a34fa6.png

Zookeeper按照“下次超时时间点” (ExpirationTime)原则,将会话都分配在不同的区块中,ExpirationTime = currentTime + SessionTimeOut

private long roundToInterval(long time) {
    // We give a one interval grace period
    return (time / expirationInterval + 1) * expirationInterval;
}

会话激活 Leader服务器激活客户端会话过程

fcc028b29b625eeb4939e00d854b8be5.png

// 会话激活
synchronized public boolean touchSession(long sessionId, int timeout) {
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG,
                                 ZooTrace.CLIENT_PING_TRACE_MASK,
                                 "SessionTrackerImpl --- Touch session: 0x"
                + Long.toHexString(sessionId) + " with timeout " + timeout);
    }
    SessionImpl s = sessionsById.get(sessionId);
    // 会话是否被关闭
    if (s == null || s.isClosing()) {
        return false;
    }
    // 重新该会话的超时时间
    long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
    if (s.tickTime >= expireTime) {
        // Nothing needs to be done
        return true;
    }
    // 定位当前会话的区块
    SessionSet set = sessionSets.get(s.tickTime);

    // 会话迁移
    if (set != null) {
        set.sessions.remove(s);
    }
    s.tickTime = expireTime;
    set = sessionSets.get(s.tickTime);
    if (set == null) {
        set = new SessionSet();
        sessionSets.put(expireTime, set);
    }
    set.sessions.add(s);
    return true;
}

会话迁移示意图

41a78cc17f4ecc27e2034c86b8294277.png

触发会话激活的情况:

  1. 只要客户端向服务器发送请求,包括读或写请求,就会触发一次会话激活。
  2. 如果客户端发现现在 sessionTimeout / 3 时间内尚未和服务器进行过任何通信,即没有向服务器发送任何请求,那么会主动发起一个ping 请求。

会话超时检查 使用一个单独线程进行清理,首先获取当前时间的纳秒,在通过和过期时间比较,如果小于则wait一定时间,否则清理掉未被迁移的会话。

// 会话超时检查
@Override
synchronized public void run() {
    try {
        while (running) {
            // 当前时间的纳秒
            currentTime = Time.currentElapsedTime();
            if (nextExpirationTime > currentTime) {
                this.wait(nextExpirationTime - currentTime);
                continue;
            }
            SessionSet set;
            // 清理的为迁移的过期会话
            set = sessionSets.remove(nextExpirationTime);
            if (set != null) {
                // 循环关闭会话
                for (SessionImpl s : set.sessions) {
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);
                }
            }
            nextExpirationTime += expirationInterval;
        }
    } catch (InterruptedException e) {
        handleException(this.getName(), e);
    }
    LOG.info("SessionTrackerImpl exited loop!");
}

会话清理

由 SessionTracker 实现类 SessionTrackerImpl 负责检测那些会话需要进行清理

  1. 标记会话状态为已关闭
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
	synchronized public void run() {
        try {
            while (running) {
                // 超时检查
                currentTime = System.currentTimeMillis();
                if (nextExpirationTime > currentTime) {
                    this.wait(nextExpirationTime - currentTime);
                    continue;
                }
                SessionSet set;
                set = sessionSets.remove(nextExpirationTime);
                if (set != null) {
                    for (SessionImpl s : set.sessions) {
                        // 标记会话状态为已关闭
                        setSessionClosing(s.sessionId);
                        // 发起会话关闭请求
                        expirer.expire(s);
                    }
                }
                nextExpirationTime += expirationInterval;
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }
}
  1. 发起会话关闭请求,并将会话关闭请求交给 PrepRequestProcessor 处理器处理
// --ZooKeeperServer
public void expire(Session session) {
    long sessionId = session.getSessionId();
    LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
            + ", timeout of " + session.getTimeout() + "ms exceeded");
    close(sessionId);
}

// --ZooKeeperServer
private void close(long sessionId) {
    submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}
  1. PrepRequestProcessor 处理器处理
// --PrepRequestProcessor
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException
{
	case OpCode.closeSession:
        // 收集需要清理的临时节点
        HashSet<String> es = zks.getZKDatabase()
                .getEphemerals(request.sessionId);
        // 如果收集完需要处理的临时节点,可能存在下面两种情况
        synchronized (zks.outstandingChanges) {
            for (ChangeRecord c : zks.outstandingChanges) {
                // 1 删除节点请求正在处理
                if (c.stat == null) {
                    // Doing a delete
                    es.remove(c.path);
                // 2 正在创建临时节点
                } else if (c.stat.getEphemeralOwner() == request.sessionId) {
                    es.add(c.path);
                }
            }
            // 将其转换成“节点删除”请求,放到事务变更队列 outstandingChanges
            for (String path2Delete : es) {
                addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
                        path2Delete, null, 0, null));
            }

            zks.sessionTracker.setSessionClosing(request.sessionId);
        }

        LOG.info("Processed session termination for sessionid: 0x"
                + Long.toHexString(request.sessionId));
        break;
}
  1. 将其转换成“节点删除”请求,放到事务变更队列 outstandingChanges
// --PrepRequestProcessor
void addChangeRecord(ChangeRecord c) {
    synchronized (zks.outstandingChanges) {
        zks.outstandingChanges.add(c);
        zks.outstandingChangesForPath.put(c.path, c);
    }
}
  1. 请求交给 FinalRequestProcessor 处理器链处理
public void processRequest(Request request) {
    
    ProcessTxnResult rc = null;
    synchronized (zks.outstandingChanges) {
        while (!zks.outstandingChanges.isEmpty()
                && zks.outstandingChanges.get(0).zxid <= request.zxid) {
            ChangeRecord cr = zks.outstandingChanges.remove(0);
            if (cr.zxid < request.zxid) {
                LOG.warn("Zxid outstanding "
                        + cr.zxid
                        + " is less than current " + request.zxid);
            }
            if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                zks.outstandingChangesForPath.remove(cr.path);
            }
        }
        if (request.hdr != null) {
           TxnHeader hdr = request.hdr;
           Record txn = request.txn;
           // 触发数据库删除所有临时节点
           rc = zks.processTxn(hdr, txn);
        }
        // 集群环境处理
        if (Request.isQuorum(request.type)) {
            zks.getZKDatabase().addCommittedProposal(request);
        }
    }

    if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
        ServerCnxnFactory scxn = zks.getServerCnxnFactory();
        if (scxn != null && request.cnxn == null) {
            // 关闭 NIOServerCnxn
            scxn.closeSession(request.sessionId);
            return;
        }
    }
}
  1. 触发数据库删除所有临时节点并移除会话
// --ZooKeeperServer
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    ProcessTxnResult rc;
    int opCode = hdr.getType();
    long sessionId = hdr.getClientId();
    // 触发数据库操作节点
    rc = getZKDatabase().processTxn(hdr, txn);
    if (opCode == OpCode.createSession) {
        ......
    } else if (opCode == OpCode.closeSession) {
        // 移除会话
        sessionTracker.removeSession(sessionId);
    }
    return rc;
}
  1. 移除会话
// --SessionTrackerImpl
synchronized public void removeSession(long sessionId) {
    SessionImpl s = sessionsById.remove(sessionId);
    sessionsWithTimeout.remove(sessionId);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                "SessionTrackerImpl --- Removing session 0x"
                + Long.toHexString(sessionId));
    }
    if (s != null) {
        SessionSet set = sessionSets.get(s.tickTime);
        // Session expiration has been removing the sessions   
        if(set != null){
            set.sessions.remove(s);
        }
    }
}
  1. 关闭 NIOServerCnxn
// --NIOServerCnxnFactory
private void closeSessionWithoutWakeup(long sessionId) {
    HashSet<NIOServerCnxn> cnxns;
    synchronized (this.cnxns) {
        cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
    }

    for (NIOServerCnxn cnxn : cnxns) {
        // 获取session对应的 NIOServerCnxn 调用关闭
        if (cnxn.getSessionId() == sessionId) {
            try {
                cnxn.close();
            } catch (Exception e) {
                LOG.warn("exception during session close", e);
            }
            break;
        }
    }
}
Clone this wiki locally