Skip to content

Commit

Permalink
scylla-jmx: Use registration checker objects
Browse files Browse the repository at this point in the history
Fixes scylladb#134
Refs scylladb#135

Replaces previous refresh calls with ones bound to registration
check objects, which provides some sync between threads doing
refresh, and reduced redundant calls.

Also adds repeated reaping of dead objects, i.e. every 5 minutes
we try to remove dead CF:s (not adding new ones), to reduce
idle footprint.
  • Loading branch information
elcallio committed Sep 1, 2020
1 parent eb2b8a6 commit 82d0aa8
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 87 deletions.
57 changes: 31 additions & 26 deletions src/main/java/com/scylladb/jmx/metrics/APIMBean.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scylladb.jmx.metrics;

import java.lang.reflect.Field;
import java.util.EnumSet;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -55,35 +56,39 @@ public APIMBean(String mbeanName, APIClient client) {
* @param generator
* {@link Function} to create a new MBean instance for a given
* {@link ObjectName}
*
* @return
* @throws MalformedObjectNameException
*/
public static boolean checkRegistration(JmxMBeanServer server, Set<ObjectName> all,
final Predicate<ObjectName> predicate, Function<ObjectName, Object> generator)
throws MalformedObjectNameException {
Set<ObjectName> registered = queryNames(server, predicate);
for (ObjectName name : registered) {
if (!all.contains(name)) {
try {
server.getMBeanServerInterceptor().unregisterMBean(name);
} catch (MBeanRegistrationException | InstanceNotFoundException e) {
}
}
}

int added = 0;
for (ObjectName name : all) {
if (!registered.contains(name)) {
try {
server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name);
added++;
} catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
}
}
}
return added > 0;
}
public static boolean checkRegistration(JmxMBeanServer server, Set<ObjectName> all,
EnumSet<RegistrationMode> mode, final Predicate<ObjectName> predicate,
Function<ObjectName, Object> generator) throws MalformedObjectNameException {
Set<ObjectName> registered = queryNames(server, predicate);
if (mode.contains(RegistrationMode.Remove)) {
for (ObjectName name : registered) {
if (!all.contains(name)) {
try {
server.getMBeanServerInterceptor().unregisterMBean(name);
} catch (MBeanRegistrationException | InstanceNotFoundException e) {
}
}
}
}

int added = 0;
if (mode.contains(RegistrationMode.Add)) {
for (ObjectName name : all) {
if (!registered.contains(name)) {
try {
server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name);
added++;
} catch (InstanceAlreadyExistsException | MBeanRegistrationException
| NotCompliantMBeanException e) {
}
}
}
}
return added > 0;
}

