Skip to content

Commit

Permalink
Allow custom internode outbound metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
driftx committed Dec 20, 2024
1 parent 315a0e1 commit e1e08f6
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ public enum CassandraRelevantProperties
*/
CUSTOM_MESSAGING_METRICS_PROVIDER_PROPERTY("cassandra.custom_messaging_metrics_provider_class"),

/**
* Which class to use for internode metrics for {@link org.apache.cassandra.net.OutboundConnections}.
* The provided class name must point to an implementation of {@link org.apache.cassandra.metrics.InternodeOutboundMetrics}.
*/
CUSTOM_INTERNODE_OUTBOUND_METRICS_PROVIDER_PROPERTY("cassandra.custom_internode_outbound_metrics_provider_class"),

/**
* Which class to use for creating guardrails
*/
Expand Down
28 changes: 23 additions & 5 deletions src/java/org/apache/cassandra/net/OutboundConnections.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.cassandra.net;

import java.lang.reflect.InvocationTargetException;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -32,14 +33,17 @@

import com.carrotsearch.hppc.ObjectObjectHashMap;
import io.netty.util.concurrent.Future;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.InternodeOutboundMetrics;
import org.apache.cassandra.nodes.Nodes;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.SimpleCondition;

import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_INTERNODE_OUTBOUND_METRICS_PROVIDER_PROPERTY;
import static org.apache.cassandra.net.MessagingService.current_version;
import static org.apache.cassandra.net.ConnectionType.URGENT_MESSAGES;
import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES;
Expand Down Expand Up @@ -95,11 +99,25 @@ static <K> OutboundConnections tryRegister(ConcurrentMap<K, OutboundConnections>

if (existing == null)
{
connections.metrics = new InternodeOutboundMetrics(settings.to, connections);
connections.metricsReady.signalAll();
}
else
{
if (CUSTOM_INTERNODE_OUTBOUND_METRICS_PROVIDER_PROPERTY.isPresent())
{
Class<InternodeOutboundMetrics> klass = FBUtilities.classForName(CUSTOM_INTERNODE_OUTBOUND_METRICS_PROVIDER_PROPERTY.getString(), "Internode Outbound Metrics Provider");
InternodeOutboundMetrics obInstance;
try
{
obInstance = klass.getDeclaredConstructor(InetAddressAndPort.class, OutboundConnections.class).newInstance(settings.to, connections);
}
catch (NoSuchMethodException | InstantiationException | IllegalAccessException |
InvocationTargetException e)
{
throw new RuntimeException(e);
}
connections.metrics = obInstance;
}
else
{
connections.metrics = new InternodeOutboundMetrics(settings.to, connections);
}
connections.metricsReady.signalAll();
connections.close(false);
connections = existing;
Expand Down

0 comments on commit e1e08f6

Please sign in to comment.