-
Notifications
You must be signed in to change notification settings - Fork 2
Zookeeper 服务器
litter-fish edited this page Dec 13, 2019
·
1 revision
启动流程图:
预启动
- 预启动, QuorumPeerMain 启动入口
// --QuorumPeerMain
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
}
// ......
LOG.info("Exiting normally");
System.exit(0);
}
- 解析配置文件
public void parse(String path) throws ConfigException {
File configFile = new File(path);
LOG.info("Reading configuration from: " + configFile);
try {
if (!configFile.exists()) {
throw new IllegalArgumentException(configFile.toString()
+ " file is missing");
}
// 加载指定配置文件
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
cfg.load(in);
} finally {
in.close();
}
// 解析配置文件内容
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
}
- 创建历史文件清理器
// --DatadirCleanupManager
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running.");
return;
}
// Don't schedule the purge task with zero or negative purge interval.
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}
// 创建一定时任务进行清理工作
timer = new Timer("PurgeTask", true);
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
purgeTaskStatus = PurgeTaskStatus.STARTED;
}
创建一个独立线程执行清理历史文件记录
// --PurgeTask
static class PurgeTask extends TimerTask {
private String logsDir;
private String snapsDir;
private int snapRetainCount;
public PurgeTask(String dataDir, String snapDir, int count) {
logsDir = dataDir;
snapsDir = snapDir;
snapRetainCount = count;
}
@Override
public void run() {
LOG.info("Purge task started.");
try {
// 清理事务日志文件和快照文件
PurgeTxnLog.purge(new File(logsDir), new File(snapsDir), snapRetainCount);
} catch (Exception e) {
LOG.error("Error occurred while purging.", e);
}
LOG.info("Purge task completed.");
}
}
清理文件
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}
// 创建操作数据文件的接口
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
// 找到最后的num个快照文件
List<File> snaps = txnLog.findNRecentSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
// 清理旧的快照文件
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}
- 判断当前是集群模式还是单机模式 根据服务器地址列表判断是否是单机还是集群模式
// --QuorumPeerMain
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
// 省略其他
if (args.length == 1 && config.servers.size() > 0) {
//集群模式
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
// 单机模式下委托 ZooKeeperServerMain 启动服务器
ZooKeeperServerMain.main(args);
}
}
- 再次解析配置文件
// --ZooKeeperServerMain
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
// 再次解析配置文件
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
// 运行服务器根据配置文件
runFromConfig(config);
}
- 创建服务器实例 ZooKeeperServer
// --ZooKeeperServerMain
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// 创建服务器实例,创建服务器统计器
final ZooKeeperServer zkServer = new ZooKeeperServer();
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
// 创建ZK数据管理器 FileTxnSnapLog
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
config.dataDir));
zkServer.setTxnLogFactory(txnLog);
// 设置服务器tickTime及超时时间
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
// 创建ZK服务器连接工厂
cnxnFactory = ServerCnxnFactory.createFactory();
// 初始化zk连接工厂
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
// 启动ZK连接工厂
cnxnFactory.startup(zkServer);
shutdownLatch.await();
shutdown();
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
初始化
- 创建服务器统计器 ServerStats
public ZooKeeperServer() {
serverStats = new ServerStats(this);
listener = new ZooKeeperServerListenerImpl(this);
}
ServerStats 中的一些计数器,都是从Zookeeper启动开始,最近一次重置服务端统计信息开始的
public class ServerStats {
// 服务器向客户端发送的响应包次数
private long packetsSent;
// 服务器接收客户端请求包次数
private long packetsReceived;
// 服务器端请求处理的最大延迟
private long maxLatency;
// 服务器端请求处理的最小延迟
private long minLatency = Long.MAX_VALUE;
// 服务器端请求处理的总延迟
private long totalLatency = 0;
// 服务端处理客户端的总请求次数
private long count = 0;
}
- 创建Zookeeper数据管理器 FileTxnSnapLog 是上层服务器和底层数据存储之间的对接层,主要包括事务日志文件和快照数据文件, 通过配置文件中dataDir指定快照文件的存储目录,即dataLogDir指定事务日志文件存储位置
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);
this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);
// 省略目录的读写检查
// check content of transaction log and snapshot dirs if they are two different directories
// See ZOOKEEPER-2967 for more details
if(!this.dataDir.getPath().equals(this.snapDir.getPath())){
checkLogDir();
checkSnapDir();
}
// 事务日志文件管理器
txnLog = new FileTxnLog(this.dataDir);
// 快照文件管理器
snapLog = new FileSnap(this.snapDir);
}
- 设置服务器tickTime和会话超时时间限制
// 设置服务器tickTime
zkServer.setTickTime(config.tickTime);
// 设置会话超时时间
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
- 创建 ServerCnxnFactory
static public ServerCnxnFactory createFactory() throws IOException {
// 可以通过系统配置属性(zookeeper.serverCnxnFactory)进行指定使用
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}
- 初始化 ServerCnxnFactory
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
// 初始化 ZooKeeperThread 线程
thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
thread.setDaemon(true);
maxClientCnxns = maxcc;
// 初始化 NIO服务器
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
- 启动 ServerCnxnFactory 主线程
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
// 启动 ZooKeeperThread 线程
start();
setZooKeeperServer(zks);
zks.startdata();
zks.startup();
}
- 恢复本地文件
// -- ZooKeeperServer
public void startdata() throws IOException, InterruptedException {
// check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}
// 如果是未初始化这进行本地数据恢复
if (!zkDb.isInitialized()) {
loadData();
}
}
- 创建并启动会话管理器
public synchronized void startup() {
if (sessionTracker == null) {
// 创建会话管理器
createSessionTracker();
}
// 启动会话管理器
startSessionTracker();
// 省略其他
}
创建会话管理器
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1, getZooKeeperServerListener());
}
启动会话管理器,用于定期清理超时会话
protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
}
- 初始化Zookeeper的请求处理链
public synchronized void startup() {
// 省略其他
// 初始化Zookeeper的请求处理链
setupRequestProcessors();
}
初始化Zookeeper的请求处理链
protected void setupRequestProcessors() {
// 最终的请求处理器
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
// 第一个请求处理器
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
处理链: PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
- 注册JMX
public synchronized void startup() {
// 省略其他
// 注册JMX
registerJMX();
// 省略其他
}
注册JMX
protected void registerJMX() {
// register with JMX
try {
jmxServerBean = new ZooKeeperServerBean(this);
MBeanRegistry.getInstance().register(jmxServerBean, null);
try {
jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxDataTreeBean = null;
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxServerBean = null;
}
}
- 注册Zookeeper服务器实例
public synchronized void startup() {
if (sessionTracker == null) {
// 创建会话管理器
createSessionTracker();
}
// 启动会话管理器
startSessionTracker();
// 初始化Zookeeper的请求处理链
setupRequestProcessors();
// 注册JMX
registerJMX();
// 注册Zookeeper服务器实例
setState(State.RUNNING);
notifyAll();
}
启动流程图:
预启动
- QuorumPeerMain 启动入口
- 解析配置文件
- 创建历史文件清理器
- 判断当前是集群模式还是单机模式
初始化
- 创建 ServerCnxnFactory
- 初始化 ServerCnxnFactory
- 创建Zookeeper数据管理器 FileTxnSnapLog
- 创建 QuorumPeer 实例 QuorumPeer代表Zookeeper服务器实例(ZookeeperServer)的托管者,从集群角度看,代表着一台机器,QuorumPeer 会不断的检测服务器运行状态,同时根据情况触发Leader选举
protected QuorumPeer getQuorumPeer() throws SaslException {
return new QuorumPeer();
}
protected QuorumPeer() throws SaslException {
super("QuorumPeer");
quorumStats = new QuorumStats(this);
initialize();
}
- 创建内存数据库ZKDatabase ZKDatabase是Zookeeper的内存数据库,负责管理Zookeeper的所有会话记录及DataTree的事务日志存储
public ZKDatabase(FileTxnSnapLog snapLog) {
// 初始化DataTree
dataTree = new DataTree();
// 初始化sessionsWithTimeouts
sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
this.snapLog = snapLog;
}
初始化DataTree,创建一些默认的节点
public DataTree() {
/* Rather than fight it, let root have an alias */
nodes.put("", root);
nodes.put(rootZookeeper, root);
/** add the proc node and quota node */
root.addChild(procChildZookeeper);
// 创建 /zookeeper
nodes.put(procZookeeper, procDataNode);
procDataNode.addChild(quotaChildZookeeper);
// 创建 /zookeeper/quota
nodes.put(quotaZookeeper, quotaDataNode);
}
- 初始化 QuorumPeer
public void runFromConfig(QuorumPeerConfig config) throws IOException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
// 创建 QuorumPeer 实例,代表Zookeeper服务器实例(ZookeeperServer)的托管者
// 从集群角度看,代表着一台机器,QuorumPeer 会不断的检测服务器运行状态,同时根据情况触发Leader选举
quorumPeer = getQuorumPeer();
// 初始化 QuorumPeer
quorumPeer.setQuorumPeers(config.getServers());
// 创建FileTxnSnapLog
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
// 创建ZKDatabase
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
- 恢复本地数据
public synchronized void start() {
// 加载本地数据
loadDataBase();
// 启动 ServerCnxnFactory 主线程
cnxnFactory.start();
// leader 选举
startLeaderElection();
super.start();
}
加载本地数据
- 启动ServerCnxnFactory 主线程
Leader选举
- 初始化Leader选举
synchronized public void startLeaderElection() {
try {
// 生成一个初始化投票
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// 省略无关代码
// 创建选举算法
this.electionAlg = createElectionAlgorithm(electionType);
}
创建选举算法
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
// 省略不支持的选举算法
case 3:
// 创建Leader选举所需的网络I/O层 QuorumCnxManager
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
// 开启对Leader选举端口的监听,等待集群中的其他从服务器创建连接
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
开启对Leader选举端口的监听,等待集群中的其他从服务器创建连接
public class Listener extends ZooKeeperThread {
volatile ServerSocket ss = null;
public Listener() {
// During startup of thread, thread name will be overridden to
// specific election address
super("ListenerThread");
}
/**
* Sleeps on accept().
*/
@Override
public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
if (listenOnAllIPs) {
int port = view.get(QuorumCnxManager.this.mySid)
.electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = view.get(QuorumCnxManager.this.mySid)
.electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(view.get(QuorumCnxManager.this.mySid)
.electionAddr.toString());
ss.bind(addr);
while (!shutdown) {
// 等待其他服务器连接
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
// 接受其他服务器的连接请求
receiveConnection(client);
}
numRetries = 0;
}
} catch (IOException e) {
// 省略其他代码
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
+ view.get(QuorumCnxManager.this.mySid).electionAddr);
}
}
/**
* Halts this listener thread.
*/
void halt(){
try{
LOG.debug("Trying to close listener: " + ss);
if(ss != null) {
LOG.debug("Closing listener: "
+ QuorumCnxManager.this.mySid);
ss.close();
}
} catch (IOException e){
LOG.warn("Exception when shutting down listener: " + e);
}
}
}
其他内容请查看选举章节
- 注册JMX服务
- 检测服务器状态
- Leader选举 以上三个步骤主要在QuorumPeer的主线程中进行
public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
// 注册JMX服务
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
p = new RemotePeerBean(s);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
try {
/*
* Main loop
*/
while (running) {
// 检测服务器状态
switch (getPeerState()) {
case LOOKING: // Leader选举
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING: // LEADING服务器的启动
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
Leader服务器与Learner服务器启动的交互 具体参考“Learner与Leader启动期的交互过程”章节内容 Leader和 Follower服务器启动
- 创建并启动会话管理器
public synchronized void startup() {
if (sessionTracker == null) {
// 创建会话管理器
createSessionTracker();
}
// 启动会话管理器
startSessionTracker();
// 初始化Zookeeper的请求处理链
setupRequestProcessors();
// 注册JMX
registerJMX();
// 注册Zookeeper服务器实例
setState(State.RUNNING);
notifyAll();
}
对于Leader Session的创建和启动:
// Leader 创建Session管理器
public void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, getZKDatabase()
.getSessionWithTimeOuts(), tickTime, self.getId(),
getZooKeeperServerListener());
}
// Leader 启动session管理器
@Override
protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
}
对于Learner Session的创建和启动:
// Learner 创建Session
public void createSessionTracker() {
sessionTracker = new LearnerSessionTracker(this, getZKDatabase()
.getSessionWithTimeOuts(), self.getId(),
getZooKeeperServerListener());
}
// Learner 的session交给Leader管理
@Override
protected void startSessionTracker() {}
-
Zookeeper 处理器链的初始化
-
注册JMS 服务