Skip to content

Commit

Permalink
Add AWS X-Ray Remote Sampler
Browse files Browse the repository at this point in the history
  • Loading branch information
jj22ee committed Aug 12, 2024
1 parent c1398e1 commit 036828d
Show file tree
Hide file tree
Showing 12 changed files with 982 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Attributes, Context, DiagLogger, Link, SpanKind, diag } from '@opentelemetry/api';
import { ParentBasedSampler, Sampler, SamplingResult } from '@opentelemetry/sdk-trace-base';
import { AwsXraySamplingClient } from './aws-xray-sampling-client';
import { FallbackSampler } from './fallback-sampler';
import {
AwsXRayRemoteSamplerConfig,
GetSamplingRulesResponse,
GetSamplingTargetsBody,
GetSamplingTargetsResponse,
SamplingRuleRecord,
SamplingTargetDocument,
} from './remote-sampler.types';
import { RuleCache, TargetMap } from './rule-cache';
import { SamplingRuleApplier } from './sampling-rule-applier';

// 5 minute default sampling rules polling interval
const DEFAULT_INTERVAL_SECONDS: number = 5 * 60;
// Default endpoint for awsproxy : https://aws-otel.github.io/docs/getting-started/remote-sampling#enable-awsproxy-extension
const DEFAULT_AWS_PROXY_ENDPOINT: string = 'http://localhost:2000';
// 10 second default sampling targets polling interval
const TARGET_POLLING_INTERAVAL_MILLIS: number = 10 * 1000;

export class AwsXRayRemoteSampler implements Sampler {
private pollingIntervalMillis: number;
private awsProxyEndpoint: string;
private ruleCache: RuleCache;
private fallbackSampler: ParentBasedSampler;
private samplerDiag: DiagLogger;
private rulePoller: NodeJS.Timer | undefined;
private targetPoller: NodeJS.Timer | undefined;
private clientId: string;
private rulePollingJitterMillis: number;
private targetPollingJitterMillis: number;
private samplingClient: AwsXraySamplingClient;

constructor(samplerConfig: AwsXRayRemoteSamplerConfig) {
this.samplerDiag = diag.createComponentLogger({
namespace: '@aws-observability/aws-xray-remote-sampler',
});

if (samplerConfig.pollingInterval == null || samplerConfig.pollingInterval < 10) {
this.samplerDiag.warn(`'pollingInterval' is undefined or too small. Defaulting to ${DEFAULT_INTERVAL_SECONDS}`);
this.pollingIntervalMillis = DEFAULT_INTERVAL_SECONDS * 1000;
} else {
this.pollingIntervalMillis = samplerConfig.pollingInterval * 1000;
}

this.rulePollingJitterMillis = Math.random() * 5 * 1000;
this.targetPollingJitterMillis = (Math.random() / 10) * 1000;

this.awsProxyEndpoint = samplerConfig.endpoint ? samplerConfig.endpoint : DEFAULT_AWS_PROXY_ENDPOINT;
this.fallbackSampler = new ParentBasedSampler({ root: new FallbackSampler() });
this.clientId = this.generateClientId();
this.ruleCache = new RuleCache(samplerConfig.resource);

this.samplingClient = new AwsXraySamplingClient(this.awsProxyEndpoint, this.samplerDiag);

// execute first Sampling Rules update and then start the respective poller
this.startSamplingRulesPoller();

// execute first Sampling Targets update and then start the respective poller
this.startSamplingTargetsPoller();
}

public shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
if (this.ruleCache.isExpired()) {
return this.fallbackSampler.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}

const matchedRule: SamplingRuleApplier | undefined = this.ruleCache.getMatchedRule(attributes);
if (matchedRule) {
return matchedRule.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}

this.samplerDiag.debug(
'Using fallback sampler as no rule match was found. This is likely due to a bug, since default rule should always match'
);
return this.fallbackSampler.shouldSample(context, traceId, spanName, spanKind, attributes, links);
}

public toString(): string {
return 'AwsXRayRemoteSampler{remote sampling with AWS X-Ray}';
}

private startSamplingRulesPoller(): void {
// Execute first update
this.getAndUpdateSamplingRules();
// Update sampling rules every 5 minutes (or user-defined polling interval)
this.rulePoller = setInterval(
() => this.getAndUpdateSamplingRules(),
this.pollingIntervalMillis + this.rulePollingJitterMillis
);
this.rulePoller.unref();
}