/**
* Helper method to query {@link ObjectName}s from an {@link MBeanServer}
Expand Down
39 changes: 28 additions & 11 deletions src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.scylladb.jmx.utils;

import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.MINUTES;

import java.io.ObjectInputStream;
import java.net.UnknownHostException;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -34,19 +39,34 @@
import org.apache.cassandra.metrics.StreamingMetrics;

import com.scylladb.jmx.api.APIClient;
import com.scylladb.jmx.metrics.RegistrationChecker;
import com.sun.jmx.mbeanserver.JmxMBeanServer;

@SuppressWarnings("restriction")
public class APIMBeanServer implements MBeanServer {
@SuppressWarnings("unused")
private static final Logger logger = Logger.getLogger(APIMBeanServer.class.getName());
private static final ScheduledExecutorService executor = newScheduledThreadPool(1);

private final RegistrationChecker columnFamilyStoreChecker = ColumnFamilyStore.createRegistrationChecker();
private final RegistrationChecker streamingMetricsChecker = StreamingMetrics.createRegistrationChecker();

private final APIClient client;
private final JmxMBeanServer server;

public APIMBeanServer(APIClient client, JmxMBeanServer server) {
this.client = client;
this.server = server;

executor.scheduleWithFixedDelay(() -> {
for (RegistrationChecker c : asList(columnFamilyStoreChecker, streamingMetricsChecker)) {
try {
c.reap(client, server);
} catch (OperationsException | UnknownHostException e) {
// TODO: log?
}
}
}, 1, 5, MINUTES);
}

private static ObjectInstance prepareForRemote(final ObjectInstance i) {
Expand All @@ -65,7 +85,7 @@ private static ObjectName prepareForRemote(final ObjectName n) {
throw new IllegalArgumentException(n.toString());
}
}

@Override
public ObjectInstance createMBean(String className, ObjectName name) throws ReflectionException,
InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException {
Expand Down Expand Up @@ -286,25 +306,22 @@ public ClassLoaderRepository getClassLoaderRepository() {
}

static final Pattern tables = Pattern.compile("^\\*?((Index)?ColumnFamil(ies|y)|(Index)?(Table(s)?)?)$");

private boolean checkRegistrations(ObjectName name) {
private void checkRegistrations(ObjectName name) {
if (name != null && server.isRegistered(name)) {
return false;
return;
}

boolean result = false;


try {
String type = name != null ? name.getKeyProperty("type") : null;
if (type == null || tables.matcher(type).matches()) {
result |= ColumnFamilyStore.checkRegistration(client, server);
columnFamilyStoreChecker.check(client, server);
}
if (type == null || StreamingMetrics.TYPE_NAME.equals(type)) {
result |= StreamingMetrics.checkRegistration(client, server);
streamingMetricsChecker.check(client, server);
}
} catch (MalformedObjectNameException | UnknownHostException e) {
} catch (OperationsException | UnknownHostException e) {
// TODO: log
}
return result;
}
}
29 changes: 20 additions & 9 deletions src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.StringReader;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -50,6 +51,7 @@
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.OperationsException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
Expand All @@ -65,6 +67,8 @@

import com.scylladb.jmx.api.APIClient;
import com.scylladb.jmx.metrics.MetricsMBean;
import com.scylladb.jmx.metrics.RegistrationChecker;
import com.scylladb.jmx.metrics.RegistrationMode;
import com.sun.jmx.mbeanserver.JmxMBeanServer;
import com.google.common.base.Throwables;

Expand Down Expand Up @@ -182,15 +186,22 @@ private static ObjectName getName(String type, String keyspace, String name) thr
"org.apache.cassandra.db:type=" + type + ",keyspace=" + keyspace + ",columnfamily=" + name);
}

public static boolean checkRegistration(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException {
JsonArray mbeans = client.getJsonArray("/column_family/");
Set<ObjectName> all = new HashSet<ObjectName>();
for (int i = 0; i < mbeans.size(); i++) {
JsonObject mbean = mbeans.getJsonObject(i);
all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf")));
}
return checkRegistration(server, all, n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n));
}
public static RegistrationChecker createRegistrationChecker() {
return new RegistrationChecker() {
@Override
protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet<RegistrationMode> mode)
throws OperationsException {
JsonArray mbeans = client.getJsonArray("/column_family/");
Set<ObjectName> all = new HashSet<ObjectName>();
for (int i = 0; i < mbeans.size(); i++) {
JsonObject mbean = mbeans.getJsonObject(i);
all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf")));
}
checkRegistration(server, all, mode,
n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n));
}
};
}

/**
* @return the name of the column family
Expand Down
84 changes: 43 additions & 41 deletions src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@
package org.apache.cassandra.metrics;

import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static org.apache.cassandra.metrics.DefaultNameFactory.createMetricName;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;

import javax.json.JsonArray;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.OperationsException;

import com.scylladb.jmx.api.APIClient;
import com.scylladb.jmx.metrics.APIMBean;
import com.scylladb.jmx.metrics.RegistrationChecker;
import com.scylladb.jmx.metrics.RegistrationMode;
import com.sun.jmx.mbeanserver.JmxMBeanServer;

/**
Expand All @@ -64,46 +67,45 @@ private StreamingMetrics() {
private static boolean isStreamingName(ObjectName n) {
return TYPE_NAME.equals(n.getKeyProperty("type"));
}

public static RegistrationChecker createRegistrationChecker() {
return new RegistrationChecker() {
@Override
protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet<RegistrationMode> mode) throws OperationsException, UnknownHostException {
Set<ObjectName> all = new HashSet<ObjectName>(globalNames);
JsonArray streams = client.getJsonArray("/stream_manager/");
for (int i = 0; i < streams.size(); i++) {
JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
for (int j = 0; j < sessions.size(); j++) {
String peer = sessions.getJsonObject(j).getString("peer");
String scope = InetAddress.getByName(peer).getHostAddress().replaceAll(":", ".");
all.add(createMetricName(TYPE_NAME, "IncomingBytes", scope));
all.add(createMetricName(TYPE_NAME, "OutgoingBytes", scope));
}
}

public static void unregister(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException {
APIMBean.checkRegistration(server, emptySet(), StreamingMetrics::isStreamingName, (n) -> null);
}

public static boolean checkRegistration(APIClient client, JmxMBeanServer server)
throws MalformedObjectNameException, UnknownHostException {

Set<ObjectName> all = new HashSet<ObjectName>(globalNames);
JsonArray streams = client.getJsonArray("/stream_manager/");
for (int i = 0; i < streams.size(); i++) {
JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
for (int j = 0; j < sessions.size(); j++) {
String peer = sessions.getJsonObject(j).getString("peer");
String scope = InetAddress.getByName(peer).getHostAddress().replaceAll(":", ".");
all.add(createMetricName(TYPE_NAME, "IncomingBytes", scope));
all.add(createMetricName(TYPE_NAME, "OutgoingBytes", scope));
}
}

MetricsRegistry registry = new MetricsRegistry(client, server);
return APIMBean.checkRegistration(server, all, StreamingMetrics::isStreamingName, n -> {
String scope = n.getKeyProperty("scope");
String name = n.getKeyProperty("name");
MetricsRegistry registry = new MetricsRegistry(client, server);
APIMBean.checkRegistration(server, all, mode, StreamingMetrics::isStreamingName, n -> {
String scope = n.getKeyProperty("scope");
String name = n.getKeyProperty("name");

String url = null;
if ("ActiveOutboundStreams".equals(name)) {
url = "/stream_manager/metrics/outbound";
} else if ("IncomingBytes".equals(name) || "TotalIncomingBytes".equals(name)) {
url = "/stream_manager/metrics/incoming";
} else if ("OutgoingBytes".equals(name) || "TotalOutgoingBytes".equals(name)) {
url = "/stream_manager/metrics/outgoing";
}
if (url == null) {
throw new IllegalArgumentException();
}
if (scope != null) {
url = url + "/" + scope;
}
return registry.counter(url);
});
}
String url = null;
if ("ActiveOutboundStreams".equals(name)) {
url = "/stream_manager/metrics/outbound";
} else if ("IncomingBytes".equals(name) || "TotalIncomingBytes".equals(name)) {
url = "/stream_manager/metrics/incoming";
} else if ("OutgoingBytes".equals(name) || "TotalOutgoingBytes".equals(name)) {
url = "/stream_manager/metrics/outgoing";
}
if (url == null) {
throw new IllegalArgumentException();
}
if (scope != null) {
url = url + "/" + scope;
}
return registry.counter(url);
});
}
};
}
}

0 comments on commit 82d0aa8

Please sign in to comment.