diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index cd1006f12803e..e216a2d7b20cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -21,8 +21,8 @@ import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER; import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER; -import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID; -import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID; import static org.apache.pulsar.common.protocol.Commands.hasChecksum; import static org.apache.pulsar.common.protocol.Commands.readChecksum; import com.google.common.annotations.VisibleForTesting; @@ -95,6 +95,7 @@ public class Producer { AtomicReferenceFieldUpdater.newUpdater(Producer.class, Attributes.class, "attributes"); private final boolean isRemote; + private final boolean isRemoteOrShadow; private final String remoteCluster; private final boolean isNonPersistentTopic; private final boolean isShadowTopic; @@ -152,6 +153,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN String replicatorPrefix = serviceConf.getReplicatorPrefix() + "."; this.isRemote = producerName.startsWith(replicatorPrefix); + this.isRemoteOrShadow = isRemoteOrShadow(producerName, serviceConf.getReplicatorPrefix()); this.remoteCluster = parseRemoteClusterName(producerName, isRemote, replicatorPrefix); this.isEncrypted = isEncrypted; @@ -163,6 +165,13 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.brokerInterceptor = cnx.getBrokerService().getInterceptor(); } + /** + * Difference with "isRemote" is whether the prefix string is end with a dot. + */ + public static boolean isRemoteOrShadow(String producerName, String replicatorPrefix) { + return producerName != null && producerName.startsWith(replicatorPrefix); + } + /** * Producer name for replicator is in format. * "replicatorPrefix.localCluster" (old) @@ -552,7 +561,7 @@ public void run() { // stats producer.stats.recordMsgIn(batchSize, msgSize); producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); - if (producer.isRemote() && producer.isSupportsDedupReplV2()) { + if (producer.isRemoteOrShadow && producer.isSupportsDedupReplV2()) { sendSendReceiptResponseRepl(); } else { // Repl V1 is the same as normal for this handling. @@ -580,16 +589,16 @@ private void sendSendReceiptResponseRepl() { return; } // Case-2: is a repl message. - String replSequenceLIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SEQUENCE_LID)); - String replSequenceEIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SEQUENCE_EID)); + String replSequenceLIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SOURCE_LID)); + String replSequenceEIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SOURCE_EID)); if (!StringUtils.isNumeric(replSequenceLIdStr) || !StringUtils.isNumeric(replSequenceEIdStr)) { log.error("[{}] Message can not determine whether the message is duplicated due to the acquired" + " messages props were are invalid. producer={}. supportsDedupReplV2: {}," + " sequence-id {}, prop-{}: {}, prop-{}: {}", producer.topic.getName(), producer.producerName, supportsDedupReplV2(), getSequenceId(), - MSG_PROP_REPL_SEQUENCE_LID, replSequenceLIdStr, - MSG_PROP_REPL_SEQUENCE_EID, replSequenceEIdStr); + MSG_PROP_REPL_SOURCE_LID, replSequenceLIdStr, + MSG_PROP_REPL_SOURCE_EID, replSequenceEIdStr); producer.cnx.getCommandSender().sendSendError(producer.producerId, Math.max(highestSequenceId, sequenceId), ServerError.PersistenceError, "Message can not determine whether the message is" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index fca79551ac3a3..7c3afdd8e496a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; -import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID; -import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID; import io.netty.buffer.ByteBuf; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -196,13 +196,10 @@ protected boolean replicateEntries(List entries) { msg.setSchemaInfoForReplicator(schemaFuture.get()); msg.getMessageBuilder().clearTxnidMostBits(); msg.getMessageBuilder().clearTxnidLeastBits(); - // Why not use a generated sequence ID that initialized with "-1" when the replicator is starting? - // Because that we should persist the props to the value of the current value of sequence id for - // each acknowledge after publishing for guarantees the sequence id can be recovered after a cursor - // reset that will happen when getting new schema or publish fails, which cost much more. - msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SEQUENCE_LID) + // Add props for sequence checking. + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_LID) .setValue(Long.valueOf(entry.getLedgerId()).toString()); - msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SEQUENCE_EID) + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_EID) .setValue(Long.valueOf(entry.getEntryId()).toString()); msgOut.recordEvent(headersAndPayload.readableBytes()); stats.incrementMsgOutCounter(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index e97a3af3777c1..b2486449ec07f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -19,8 +19,8 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_IS_REPL_MARKER; -import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID; -import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.util.Iterator; @@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Topic.PublishContext; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -331,7 +332,7 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade if (!isEnabled() || publishContext.isMarkerMessage()) { return MessageDupStatus.NotDup; } - if (publishContext.getProducerName() != null && publishContext.getProducerName().startsWith(replicatorPrefix)) { + if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) { if (!publishContext.supportsDedupReplV2()){ return isDuplicateReplV1(publishContext, headersAndPayload); } else { @@ -373,7 +374,7 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header } // Case-2: is a replicated message. - if (publishContext.getProducerName() != null && publishContext.getProducerName().startsWith(replicatorPrefix)) { + if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) { // Message is coming from replication, we need to use the replication's producer name, source cluster's // ledger id and entry id for the purpose of deduplication. int readerIndex = headersAndPayload.readerIndex(); @@ -384,18 +385,18 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header Long replSequenceEId = null; List kvPairList = md.getPropertiesList(); for (KeyValue kvPair : kvPairList) { - if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_LID)) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_LID)) { if (StringUtils.isNumeric(kvPair.getValue())) { replSequenceLId = Long.valueOf(kvPair.getValue()); - publishContext.setProperty(MSG_PROP_REPL_SEQUENCE_LID, replSequenceLId); + publishContext.setProperty(MSG_PROP_REPL_SOURCE_LID, replSequenceLId); } else { break; } } - if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_EID)) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_EID)) { if (StringUtils.isNumeric(kvPair.getValue())) { replSequenceEId = Long.valueOf(kvPair.getValue()); - publishContext.setProperty(MSG_PROP_REPL_SEQUENCE_EID, replSequenceEId); + publishContext.setProperty(MSG_PROP_REPL_SOURCE_EID, replSequenceEId); } else { break; } @@ -408,16 +409,16 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header } public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf headersAndPayload) { - Long replSequenceLId = (Long) publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_LID); - Long replSequenceEId = (Long) publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_EID); + Long replSequenceLId = (Long) publishContext.getProperty(MSG_PROP_REPL_SOURCE_LID); + Long replSequenceEId = (Long) publishContext.getProperty(MSG_PROP_REPL_SOURCE_EID); if (replSequenceLId == null || replSequenceEId == null) { log.error("[{}] Message can not determine whether the message is duplicated due to the acquired messages" + " props were are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {}," + " prop-{}: {}, prop-{}: {}", topic.getName(), publishContext.getProducerName(), publishContext.supportsDedupReplV2(), publishContext.getSequenceId(), - MSG_PROP_REPL_SEQUENCE_LID, replSequenceLId, - MSG_PROP_REPL_SEQUENCE_EID, replSequenceEId); + MSG_PROP_REPL_SOURCE_LID, replSequenceLId, + MSG_PROP_REPL_SOURCE_EID, replSequenceEId); return MessageDupStatus.Unknown; } @@ -545,16 +546,16 @@ public void recordMessagePersisted(PublishContext publishContext, Position posit } public void recordMessagePersistedRepl(PublishContext publishContext, Position position) { - String replSequenceLIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_LID)); - String replSequenceEIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_EID)); + String replSequenceLIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SOURCE_LID)); + String replSequenceEIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SOURCE_EID)); if (!StringUtils.isNumeric(replSequenceLIdStr) || !StringUtils.isNumeric(replSequenceEIdStr)) { log.error("[{}] Can not persist highest sequence-id due to the acquired messages" + " props are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {}," + " prop-{}: {}, prop-{}: {}", topic.getName(), publishContext.getProducerName(), publishContext.supportsDedupReplV2(), publishContext.getSequenceId(), - MSG_PROP_REPL_SEQUENCE_LID, replSequenceLIdStr, - MSG_PROP_REPL_SEQUENCE_EID, replSequenceEIdStr); + MSG_PROP_REPL_SOURCE_LID, replSequenceLIdStr, + MSG_PROP_REPL_SOURCE_EID, replSequenceEIdStr); recordMessagePersistedNormal(publishContext, position); return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index 65bcbfd131f12..b89e03c1f60ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_EID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SOURCE_LID; import io.netty.buffer.ByteBuf; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -98,6 +100,11 @@ protected boolean replicateEntries(List entries) { msg.setReplicatedFrom(localCluster); msg.setMessageId(new MessageIdImpl(entry.getLedgerId(), entry.getEntryId(), -1)); + // Add props for sequence checking. + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_LID) + .setValue(Long.valueOf(entry.getLedgerId()).toString()); + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SOURCE_EID) + .setValue(Long.valueOf(entry.getEntryId()).toString()); headersAndPayload.retain(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java index 289941a761a46..0e3365b5561f0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java @@ -33,8 +33,8 @@ @Slf4j public class GeoReplicationProducerImpl extends ProducerImpl{ - public static final String MSG_PROP_REPL_SEQUENCE_LID = "__MSG_PROP_REPL_SEQUENCE_LID"; - public static final String MSG_PROP_REPL_SEQUENCE_EID = "__MSG_PROP_REPL_SEQUENCE_EID"; + public static final String MSG_PROP_REPL_SOURCE_LID = "__MSG_PROP_REPL_SOURCE_LID"; + public static final String MSG_PROP_REPL_SOURCE_EID = "__MSG_PROP_REPL_SOURCE_EID"; public static final String MSG_PROP_IS_REPL_MARKER = "__MSG_PROP_IS_REPL_MARKER"; private long lastPersistedSourceLedgerId; @@ -98,14 +98,14 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI Long pendingEId = null; List kvPairList = op.msg.getMessageBuilder().getPropertiesList(); for (KeyValue kvPair : kvPairList) { - if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_LID)) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_LID)) { if (StringUtils.isNumeric(kvPair.getValue())) { pendingLId = Long.valueOf(kvPair.getValue()); } else { break; } } - if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_EID)) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SOURCE_EID)) { if (StringUtils.isNumeric(kvPair.getValue())) { pendingEId = Long.valueOf(kvPair.getValue()); } else { @@ -154,13 +154,13 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long isSourceMarker, long ledgerId, long entryId) { // Case-1: repeatedly publish repl marker. - long lastSeqReceivedAck = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this); - if (seq <= lastSeqReceivedAck) { + long lastSeqPersisted = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this); + if (lastSeqPersisted != 0 && seq <= lastSeqPersisted) { // Ignoring the ack since it's referring to a message that has already timed out. if (log.isDebugEnabled()) { log.debug("[{}] [{}] Received an repl marker send receipt[repeated]. seq: {}, seqPersisted: {}," + " isSourceMarker: {}, target entry: {}:{}", - topic, producerName, seq, lastSeqReceivedAck, isSourceMarker, ledgerId, entryId); + topic, producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId); } return; } @@ -173,7 +173,7 @@ protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long if (log.isDebugEnabled()) { log.debug("[{}] [{}] Received an repl marker send receipt[expected]. seq: {}, seqPersisted: {}," + " isReplMarker: {}, target entry: {}:{}", - topic, producerName, seq, lastSeqReceivedAck, isSourceMarker, ledgerId, entryId); + topic, producerName, seq, lastSeqPersisted, isSourceMarker, ledgerId, entryId); } long calculatedSeq = getHighestSequenceId(op); LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, calculatedSeq)); @@ -189,7 +189,7 @@ protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long + " sequenceIdPersisted: %s, lastInProgressSend: %s," + " isSourceMarker: %s, target entry: %s:%s, queue-size: %s", topic, producerName, seq, pendingMsgIsReplMarker ? op.sequenceId : "unknown", - lastSeqReceivedAck, lastInProgressSend, + lastSeqPersisted, lastInProgressSend, isSourceMarker, ledgerId, entryId, pendingMessages.messagesCount() ); if (seq < lastInProgressSend) {