private startSamplingTargetsPoller(): void {
// Execute first update
this.getAndUpdateSamplingTargets();
// Update sampling targets every 10 seconds
this.targetPoller = setInterval(
() => this.getAndUpdateSamplingTargets(),
TARGET_POLLING_INTERAVAL_MILLIS + this.targetPollingJitterMillis
);
this.targetPoller.unref();
}

private getAndUpdateSamplingTargets(): void {
const requestBody: GetSamplingTargetsBody = {
SamplingStatisticsDocuments: this.ruleCache.createSamplingStatisticsDocuments(this.clientId),
};

this.samplingClient.fetchSamplingTargets(requestBody, this.updateSamplingTargets.bind(this));
}

private getAndUpdateSamplingRules(): void {
this.samplingClient.fetchSamplingRules(this.updateSamplingRules.bind(this));
}

private updateSamplingRules(responseObject: GetSamplingRulesResponse): void {
let samplingRules: SamplingRuleApplier[] = [];

samplingRules = [];
if (responseObject.SamplingRuleRecords) {
responseObject.SamplingRuleRecords.forEach((record: SamplingRuleRecord) => {
if (record.SamplingRule) {
samplingRules.push(new SamplingRuleApplier(record.SamplingRule, undefined));
}
});
this.ruleCache.updateRules(samplingRules);
} else {
this.samplerDiag.error('SamplingRuleRecords from GetSamplingRules request is not defined');
}
}

private updateSamplingTargets(responseObject: GetSamplingTargetsResponse): void {
try {
const targetDocuments: TargetMap = {};

// Create Target-Name-to-Target-Map from sampling targets response
responseObject.SamplingTargetDocuments.forEach((newTarget: SamplingTargetDocument) => {
targetDocuments[newTarget.RuleName] = newTarget;
});

// Update targets in the cache
const refreshSamplingRules: boolean = this.ruleCache.updateTargets(
targetDocuments,
responseObject.LastRuleModification
);

if (refreshSamplingRules) {
this.samplerDiag.debug('Performing out-of-band sampling rule polling to fetch updated rules.');
clearInterval(this.rulePoller);
this.startSamplingRulesPoller();
}
} catch (error: unknown) {
this.samplerDiag.debug('Error occurred when updating Sampling Targets');
}
}

