-
Notifications
You must be signed in to change notification settings - Fork 2
Zookeeper 会话
- 初始化Zookeeper对象 会创建一个客户端的Watcher管理器:ClientWatchManager
private final ZKWatchManager watchManager = new ZKWatchManager();
Watcher管理器:
private static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher;
// 省略其他逻辑
}
- 设置默认watcher 如果构造函数中传入了Watcher则会将其设置为默认的Watcher,即ZKWatchManager中的defaultWatcher属性
watchManager.defaultWatcher = watcher;
- 设置服务器地址列表
// 解析chrootPath、解析服务器地址列表
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
// 调用Collections的shuffle方法打乱地址列表
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
地址列表的处理
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
// 省略其他代码
// 调用Collections的shuffle方法打乱地址列表
Collections.shuffle(this.serverAddresses);
}
打乱后的地址会组成一个圆环
每次获取地址时,即调用next时,currentIndex进行+1操作
获取服务器地址
public InetSocketAddress next(long spinDelay) {
// 游标向下移动一位
++currentIndex;
// 游标移动超过了服务器地址长度,重置为0
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
// 当前游标地址和上一次使用过的地址一样,则休眠spinDelay秒
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
return serverAddresses.get(currentIndex);
}
- 创建并初始化客户端网络连接器 ClientCnxn
// 4. 创建 ClientCnxn 实例,用于管理客户端与服务器的网络交互
// 初始化cnxn的 pendingQueue (客户端请求发送队列) 与 outgoingQueue (服务端响应等待队列)队列
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
初始化核心队列
/**
* 客户端请求的发送队列
*/
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
/**
* 服务端响应的等待队列
*/
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
- 创建 SendThread 线程,用于管理客户端与服务器端的所有网络IO操作
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
// 5. 创建 SendThread 线程,用于管理客户端与服务器端的所有网络IO操作
// 创建 EventThread 线程,用于客户端事件处理,并初始化 waitingEvents 事件处理队列
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
-
启动SendThread和EventThread
-
获取一个服务器地址
// 获取一个服务器地址
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
- 创建TCP连接
// ClientCnxnSocket负责和服务器创建一个TCP长连接
startConnect(serverAddress);
private void startConnect(InetSocketAddress addr) throws IOException {
// initializing it for new connection
saslLoginFailed = false;
state = States.CONNECTING;
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
if (ZooKeeperSaslClient.isEnabled()) {
try {
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =
new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostName());
} catch (LoginException e) {
// ......
}
}
logStartConnect(addr);
clientCnxnSocket.connect(addr);
}
void connect(InetSocketAddress addr) throws IOException {
// create a socket channel.
SocketChannel sock = createSock();
try {
// register with the selection and connect
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
// 进行连接处理
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
// 发送 ConnectRequest 请求,客户端试图与服务器创建一个会话
sendThread.primeConnection();
}
}
- 构造ConnectRequest
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
// 构造 ConnectRequest 请求
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!disableAutoWatchReset) {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
// 包装成 Packet 对象,放入请求发送队列 outgoingQueue 中
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
- 发送请求
clientCnxnSocket.doTransport => doIO =>
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// 省略其他代码
if (sockKey.isWritable()) {
// 从Packet 队列中取出包进行发送
synchronized(outgoingQueue) {
// 寻找可以发送的Packet 包
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
// 将待发送的 Packet 包进行序列化操作
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
enableWrite();
}
}
}
}
- 接收服务端响应
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
// 未初始化过进行初始化操作
} else if (!initialized) {
// 读取连接请求服务器端返回的结果
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
// 读取响应
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
// 省略发送packet逻辑
}
- 处理Response
void readConnectResult() throws IOException {
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b) + ",");
}
buf.append("]");
LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
+ buf.toString());
}
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
// 反序列化得到 ConnectResponse
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
// 获取SessionId
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
- 连接成功
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo;
warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
// 对客户端一些参数的设置
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
// 生成SyncConnected - Node事件
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
}
- 生成事件:SyncConnected - None
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
- 查询Watcher
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
- 处理事件
服务端 SessionID 的生成
/**
* 服务端初始化sessionID
* @param id 机器编号
* @return
*/
public static long initializeNextSession(long id) {
long nextSid = 0;
// 1. 获取当前时间毫秒数
// 000000000000000000000001 0110110010001101100100011110000010100001
// 2. 左移24位,低位全部用0补齐
// 0110110010001101100100011110000010100001 000000000000000000000000
// 3. 无符号右移8位,高位补0
// 00000000 01101100100011011001000111100000101000010000000000000000
nextSid = (System.currentTimeMillis() << 24) >>> 8;
// 4. 将机器ID左移56位
// 00000010 00000000000000000000000000000000000000000000000000000000
// 5. 将第三步得到的结果与第四步结果进行或运算
// 00000010 01101100100011011001000111100000101000010000000000000000
nextSid = nextSid | (id <<56);
return nextSid;
}
SessionTracker:Zookeeper服务器的会话管理器,负责会话的创建、管理和清理等工作。
// 根据sessionID来管理session实体
HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
// 根据下次会话超时时间点来归档会话,便于会话管理和超时检查
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();
// 根据sessionID来管理会话的超时时间,该结构会会被持久化到快照文件中
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
主要分为:处理ConnectRequest请求、会话创建、处理器链路处理和会话响应四个步骤。 首先由 NIOServerCnxn 接收客户端请求,反序列出ConnectRequest请求,然后根据Zookeeper服务端配置的完成会话超时时间协商。 随后,SessionTracker将会为该会话分配一个sessionID,并加入到sessionsById,sessionSets中 ,同时进行会话激活。 接着在处理链中进行流转,最终完成会话创建。
分桶策略 将类似的会话放在同一块中进行管理,以便Zookeeper对会话进行不同区块的隔离处理以及同一区块的统一处理
Zookeeper按照“下次超时时间点” (ExpirationTime)原则,将会话都分配在不同的区块中,ExpirationTime = currentTime + SessionTimeOut
private long roundToInterval(long time) {
// We give a one interval grace period
return (time / expirationInterval + 1) * expirationInterval;
}
会话激活 Leader服务器激活客户端会话过程
// 会话激活
synchronized public boolean touchSession(long sessionId, int timeout) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.CLIENT_PING_TRACE_MASK,
"SessionTrackerImpl --- Touch session: 0x"
+ Long.toHexString(sessionId) + " with timeout " + timeout);
}
SessionImpl s = sessionsById.get(sessionId);
// 会话是否被关闭
if (s == null || s.isClosing()) {
return false;
}
// 重新该会话的超时时间
long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
if (s.tickTime >= expireTime) {
// Nothing needs to be done
return true;
}
// 定位当前会话的区块
SessionSet set = sessionSets.get(s.tickTime);
// 会话迁移
if (set != null) {
set.sessions.remove(s);
}
s.tickTime = expireTime;
set = sessionSets.get(s.tickTime);
if (set == null) {
set = new SessionSet();
sessionSets.put(expireTime, set);
}
set.sessions.add(s);
return true;
}
会话迁移示意图
触发会话激活的情况:
- 只要客户端向服务器发送请求,包括读或写请求,就会触发一次会话激活。
- 如果客户端发现现在 sessionTimeout / 3 时间内尚未和服务器进行过任何通信,即没有向服务器发送任何请求,那么会主动发起一个ping 请求。
会话超时检查 使用一个单独线程进行清理,首先获取当前时间的纳秒,在通过和过期时间比较,如果小于则wait一定时间,否则清理掉未被迁移的会话。
// 会话超时检查
@Override
synchronized public void run() {
try {
while (running) {
// 当前时间的纳秒
currentTime = Time.currentElapsedTime();
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
SessionSet set;
// 清理的为迁移的过期会话
set = sessionSets.remove(nextExpirationTime);
if (set != null) {
// 循环关闭会话
for (SessionImpl s : set.sessions) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
由 SessionTracker 实现类 SessionTrackerImpl 负责检测那些会话需要进行清理
- 标记会话状态为已关闭
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
synchronized public void run() {
try {
while (running) {
// 超时检查
currentTime = System.currentTimeMillis();
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
SessionSet set;
set = sessionSets.remove(nextExpirationTime);
if (set != null) {
for (SessionImpl s : set.sessions) {
// 标记会话状态为已关闭
setSessionClosing(s.sessionId);
// 发起会话关闭请求
expirer.expire(s);
}
}
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
}
- 发起会话关闭请求,并将会话关闭请求交给 PrepRequestProcessor 处理器处理
// --ZooKeeperServer
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
close(sessionId);
}
// --ZooKeeperServer
private void close(long sessionId) {
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}
- PrepRequestProcessor 处理器处理
// --PrepRequestProcessor
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
case OpCode.closeSession:
// 收集需要清理的临时节点
HashSet<String> es = zks.getZKDatabase()
.getEphemerals(request.sessionId);
// 如果收集完需要处理的临时节点,可能存在下面两种情况
synchronized (zks.outstandingChanges) {
for (ChangeRecord c : zks.outstandingChanges) {
// 1 删除节点请求正在处理
if (c.stat == null) {
// Doing a delete
es.remove(c.path);
// 2 正在创建临时节点
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
// 将其转换成“节点删除”请求,放到事务变更队列 outstandingChanges
for (String path2Delete : es) {
addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
path2Delete, null, 0, null));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}
LOG.info("Processed session termination for sessionid: 0x"
+ Long.toHexString(request.sessionId));
break;
}
- 将其转换成“节点删除”请求,放到事务变更队列 outstandingChanges
// --PrepRequestProcessor
void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
}
}
- 请求交给 FinalRequestProcessor 处理器链处理
public void processRequest(Request request) {
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < request.zxid) {
LOG.warn("Zxid outstanding "
+ cr.zxid
+ " is less than current " + request.zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
// 触发数据库删除所有临时节点
rc = zks.processTxn(hdr, txn);
}
// 集群环境处理
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
ServerCnxnFactory scxn = zks.getServerCnxnFactory();
if (scxn != null && request.cnxn == null) {
// 关闭 NIOServerCnxn
scxn.closeSession(request.sessionId);
return;
}
}
}
- 触发数据库删除所有临时节点并移除会话
// --ZooKeeperServer
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
// 触发数据库操作节点
rc = getZKDatabase().processTxn(hdr, txn);
if (opCode == OpCode.createSession) {
......
} else if (opCode == OpCode.closeSession) {
// 移除会话
sessionTracker.removeSession(sessionId);
}
return rc;
}
- 移除会话
// --SessionTrackerImpl
synchronized public void removeSession(long sessionId) {
SessionImpl s = sessionsById.remove(sessionId);
sessionsWithTimeout.remove(sessionId);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"SessionTrackerImpl --- Removing session 0x"
+ Long.toHexString(sessionId));
}
if (s != null) {
SessionSet set = sessionSets.get(s.tickTime);
// Session expiration has been removing the sessions
if(set != null){
set.sessions.remove(s);
}
}
}
- 关闭 NIOServerCnxn
// --NIOServerCnxnFactory
private void closeSessionWithoutWakeup(long sessionId) {
HashSet<NIOServerCnxn> cnxns;
synchronized (this.cnxns) {
cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
}
for (NIOServerCnxn cnxn : cnxns) {
// 获取session对应的 NIOServerCnxn 调用关闭
if (cnxn.getSessionId() == sessionId) {
try {
cnxn.close();
} catch (Exception e) {
LOG.warn("exception during session close", e);
}
break;
}
}
}