Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Dec 20, 2024
1 parent 4994026 commit 391e0fa
Showing 1 changed file with 31 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected void ackReceived(ClientCnx cnx, long seq, long highSeq, long ledgerId,
return;
}
// Replicator send markers also, use sequenceId to check the marker send-receipt.
if (isReplicationMarker(entryId)) {
if (isReplicationMarker(highSeq)) {
ackReceivedReplMarker(cnx, op, seq, highSeq, ledgerId, entryId);
return;
}
Expand All @@ -85,8 +85,8 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI
if (sourceLId < lastPersistedSourceLedgerId
|| (sourceLId == lastPersistedSourceLedgerId && sourceEId < lastPersistedSourceEntryId)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Dropped a repl marker SendReceipt. Got entry {}:{}, last persisted source entry:"
+ " {}:{}",
log.debug("[{}] [{}] Received an msg send receipt[repeated]: source entry {}:{}, latest persisted:"
+ " {}:{}",
topic, producerName, sourceLId, sourceEId,
lastPersistedSourceLedgerId, lastPersistedSourceEntryId);
}
Expand Down Expand Up @@ -130,66 +130,69 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI
// Notice: if send messages outs of oder, may lost messages.
// Conclusion: So whether @param-ledgerId and @param-entry-id are "-1" or not, we can remove pending
// message.
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received an msg send receipt[expected]: source entry {}:{}, target entry:"
+ " {}:{}",
topic, producerName, sourceLId, sourceEId,
targetLId, targetEid);
}
lastPersistedSourceLedgerId = sourceLId;
lastPersistedSourceEntryId = sourceEId;
removeAndApplyCallback(op, sourceLId, sourceEId, targetLId, targetEid, false);
return;
}

// Case-3: got null source cluster's entry position, which is unexpected.
if (pendingLId == null || pendingEId == null) {
log.error("[{}] [{}] can not found v2 sequence-id {}:{}, ackReceived: {}:{} {}:{} - queue-size: {}",
topic, producerName, pendingLId, pendingEId, sourceLId,
sourceEId, targetLId, targetEid, pendingMessages.messagesCount());
cnx.channel().close();
return;
}

// Case-4: unknown error, which is unexpected.
log.warn("[{}] [{}] Got ack for msg. expecting {}:{}, but got: {}:{} - queue-size: {}",
topic, producerName, pendingLId, pendingEId, sourceLId,
sourceEId, pendingMessages.messagesCount());
// Force connection closing so that messages can be re-transmitted in a new connection
log.error("[{}] [{}] Received an msg send receipt[error]: source entry {}:{}, target entry: {}:{},"
+ " pending send: {}:{}, queue-size: {}",
topic, producerName, sourceLId, sourceEId, targetLId, targetEid, pendingLId, pendingEId,
pendingMessages.messagesCount());
cnx.channel().close();
}

protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long req, long highReq,
protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long isSourceMarker,
long ledgerId, long entryId) {
// Case-1: repeatedly publish repl marker.
long lastSeqReceivedAck = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this);
if (req <= lastSeqReceivedAck) {
if (seq <= lastSeqReceivedAck) {
// Ignoring the ack since it's referring to a message that has already timed out.
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Dropped a repl marker SendReceipt. sequenceId: {},"
+ " sequenceIdPersisted: {},"
+ " highReq: {}, position: {}:{}",
topic, producerName, req, lastSeqReceivedAck, highReq, ledgerId, entryId);
log.debug("[{}] [{}] Received an repl marker send receipt[repeated]. seq: {}, seqPersisted: {},"
+ " isSourceMarker: {}, target entry: {}:{}",
topic, producerName, seq, lastSeqReceivedAck, isSourceMarker, ledgerId, entryId);
}
return;
}

// Case-2, which is expected:
// 1. Broker responds SendReceipt who is a repl marker.
// 2. The current pending msg is also a marker.
if (isReplicationMarker(op) && req == op.sequenceId) {
boolean pendingMsgIsReplMarker = isReplicationMarker(op);
if (pendingMsgIsReplMarker && seq == op.sequenceId) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received an repl marker send receipt[expected]. seq: {}, seqPersisted: {},"
+ " isReplMarker: {}, target entry: {}:{}",
topic, producerName, seq, lastSeqReceivedAck, isSourceMarker, ledgerId, entryId);
}
long calculatedSeq = getHighestSequenceId(op);
LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, calculatedSeq));
removeAndApplyCallback(op, req, highReq, ledgerId, entryId, true);
removeAndApplyCallback(op, seq, isSourceMarker, ledgerId, entryId, true);
return;
}

// Case-3, which is unexpected.
// Case-3-1: expected a SendError if "seq <= lastInProgressSend".
// Case-3-2: something went wrong.
long lastInProgressSend = LAST_SEQ_ID_PUSHED_UPDATER.get(this);
String logText = String.format("[%s] [%s] Got ack for a repl marker msg. expecting %s, but got: %s."
String logText = String.format("[%s] [%s] Received an repl marker send receipt[error]. seq: %s, seqPending: %s."
+ " sequenceIdPersisted: %s, lastInProgressSend: %s,"
+ " HighReq: %s, position: %s:%s, queue-size: %s",
topic, producerName, lastSeqReceivedAck - 1, req,
+ " isSourceMarker: %s, target entry: %s:%s, queue-size: %s",
topic, producerName, seq, pendingMsgIsReplMarker ? op.sequenceId : "unknown",
lastSeqReceivedAck, lastInProgressSend,
highReq, ledgerId, entryId, pendingMessages.messagesCount()
isSourceMarker, ledgerId, entryId, pendingMessages.messagesCount()
);
if (req < lastInProgressSend) {
if (seq < lastInProgressSend) {
log.warn(logText);
} else {
log.error(logText);
Expand All @@ -200,10 +203,6 @@ protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long req, long

private void removeAndApplyCallback(OpSendMsg op, long lIdSent, long eIdSent, long ledgerId, long entryId,
boolean isMarker) {
if (log.isDebugEnabled()) {
log.debug("Got receipt for producer: [{}] -- source-message: {}:{} -- target-msg: {}:{} -- isMarker: {}",
getProducerName(), lIdSent, eIdSent, ledgerId, entryId, isMarker);
}
pendingMessages.remove();
releaseSemaphoreForSendOp(op);
// Since Geo-Replicator will not send batched message, skip to update the field
Expand Down

0 comments on commit 391e0fa

Please sign in to comment.