Skip to content

Commit

Permalink
validates server metrics level
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Nov 25, 2024
1 parent 5a18ea6 commit e941497
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 30 deletions.
3 changes: 2 additions & 1 deletion local-dev/configs/server-1.config
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ LHS_STREAMS_NUM_STANDBY_REPLICAS=0
LHS_STREAMS_SESSION_TIMEOUT=10000
LHS_SHOULD_CREATE_TOPICS=true
LHS_CORE_STREAM_THREADS=2
LHS_STREAMS_METRICS_LEVEL=info
LHS_STREAMS_METRICS_LEVEL=debug
LHS_METRICS_LEVEL=info

LHS_STREAMS_TRANSACTION_TIMEOUT_MS=10000

Expand Down
66 changes: 40 additions & 26 deletions server/src/main/java/io/littlehorse/common/LHServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@
@Slf4j
public class LHServerConfig extends ConfigBase {

// Singletons for RocksConfigSetter
@Getter
private Cache globalRocksdbBlockCache;

@Getter
private WriteBufferManager globalRocksdbWriteBufferManager;

private String instanceName;

// Kafka Global Configs
Expand Down Expand Up @@ -152,6 +145,37 @@ public class LHServerConfig extends ConfigBase {
public static final String X_LEAVE_GROUP_ON_SHUTDOWN_KEY = "LHS_X_LEAVE_GROUP_ON_SHUTDOWN";
public static final String X_USE_STATIC_MEMBERSHIP_KEY = "LHS_X_USE_STATIC_MEMBERSHIP";

// Instance configs
private final String lhsMetricsLevel;

// Singletons for RocksConfigSetter
@Getter
private Cache globalRocksdbBlockCache;

@Getter
private WriteBufferManager globalRocksdbWriteBufferManager;

public LHServerConfig() {
super();
initKafkaAdmin();
initRocksdbSingletons();
lhsMetricsLevel = getServerMetricLevel();
}

public LHServerConfig(String propertiesPath) {
super(propertiesPath);
initKafkaAdmin();
initRocksdbSingletons();
lhsMetricsLevel = getServerMetricLevel();
}

public LHServerConfig(Properties props) {
super(props);
initKafkaAdmin();
initRocksdbSingletons();
lhsMetricsLevel = getServerMetricLevel();
}

protected String[] getEnvKeyPrefixes() {
return new String[] {"LHS_"};
}
Expand Down Expand Up @@ -366,7 +390,15 @@ public String getInternalAdvertisedHost() {
}

public String getServerMetricLevel() {
return getOrSetDefault(LHS_METRICS_LEVEL_KEY, "INFO").toUpperCase();
if (lhsMetricsLevel != null) {
return lhsMetricsLevel;
}
String metricLevel = getOrSetDefault(LHS_METRICS_LEVEL_KEY, "INFO").toUpperCase();
List<String> allowedValues = List.of("INFO", "DEBUG", "TRACE");
if (!allowedValues.contains(metricLevel)) {
throw new LHMisconfigurationException("Unrecognized metric level: " + metricLevel);
}
return metricLevel.toUpperCase();
}

// If INTERNAL_ADVERTISED_PORT isn't set, we return INTERNAL_BIND_PORT.
Expand Down Expand Up @@ -1022,24 +1054,6 @@ public boolean createKafkaTopic(NewTopic topic) throws InterruptedException, Exe
}
}

public LHServerConfig() {
super();
initKafkaAdmin();
initRocksdbSingletons();
}

public LHServerConfig(String propertiesPath) {
super(propertiesPath);
initKafkaAdmin();
initRocksdbSingletons();
}

public LHServerConfig(Properties props) {
super(props);
initKafkaAdmin();
initRocksdbSingletons();
}

private void initRocksdbSingletons() {
RocksDB.loadLibrary();
long cacheSize = Long.valueOf(getOrSetDefault(ROCKSDB_TOTAL_BLOCK_CACHE_BYTES_KEY, "-1"));
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/io/littlehorse/server/LHServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private RequestExecutionContext requestContext() {
return contextKey.get();
}

public LHServer(LHServerConfig config) {
public LHServer(LHServerConfig config) throws LHMisconfigurationException {
this.metadataCache = new MetadataCache();
this.config = config;
this.taskQueueManager = new TaskQueueManager(this, LHConstants.MAX_TASKRUNS_IN_ONE_TASKQUEUE);
Expand All @@ -88,6 +88,7 @@ public LHServer(LHServerConfig config) {
ServerTopology.initCoreTopology(config, this, metadataCache, taskQueueManager),
config.getCoreStreamsConfig());
this.timerStreams = new KafkaStreams(ServerTopology.initTimerTopology(config), config.getTimerStreamsConfig());

coreStreams.setUncaughtExceptionHandler(throwable -> {
log.error("Uncaught exception for " + throwable.getMessage());
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ private ServerFilterRules() {}
accept("kafka_producer_record_error"),

// Filter metrics to reduce
deny("kafka_stream_state"),
deny("kafka_stream_state_"),
accept("kafka_stream_state"),
deny("kafka_stream_task"),
deny("kafka_stream_processor"),
deny("kafka_producer"),
Expand Down Expand Up @@ -78,7 +79,8 @@ private ServerFilterRules() {}
accept("kafka_producer_record_error"),

// Filter metrics to reduce
accept("kafka_stream_state"),
deny("kafka_stream_state"),
accept("kafka_stream_state_"),
accept("kafka_stream_task"),
deny("kafka_stream_processor"),
deny("kafka_producer"),
Expand Down

0 comments on commit e941497

Please sign in to comment.