diff --git a/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java index dfbfdec7..0af2d244 100644 --- a/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java +++ b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/BrokerTracing.java @@ -64,6 +64,9 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl; +import org.apache.pulsar.common.util.DateFormatter; import org.jetbrains.annotations.NotNull; @Slf4j @@ -351,7 +354,6 @@ public void producerCreated(ServerCnx cnx, Producer producer, Map traceDetails = new TreeMap<>(); - traceDetails.put("serverCnx", getConnectionDetails(cnx)); traceDetails.put("producer", getProducerDetails(producer, traceSchema)); traceDetails.put("metadata", metadata); @@ -365,10 +367,17 @@ public void producerClosed(ServerCnx cnx, Producer producer, Map if (traceLevel == TraceLevel.OFF) return; Map traceDetails = new TreeMap<>(); - traceDetails.put("serverCnx", getConnectionDetails(cnx)); traceDetails.put("producer", getProducerDetails(producer, traceSchema)); traceDetails.put("metadata", metadata); + PublisherStatsImpl stats = producer.getStats(); + traceDetails.put("connectedSince", stats.getConnectedSince()); + traceDetails.put("closedAt", DateFormatter.now()); + traceDetails.put("averageMsgSize", stats.getAverageMsgSize()); + traceDetails.put("msgRateIn", stats.getMsgRateIn()); + traceDetails.put("msgThroughputIn", stats.getMsgThroughputIn()); + // no message count in stats? stats.getCount() is not it + trace(EventReasons.ADMINISTRATIVE, "Producer closed", traceDetails); } @@ -379,7 +388,6 @@ public void consumerCreated(ServerCnx cnx, Consumer consumer, Map traceDetails = new TreeMap<>(); - traceDetails.put("serverCnx", getConnectionDetails(cnx)); traceDetails.put("consumer", getConsumerDetails(consumer)); traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription())); traceDetails.put("metadata", metadata); @@ -394,11 +402,22 @@ public void consumerClosed(ServerCnx cnx, Consumer consumer, Map if (traceLevel == TraceLevel.OFF) return; Map traceDetails = new TreeMap<>(); - traceDetails.put("serverCnx", getConnectionDetails(cnx)); traceDetails.put("consumer", getConsumerDetails(consumer)); traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription())); traceDetails.put("metadata", metadata); + ConsumerStatsImpl stats = consumer.getStats(); + + traceDetails.put("connectedSince", stats.getConnectedSince()); + traceDetails.put("closedAt", DateFormatter.now()); + traceDetails.put("averageMsgSize", stats.getAvgMessagesPerEntry()); + traceDetails.put("msgRateOut", stats.getMsgRateOut()); + traceDetails.put("msgThroughputOut", stats.getMsgThroughputOut()); + traceDetails.put("msgOutCounter", stats.getMsgOutCounter()); + traceDetails.put("bytesOutCounter", stats.getBytesOutCounter()); + traceDetails.put("unackedMessages", stats.getUnackedMessages()); + traceDetails.put("messageAckRate", stats.getMessageAckRate()); + trace(EventReasons.ADMINISTRATIVE, "Consumer closed", traceDetails); } @@ -452,7 +471,7 @@ public void beforeSendMessage( traceDetails.put("entry", getEntryDetails(entry, maxBinaryDataLength)); traceDetails.put("messageMetadata", getMessageMetadataDetails(msgMetadata)); - trace(EventReasons.MESSAGE, "Before sending message", traceDetails); + trace(EventReasons.MESSAGE, "Message read", traceDetails); } public void onMessagePublish( @@ -472,7 +491,7 @@ public void onMessagePublish( "headersAndPayload", headersAndPayload, headersAndPayloadDetails, maxBinaryDataLength); traceDetails.put("payload", headersAndPayloadDetails); - trace(EventReasons.MESSAGE, "Message publish", traceDetails); + trace(EventReasons.MESSAGE, "Message received", traceDetails); } public void messageProduced( @@ -493,7 +512,7 @@ public void messageProduced( traceDetails.put("publishContext", getPublishContextDetails(publishContext)); traceDetails.put("messageId", ledgerId + ":" + entryId); traceDetails.put("startTimeNs", startTimeNs); - trace(EventReasons.MESSAGE, "Message produced", traceDetails); + trace(EventReasons.MESSAGE, "Message stored", traceDetails); } public void messageDispatched( @@ -504,17 +523,18 @@ public void messageDispatched( if (level == TraceLevel.OFF) return; Map traceDetails = new TreeMap<>(); - traceDetails.put("serverCnx", getConnectionDetails(cnx)); traceDetails.put("consumer", getConsumerDetails(consumer)); - traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription())); + if (consumer != null) { + traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription())); + } traceDetails.put("messageId", ledgerId + ":" + entryId); Map headersAndPayloadDetails = new TreeMap<>(); traceByteBuf( - "headersAndPayload", headersAndPayload, headersAndPayloadDetails, maxBinaryDataLength); + "headersAndPayload", headersAndPayload, headersAndPayloadDetails, maxBinaryDataLength); traceDetails.put("payload", headersAndPayloadDetails); - trace(EventReasons.MESSAGE, "After dispatching message", traceDetails); + trace(EventReasons.MESSAGE, "Message dispatched", traceDetails); } public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) { @@ -524,9 +544,10 @@ public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) { if (level == TraceLevel.OFF) return; Map traceDetails = new TreeMap<>(); - traceDetails.put("serverCnx", getConnectionDetails(cnx)); traceDetails.put("consumer", getConsumerDetails(consumer)); - traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription())); + if (consumer != null) { + traceDetails.put("subscription", getSubscriptionDetails(consumer.getSubscription())); + } Map ackDetails = new TreeMap<>(); if (ackCmd.hasAckType()) { diff --git a/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/TracingUtils.java b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/TracingUtils.java index 64e4aa92..25f933e1 100644 --- a/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/TracingUtils.java +++ b/pulsar-jms-tracing/src/main/java/com/datastax/oss/pulsar/jms/tracing/TracingUtils.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; @@ -100,7 +101,7 @@ public enum TraceLevel { private static final LoadingCache ipResolverCache = CacheBuilder.newBuilder() .maximumSize(10_000L) - .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .expireAfterWrite(4, TimeUnit.HOURS) .build( new CacheLoader() { public String load(String clientAddress) { @@ -464,18 +465,15 @@ private static void populateConnectionDetails(ServerCnx cnx, Map if (cnx == null) { return; } - - traceDetails.put("clientAddress", hostNameOf(cnx.clientSourceAddress())); - traceDetails.put("clientSocket", cnx.clientAddress()); + traceDetails.put("clientHost", hostNameOf(cnx.clientSourceAddress())); traceDetails.put("authRole", cnx.getAuthRole()); + traceDetails.put("principal", cnx.getPrincipal()); traceDetails.put("clientVersion", cnx.getClientVersion()); traceDetails.put("clientSourceAddressAndPort", cnx.clientSourceAddressAndPort()); traceDetails.put("authMethod", cnx.getAuthMethod()); - traceDetails.put( - "authMethodName", - cnx.getAuthenticationProvider() == null - ? "no provider" - : cnx.getAuthenticationProvider().getAuthMethodName()); + if (cnx.getAuthenticationProvider() != null) { + traceDetails.put("authMethodName", cnx.getAuthenticationProvider().getAuthMethodName()); + } AuthenticationDataSource authData = cnx.getAuthenticationData(); if (authData != null) { @@ -534,10 +532,10 @@ private static void populateSubscriptionDetails( if (sub.getConsumers() != null) { traceDetails.put("numberOfConsumers", sub.getConsumers().size()); - traceDetails.put( - "namesOfConsumers", - sub.getConsumers().stream().map(Consumer::consumerName).collect(Collectors.toList())); } + traceDetails.put("isReplicated", sub.isReplicated()); + traceDetails.put("numberOfEntriesDelayed", sub.getNumberOfEntriesDelayed()); + traceDetails.put("numberOfEntriesInBacklog", sub.getNumberOfEntriesInBacklog(false)); traceDetails.put("subscriptionProperties", sub.getSubscriptionProperties()); } @@ -567,9 +565,11 @@ private static void populateConsumerDetails(Consumer consumer, Map getProducerDetails(Producer producer, boolean traceSchema) { @@ -597,7 +597,7 @@ private static void populateProducerDetails( "topicName", TopicName.get(producer.getTopic().getName()).getPartitionedTopicName()); } - traceDetails.put("clientAddress", hostNameOf(producer.getClientAddress())); + traceDetails.put("clientHost", hostNameOf(producer.getClientAddress())); traceDetails.put("metadata", producer.getMetadata()); @@ -613,6 +613,8 @@ private static void populateProducerDetails( traceDetails.put("schemaVersion", schemaVersion); } traceDetails.put("remoteCluster", producer.getRemoteCluster()); + + traceDetails.put("authRole", producer.getCnx().getAuthRole()); } public static Map getMessageMetadataDetails(MessageMetadata msgMetadata) {