Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MONIT-40224: ebpfSampler #861

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25,221 changes: 13,318 additions & 11,903 deletions open_source_licenses.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.wavefront</groupId>
<artifactId>proxy</artifactId>
<version>13.0</version>
<version>13.3-SNAPSHOT</version>

<name>Wavefront Proxy</name>
<description>Service for batching and relaying metric traffic to Wavefront</description>
Expand Down
4 changes: 4 additions & 0 deletions proxy/src/main/java/com/wavefront/agent/ProxyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,10 @@ public void setReceivedLogServerDetails(boolean receivedLogServerDetails) {
this.receivedLogServerDetails = receivedLogServerDetails;
}

public String getPreferedSampler() {
return preferedSampler;
}

@Override
public void verifyAndInit() {
throw new UnsupportedOperationException("not implemented");
Expand Down
6 changes: 6 additions & 0 deletions proxy/src/main/java/com/wavefront/agent/ProxyConfigDef.java
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,12 @@ public abstract class ProxyConfigDef extends Configuration {
@ProxyConfigOption(category = Categories.INPUT, subCategory = SubCategories.TRACES)
String traceDerivedCustomTagKeys;

@Parameter(
names = {"--tracePreferedSampler"},
description = "Prefered tracing sampler. e.g. ebpf")
@ProxyConfigOption(category = Categories.INPUT, subCategory = SubCategories.TRACES)
String preferedSampler;

@Parameter(
names = {"--backendSpanHeadSamplingPercentIgnored"},
description = "Ignore " + "spanHeadSamplingPercent config in backend CustomerSettings")
Expand Down
12 changes: 11 additions & 1 deletion proxy/src/main/java/com/wavefront/agent/PushAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import com.wavefront.agent.preprocessor.ReportPointTimestampInRangeFilter;
import com.wavefront.agent.preprocessor.SpanSanitizeTransformer;
import com.wavefront.agent.queueing.*;
import com.wavefront.agent.sampler.EbpfSampler;
import com.wavefront.agent.sampler.PreferredSampler;
import com.wavefront.agent.sampler.SpanSampler;
import com.wavefront.agent.sampler.SpanSamplerUtils;
import com.wavefront.api.agent.AgentConfiguration;
Expand Down Expand Up @@ -514,14 +516,22 @@ private SpanSampler createSpanSampler() {
Sampler durationSampler =
SpanSamplerUtils.getDurationSampler(proxyConfig.getTraceSamplingDuration());
List<Sampler> samplers = SpanSamplerUtils.fromSamplers(rateSampler, durationSampler);
PreferredSampler preferredSampler = null;
if ("ebpf".equals(proxyConfig.getPreferedSampler())) {
preferredSampler =
new EbpfSampler(
proxyConfig.getTraceSamplingRate(), proxyConfig.getTraceSamplingDuration());
logger.info("ebpfSampler is prefered");
}
SpanSampler spanSampler =
new SpanSampler(
new CompositeSampler(samplers),
() ->
entityPropertiesFactoryMap
.get(CENTRAL_TENANT_NAME)
.getGlobalProperties()
.getActiveSpanSamplingPolicies());
.getActiveSpanSamplingPolicies(),
preferredSampler);
return spanSampler;
}

Expand Down
241 changes: 241 additions & 0 deletions proxy/src/main/java/com/wavefront/agent/sampler/EbpfSampler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package com.wavefront.agent.sampler;

import static com.wavefront.sdk.common.Constants.*;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.wavefront.sdk.entities.tracing.sampling.RateSampler;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import wavefront.report.Annotation;
import wavefront.report.Span;

/**
* This Sampler is for Ebpf traffic focusing on not losing a single edge due to sampling in
* application topology. It uses a dynamic sampling rate and prioritizes less frequent edges, error
* spans and long spans
*/
public class EbpfSampler implements PreferredSampler {
private static final String OTEL_ORIGIN = "origin";
private static final String OTEL_ORIGIN_VALUE_EBPF = "opapps-auto";
static final String FROM_SERVICE_TAG_KEY = "from_service";
static final String FROM_SOURCE_TAG_KEY = "from_source";

private final int durationLimit;
private final double samplingRate;
protected static final Logger logger = Logger.getLogger("ebpfSampler");
private double samplingFactor = 0.5;
private long totalTypes = 0;

public EbpfSampler(double samplingRate, int durationLimit) {
this.samplingRate = samplingRate;
this.durationLimit = durationLimit;
}

class CacheKey {
String customer;
String application;
String fromService;
String fromSource;
String toService;
String toSource;

public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CacheKey that = (CacheKey) o;

if (!customer.equals(that.customer)) return false;
if (!application.equals(that.application)) return false;
if (!fromService.equals(that.fromService)) return false;
if (!toService.equals(that.toService)) return false;

if (fromSource != null ? !fromSource.equals(that.fromSource) : that.fromSource != null)
return false;
return toSource != null ? toSource.equals(that.toSource) : that.toSource == null;
}

@Override
public int hashCode() {
int result = application.hashCode();
result = 31 * result + fromService.hashCode();
result = 31 * result + toService.hashCode();
result = 31 * result + customer.hashCode();
result = 31 * result + (fromSource != null ? fromSource.hashCode() : 0);
result = 31 * result + (toSource != null ? toSource.hashCode() : 0);

return result;
}
}

static class EdgeStats {
static final AtomicInteger totalEdgeCount = new AtomicInteger(0);
static final AtomicInteger totalSampledCount = new AtomicInteger(0);
AtomicInteger edgeCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);
AtomicInteger longDurationCount = new AtomicInteger(0);
LongAccumulator maxDuration = new LongAccumulator(Long::max, 0);

public double getTypeRatio() {
if (totalEdgeCount.get() == 0) return 0;
return ((double) edgeCount.get()) / totalEdgeCount.get();
}

public double getErrorRatio() {
if (totalEdgeCount.get() == 0) return 0;
return ((double) errorCount.get()) / totalEdgeCount.get();
}

public double getLongDurationRatio() {
if (totalEdgeCount.get() == 0) return 0;
return ((double) longDurationCount.get()) / totalEdgeCount.get();
}

public double getSampledRatio() {
if (totalEdgeCount.get() == 0) return 0.0;
double ratio = ((double) totalSampledCount.get()) / totalEdgeCount.get();
if (ratio > 1.0) ratio = 1.0;
return ratio;
}

public boolean isSignificant() {
return totalEdgeCount.get() >= 100;
}

@Override
public String toString() {
return String.format(
"errorCount %d, edgeCount %d, longDurationCount %d, maxDuration %d",
errorCount.intValue(),
edgeCount.intValue(),
longDurationCount.longValue(),
maxDuration.longValue());
}
}

