From e1e08f6dc4488e160fbd55544dd2be0fa7256c5a Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Thu, 19 Dec 2024 14:28:11 -0600 Subject: [PATCH] Allow custom internode outbound metrics --- .../config/CassandraRelevantProperties.java | 6 ++++ .../cassandra/net/OutboundConnections.java | 28 +++++++++++++++---- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 955429aa408b..27bfb81b5a96 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -328,6 +328,12 @@ public enum CassandraRelevantProperties */ CUSTOM_MESSAGING_METRICS_PROVIDER_PROPERTY("cassandra.custom_messaging_metrics_provider_class"), + /** + * Which class to use for internode metrics for {@link org.apache.cassandra.net.OutboundConnections}. + * The provided class name must point to an implementation of {@link org.apache.cassandra.metrics.InternodeOutboundMetrics}. + */ + CUSTOM_INTERNODE_OUTBOUND_METRICS_PROVIDER_PROPERTY("cassandra.custom_internode_outbound_metrics_provider_class"), + /** * Which class to use for creating guardrails */ diff --git a/src/java/org/apache/cassandra/net/OutboundConnections.java b/src/java/org/apache/cassandra/net/OutboundConnections.java index 20378fa6d34f..0e5e84360980 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnections.java +++ b/src/java/org/apache/cassandra/net/OutboundConnections.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.net; +import java.lang.reflect.InvocationTargetException; import java.nio.channels.ClosedChannelException; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -32,14 +33,17 @@ import com.carrotsearch.hppc.ObjectObjectHashMap; import io.netty.util.concurrent.Future; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.Config; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.InternodeOutboundMetrics; import org.apache.cassandra.nodes.Nodes; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.SimpleCondition; +import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_INTERNODE_OUTBOUND_METRICS_PROVIDER_PROPERTY; import static org.apache.cassandra.net.MessagingService.current_version; import static org.apache.cassandra.net.ConnectionType.URGENT_MESSAGES; import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES; @@ -95,11 +99,25 @@ static OutboundConnections tryRegister(ConcurrentMap if (existing == null) { - connections.metrics = new InternodeOutboundMetrics(settings.to, connections); - connections.metricsReady.signalAll(); - } - else - { + if (CUSTOM_INTERNODE_OUTBOUND_METRICS_PROVIDER_PROPERTY.isPresent()) + { + Class klass = FBUtilities.classForName(CUSTOM_INTERNODE_OUTBOUND_METRICS_PROVIDER_PROPERTY.getString(), "Internode Outbound Metrics Provider"); + InternodeOutboundMetrics obInstance; + try + { + obInstance = klass.getDeclaredConstructor(InetAddressAndPort.class, OutboundConnections.class).newInstance(settings.to, connections); + } + catch (NoSuchMethodException | InstantiationException | IllegalAccessException | + InvocationTargetException e) + { + throw new RuntimeException(e); + } + connections.metrics = obInstance; + } + else + { + connections.metrics = new InternodeOutboundMetrics(settings.to, connections); + } connections.metricsReady.signalAll(); connections.close(false); connections = existing;