private generateClientId(): string {
const hexChars: string[] = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'];
const clientIdArray: string[] = [];
for (let _: number = 0; _ < 24; _ += 1) {
clientIdArray.push(hexChars[Math.floor(Math.random() * hexChars.length)]);
}
return clientIdArray.join('');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { DiagLogFunction, DiagLogger } from '@opentelemetry/api';
import * as http from 'http';
import { GetSamplingRulesResponse, GetSamplingTargetsBody, GetSamplingTargetsResponse } from './remote-sampler.types';

export class AwsXraySamplingClient {
private getSamplingRulesEndpoint: string;
private samplingTargetsEndpoint: string;
private samplerDiag: DiagLogger;

constructor(endpoint: string, samplerDiag: DiagLogger) {
this.getSamplingRulesEndpoint = endpoint + '/GetSamplingRules';
this.samplingTargetsEndpoint = endpoint + '/SamplingTargets';
this.samplerDiag = samplerDiag;
}

public fetchSamplingTargets(
requestBody: GetSamplingTargetsBody,
callback: (responseObject: GetSamplingTargetsResponse) => void
) {
this.makeSamplingRequest<GetSamplingTargetsResponse>(
this.samplingTargetsEndpoint,
callback,
this.samplerDiag.debug,
JSON.stringify(requestBody)
);
}

public fetchSamplingRules(callback: (responseObject: GetSamplingRulesResponse) => void) {
this.makeSamplingRequest<GetSamplingRulesResponse>(this.getSamplingRulesEndpoint, callback, this.samplerDiag.error);
}

private makeSamplingRequest<T>(
url: string,
callback: (responseObject: T) => void,
logger: DiagLogFunction,
requestBodyJsonString?: string
): void {
const options = {
method: 'POST',
headers: {},
};

if (requestBodyJsonString) {
options.headers = {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(requestBodyJsonString),
};
}

const req = http
.request(url, options, response => {
response.setEncoding('utf-8');
let responseData = '';
response.on('data', dataChunk => (responseData += dataChunk));
response.on('end', () => {
if (response.statusCode === 200 && responseData.length > 0) {
let responseObject: T | undefined = undefined;
try {
responseObject = JSON.parse(responseData) as T;
} catch (e: unknown) {
logger(`Error occurred when parsing responseData from ${url}`);
}

if (responseObject) {
callback(responseObject);
}
} else {
this.samplerDiag.debug(`${url} Response Code is: ${response.statusCode}`);
this.samplerDiag.debug(`${url} responseData is: ${responseData}`);
}
});
})
.on('error', (error: unknown) => {
logger(`Error occurred when making an HTTP POST to ${url}: ${error}`);
});
if (requestBodyJsonString) {
req.end(requestBodyJsonString);
} else {
req.end();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api';
import { Sampler, SamplingDecision, SamplingResult, TraceIdRatioBasedSampler } from '@opentelemetry/sdk-trace-base';
import { RateLimitingSampler } from './rate-limiting-sampler';

// FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
export class FallbackSampler implements Sampler {
private fixedRateSampler: TraceIdRatioBasedSampler;
private rateLimitingSampler: RateLimitingSampler;

constructor() {
this.fixedRateSampler = new TraceIdRatioBasedSampler(0.05);
this.rateLimitingSampler = new RateLimitingSampler(1);
}

shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
const samplingResult: SamplingResult = this.rateLimitingSampler.shouldSample(
context,
traceId,
spanName,
spanKind,
attributes,
links
);

if (samplingResult.decision !== SamplingDecision.NOT_RECORD) {
return samplingResult;
}

return this.fixedRateSampler.shouldSample(context, traceId);
}

public toString(): string {
return 'FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
export * from './aws-xray-remote-sampler';
export { AwsXRayRemoteSamplerConfig } from './remote-sampler.types';
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

/*
* The RateLimiter keeps track of the current reservoir quota balance available (measured via available time)
* If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time)
* A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available.
*/
export class RateLimiter {
// Quota assigned to client to dictate maximum quota balance that can be consumed per second.
private quota: number;
private MAX_BALANCE_MILLIS: number;
// Used to measure current quota balance.
private walletFloorMillis: number;

constructor(quota: number, maxBalanceInSeconds: number = 1) {
this.MAX_BALANCE_MILLIS = maxBalanceInSeconds * 1000.0;
this.quota = quota;
this.walletFloorMillis = Date.now();
// current "balance" would be `ceiling - floor`
}

public take(cost: number = 1): boolean {
if (this.quota === 0) {
return false;
}

const quotaPerMillis: number = this.quota / 1000.0;

// assume divide by zero not possible
const costInMillis: number = cost / quotaPerMillis;

const walletCeilingMillis = Date.now();
let currentBalanceMillis: number = walletCeilingMillis - this.walletFloorMillis;
currentBalanceMillis = Math.min(currentBalanceMillis, this.MAX_BALANCE_MILLIS);
const pendingRemainingBalanceMillis = currentBalanceMillis - costInMillis;
if (pendingRemainingBalanceMillis >= 0) {
this.walletFloorMillis = walletCeilingMillis - pendingRemainingBalanceMillis;
return true;
}
// No changes to the wallet state
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

import { Attributes, Context, Link, SpanKind } from '@opentelemetry/api';
import { Sampler, SamplingDecision, SamplingResult } from '@opentelemetry/sdk-trace-base';
import { RateLimiter } from './rate-limiter';

export class RateLimitingSampler implements Sampler {
private quota: number;
private reservoir: RateLimiter;

constructor(quota: number) {
this.quota = quota;
this.reservoir = new RateLimiter(quota);
}

shouldSample(
context: Context,
traceId: string,
spanName: string,
spanKind: SpanKind,
attributes: Attributes,
links: Link[]
): SamplingResult {
if (this.reservoir.take(1)) {
return { decision: SamplingDecision.RECORD_AND_SAMPLED, attributes: attributes };
}
return { decision: SamplingDecision.NOT_RECORD, attributes: attributes };
}

public toString(): string {
return `RateLimitingSampler{rate limiting sampling with sampling config of ${this.quota} req/sec and 0% of additional requests}`;
}
}
Loading

0 comments on commit 036828d

Please sign in to comment.