private final LoadingCache<CacheKey, EdgeStats> edgeStats =
Caffeine.newBuilder().build(cacheKey -> new EdgeStats());

@Override
public boolean sample(@Nonnull Span span) {
Map<String, String> annotationMap = convertAnnotationToMap(span);

boolean sampled = false;
CacheKey key = extractCacheKey(span, annotationMap);
EdgeStats stats = edgeStats.get(key);
stats.totalEdgeCount.getAndIncrement();

stats.edgeCount.getAndIncrement();
if (!stats.isSignificant()) sampled = true;

if (annotationMap.containsKey(ERROR_TAG_KEY)
&& "true".equalsIgnoreCase(annotationMap.get(ERROR_TAG_KEY))) {
stats.errorCount.getAndIncrement();
if (!sampled && stats.getErrorRatio() < 0.5) sampled = true;
}
if (span.getDuration() > stats.maxDuration.longValue()) {
stats.maxDuration.accumulate(span.getDuration());
sampled = true;
}
if (durationLimit > 0 && span.getDuration() > durationLimit) {
stats.longDurationCount.getAndIncrement();
if (!sampled && stats.getLongDurationRatio() < 0.5) sampled = true;
}
logger.log(Level.FINE, stats.toString());
if (!sampled) {
// adjust sampling factor every 1000 spans
if (stats.totalEdgeCount.get() % 1000 == 0) {
long totalTypes = edgeStats.estimatedSize();
if (Math.abs(totalTypes - this.totalTypes) > 1) {
samplingFactor = estimateSamplingFactor();
this.totalTypes = totalTypes;
logger.info(
String.format(
"ebpf sampling factor adjusted to %f based on total edges of %d",
samplingFactor, totalTypes));
}
}
double samplingRate = 1 - Math.pow(stats.getTypeRatio(), samplingFactor);
logger.log(
Level.FINE,
String.format(
"typeRatio:%f, final sampling rate %f", stats.getTypeRatio(), samplingRate));
sampled =
new RateSampler(samplingRate)
.sample(null, UUID.fromString(span.getTraceId()).getLeastSignificantBits(), 0);
}
if (sampled) stats.totalSampledCount.getAndIncrement();
return sampled;
}

