-
Notifications
You must be signed in to change notification settings - Fork 2
Zookeeper 数据同步
从图中可以看出,数据同步包括:直接差异同步(DIFF)、先回滚在差异同步(TRUNC + DIFF)、仅回滚(TRUNC)、全量同步(SNAP)
// 该Learner服务器最后处理的ZXID
peerLastZxid = ss.getLastZxid();
// Leader服务器缓存队列 CommittedLog 中最大ZXID
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
// Leader服务器缓存队列 CommittedLog 中最小ZXID
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
// 提议缓存队列
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
场景:peerLastZxid 介于 minCommittedLog 和 maxCommittedLog 之间
-
Leader对于同步的处理: Leader首先会向Learner服务器发送一个DIFF指令,通知Learner服务器,进行差异化数据同步阶段, 同步过程中,对于每个Proposal都会发送一个Proposal内容数据包和Commit指令数据包, 接着会发送一个 NEWLEADER指令,用于通知learner,已经将提案缓存队列中的Proposal都已经同步了,并等待Learner服务器反馈数据同步完成的ACK指令
-
Learner服务器的处理: Learner服务器接收到一个数据同步指令(DIFF、TRUNC、SNAP),告诉自己准备进入数据同步过程,然后依次处理接收到的同步数据包, 对于最后的数据包NEWLEADER指令,需要向Leader反馈数据同步完成指令。
-
Leader接收到Learner反馈的ACK指令,判断是否过半反馈,然后Leader服务器会向完成数据同步的Learner服务器发送一个UPTODATE指令,通知Learner服务器完成数据同步
-
Learner服务器收到UPTODATE指令后,会终止数据同步流程,然后向Leader反馈一个ACK指令
举例说明: Leader服务器的提议缓存队列 = [0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005] 而Learner服务器最后处理的ZXID = 0x500000003,进行数据同步时,Leader会依次将0x500000004, 0x500000005两个提议发送给Learner
发送数据包顺序:
发送顺序 | 数据包类型 | 对应ZXID |
---|---|---|
1 | DIFF | 0x500000003 |
2 | PROPOSAL | 0x500000004 |
3 | COMMIT | 0x500000004 |
4 | PROPOSAL | 0x500000005 |
5 | COMMIT | 0x500000005 |
假设A、B、C三台服务器,开始Leader为B,Epoch为5,处理了0x500000001、0x500000002提案,并且已经提交给了Learner服务器
一个新的提案0x500000003,Leader已经写入到Leader本地事务日志中,此时服务器B崩溃了,提案没被发送出去。 接着A、C服务器进行选举,产生新的Leader A,Epoch为6,这时处理了0x600000001、0x600000002提案,这时服务器B启动了,需要与新的 Leader进行数据同步。
这时 peerLastZxid = 0x500000003,maxCommittedLog = 0x600000002,minCommittedLog = 0x500000001 proposals = [0x500000001, 0x500000002, 0x600000001, 0x600000002]
进行赋值: zxidToSend = 0x500000002 updates = 0x500000002
首先发送一个TRUNC指令,且zxid等于服务器B崩溃前新Leader处理的最后提议, 接着发送提议缓存队列中的每个提议,每次发送一个时都会发送一个COMMIT指令。
发送数据包顺序:
发送顺序 | 数据包类型 | 对应ZXID |
---|---|---|
1 | TRUNC | 0x500000002 |
2 | PROPOSAL | 0x600000001 |
3 | COMMIT | 0x600000001 |
4 | PROPOSAL | 0x600000002 |
5 | COMMIT | 0x600000002 |
场景:peerLastZxid 大于 maxCommittedLog
场景1:peerLastZxid 小于 minCommittedLog 场景2:Leader服务器上没有提议缓存队列,peerLastZxid 不等于 Leader服务器数据恢复后得到最大的ZXID
Leader 先发送一个 SNAP 指令,通知Learner将进行全量数据同步, 随后,Leader会从内存数据库中获取全量数据节点和会话超时时间记录器,然后将其序列化后传输给Learner Learner接收到后会进行反序列化,用于与内存数据库中。
源码分析: Leader 数据同步主流程
// org/apache/zookeeper/server/quorum/LearnerHandler.java
public void run() {
try {
// 省略Leader与Learner状态的交互
// 该Learner服务器最后处理的ZXID
peerLastZxid = ss.getLastZxid();
/* the default to send to the follower */
int packetToSend = Leader.SNAP;
long zxidToSend = 0;
long leaderLastZxid = 0;
/** the packets that the follower needs to get updates from **/
long updates = peerLastZxid;
////////////// Leader服务器与Learner服务器进行数据同步
/* we are sending the diff check if we have proposals in memory to be able to
* send a diff to the
*/
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
// Leader服务器缓存队列 CommittedLog 中最大ZXID
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
// Leader服务器缓存队列 CommittedLog 中最小ZXID
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LOG.info("Synchronizing with Follower sid: " + sid
+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
// 提议缓存队列
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
// Leader的ZXID与Learner服务器的ZXID相同,则不需要进行同步
if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
// Follower is already sync with us, send empty diff
LOG.info("leader and follower are in sync, zxid=0x{}",
Long.toHexString(peerLastZxid));
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else if (proposals.size() != 0) {
LOG.debug("proposal size is {}", proposals.size());
// peerLastZxid 介于 minCommittedLog 和 maxCommittedLog 之间
if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
LOG.debug("Sending proposals to follower");
// as we look through proposals, this variable keeps track of previous
// proposal Id.
long prevProposalZxid = minCommittedLog;
// Keep track of whether we are about to send the first packet.
// Before sending the first packet, we have to tell the learner
// whether to expect a trunc or a diff
boolean firstPacket=true;
packetToSend = Leader.DIFF;
zxidToSend = maxCommittedLog;
for (Proposal propose: proposals) {
// 跳过已经处理的提案
if (propose.packet.getZxid() <= peerLastZxid) {
prevProposalZxid = propose.packet.getZxid();
continue;
} else {
if (firstPacket) {
firstPacket = false;
// 先回滚处理后差异同步,确认回滚点
if (prevProposalZxid < peerLastZxid) {
packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
}
}
// 提案 Proposal 数据包 放入队列中等待发送
queuePacket(propose.packet);
// 提案 Proposal 的 COMMIT 指令包加入队列中等待发送
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
queuePacket(qcommit);
}
}
// 如果最后的ZXID比新Leader的ZXID大,此时直接进行回滚操作
} else if (peerLastZxid > maxCommittedLog) {
LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
Long.toHexString(maxCommittedLog),
Long.toHexString(updates));
packetToSend = Leader.TRUNC;
zxidToSend = maxCommittedLog;
updates = zxidToSend;
} else {
LOG.warn("Unhandled proposal scenario");
}
} else {
// just let the state transfer happen
LOG.debug("proposals is empty");
}
LOG.info("Sending " + Leader.getPacketType(packetToSend));
leaderLastZxid = leader.startForwarding(this, updates);
} finally {
rl.unlock();
}
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
ZxidUtils.makeZxid(newEpoch, 0), null, null);
if (getVersion() < 0x10000) {
oa.writeRecord(newLeaderQP, "packet");
} else {
// 加入 NEWLEADER 指令,用于通知 Learner,已经将缓存队列里面的提案都同步给自己了
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
//Need to set the zxidToSend to the latest zxid
if (packetToSend == Leader.SNAP) {
zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
}
// 首先会发送一个数据同步指令(DIFF、TUNC、SNAP)给learner服务器,报文中包含最后
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
// 进行全量数据同步
if (packetToSend == Leader.SNAP) {
// 从内存数据库中获取全量数据节点和会话超时时间记录器,然后将其序列化
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();
// Start sending packets
// 创建并启动一个新的线程用于发送 Proposal 数据
new Thread() {
public void run() {
Thread.currentThread().setName(
"Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption",e);
}
}
}.start();
qp = new QuorumPacket();
// 等待learner反馈ACK指令
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.ACK){
LOG.error("Next packet was supposed to be an ACK");
return;
}
LOG.info("Received NEWLEADER-ACK message from " + getSid());
// 阻塞,等待 NEWLEADER 指令的过半反馈ACK
leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
syncLimitCheck.start();
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
synchronized(leader.zk){
while(!leader.zk.isRunning() && !this.isInterrupted()){
leader.zk.wait(20);
}
}
// 发送 UPTODATE 指令,告诉Learner终止数据同步操作
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
//
} // catch
}
Learner 服务器接收到数据同步的处理主流程
// org/apache/zookeeper/server/quorum/Learner.java
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
boolean snapshotNeeded = true;
// 阻塞等待leader进行数据同步的第一个数据包
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
// 差异同步
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
// 全量同步
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
// 清除内存数据
zk.getZKDatabase().clear();
// 反序列化快照文件
zk.getZKDatabase().deserializeSnapshot(leaderIs);
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got " + signature);
throw new IOException("Missing signature");
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
// 回滚同步
} else if (qp.getType() == Leader.TRUNC) {
//we need to truncate the log to the lastzxid of the leader
LOG.warn("Truncating log to get in sync with the leader 0x"
+ Long.toHexString(qp.getZxid()));
// 回滚日志操作
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log "
+ Long.toHexString(qp.getZxid()));
System.exit(13);
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
}
else {
LOG.error("Got unexpected packet from leader "
+ qp.getType() + " exiting ... " );
System.exit(13);
}
zk.createSessionTracker();
long lastQueued = 0;
boolean isPreZAB1_0 = true;
boolean writeToTxnLog = !snapshotNeeded;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
// 阻塞,读取提议数据包
readPacket(qp);
switch(qp.getType()) {
case Leader.PROPOSAL: // 提案处理
PacketInFlight pif = new PacketInFlight();
pif.hdr = new TxnHeader();
pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(pif.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
if (!writeToTxnLog) {
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
PacketInFlight packet = new PacketInFlight();
packet.hdr = new TxnHeader();
packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
// Log warning message if txn comes out-of-order
if (packet.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(packet.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.UPTODATE: // UPTODATE 指令,告诉Learner终止数据同步操作
if (isPreZAB1_0) {
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop;
// 通知learner,已经将提案缓存队列中的Proposal都已经同步了。
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
File updating = new File(self.getTxnFactory().getSnapDir(),
QuorumPeer.UPDATING_EPOCH_FILENAME);
if (!updating.exists() && !updating.createNewFile()) {
throw new IOException("Failed to create " +
updating.toString());
}
if (snapshotNeeded) {
zk.takeSnapshot();
}
self.setCurrentEpoch(newEpoch);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
// 反馈Leader 一个NEWLEADER 的ACK指令
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
// 反馈Leader 一个UPTODATE 的ACK指令
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
// 启动Learner服务器
zk.startup();
// 省略其他代码
}
回滚日志操作
// org/apache/zookeeper/server/ZKDatabase.java
public boolean truncateLog(long zxid) throws IOException {
clear();
// truncate the log
boolean truncated = snapLog.truncateLog(zxid);
if (!truncated) {
return false;
}
loadDataBase();
return true;
}
回滚日志
// org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
public boolean truncateLog(long zxid) throws IOException {
// close the existing txnLog and snapLog
close();
// truncate it
FileTxnLog truncLog = new FileTxnLog(dataDir);
boolean truncated = truncLog.truncate(zxid);
truncLog.close();
txnLog = new FileTxnLog(dataDir);
snapLog = new FileSnap(snapDir);
return truncated;
}
回滚
// org/apache/zookeeper/server/persistence/FileTxnLog.java
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
itr = new FileTxnIterator(this.logDir, zxid);
PositionInputStream input = itr.inputStream;
//
long pos = input.getPosition();
// now, truncate at the current position
RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
raf.setLength(pos);
raf.close();
// 下一个日志
while (itr.goToNextLog()) {
if (!itr.logFile.delete()) {
LOG.warn("Unable to truncate {}", itr.logFile);
}
}
} finally {
close(itr);
}
return true;
}