Skip to content

Commit

Permalink
Update ZipkinGraphiteSink to use Metrics with Graphite reporter and h…
Browse files Browse the repository at this point in the history
…istogram generation.
  • Loading branch information
kristofa committed Oct 10, 2013
1 parent 4880c7c commit c67f70a
Showing 1 changed file with 24 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import org.apache.commons.codec.binary.Base64;
import org.apache.flume.Channel;
Expand All @@ -22,7 +23,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
import com.twitter.zipkin.gen.Annotation;
import com.twitter.zipkin.gen.LogEntry;
import com.twitter.zipkin.gen.Span;
Expand All @@ -40,26 +45,27 @@ public class ZipkinGraphiteSink extends AbstractSink implements Configurable {
private int port;
private SinkCounter sinkCounter;
private int batchSize = DEFAULT_BATCH_SIZE;
private Graphite graphite;
private LifecycleState lifeCycleState;
private MetricRegistry metricRegistry;
private GraphiteReporter reporter;

@Override
public synchronized void start() {
super.start();
lifeCycleState = LifecycleState.START;
sinkCounter.start();
try {
connect();
} catch (final Exception e) {
lifeCycleState = LifecycleState.ERROR;
throw new IllegalStateException(e);
}
metricRegistry = new MetricRegistry();
final Graphite graphite = new Graphite(new InetSocketAddress(hostName, port));
reporter =
GraphiteReporter.forRegistry(metricRegistry).convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS).filter(MetricFilter.ALL).build(graphite);
reporter.start(1, TimeUnit.MINUTES);
}

@Override
public synchronized void stop() {
LOGGER.info("Stopping ZipkinGraphiteSink.");
close();
reporter.close();
lifeCycleState = LifecycleState.STOP;
sinkCounter.stop();
super.stop();
Expand Down Expand Up @@ -102,13 +108,7 @@ public Status process() throws EventDeliveryException {
txn.commit();
} catch (final IOException e) {
txn.rollback();
LOGGER.error("Got a IOException. Will close connection with Graphite and create new connection/client.");
try {
connect();
LOGGER.info("Reconnect succeeded.");
} catch (final IOException e1) {
LOGGER.error("Trying to reconnect failed.", e1);
}
LOGGER.error("IOException", e);
} catch (final Throwable e) {
txn.rollback();
throw new EventDeliveryException(e);
Expand All @@ -134,23 +134,6 @@ public void configure(final Context context) {

}

private void connect() throws IllegalStateException, IOException {
close();
graphite = new Graphite(new InetSocketAddress(hostName, port));
graphite.connect();
}

private void close() {
if (graphite != null) {
sinkCounter.incrementConnectionClosedCount();
try {
graphite.close();
} catch (final IOException e) {
LOGGER.error("Closing graphite connection failed.", e);
}
}
}

private LogEntry create(final Event event) {
final byte[] body = event.getBody();

Expand All @@ -174,14 +157,20 @@ private void process(final LogEntry logEntry) throws TException, IOException {
int duration = annotation.getDuration();
if (duration > 0) {
duration = duration / 1000; // Convert from micro- to milliseconds
final long timestamp = annotation.getTimestamp() / 1000000; // Convert from micro- to seoncds
final String value = annotation.getValue();
final int equalSignIndex = value.indexOf("=");
String metricName = null;
if (equalSignIndex > -1) {
graphite.send(annotation.getValue().substring(0, equalSignIndex), String.valueOf(duration), timestamp);
metricName = annotation.getValue().substring(0, equalSignIndex);
} else {
graphite.send(annotation.getValue(), String.valueOf(duration), timestamp);
metricName = annotation.getValue();
}
Histogram histogram = metricRegistry.getHistograms().get(metricName);
if (histogram == null) {
histogram = metricRegistry.histogram(metricName);
}
histogram.update(duration);

}
}

Expand Down

0 comments on commit c67f70a

Please sign in to comment.