@Override
public boolean isApplicable(@Nonnull Span span) {
List<Annotation> annotations = span.getAnnotations();
for (Annotation annotation : annotations) {
if (OTEL_ORIGIN.equals(annotation.getKey())) {
if (OTEL_ORIGIN_VALUE_EBPF.equals(annotation.getValue())) return true;
}
}
return false;
}

private static Map<String, String> convertAnnotationToMap(Span span) {
return span.getAnnotations().stream()
.collect(Collectors.toMap(Annotation::getKey, Annotation::getValue));
}

private CacheKey extractCacheKey(@Nonnull Span span, @Nonnull Map<String, String> annotationMap) {
CacheKey key = new CacheKey();
key.customer = span.getCustomer();
key.application = annotationMap.getOrDefault(APPLICATION_TAG_KEY, "default_application");
key.toService = annotationMap.getOrDefault(SERVICE_TAG_KEY, "to_service");
key.fromService = annotationMap.getOrDefault(FROM_SERVICE_TAG_KEY, "from_service");
key.toSource = span.getSource();
key.fromSource = annotationMap.getOrDefault(FROM_SOURCE_TAG_KEY, "from_source");

// pixie side seems not honoring spanTopologyDimensions in the customer setting. so not adding
// tags for now

return key;
}

@VisibleForTesting
double estimateSamplingFactor() {
double estimate = 0.5;
long totalTypes = edgeStats.estimatedSize();
double lastDelta = 1;
int factor = 10;
if (totalTypes > 0) {
for (double power = 0.5; power > 0.005; ) {
// System.out.println(power);
double samplingRate = 1 - Math.pow(1.0 / totalTypes, power);
double delta = Math.abs(samplingRate - this.samplingRate);
if (delta < 0.1) break;
if (delta >= lastDelta) break;
else {
estimate = power;
lastDelta = delta;
}
if (Math.abs(power - 0.01) < 0.000001d) {
factor = 1000;
power = 0.009;
} else if (Math.abs(power - 0.1) < 0.000001d) {
factor = 100;
power = 0.09;
} else {
power = power - (1d / factor);
}
}
}
return estimate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.wavefront.agent.sampler;

import javax.annotation.Nonnull;
import wavefront.report.Span;

/** This interface is for preferred sampler which if not null will override other samplers */
public interface PreferredSampler {
boolean sample(@Nonnull Span span);

boolean isApplicable(@Nonnull Span span);
}
17 changes: 12 additions & 5 deletions proxy/src/main/java/com/wavefront/agent/sampler/SpanSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class SpanSampler {
private static final int POLICY_BASED_SAMPLING_MOD_FACTOR = 100;
private static final Logger logger = Logger.getLogger(SpanSampler.class.getCanonicalName());
private final Sampler delegate;
private final PreferredSampler preferredSampler;
private final LoadingCache<String, Predicate<Span>> spanPredicateCache =
Caffeine.newBuilder()
.expireAfterAccess(EXPIRE_AFTER_ACCESS_SECONDS, TimeUnit.SECONDS)
Expand All @@ -62,9 +63,11 @@ public Predicate<Span> load(@NonNull String key) {
*/
public SpanSampler(
Sampler delegate,
@Nonnull Supplier<List<SpanSamplingPolicy>> activeSpanSamplingPoliciesSupplier) {
@Nonnull Supplier<List<SpanSamplingPolicy>> activeSpanSamplingPoliciesSupplier,
PreferredSampler preferredSampler) {
this.delegate = delegate;
this.activeSpanSamplingPoliciesSupplier = activeSpanSamplingPoliciesSupplier;
this.preferredSampler = preferredSampler;
}

/**
Expand All @@ -89,6 +92,12 @@ public boolean sample(Span span, @Nullable Counter discarded) {
if (isForceSampled(span)) {
return true;
}

// Prefered sampling
if (preferredSampler != null && preferredSampler.isApplicable(span)) {
return preferredSampler.sample(span);
}

// Policy based span sampling
List<SpanSamplingPolicy> activeSpanSamplingPolicies = activeSpanSamplingPoliciesSupplier.get();
if (activeSpanSamplingPolicies != null) {
Expand Down Expand Up @@ -127,11 +136,9 @@ public boolean sample(Span span, @Nullable Counter discarded) {
}

/**
* Util method to determine if a span is force sampled. Currently force samples if any of the
* below conditions are met. 1. The span annotation debug=true is present 2.
* alwaysSampleErrors=true and the span annotation error=true is present.
* Util method to determine if a span is force sampled. force samples if The span annotation
* debug=true is present
*
* @param span The span to sample
* @return true if the span should be force sampled.
*/
private boolean isForceSampled(Span span) {
Expand Down
Loading