Skip to content

Commit

Permalink
Merge pull request #1901 from balena-io/loki
Browse files Browse the repository at this point in the history
Loki
  • Loading branch information
Page- authored Dec 13, 2024
2 parents a9da857 + 42b59c3 commit c3c0aa9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 33 deletions.
75 changes: 44 additions & 31 deletions src/features/device-logs/lib/backends/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function backoff<T extends (...args: any[]) => any>(
fn: T,
retryIf: (err: Error) => boolean,
) {
return async (...args: Parameters<T>): Promise<ReturnType<T> | undefined> => {
return async (...args: Parameters<T>): Promise<Awaited<ReturnType<T>>> => {
let nextBackoff = MIN_BACKOFF;
let prevBackoff = MIN_BACKOFF;
while (nextBackoff <= MAX_BACKOFF) {
Expand Down Expand Up @@ -100,8 +100,8 @@ function backoff<T extends (...args: any[]) => any>(
async function assertLokiLogContext(
ctx: LogContext & Partial<LokiLogContext>,
): Promise<LokiLogContext> {
if ('belongs_to__application' in ctx) {
return ctx as types.RequiredField<typeof ctx, 'belongs_to__application'>;
if ('appId' in ctx) {
return ctx as types.RequiredField<typeof ctx, 'appId'>;
}

const device = await sbvrUtils.api.resin.get({
Expand All @@ -113,11 +113,19 @@ async function assertLokiLogContext(
},
});

if (device == null) {
throw new Error(`Device '${ctx.id}' not found`);
}

// Mutate so that we don't have to repeatedly amend the same context and instead cache it
(ctx as Writable<typeof ctx>).belongs_to__application =
device?.belongs_to__application?.__id;
(ctx as Writable<typeof ctx>).appId =
`${device.belongs_to__application?.__id}`;

if (ctx.appId == null) {
throw new Error(`Device '${ctx.id}' app not found`);
}

return ctx as types.RequiredField<typeof ctx, 'belongs_to__application'>;
return ctx as types.RequiredField<typeof ctx, 'appId'>;
}

export class LokiBackend implements DeviceLogsBackend {
Expand Down Expand Up @@ -181,7 +189,7 @@ export class LokiBackend implements DeviceLogsBackend {
const [, body] = await requestAsync({
url: `http://${lokiQueryAddress}/loki/api/v1/query_range`,
headers: {
'X-Scope-OrgID': `${ctx.belongs_to__application}`,
'X-Scope-OrgID': ctx.appId,
},
qs: {
query: this.getDeviceQuery(ctx),
Expand Down Expand Up @@ -223,7 +231,7 @@ export class LokiBackend implements DeviceLogsBackend {
incrementPublishLogMessagesTotal(countLogs);
const stream = this.fromDeviceLogsToStream(ctx, logs);
try {
await this.push(ctx.belongs_to__application, stream);
await this.push(ctx, stream);
incrementPublishCallSuccessTotal();
} catch (err) {
incrementPublishCallFailedTotal();
Expand All @@ -242,29 +250,34 @@ export class LokiBackend implements DeviceLogsBackend {
}
}

private push(appId: number, stream: loki.StreamAdapter): Promise<any> {
private async push(
ctx: LokiLogContext,
stream: loki.StreamAdapter,
): Promise<void> {
incrementLokiPushTotal();
const pushRequest = new loki.PushRequest();
pushRequest.addStreams(stream);
const startAt = Date.now();
return new Promise<loki.PushResponse>((resolve, reject) => {
this.pusher.push(
pushRequest,
loki.createOrgIdMetadata(String(appId)),
{
deadline: startAt + PUSH_TIMEOUT,
},
(err, response) => {
if (err) {
reject(err);
} else {
resolve(response);
}
},
);
}).finally(() => {
try {
await new Promise<loki.PushResponse>((resolve, reject) => {
this.pusher.push(
pushRequest,
loki.createOrgIdMetadata(ctx.appId),
{
deadline: startAt + PUSH_TIMEOUT,
},
(err, response) => {
if (err) {
reject(err);
} else {
resolve(response);
}
},
);
});
} finally {
updateLokiPushDurationHistogram(Date.now() - startAt);
});
}
}

public async subscribe($ctx: LogContext, subscription: Subscription) {
Expand All @@ -277,7 +290,7 @@ export class LokiBackend implements DeviceLogsBackend {

const call = this.querier.tail(
request,
loki.createOrgIdMetadata(String(ctx.belongs_to__application)),
loki.createOrgIdMetadata(ctx.appId),
);
call.on('data', (response: loki.TailResponse) => {
const stream = response.getStream();
Expand Down Expand Up @@ -316,11 +329,11 @@ export class LokiBackend implements DeviceLogsBackend {
}

private getDeviceQuery(ctx: LokiLogContext) {
return `{application_id="${ctx.belongs_to__application}"} | device_id="${ctx.id}"`;
return `{application_id="${ctx.appId}"} | device_id="${ctx.id}"`;
}

private getKey(ctx: LokiLogContext, suffix = 'logs') {
return `app:${ctx.belongs_to__application}:device:${ctx.id}:${suffix}`;
private getKey(ctx: LokiLogContext) {
return `a${ctx.appId}:d${ctx.id}`;
}

private getStructuredMetadata(ctx: LogContext): loki.LabelPairAdapter[] {
Expand All @@ -330,7 +343,7 @@ export class LokiBackend implements DeviceLogsBackend {
}

private getLabels(ctx: LokiLogContext): string {
return `{application_id="${ctx.belongs_to__application}"}`;
return `{application_id="${ctx.appId}"}`;
}

private validateLog(log: DeviceLog): asserts log is DeviceLog {
Expand Down
2 changes: 1 addition & 1 deletion src/features/device-logs/lib/struct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export interface LogContext {
readonly retention_limit: number;
}
export interface LokiLogContext extends LogContext {
readonly belongs_to__application: number;
readonly appId: string;
}

// This is the format we store and that we output to consumers
Expand Down
2 changes: 1 addition & 1 deletion test/13_loki-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const createContext = (extra = {}): LokiLogContext => {
return {
id: 1,
uuid: '1',
belongs_to__application: 1,
appId: '1',
retention_limit: 100,
...extra,
};
Expand Down

0 comments on commit c3c0aa9

Please sign in to comment.