diff --git a/docs/docs/06-operations/01-server-configuration.md b/docs/docs/06-operations/01-server-configuration.md index 3538b403a..16dae6bb8 100644 --- a/docs/docs/06-operations/01-server-configuration.md +++ b/docs/docs/06-operations/01-server-configuration.md @@ -698,3 +698,13 @@ The path upon which application liveness (the ability to ). - **Type:** string - **Default:** /metrics - **Importance:** low + +--- + +### `LHS_METRICS_LEVEL` + +The level of granularity to collect application instance metrics. + +- **Type:** string +- **Default:** INFO +- **Importance:** low \ No newline at end of file diff --git a/local-dev/configs/server-1.config b/local-dev/configs/server-1.config index f561a5877..773f7591d 100644 --- a/local-dev/configs/server-1.config +++ b/local-dev/configs/server-1.config @@ -9,7 +9,8 @@ LHS_STREAMS_NUM_STANDBY_REPLICAS=0 LHS_STREAMS_SESSION_TIMEOUT=10000 LHS_SHOULD_CREATE_TOPICS=true LHS_CORE_STREAM_THREADS=2 -LHS_STREAMS_METRICS_LEVEL=info +LHS_STREAMS_METRICS_LEVEL=debug +LHS_METRICS_LEVEL=info LHS_STREAMS_TRANSACTION_TIMEOUT_MS=10000 diff --git a/server/src/main/java/io/littlehorse/common/LHServerConfig.java b/server/src/main/java/io/littlehorse/common/LHServerConfig.java index cece0e29b..ff8835f02 100644 --- a/server/src/main/java/io/littlehorse/common/LHServerConfig.java +++ b/server/src/main/java/io/littlehorse/common/LHServerConfig.java @@ -56,13 +56,6 @@ @Slf4j public class LHServerConfig extends ConfigBase { - // Singletons for RocksConfigSetter - @Getter - private Cache globalRocksdbBlockCache; - - @Getter - private WriteBufferManager globalRocksdbWriteBufferManager; - private String instanceName; // Kafka Global Configs @@ -91,6 +84,7 @@ public class LHServerConfig extends ConfigBase { public static final String NUM_STANDBY_REPLICAS_KEY = "LHS_STREAMS_NUM_STANDBY_REPLICAS"; public static final String ROCKSDB_COMPACTION_THREADS_KEY = "LHS_ROCKSDB_COMPACTION_THREADS"; public static final String STREAMS_METRICS_LEVEL_KEY = "LHS_STREAMS_METRICS_LEVEL"; + public static final String LHS_METRICS_LEVEL_KEY = "LHS_METRICS_LEVEL"; public static final String LINGER_MS_KEY = "LHS_KAFKA_LINGER_MS"; public static final String TRANSACTION_TIMEOUT_MS_KEY = "LHS_STREAMS_TRANSACTION_TIMEOUT_MS"; public static final String CORE_KAFKA_STREAMS_OVERRIDE_PREFIX = "LHS_CORE_KS_CONFIG_"; @@ -151,6 +145,37 @@ public class LHServerConfig extends ConfigBase { public static final String X_LEAVE_GROUP_ON_SHUTDOWN_KEY = "LHS_X_LEAVE_GROUP_ON_SHUTDOWN"; public static final String X_USE_STATIC_MEMBERSHIP_KEY = "LHS_X_USE_STATIC_MEMBERSHIP"; + // Instance configs + private final String lhsMetricsLevel; + + // Singletons for RocksConfigSetter + @Getter + private Cache globalRocksdbBlockCache; + + @Getter + private WriteBufferManager globalRocksdbWriteBufferManager; + + public LHServerConfig() { + super(); + initKafkaAdmin(); + initRocksdbSingletons(); + lhsMetricsLevel = getServerMetricLevel(); + } + + public LHServerConfig(String propertiesPath) { + super(propertiesPath); + initKafkaAdmin(); + initRocksdbSingletons(); + lhsMetricsLevel = getServerMetricLevel(); + } + + public LHServerConfig(Properties props) { + super(props); + initKafkaAdmin(); + initRocksdbSingletons(); + lhsMetricsLevel = getServerMetricLevel(); + } + protected String[] getEnvKeyPrefixes() { return new String[] {"LHS_"}; } @@ -364,6 +389,18 @@ public String getInternalAdvertisedHost() { return getOrSetDefault(LHServerConfig.INTERNAL_ADVERTISED_HOST_KEY, "localhost"); } + public String getServerMetricLevel() { + if (lhsMetricsLevel != null) { + return lhsMetricsLevel; + } + String metricLevel = getOrSetDefault(LHS_METRICS_LEVEL_KEY, "INFO").toUpperCase(); + List allowedValues = List.of("INFO", "DEBUG", "TRACE"); + if (!allowedValues.contains(metricLevel)) { + throw new LHMisconfigurationException("Unrecognized metric level: " + metricLevel); + } + return metricLevel.toUpperCase(); + } + // If INTERNAL_ADVERTISED_PORT isn't set, we return INTERNAL_BIND_PORT. public int getInternalAdvertisedPort() { return Integer.valueOf( @@ -1017,24 +1054,6 @@ public boolean createKafkaTopic(NewTopic topic) throws InterruptedException, Exe } } - public LHServerConfig() { - super(); - initKafkaAdmin(); - initRocksdbSingletons(); - } - - public LHServerConfig(String propertiesPath) { - super(propertiesPath); - initKafkaAdmin(); - initRocksdbSingletons(); - } - - public LHServerConfig(Properties props) { - super(props); - initKafkaAdmin(); - initRocksdbSingletons(); - } - private void initRocksdbSingletons() { RocksDB.loadLibrary(); long cacheSize = Long.valueOf(getOrSetDefault(ROCKSDB_TOTAL_BLOCK_CACHE_BYTES_KEY, "-1")); diff --git a/server/src/main/java/io/littlehorse/server/LHServer.java b/server/src/main/java/io/littlehorse/server/LHServer.java index 9d450ddb4..a72af4adf 100644 --- a/server/src/main/java/io/littlehorse/server/LHServer.java +++ b/server/src/main/java/io/littlehorse/server/LHServer.java @@ -73,7 +73,7 @@ private RequestExecutionContext requestContext() { return contextKey.get(); } - public LHServer(LHServerConfig config) { + public LHServer(LHServerConfig config) throws LHMisconfigurationException { this.metadataCache = new MetadataCache(); this.config = config; this.taskQueueManager = new TaskQueueManager(this, LHConstants.MAX_TASKRUNS_IN_ONE_TASKQUEUE); @@ -87,6 +87,7 @@ public LHServer(LHServerConfig config) { ServerTopology.initCoreTopology(config, this, metadataCache, taskQueueManager), config.getCoreStreamsConfig()); this.timerStreams = new KafkaStreams(ServerTopology.initTimerTopology(config), config.getTimerStreamsConfig()); + coreStreams.setUncaughtExceptionHandler(throwable -> { log.error("Uncaught exception for " + throwable.getMessage()); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; diff --git a/server/src/main/java/io/littlehorse/server/monitoring/metrics/PrometheusMetricExporter.java b/server/src/main/java/io/littlehorse/server/monitoring/metrics/PrometheusMetricExporter.java index ecfddfcd8..bf4c866fb 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/metrics/PrometheusMetricExporter.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/metrics/PrometheusMetricExporter.java @@ -32,8 +32,8 @@ public PrometheusMetricExporter(LHServerConfig config) { this.config = config; this.prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); this.prometheusRegistry.config().commonTags("application_id", config.getLHClusterId()); - - new ServerMetricFilter(prometheusRegistry, ServerFilterRules.RULES).initialize(); + new ServerMetricFilter(prometheusRegistry, ServerFilterRules.fromLevel(config.getServerMetricLevel())) + .initialize(); } public MeterRegistry getMeterRegistry() { diff --git a/server/src/main/java/io/littlehorse/server/monitoring/metrics/ServerFilterRules.java b/server/src/main/java/io/littlehorse/server/monitoring/metrics/ServerFilterRules.java index f170bf538..68da1dc25 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/metrics/ServerFilterRules.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/metrics/ServerFilterRules.java @@ -8,7 +8,7 @@ public final class ServerFilterRules { private ServerFilterRules() {} // relevance ordered - public static final List RULES = List.of( + public static final List INFO_RULES = List.of( // TODO: Wait for KIP-869 and gather state restoration metrics. accept("lh_in_memory_task_queue_size"), accept("lh_cache_size"), @@ -39,13 +39,56 @@ private ServerFilterRules() {} accept("kafka_producer_record_error"), // Filter metrics to reduce - deny("kafka_stream_state"), + deny("kafka_stream_state_"), + accept("kafka_stream_state"), deny("kafka_stream_task"), deny("kafka_stream_processor"), deny("kafka_producer"), deny("kafka_consumer"), deny("kafka_admin")); + // relevance ordered + public static final List DEBUG_RULES = List.of( + // TODO: Wait for KIP-869 and gather state restoration metrics. + accept("lh_in_memory_task_queue_size"), + accept("lh_cache_size"), + + // Kafka Streams State Stuff + accept("kafka_stream_state_compaction_pending"), + accept("kafka_stream_state_write_stall"), + accept("kafka_stream_state_bytes"), + accept("kafka_stream_state_total_sst"), + accept("kafka_stream_state_live_sst"), + accept("kafka_stream_state_put_latency"), + accept("kafka_stream_state_get_latency"), + + // Kafka Consumer Metrics + accept("kafka_consumer_coordinator_rebalance"), + accept("kafka_consumer_coordinator_last_rebalance_seconds_ago"), + accept("kafka_consumer_incoming_byte_rate"), + accept("kafka_consumer_fetch_manager_records_lag"), + accept("kafka_consumer_fetch_manager_fetch_throttle_time"), + accept("kafka_consumer_fetch_manager_fetch_latency_avg"), + accept("kafka_consumer_request"), + accept("kafka_consumer_time_between_poll"), + + // Producer Metrics + accept("kafka_producer_batch_size"), + accept("kafka_producer_request"), + accept("kafka_producer_outgoing_byte"), + accept("kafka_producer_record_error"), + + // Filter metrics to reduce + deny("kafka_stream_state"), + accept("kafka_stream_state_"), + accept("kafka_stream_task"), + deny("kafka_stream_processor"), + deny("kafka_producer"), + deny("kafka_consumer"), + deny("kafka_admin")); + + public static final List TRACE_RULES = List.of(); + public static ServerFilterRule accept(String prefix) { return new ServerFilterRule(prefix, MeterFilterReply.ACCEPT); } @@ -53,4 +96,14 @@ public static ServerFilterRule accept(String prefix) { public static ServerFilterRule deny(String prefix) { return new ServerFilterRule(prefix, MeterFilterReply.DENY); } + + public static List fromLevel(String level) { + if (level.equals("TRACE")) { + return TRACE_RULES; + } else if (level.equals("DEBUG")) { + return DEBUG_RULES; + } else { + return INFO_RULES; + } + } } diff --git a/server/src/main/java/io/littlehorse/server/monitoring/metrics/ServerMetricFilter.java b/server/src/main/java/io/littlehorse/server/monitoring/metrics/ServerMetricFilter.java index 8f093bda0..945364275 100644 --- a/server/src/main/java/io/littlehorse/server/monitoring/metrics/ServerMetricFilter.java +++ b/server/src/main/java/io/littlehorse/server/monitoring/metrics/ServerMetricFilter.java @@ -5,8 +5,8 @@ public class ServerMetricFilter { - private MeterRegistry meterRegistry; - private List rules; + private final MeterRegistry meterRegistry; + private final List rules; public ServerMetricFilter(MeterRegistry meterRegistry, List rules) { this.meterRegistry = meterRegistry;