Skip to content

Commit

Permalink
Configure server metrics level(#1050)
Browse files Browse the repository at this point in the history
Adds the ability to configure the level of granularity for the metrics that are being exported by Prometheus
  • Loading branch information
eduwercamacaro authored Nov 25, 2024
1 parent f4abe43 commit 72c9e57
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 33 deletions.
10 changes: 10 additions & 0 deletions docs/docs/06-operations/01-server-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -698,3 +698,13 @@ The path upon which application liveness (the ability to ).
- **Type:** string
- **Default:** /metrics
- **Importance:** low

---

### `LHS_METRICS_LEVEL`

The level of granularity to collect application instance metrics.

- **Type:** string
- **Default:** INFO
- **Importance:** low
3 changes: 2 additions & 1 deletion local-dev/configs/server-1.config
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ LHS_STREAMS_NUM_STANDBY_REPLICAS=0
LHS_STREAMS_SESSION_TIMEOUT=10000
LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
LHS_STREAMS_METRICS_LEVEL=info
LHS_STREAMS_METRICS_LEVEL=debug
LHS_METRICS_LEVEL=info

LHS_STREAMS_TRANSACTION_TIMEOUT_MS=10000

Expand Down
69 changes: 44 additions & 25 deletions server/src/main/java/io/littlehorse/common/LHServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@
@Slf4j
public class LHServerConfig extends ConfigBase {

// Singletons for RocksConfigSetter
@Getter
private Cache globalRocksdbBlockCache;

@Getter
private WriteBufferManager globalRocksdbWriteBufferManager;

private String instanceName;

// Kafka Global Configs
Expand Down Expand Up @@ -91,6 +84,7 @@ public class LHServerConfig extends ConfigBase {
public static final String NUM_STANDBY_REPLICAS_KEY = "LHS_STREAMS_NUM_STANDBY_REPLICAS";
public static final String ROCKSDB_COMPACTION_THREADS_KEY = "LHS_ROCKSDB_COMPACTION_THREADS";
public static final String STREAMS_METRICS_LEVEL_KEY = "LHS_STREAMS_METRICS_LEVEL";
public static final String LHS_METRICS_LEVEL_KEY = "LHS_METRICS_LEVEL";
public static final String LINGER_MS_KEY = "LHS_KAFKA_LINGER_MS";
public static final String TRANSACTION_TIMEOUT_MS_KEY = "LHS_STREAMS_TRANSACTION_TIMEOUT_MS";
public static final String CORE_KAFKA_STREAMS_OVERRIDE_PREFIX = "LHS_CORE_KS_CONFIG_";
Expand Down Expand Up @@ -151,6 +145,37 @@ public class LHServerConfig extends ConfigBase {
public static final String X_LEAVE_GROUP_ON_SHUTDOWN_KEY = "LHS_X_LEAVE_GROUP_ON_SHUTDOWN";
public static final String X_USE_STATIC_MEMBERSHIP_KEY = "LHS_X_USE_STATIC_MEMBERSHIP";

// Instance configs
private final String lhsMetricsLevel;

// Singletons for RocksConfigSetter
@Getter
private Cache globalRocksdbBlockCache;

@Getter
private WriteBufferManager globalRocksdbWriteBufferManager;

public LHServerConfig() {
super();
initKafkaAdmin();
initRocksdbSingletons();
lhsMetricsLevel = getServerMetricLevel();
}

public LHServerConfig(String propertiesPath) {
super(propertiesPath);
initKafkaAdmin();
initRocksdbSingletons();
lhsMetricsLevel = getServerMetricLevel();
}

public LHServerConfig(Properties props) {
super(props);
initKafkaAdmin();
initRocksdbSingletons();
lhsMetricsLevel = getServerMetricLevel();
}

protected String[] getEnvKeyPrefixes() {
return new String[] {"LHS_"};
}
Expand Down Expand Up @@ -364,6 +389,18 @@ public String getInternalAdvertisedHost() {
return getOrSetDefault(LHServerConfig.INTERNAL_ADVERTISED_HOST_KEY, "localhost");
}

public String getServerMetricLevel() {
if (lhsMetricsLevel != null) {
return lhsMetricsLevel;
}
String metricLevel = getOrSetDefault(LHS_METRICS_LEVEL_KEY, "INFO").toUpperCase();
List<String> allowedValues = List.of("INFO", "DEBUG", "TRACE");
if (!allowedValues.contains(metricLevel)) {
throw new LHMisconfigurationException("Unrecognized metric level: " + metricLevel);
}
return metricLevel.toUpperCase();
}

// If INTERNAL_ADVERTISED_PORT isn't set, we return INTERNAL_BIND_PORT.
public int getInternalAdvertisedPort() {
return Integer.valueOf(
Expand Down Expand Up @@ -1017,24 +1054,6 @@ public boolean createKafkaTopic(NewTopic topic) throws InterruptedException, Exe
}
}

public LHServerConfig() {
super();
initKafkaAdmin();
initRocksdbSingletons();
}

public LHServerConfig(String propertiesPath) {
super(propertiesPath);
initKafkaAdmin();
initRocksdbSingletons();
}

public LHServerConfig(Properties props) {
super(props);
initKafkaAdmin();
initRocksdbSingletons();
}

private void initRocksdbSingletons() {
RocksDB.loadLibrary();
long cacheSize = Long.valueOf(getOrSetDefault(ROCKSDB_TOTAL_BLOCK_CACHE_BYTES_KEY, "-1"));
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/io/littlehorse/server/LHServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private RequestExecutionContext requestContext() {
return contextKey.get();
}

public LHServer(LHServerConfig config) {
public LHServer(LHServerConfig config) throws LHMisconfigurationException {
this.metadataCache = new MetadataCache();
this.config = config;
this.taskQueueManager = new TaskQueueManager(this, LHConstants.MAX_TASKRUNS_IN_ONE_TASKQUEUE);
Expand All @@ -87,6 +87,7 @@ public LHServer(LHServerConfig config) {
ServerTopology.initCoreTopology(config, this, metadataCache, taskQueueManager),
config.getCoreStreamsConfig());
this.timerStreams = new KafkaStreams(ServerTopology.initTimerTopology(config), config.getTimerStreamsConfig());

coreStreams.setUncaughtExceptionHandler(throwable -> {
log.error("Uncaught exception for " + throwable.getMessage());
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public PrometheusMetricExporter(LHServerConfig config) {
this.config = config;
this.prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
this.prometheusRegistry.config().commonTags("application_id", config.getLHClusterId());

new ServerMetricFilter(prometheusRegistry, ServerFilterRules.RULES).initialize();
new ServerMetricFilter(prometheusRegistry, ServerFilterRules.fromLevel(config.getServerMetricLevel()))
.initialize();
}

public MeterRegistry getMeterRegistry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public final class ServerFilterRules {
private ServerFilterRules() {}

// relevance ordered
public static final List<ServerFilterRule> RULES = List.of(
public static final List<ServerFilterRule> INFO_RULES = List.of(
// TODO: Wait for KIP-869 and gather state restoration metrics.
accept("lh_in_memory_task_queue_size"),
accept("lh_cache_size"),
Expand Down Expand Up @@ -39,18 +39,71 @@ private ServerFilterRules() {}
accept("kafka_producer_record_error"),

// Filter metrics to reduce
deny("kafka_stream_state"),
deny("kafka_stream_state_"),
accept("kafka_stream_state"),
deny("kafka_stream_task"),
deny("kafka_stream_processor"),
deny("kafka_producer"),
deny("kafka_consumer"),
deny("kafka_admin"));

// relevance ordered
public static final List<ServerFilterRule> DEBUG_RULES = List.of(
// TODO: Wait for KIP-869 and gather state restoration metrics.
accept("lh_in_memory_task_queue_size"),
accept("lh_cache_size"),

// Kafka Streams State Stuff
accept("kafka_stream_state_compaction_pending"),
accept("kafka_stream_state_write_stall"),
accept("kafka_stream_state_bytes"),
accept("kafka_stream_state_total_sst"),
accept("kafka_stream_state_live_sst"),
accept("kafka_stream_state_put_latency"),
accept("kafka_stream_state_get_latency"),

// Kafka Consumer Metrics
accept("kafka_consumer_coordinator_rebalance"),
accept("kafka_consumer_coordinator_last_rebalance_seconds_ago"),
accept("kafka_consumer_incoming_byte_rate"),
accept("kafka_consumer_fetch_manager_records_lag"),
accept("kafka_consumer_fetch_manager_fetch_throttle_time"),
accept("kafka_consumer_fetch_manager_fetch_latency_avg"),
accept("kafka_consumer_request"),
accept("kafka_consumer_time_between_poll"),

// Producer Metrics
accept("kafka_producer_batch_size"),
accept("kafka_producer_request"),
accept("kafka_producer_outgoing_byte"),
accept("kafka_producer_record_error"),

// Filter metrics to reduce
deny("kafka_stream_state"),
accept("kafka_stream_state_"),
accept("kafka_stream_task"),
deny("kafka_stream_processor"),
deny("kafka_producer"),
deny("kafka_consumer"),
deny("kafka_admin"));

public static final List<ServerFilterRule> TRACE_RULES = List.of();

public static ServerFilterRule accept(String prefix) {
return new ServerFilterRule(prefix, MeterFilterReply.ACCEPT);
}

public static ServerFilterRule deny(String prefix) {
return new ServerFilterRule(prefix, MeterFilterReply.DENY);
}

public static List<ServerFilterRule> fromLevel(String level) {
if (level.equals("TRACE")) {
return TRACE_RULES;
} else if (level.equals("DEBUG")) {
return DEBUG_RULES;
} else {
return INFO_RULES;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

public class ServerMetricFilter {

private MeterRegistry meterRegistry;
private List<ServerFilterRule> rules;
private final MeterRegistry meterRegistry;
private final List<ServerFilterRule> rules;

public ServerMetricFilter(MeterRegistry meterRegistry, List<ServerFilterRule> rules) {
this.meterRegistry = meterRegistry;
Expand Down

0 comments on commit 72c9e57

Please sign in to comment.