Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track code.function #10

Merged
merged 3 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ export class AckQueue extends MessageQueue {
protected async _sendBatch(batch: QueuedMessages): Promise<QueuedMessages> {
const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan(
batch.map(b => b.message.tracingSpan),
this._subscriber.name
this._subscriber.name,
'AckQueue._sendBatch'
);
const client = await this._subscriber.getClient();
const ackIds = batch.map(({message}) => message.ackId);
Expand Down Expand Up @@ -541,7 +542,8 @@ export class ModAckQueue extends MessageQueue {
protected async _sendBatch(batch: QueuedMessages): Promise<QueuedMessages> {
const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan(
batch.map(b => b.message.tracingSpan),
this._subscriber.name
this._subscriber.name,
'ModAckQueue._sendBatch'
);
const client = await this._subscriber.getClient();
const subscription = this._subscriber.name;
Expand Down
7 changes: 4 additions & 3 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ export class Publisher {

// Ensure that there's a parent span for subsequent publishes
// to hang off of.
this.getParentSpan(message);
this.getParentSpan(message, 'Publisher.publishMessage');

if (!message.orderingKey) {
this.queue.add(message, callback!);
Expand Down Expand Up @@ -333,7 +333,7 @@ export class Publisher {
*
* @param {PubsubMessage} message The message to create a span for
*/
getParentSpan(message: PubsubMessage): Span | undefined {
getParentSpan(message: PubsubMessage, caller: string): Span | undefined {
const enabled = tracing.isEnabled(this.settings);
if (!enabled) {
return undefined;
Expand All @@ -345,7 +345,8 @@ export class Publisher {

const span = tracing.PubsubSpans.createPublisherSpan(
message,
this.topic.name
this.topic.name,
caller
);

// If the span's context is valid we should inject the propagation trace context.
Expand Down
3 changes: 2 additions & 1 deletion src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ export abstract class MessageQueue extends EventEmitter {

const rpcSpan = tracing.PubsubSpans.createPublishRpcSpan(
spanMessages,
topic.name
topic.name,
'MessageQueue._publish'
);

const requestCallback = topic.request<google.pubsub.v1.IPublishResponse>;
Expand Down
42 changes: 31 additions & 11 deletions src/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ export function getTopicInfo(fullName: string): AttributeParams {
export class PubsubSpans {
static createAttributes(
params: AttributeParams,
message?: PubsubMessage
message?: PubsubMessage,
caller?: string
): SpanAttributes {
const destinationName = params.topicName ?? params.subName;
const destinationId = params.topicId ?? params.subId;
Expand All @@ -299,6 +300,7 @@ export class PubsubSpans {
['messaging.system']: 'gcp_pubsub',
['messaging.destination.name']: destinationId ?? destinationName,
['gcp.project_id']: projectId,
['code.function']: caller ?? 'unknown',
} as SpanAttributes;

if (message) {
Expand Down Expand Up @@ -327,11 +329,15 @@ export class PubsubSpans {
return spanAttributes;
}

static createPublisherSpan(message: PubsubMessage, topicName: string): Span {
static createPublisherSpan(
message: PubsubMessage,
topicName: string,
caller: string
): Span {
const topicInfo = getTopicInfo(topicName);
const span: Span = getTracer().startSpan(`${topicName} create`, {
kind: SpanKind.PRODUCER,
attributes: PubsubSpans.createAttributes(topicInfo, message),
attributes: PubsubSpans.createAttributes(topicInfo, message, caller),
});
if (topicInfo.topicId) {
span.updateName(`${topicInfo.topicId} create`);
Expand All @@ -357,11 +363,12 @@ export class PubsubSpans {
static createReceiveSpan(
message: PubsubMessage,
subName: string,
parent: Context | undefined
parent: Context | undefined,
caller: string
): Span {
const subInfo = getSubscriptionInfo(subName);
const name = `${subInfo.subId ?? subName} subscribe`;
const attributes = this.createAttributes(subInfo, message);
const attributes = this.createAttributes(subInfo, message, caller);
if (subInfo.subId) {
attributes['messaging.destination.name'] = subInfo.subId;
}
Expand Down Expand Up @@ -414,10 +421,13 @@ export class PubsubSpans {

static createPublishRpcSpan(
messages: MessageWithAttributes[],
topicName: string
topicName: string,
caller: string
): Span {
const spanAttributes = PubsubSpans.createAttributes(
getTopicInfo(topicName)
getTopicInfo(topicName),
undefined,
caller
);
const links: Link[] = messages
.map(m => ({context: m.parentSpan?.spanContext()}) as Link)
Expand Down Expand Up @@ -451,10 +461,13 @@ export class PubsubSpans {

static createReceiveResponseRpcSpan(
messageSpans: (Span | undefined)[],
subName: string
subName: string,
caller: string
): Span {
const spanAttributes = PubsubSpans.createAttributes(
getSubscriptionInfo(subName)
getSubscriptionInfo(subName),
undefined,
caller
);
const links: Link[] = messageSpans
.map(m => ({context: m?.spanContext()}) as Link)
Expand Down Expand Up @@ -520,14 +533,16 @@ export class PubsubSpans {
message: MessageWithAttributes,
subName: string,
type: 'modack' | 'ack' | 'nack',
caller: string,
deadline?: Duration,
isInitial?: boolean
): Span | undefined {
const subInfo = getSubscriptionInfo(subName);
const span = PubsubSpans.createReceiveSpan(
message,
`${subInfo.subId ?? subInfo.subName} ${type}`,
undefined
undefined,
caller
);

if (deadline) {
Expand Down Expand Up @@ -744,7 +759,12 @@ export function extractSpan(
}
}

const span = PubsubSpans.createReceiveSpan(message, subName, context);
const span = PubsubSpans.createReceiveSpan(
message,
subName,
context,
'extractSpan'
);
message.parentSpan = span;
return span;
}
Expand Down
3 changes: 2 additions & 1 deletion test/publisher/flow-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ describe('Flow control publisher', () => {
data: Buffer.from('foo'),
parentSpan: tracing.PubsubSpans.createPublisherSpan(
{},
'projects/foo/topics/topic'
'projects/foo/topics/topic',
'tests'
),
};
fcp.publish(message as unknown as PubsubMessage);
Expand Down
38 changes: 28 additions & 10 deletions test/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ describe('OpenTelemetryTracer', () => {
const message: PubsubMessage = {};
const span = otel.PubsubSpans.createPublisherSpan(
message,
'projects/test/topics/topicfoo'
'projects/test/topics/topicfoo',
'tests'
) as trace.Span;
span.end();

Expand All @@ -97,7 +98,8 @@ describe('OpenTelemetryTracer', () => {
};
const span = otel.PubsubSpans.createPublisherSpan(
message,
'projects/test/topics/topicfoo'
'projects/test/topics/topicfoo',
'tests'
) as trace.Span;

otel.injectSpan(span, message, otel.OpenTelemetryLevel.Modern);
Expand All @@ -118,7 +120,8 @@ describe('OpenTelemetryTracer', () => {
};
const span = otel.PubsubSpans.createPublisherSpan(
message,
'projects/test/topics/topicfoo'
'projects/test/topics/topicfoo',
'tests'
);

otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy);
Expand Down Expand Up @@ -146,7 +149,8 @@ describe('OpenTelemetryTracer', () => {
};
const span = otel.PubsubSpans.createPublisherSpan(
message,
'projects/test/topics/topicfoo'
'projects/test/topics/topicfoo',
'tests'
);

const warnSpy = sinon.spy(console, 'warn');
Expand Down Expand Up @@ -213,7 +217,11 @@ describe('OpenTelemetryTracer', () => {
ackId: 'ackack',
};

const topicAttrs = otel.PubsubSpans.createAttributes(topicInfo, message);
const topicAttrs = otel.PubsubSpans.createAttributes(
topicInfo,
message,
'tests'
);
assert.deepStrictEqual(topicAttrs, {
'messaging.system': 'gcp_pubsub',
'messaging.destination.name': topicInfo.topicId,
Expand All @@ -223,6 +231,7 @@ describe('OpenTelemetryTracer', () => {
'messaging.gcp_pubsub.message.exactly_once_delivery':
message.isExactlyOnceDelivery,
'messaging.gcp_pubsub.message.ack_id': message.ackId,
'code.function': 'tests',
});

// Check again with no calculated size and other parameters missing.
Expand All @@ -231,12 +240,17 @@ describe('OpenTelemetryTracer', () => {
delete message.isExactlyOnceDelivery;
delete message.ackId;

const topicAttrs2 = otel.PubsubSpans.createAttributes(topicInfo, message);
const topicAttrs2 = otel.PubsubSpans.createAttributes(
topicInfo,
message,
'tests'
);
assert.deepStrictEqual(topicAttrs2, {
'messaging.system': 'gcp_pubsub',
'messaging.destination.name': topicInfo.topicId,
'gcp.project_id': topicInfo.projectId,
'messaging.message.envelope.size': message.data?.length,
'code.function': 'tests',
});
});
});
Expand Down Expand Up @@ -266,7 +280,8 @@ describe('OpenTelemetryTracer', () => {
it('creates publisher spans', () => {
const span = otel.PubsubSpans.createPublisherSpan(
tests.message,
tests.topicInfo.topicName!
tests.topicInfo.topicName!,
'tests'
);
span.end();

Expand All @@ -289,7 +304,8 @@ describe('OpenTelemetryTracer', () => {
it('updates publisher topic names', () => {
const span = otel.PubsubSpans.createPublisherSpan(
tests.message,
tests.topicInfo.topicName!
tests.topicInfo.topicName!,
'tests'
);
otel.PubsubSpans.updatePublisherTopicName(
span,
Expand All @@ -313,12 +329,14 @@ describe('OpenTelemetryTracer', () => {
it('creates receive spans', () => {
const parentSpan = otel.PubsubSpans.createPublisherSpan(
tests.message,
tests.topicInfo.topicName!
tests.topicInfo.topicName!,
'tests'
);
const span = otel.PubsubSpans.createReceiveSpan(
tests.message,
tests.subInfo.subName!,
otel.spanContextToContext(parentSpan.spanContext())
otel.spanContextToContext(parentSpan.spanContext()),
'tests'
);
span.end();
parentSpan.end();
Expand Down
Loading