From 343f6fa38a8a519b76003fee74b33e781627eced Mon Sep 17 00:00:00 2001 From: Andrew Choi Date: Tue, 8 Dec 2020 06:57:02 -0800 Subject: [PATCH] Xinfra Monitor - Performance, Optimization, Refactoring, Upgrades, Removal of Deprecations Xinfra Monitor - Performance, Optimization, Refactoring, Upgrades, Removal of Deprecations Signed-off-by: Andrew Choi --- .../services/ClusterTopicManipulationService.java | 1 + .../xinfra/monitor/services/ConsumeService.java | 2 +- .../DefaultMetricsReporterServiceFactory.java | 2 +- .../services/MultiClusterTopicManagementService.java | 5 ++--- .../xinfra/monitor/services/OffsetCommitService.java | 1 - .../xinfra/monitor/services/ProduceService.java | 4 ++-- .../StatsdMetricsReporterServiceFactory.java | 2 +- .../services/metrics/CommitAvailabilityMetrics.java | 6 +++--- .../services/metrics/CommitLatencyMetrics.java | 2 +- .../monitor/services/metrics/ConsumeMetrics.java | 12 ++++++------ .../services/metrics/OffsetCommitServiceMetrics.java | 6 +++--- .../monitor/services/metrics/ProduceMetrics.java | 6 +++--- .../services/metrics/XinfraMonitorMetrics.java | 4 ++-- 13 files changed, 26 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java index ded736db..067cd46f 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java @@ -90,6 +90,7 @@ public ClusterTopicManipulationService(String name, AdminClient adminClient, Map tags.put("name", name); TopicManagementServiceConfig config = new TopicManagementServiceConfig(props); String topicFactoryClassName = config.getString(TopicManagementServiceConfig.TOPIC_FACTORY_CLASS_CONFIG); + @SuppressWarnings("rawtypes") Map topicFactoryConfig = props.containsKey(TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) ? (Map) props.get( TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) : new HashMap(); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java index 8906e1ed..d251256a 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java @@ -204,7 +204,7 @@ public void onComplete(Map topicPartitionOffs } else if (index < nextIndex) { _sensors._recordsDuplicated.record(); - } else if (index > nextIndex) { + } else { // this will equate to the case where index > nextIndex... nextIndexes.put(partition, index + 1); long numLostRecords = index - nextIndex; _sensors._recordsLost.record(numLostRecords); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterServiceFactory.java b/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterServiceFactory.java index e2919a7a..9d0acec0 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterServiceFactory.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterServiceFactory.java @@ -29,7 +29,7 @@ public DefaultMetricsReporterServiceFactory(Map properties, String serviceName) @SuppressWarnings("unchecked") @Override - public Service createService() throws Exception { + public Service createService() { return new DefaultMetricsReporterService(_properties, _serviceName); } } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java b/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java index c7f6eb94..6b91474c 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java @@ -383,10 +383,9 @@ private static BrokerMetadata randomBroker(Set brokers) { // Using Set enforces the usage of loop which is O(n). // As the list of brokers does not change in newPartitionAssignments, // the acceptance of a List argument instead of a Set will be faster which is (O(1)) - List brokerMetadataList = new ArrayList<>(); - + List brokerMetadataList = new ArrayList<>(brokers); // convert to a list so there's no need to create a index and iterate through this set - brokerMetadataList.addAll(brokers); + //addAll() is replaced with parameterized constructor call for better performance.. int brokerSetSize = brokers.size(); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java index a3a445dd..18c31803 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java @@ -58,7 +58,6 @@ /** * Service that monitors the commit offset availability of a particular Consumer Group. */ -@SuppressWarnings("NullableProblems") public class OffsetCommitService implements Service { public static final String METRIC_GRP_PREFIX = "xm-offset-commit-service"; diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java index 46d1045c..c16da3d0 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java @@ -308,7 +308,7 @@ public void run() { } } - @SuppressWarnings("NullableProblems") + private class ProduceServiceThreadFactory implements ThreadFactory { private final AtomicInteger _threadId = new AtomicInteger(); @@ -318,7 +318,7 @@ public Thread newThread(Runnable r) { } private class HandleNewPartitionsThreadFactory implements ThreadFactory { - public Thread newThread(@SuppressWarnings("NullableProblems") Runnable r) { + public Thread newThread(Runnable r) { return new Thread(r, _name + "-produce-service-new-partition-handler"); } } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterServiceFactory.java b/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterServiceFactory.java index 75103153..046d0a2f 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterServiceFactory.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterServiceFactory.java @@ -28,7 +28,7 @@ public StatsdMetricsReporterServiceFactory(Map properties, String name) { } @Override - public Service createService() throws Exception { + public Service createService() { //noinspection unchecked return new StatsdMetricsReporterService(_properties, _name); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/CommitAvailabilityMetrics.java b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/CommitAvailabilityMetrics.java index eeeb1819..9643a8d0 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/CommitAvailabilityMetrics.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/CommitAvailabilityMetrics.java @@ -15,8 +15,8 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.metrics.stats.Total; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,13 +37,13 @@ public CommitAvailabilityMetrics(final Metrics metrics, final Map { diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/CommitLatencyMetrics.java b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/CommitLatencyMetrics.java index 607b52f5..1970c18c 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/CommitLatencyMetrics.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/CommitLatencyMetrics.java @@ -63,7 +63,7 @@ public CommitLatencyMetrics(Metrics metrics, Map tags, int laten * start the recording of consumer offset commit * @throws Exception if the offset commit is already in progress. */ - public void recordCommitStart() throws Exception { + public void recordCommitStart() { if (!_inProgressCommit) { this.setCommitStartTimeMs(System.currentTimeMillis()); _inProgressCommit = true; diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/ConsumeMetrics.java b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/ConsumeMetrics.java index f6818102..82d902e9 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/ConsumeMetrics.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/ConsumeMetrics.java @@ -15,11 +15,11 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Percentile; import org.apache.kafka.common.metrics.stats.Percentiles; import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.metrics.stats.Total; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,23 +43,23 @@ public ConsumeMetrics(final Metrics metrics, Map tags, int laten _consumeError = metrics.sensor("consume-error"); _consumeError.add(new MetricName("consume-error-rate", METRIC_GROUP_NAME, "The average number of errors per second", tags), new Rate()); - _consumeError.add(new MetricName("consume-error-total", METRIC_GROUP_NAME, "The total number of errors", tags), new Total()); + _consumeError.add(new MetricName("consume-error-total", METRIC_GROUP_NAME, "The total number of errors", tags), new CumulativeSum()); _recordsConsumed = metrics.sensor("records-consumed"); _recordsConsumed.add(new MetricName("records-consumed-rate", METRIC_GROUP_NAME, "The average number of records per second that are consumed", tags), new Rate()); - _recordsConsumed.add(new MetricName("records-consumed-total", METRIC_GROUP_NAME, "The total number of records that are consumed", tags), new Total()); + _recordsConsumed.add(new MetricName("records-consumed-total", METRIC_GROUP_NAME, "The total number of records that are consumed", tags), new CumulativeSum()); _recordsDuplicated = metrics.sensor("records-duplicated"); _recordsDuplicated.add(new MetricName("records-duplicated-rate", METRIC_GROUP_NAME, "The average number of records per second that are duplicated", tags), new Rate()); - _recordsDuplicated.add(new MetricName("records-duplicated-total", METRIC_GROUP_NAME, "The total number of records that are duplicated", tags), new Total()); + _recordsDuplicated.add(new MetricName("records-duplicated-total", METRIC_GROUP_NAME, "The total number of records that are duplicated", tags), new CumulativeSum()); _recordsLost = metrics.sensor("records-lost"); _recordsLost.add(new MetricName("records-lost-rate", METRIC_GROUP_NAME, "The average number of records per second that are lost", tags), new Rate()); - _recordsLost.add(new MetricName("records-lost-total", METRIC_GROUP_NAME, "The total number of records that are lost", tags), new Total()); + _recordsLost.add(new MetricName("records-lost-total", METRIC_GROUP_NAME, "The total number of records that are lost", tags), new CumulativeSum()); _recordsDelayed = metrics.sensor("records-delayed"); _recordsDelayed.add(new MetricName("records-delayed-rate", METRIC_GROUP_NAME, "The average number of records per second that are either lost or arrive after maximum allowed latency under SLA", tags), new Rate()); - _recordsDelayed.add(new MetricName("records-delayed-total", METRIC_GROUP_NAME, "The total number of records that are either lost or arrive after maximum allowed latency under SLA", tags), new Total()); + _recordsDelayed.add(new MetricName("records-delayed-total", METRIC_GROUP_NAME, "The total number of records that are either lost or arrive after maximum allowed latency under SLA", tags), new CumulativeSum()); _recordsDelay = metrics.sensor("records-delay"); _recordsDelay.add(new MetricName("records-delay-ms-avg", METRIC_GROUP_NAME, "The average latency of records from producer to consumer", tags), new Avg()); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java index 1fad2177..b6a6e753 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java @@ -17,7 +17,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Total; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +48,7 @@ public OffsetCommitServiceMetrics(final Metrics metrics, final Map tags, int "The average number of records per second that are produced", tags), new Rate()); _recordsProduced.add( new MetricName("records-produced-total", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE, - "The total number of records that are produced", tags), new Total()); + "The total number of records that are produced", tags), new CumulativeSum()); _produceError = metrics.sensor("produce-error"); _produceError.add(new MetricName("produce-error-rate", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE, "The average number of errors per second", tags), new Rate()); _produceError.add(new MetricName("produce-error-total", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE, - "The total number of errors", tags), new Total()); + "The total number of errors", tags), new CumulativeSum()); _produceDelay = metrics.sensor("produce-delay"); _produceDelay.add(new MetricName("produce-delay-ms-avg", XinfraMonitorConstants.METRIC_GROUP_NAME_PRODUCE_SERVICE, diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/XinfraMonitorMetrics.java b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/XinfraMonitorMetrics.java index ee433fc7..a6cc8cee 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/XinfraMonitorMetrics.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/XinfraMonitorMetrics.java @@ -19,8 +19,8 @@ */ class XinfraMonitorMetrics { - Metrics _metrics; - Map _tags; + final Metrics _metrics; + final Map _tags; /** *