Skip to content

Commit

Permalink
Merge pull request #39 from Netflix/remove_nonessential_metrics_and_tags
Browse files Browse the repository at this point in the history
Create a single instance of MetricGroupId for SSE
  • Loading branch information
neerajrj authored Mar 26, 2020
2 parents 78e8ac4 + 934b081 commit e0fcd3a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.mantisrx.client;

import static com.mantisrx.common.utils.MantisMetricStringConstants.DROP_OPERATOR_INCOMING_METRIC_GROUP;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -44,6 +46,7 @@ public class SseSinkConnectionFunction implements SinkConnectionFunc<MantisServe
private static final String DEFAULT_BUFFER_SIZE_STR = "0";
private static final Logger logger = LoggerFactory.getLogger(SseSinkConnectionFunction.class);
private static final CopyOnWriteArraySet<MetricGroupId> metricsSet = new CopyOnWriteArraySet<>();
private static final MetricGroupId metricGroupId;
private static final Action1<Throwable> defaultConxResetHandler = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Expand All @@ -59,7 +62,8 @@ public void call(Throwable throwable) {
// Use single netty thread
NettyUtils.setNettyThreads();


metricGroupId = new MetricGroupId(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_SseSinkConnectionFunction_withBuffer");
metricsSet.add(metricGroupId);
logger.info("SETTING UP METRICS PRINTER THREAD");
new ScheduledThreadPoolExecutor(1).scheduleWithFixedDelay(new Runnable() {
@Override
Expand Down Expand Up @@ -124,7 +128,7 @@ public SinkConnection<MantisServerSentEvent> call(final String hostname, final I
private final SseWorkerConnection workerConn =
new SseWorkerConnection("Sink", hostname, port, updateConxStatus, updateDataRecvngStatus,
connectionResetHandler, dataRecvTimeoutSecs, reconnectUponConnectionRest, metricsSet,
bufferSize, sinkParameters, disablePingFiltering);
bufferSize, sinkParameters, disablePingFiltering,metricGroupId);

@Override
public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ public SseWorkerConnection(final String connectionType,
final boolean reconnectUponConnectionReset,
final CopyOnWriteArraySet<MetricGroupId> metricsSet,
final int bufferSize,
final SinkParameters sinkParameters) {
final SinkParameters sinkParameters,
final MetricGroupId metricGroupId) {
this(connectionType, hostname, port, updateConxStatus, updateDataRecvngStatus, connectionResetHandler,
dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false);
dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false,
metricGroupId);
}
public SseWorkerConnection(final String connectionType,
final String hostname,
Expand All @@ -133,12 +135,13 @@ public SseWorkerConnection(final String connectionType,
final CopyOnWriteArraySet<MetricGroupId> metricsSet,
final int bufferSize,
final SinkParameters sinkParameters,
final boolean disablePingFiltering) {
final boolean disablePingFiltering,
final MetricGroupId metricGroupId) {
this.connectionType = connectionType;
this.hostname = hostname;
this.port = port;

this.metricGroupId = new MetricGroupId(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_Sse" + connectionType + "ConnectionFunction_withBuffer");
this.metricGroupId = metricGroupId;
final MetricGroupId connHealthMetricGroup = new MetricGroupId("ConnectionHealth");
Metrics m = new Metrics.Builder()
.id(connHealthMetricGroup)
Expand Down Expand Up @@ -174,7 +177,6 @@ public String getName() {
}

public synchronized void close() throws Exception {
metricsSet.remove(metricGroupId);
if (isShutdown)
return;
logger.info("Closing sse connection to " + hostname + ":" + port);
Expand All @@ -187,7 +189,6 @@ public synchronized void close() throws Exception {
public synchronized Observable<MantisServerSentEvent> call() {
if (isShutdown)
return Observable.empty();
metricsSet.add(metricGroupId);
client =
RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder(hostname, port)
.pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.mantisrx.server.worker.client;

import static com.mantisrx.common.utils.MantisMetricStringConstants.DROP_OPERATOR_INCOMING_METRIC_GROUP;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
Expand All @@ -42,6 +44,7 @@ public class SseWorkerConnectionFunction implements WorkerConnectionFunc<MantisS
private static final String DEFAULT_BUFFER_SIZE_STR = "0";
private static final Logger logger = LoggerFactory.getLogger(SseWorkerConnectionFunction.class);
private static final CopyOnWriteArraySet<MetricGroupId> metricsSet = new CopyOnWriteArraySet<>();
private static final MetricGroupId metricGroupId;
private static final Action1<Throwable> defaultConxResetHandler = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Expand All @@ -56,7 +59,8 @@ public void call(Throwable throwable) {
// Use single netty thread
NettyUtils.setNettyThreads();


metricGroupId = new MetricGroupId(DROP_OPERATOR_INCOMING_METRIC_GROUP + "_SseWorkerMetricsConnectionFunction_withBuffer");
metricsSet.add(metricGroupId);
logger.info("SETTING UP METRICS PRINTER THREAD");
new ScheduledThreadPoolExecutor(1).scheduleWithFixedDelay(new Runnable() {
@Override
Expand Down Expand Up @@ -119,7 +123,7 @@ public WorkerConnection<MantisServerSentEvent> call(final String hostname, final
private final SseWorkerConnection workerConn =
new SseWorkerConnection("WorkerMetrics", hostname, port, updateConxStatus, updateDataRecvngStatus,
connectionResetHandler, dataRecvTimeoutSecs, reconnectUponConnectionRest, metricsSet,
bufferSize, sinkParameters);
bufferSize, sinkParameters,metricGroupId);

@Override
public String getName() {
Expand Down

0 comments on commit e0fcd3a

Please sign in to comment.