Skip to content

Commit

Permalink
fix bug: marker sequence id is start at 0
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Dec 20, 2024
1 parent 391e0fa commit cc44dc3
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -95,6 +95,7 @@ public class Producer {
AtomicReferenceFieldUpdater.newUpdater(Producer.class, Attributes.class, "attributes");

private final boolean isRemote;
private final boolean isRemoteOrShadow;
private final String remoteCluster;
private final boolean isNonPersistentTopic;
private final boolean isShadowTopic;
Expand Down Expand Up @@ -152,6 +153,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN

String replicatorPrefix = serviceConf.getReplicatorPrefix() + ".";
this.isRemote = producerName.startsWith(replicatorPrefix);
this.isRemoteOrShadow = isRemoteOrShadow(producerName, serviceConf.getReplicatorPrefix());
this.remoteCluster = parseRemoteClusterName(producerName, isRemote, replicatorPrefix);

this.isEncrypted = isEncrypted;
Expand All @@ -163,6 +165,13 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
}

/**
* Difference with "isRemote" is whether the prefix string is end with a dot.
*/
public static boolean isRemoteOrShadow(String producerName, String replicatorPrefix) {
return producerName != null && producerName.startsWith(replicatorPrefix);
}

/**
* Producer name for replicator is in format.
* "replicatorPrefix.localCluster" (old)
Expand Down Expand Up @@ -552,7 +561,7 @@ public void run() {
// stats
producer.stats.recordMsgIn(batchSize, msgSize);
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
if (producer.isRemote() && producer.isSupportsDedupReplV2()) {
if (producer.isRemoteOrShadow && producer.isSupportsDedupReplV2()) {
sendSendReceiptResponseRepl();
} else {
// Repl V1 is the same as normal for this handling.
Expand Down Expand Up @@ -580,16 +589,16 @@ private void sendSendReceiptResponseRepl() {
return;
}
// Case-2: is a repl message.
String replSequenceLIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SEQUENCE_LID));
String replSequenceEIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SEQUENCE_EID));
String replSequenceLIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SOURCE_LID));
String replSequenceEIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SOURCE_EID));
if (!StringUtils.isNumeric(replSequenceLIdStr) || !StringUtils.isNumeric(replSequenceEIdStr)) {
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired"
+ " messages props were are invalid. producer={}. supportsDedupReplV2: {},"
+ " sequence-id {}, prop-{}: {}, prop-{}: {}",
producer.topic.getName(), producer.producerName,
supportsDedupReplV2(), getSequenceId(),
MSG_PROP_REPL_SEQUENCE_LID, replSequenceLIdStr,
MSG_PROP_REPL_SEQUENCE_EID, replSequenceEIdStr);
MSG_PROP_REPL_SOURCE_LID, replSequenceLIdStr,
MSG_PROP_REPL_SOURCE_EID, replSequenceEIdStr);
producer.cnx.getCommandSender().sendSendError(producer.producerId,
Math.max(highestSequenceId, sequenceId),
ServerError.PersistenceError, "Message can not determine whether the message is"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -196,13 +196,10 @@ protected boolean replicateEntries(List<Entry> entries) {
msg.setSchemaInfoForReplicator(schemaFuture.get());
msg.getMessageBuilder().clearTxnidMostBits();
msg.getMessageBuilder().clearTxnidLeastBits();
// Why not use a generated sequence ID that initialized with "-1" when the replicator is starting?
// Because that we should persist the props to the value of the current value of sequence id for
// each acknowledge after publishing for guarantees the sequence id can be recovered after a cursor
// reset that will happen when getting new schema or publish fails, which cost much more.
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SEQUENCE_LID)
// Add props for sequence checking.
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_LID)
.setValue(Long.valueOf(entry.getLedgerId()).toString());
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SEQUENCE_EID)
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_EID)
.setValue(Long.valueOf(entry.getEntryId()).toString());
msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.util.Iterator;
Expand All @@ -43,6 +43,7 @@
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -331,7 +332,7 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
if (!isEnabled() || publishContext.isMarkerMessage()) {
return MessageDupStatus.NotDup;
}
if (publishContext.getProducerName() != null && publishContext.getProducerName().startsWith(replicatorPrefix)) {
if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) {
if (!publishContext.supportsDedupReplV2()){
return isDuplicateReplV1(publishContext, headersAndPayload);
} else {
Expand Down Expand Up @@ -373,7 +374,7 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header
}

// Case-2: is a replicated message.
if (publishContext.getProducerName() != null && publishContext.getProducerName().startsWith(replicatorPrefix)) {
if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) {
// Message is coming from replication, we need to use the replication's producer name, source cluster's
// ledger id and entry id for the purpose of deduplication.
int readerIndex = headersAndPayload.readerIndex();
Expand All @@ -384,18 +385,18 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header
Long replSequenceEId = null;
List<KeyValue> kvPairList = md.getPropertiesList();
for (KeyValue kvPair : kvPairList) {
if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_LID)) {
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_LID)) {
if (StringUtils.isNumeric(kvPair.getValue())) {
replSequenceLId = Long.valueOf(kvPair.getValue());
publishContext.setProperty(MSG_PROP_REPL_SEQUENCE_LID, replSequenceLId);
publishContext.setProperty(MSG_PROP_REPL_SOURCE_LID, replSequenceLId);
} else {
break;
}
}
if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_EID)) {
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_EID)) {
if (StringUtils.isNumeric(kvPair.getValue())) {
replSequenceEId = Long.valueOf(kvPair.getValue());
publishContext.setProperty(MSG_PROP_REPL_SEQUENCE_EID, replSequenceEId);
publishContext.setProperty(MSG_PROP_REPL_SOURCE_EID, replSequenceEId);
} else {
break;
}
Expand All @@ -408,16 +409,16 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header
}

public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf headersAndPayload) {
Long replSequenceLId = (Long) publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_LID);
Long replSequenceEId = (Long) publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_EID);
Long replSequenceLId = (Long) publishContext.getProperty(MSG_PROP_REPL_SOURCE_LID);
Long replSequenceEId = (Long) publishContext.getProperty(MSG_PROP_REPL_SOURCE_EID);
if (replSequenceLId == null || replSequenceEId == null) {
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired messages"
+ " props were are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {},"
+ " prop-{}: {}, prop-{}: {}",
topic.getName(), publishContext.getProducerName(),
publishContext.supportsDedupReplV2(), publishContext.getSequenceId(),
MSG_PROP_REPL_SEQUENCE_LID, replSequenceLId,
MSG_PROP_REPL_SEQUENCE_EID, replSequenceEId);
MSG_PROP_REPL_SOURCE_LID, replSequenceLId,
MSG_PROP_REPL_SOURCE_EID, replSequenceEId);
return MessageDupStatus.Unknown;
}

Expand Down Expand Up @@ -545,16 +546,16 @@ public void recordMessagePersisted(PublishContext publishContext, Position posit
}

public void recordMessagePersistedRepl(PublishContext publishContext, Position position) {
String replSequenceLIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_LID));
String replSequenceEIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_EID));
String replSequenceLIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SOURCE_LID));
String replSequenceEIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SOURCE_EID));
if (!StringUtils.isNumeric(replSequenceLIdStr) || !StringUtils.isNumeric(replSequenceEIdStr)) {
log.error("[{}] Can not persist highest sequence-id due to the acquired messages"
+ " props are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {},"
+ " prop-{}: {}, prop-{}: {}",
topic.getName(), publishContext.getProducerName(),
publishContext.supportsDedupReplV2(), publishContext.getSequenceId(),
MSG_PROP_REPL_SEQUENCE_LID, replSequenceLIdStr,
MSG_PROP_REPL_SEQUENCE_EID, replSequenceEIdStr);
MSG_PROP_REPL_SOURCE_LID, replSequenceLIdStr,
MSG_PROP_REPL_SOURCE_EID, replSequenceEIdStr);
recordMessagePersistedNormal(publishContext, position);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.service.persistent;


import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID;
import io.netty.buffer.ByteBuf;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -98,6 +100,11 @@ protected boolean replicateEntries(List<Entry> entries) {
msg.setReplicatedFrom(localCluster);

msg.setMessageId(new MessageIdImpl(entry.getLedgerId(), entry.getEntryId(), -1));
// Add props for sequence checking.
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_LID)
.setValue(Long.valueOf(entry.getLedgerId()).toString());
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_EID)
.setValue(Long.valueOf(entry.getEntryId()).toString());

headersAndPayload.retain();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
@Slf4j
public class GeoReplicationProducerImpl extends ProducerImpl{

public static final String MSG_PROP_REPL_SEQUENCE_LID = "__MSG_PROP_REPL_SEQUENCE_LID";
public static final String MSG_PROP_REPL_SEQUENCE_EID = "__MSG_PROP_REPL_SEQUENCE_EID";
public static final String MSG_PROP_REPL_SOURCE_LID = "__MSG_PROP_REPL_SOURCE_LID";
public static final String MSG_PROP_REPL_SOURCE_EID = "__MSG_PROP_REPL_SOURCE_EID";
public static final String MSG_PROP_IS_REPL_MARKER = "__MSG_PROP_IS_REPL_MARKER";

private long lastPersistedSourceLedgerId;
Expand Down Expand Up @@ -98,14 +98,14 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI
Long pendingEId = null;
List<KeyValue> kvPairList = op.msg.getMessageBuilder().getPropertiesList();
for (KeyValue kvPair : kvPairList) {
if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_LID)) {
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_LID)) {
if (StringUtils.isNumeric(kvPair.getValue())) {
pendingLId = Long.valueOf(kvPair.getValue());
} else {
break;
}
}
if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_EID)) {
if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_EID)) {
if (StringUtils.isNumeric(kvPair.getValue())) {
pendingEId = Long.valueOf(kvPair.getValue());
} else {
Expand Down Expand Up @@ -154,13 +154,13 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI
protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long isSourceMarker,
long ledgerId, long entryId) {
// Case-1: repeatedly publish repl marker.
long lastSeqReceivedAck = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this);
if (seq <= lastSeqReceivedAck) {
long lastSeqPersisted = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this);
if (lastSeqPersisted != 0 && seq <= lastSeqPersisted) {
// Ignoring the ack since it's referring to a message that has already timed out.
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received an repl marker send receipt[repeated]. seq: {}, seqPersisted: {},"
+ " isSourceMarker: {}, target entry: {}:{}",
topic, producerName, seq, lastSeqReceivedAck, isSourceMarker, ledgerId, entryId);
topic, producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId);
}
return;
}
Expand All @@ -173,7 +173,7 @@ protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received an repl marker send receipt[expected]. seq: {}, seqPersisted: {},"
+ " isReplMarker: {}, target entry: {}:{}",
topic, producerName, seq, lastSeqReceivedAck, isSourceMarker, ledgerId, entryId);
topic, producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId);
}
long calculatedSeq = getHighestSequenceId(op);
LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, calculatedSeq));
Expand All @@ -189,7 +189,7 @@ protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long
+ " sequenceIdPersisted: %s, lastInProgressSend: %s,"
+ " isSourceMarker: %s, target entry: %s:%s, queue-size: %s",
topic, producerName, seq, pendingMsgIsReplMarker ? op.sequenceId : "unknown",
lastSeqReceivedAck, lastInProgressSend,
lastSeqPersisted, lastInProgressSend,
isSourceMarker, ledgerId, entryId, pendingMessages.messagesCount()
);
if (seq < lastInProgressSend) {
Expand Down

0 comments on commit cc44dc3

Please sign in to comment.