Skip to content

Commit

Permalink
Xinfra Monitor - Performance, Optimization, Refactoring, Upgrades, Re…
Browse files Browse the repository at this point in the history
…moval of Deprecations

Xinfra Monitor - Performance, Optimization, Refactoring, Upgrades, Removal of Deprecations






Signed-off-by: Andrew Choi <[email protected]>
  • Loading branch information
Andrew Choi authored Dec 8, 2020
1 parent 59fbcf6 commit 343f6fa
Show file tree
Hide file tree
Showing 13 changed files with 26 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public ClusterTopicManipulationService(String name, AdminClient adminClient, Map
tags.put("name", name);
TopicManagementServiceConfig config = new TopicManagementServiceConfig(props);
String topicFactoryClassName = config.getString(TopicManagementServiceConfig.TOPIC_FACTORY_CLASS_CONFIG);
@SuppressWarnings("rawtypes")
Map topicFactoryConfig =
props.containsKey(TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) ? (Map) props.get(
TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) : new HashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffs

} else if (index < nextIndex) {
_sensors._recordsDuplicated.record();
} else if (index > nextIndex) {
} else { // this will equate to the case where index > nextIndex...
nextIndexes.put(partition, index + 1);
long numLostRecords = index - nextIndex;
_sensors._recordsLost.record(numLostRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public DefaultMetricsReporterServiceFactory(Map properties, String serviceName)

@SuppressWarnings("unchecked")
@Override
public Service createService() throws Exception {
public Service createService() {
return new DefaultMetricsReporterService(_properties, _serviceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,9 @@ private static BrokerMetadata randomBroker(Set<BrokerMetadata> brokers) {
// Using Set enforces the usage of loop which is O(n).
// As the list of brokers does not change in newPartitionAssignments,
// the acceptance of a List argument instead of a Set will be faster which is (O(1))
List<BrokerMetadata> brokerMetadataList = new ArrayList<>();

List<BrokerMetadata> brokerMetadataList = new ArrayList<>(brokers);
// convert to a list so there's no need to create a index and iterate through this set
brokerMetadataList.addAll(brokers);
//addAll() is replaced with parameterized constructor call for better performance..

int brokerSetSize = brokers.size();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
/**
* Service that monitors the commit offset availability of a particular Consumer Group.
*/
@SuppressWarnings("NullableProblems")
public class OffsetCommitService implements Service {

public static final String METRIC_GRP_PREFIX = "xm-offset-commit-service";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void run() {
}
}

@SuppressWarnings("NullableProblems")

private class ProduceServiceThreadFactory implements ThreadFactory {

private final AtomicInteger _threadId = new AtomicInteger();
Expand All @@ -318,7 +318,7 @@ public Thread newThread(Runnable r) {
}

private class HandleNewPartitionsThreadFactory implements ThreadFactory {
public Thread newThread(@SuppressWarnings("NullableProblems") Runnable r) {
public Thread newThread(Runnable r) {
return new Thread(r, _name + "-produce-service-new-partition-handler");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public StatsdMetricsReporterServiceFactory(Map properties, String name) {
}

@Override
public Service createService() throws Exception {
public Service createService() {

//noinspection unchecked
return new StatsdMetricsReporterService(_properties, _name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,13 +37,13 @@ public CommitAvailabilityMetrics(final Metrics metrics, final Map<String, String
LOG.info("{} called.", this.getClass().getSimpleName());
_offsetsCommitted = metrics.sensor("offsets-committed");
_offsetsCommitted.add(new MetricName("offsets-committed-total", METRIC_GROUP_NAME,
"The total number of offsets per second that are committed.", tags), new Total());
"The total number of offsets per second that are committed.", tags), new CumulativeSum());

_failedCommitOffsets = metrics.sensor("failed-commit-offsets");
_failedCommitOffsets.add(new MetricName("failed-commit-offsets-avg", METRIC_GROUP_NAME,
"The average number of offsets per second that have failed.", tags), new Rate());
_failedCommitOffsets.add(new MetricName("failed-commit-offsets-total", METRIC_GROUP_NAME,
"The total number of offsets per second that have failed.", tags), new Total());
"The total number of offsets per second that have failed.", tags), new CumulativeSum());

metrics.addMetric(new MetricName("offsets-committed-avg", METRIC_GROUP_NAME, "The average offset commits availability.", tags),
(MetricConfig config, long now) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public CommitLatencyMetrics(Metrics metrics, Map<String, String> tags, int laten
* start the recording of consumer offset commit
* @throws Exception if the offset commit is already in progress.
*/
public void recordCommitStart() throws Exception {
public void recordCommitStart() {
if (!_inProgressCommit) {
this.setCommitStartTimeMs(System.currentTimeMillis());
_inProgressCommit = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,23 +43,23 @@ public ConsumeMetrics(final Metrics metrics, Map<String, String> tags, int laten

_consumeError = metrics.sensor("consume-error");
_consumeError.add(new MetricName("consume-error-rate", METRIC_GROUP_NAME, "The average number of errors per second", tags), new Rate());
_consumeError.add(new MetricName("consume-error-total", METRIC_GROUP_NAME, "The total number of errors", tags), new Total());
_consumeError.add(new MetricName("consume-error-total", METRIC_GROUP_NAME, "The total number of errors", tags), new CumulativeSum());

_recordsConsumed = metrics.sensor("records-consumed");
_recordsConsumed.add(new MetricName("records-consumed-rate", METRIC_GROUP_NAME, "The average number of records per second that are consumed", tags), new Rate());
_recordsConsumed.add(new MetricName("records-consumed-total", METRIC_GROUP_NAME, "The total number of records that are consumed", tags), new Total());
_recordsConsumed.add(new MetricName("records-consumed-total", METRIC_GROUP_NAME, "The total number of records that are consumed", tags), new CumulativeSum());

_recordsDuplicated = metrics.sensor("records-duplicated");
_recordsDuplicated.add(new MetricName("records-duplicated-rate", METRIC_GROUP_NAME, "The average number of records per second that are duplicated", tags), new Rate());
_recordsDuplicated.add(new MetricName("records-duplicated-total", METRIC_GROUP_NAME, "The total number of records that are duplicated", tags), new Total());
_recordsDuplicated.add(new MetricName("records-duplicated-total", METRIC_GROUP_NAME, "The total number of records that are duplicated", tags), new CumulativeSum());

_recordsLost = metrics.sensor("records-lost");
_recordsLost.add(new MetricName("records-lost-rate", METRIC_GROUP_NAME, "The average number of records per second that are lost", tags), new Rate());
_recordsLost.add(new MetricName("records-lost-total", METRIC_GROUP_NAME, "The total number of records that are lost", tags), new Total());
_recordsLost.add(new MetricName("records-lost-total", METRIC_GROUP_NAME, "The total number of records that are lost", tags), new CumulativeSum());

_recordsDelayed = metrics.sensor("records-delayed");
_recordsDelayed.add(new MetricName("records-delayed-rate", METRIC_GROUP_NAME, "The average number of records per second that are either lost or arrive after maximum allowed latency under SLA", tags), new Rate());
_recordsDelayed.add(new MetricName("records-delayed-total", METRIC_GROUP_NAME, "The total number of records that are either lost or arrive after maximum allowed latency under SLA", tags), new Total());
_recordsDelayed.add(new MetricName("records-delayed-total", METRIC_GROUP_NAME, "The total number of records that are either lost or arrive after maximum allowed latency under SLA", tags), new CumulativeSum());

_recordsDelay = metrics.sensor("records-delay");
_recordsDelay.add(new MetricName("records-delay-ms-avg", METRIC_GROUP_NAME, "The average latency of records from producer to consumer", tags), new Avg());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,7 +48,7 @@ public OffsetCommitServiceMetrics(final Metrics metrics, final Map<String, Strin
"The success rate of group coordinator accepting consumer offset commit requests.", tags), new Avg());
_offsetCommittedSensor.add(new MetricName(SUCCESS_METRIC_TOTAL, METRIC_GROUP_NAME,
"The total count of group coordinator successfully accepting consumer offset commit requests.", tags),
new Total());
new CumulativeSum());

_offsetCommitFailSensor = metrics.sensor(FAILURE_SENSOR_NAME);
/* NaN will persist as long as no record is submitted to the failure sensor.
Expand All @@ -57,7 +57,7 @@ public OffsetCommitServiceMetrics(final Metrics metrics, final Map<String, Strin
"The failure rate of group coordinator accepting consumer offset commit requests.", tags), new Avg());
_offsetCommitFailSensor.add(new MetricName(FAILURE_METRIC_TOTAL, METRIC_GROUP_NAME,
"The total count of group coordinator unsuccessfully receiving consumer offset commit requests.", tags),
new Total());
new CumulativeSum());

Measurable measurable = new Measurable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;


public class ProduceMetrics {
Expand Down Expand Up @@ -52,13 +52,13 @@ public ProduceMetrics(final Metrics metrics, final Map<String, String> tags, int
"The average number of records per second that are produced", tags), new Rate());
_recordsProduced.add(
new MetricName("records-produced-total", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The total number of records that are produced", tags), new Total());
"The total number of records that are produced", tags), new CumulativeSum());

_produceError = metrics.sensor("produce-error");
_produceError.add(new MetricName("produce-error-rate", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The average number of errors per second", tags), new Rate());
_produceError.add(new MetricName("produce-error-total", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
"The total number of errors", tags), new Total());
"The total number of errors", tags), new CumulativeSum());

_produceDelay = metrics.sensor("produce-delay");
_produceDelay.add(new MetricName("produce-delay-ms-avg", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
*/
class XinfraMonitorMetrics {

Metrics _metrics;
Map<String, String> _tags;
final Metrics _metrics;
final Map<String, String> _tags;

/**
*
Expand Down

0 comments on commit 343f6fa

Please sign in to comment.