Skip to content

Commit

Permalink
Logs: differentiate between internal/output logs to reduce processing
Browse files Browse the repository at this point in the history
This avoids adding the `createdAt`/`nanoTimestamp` timestamps at points
when they are not actually necessary/expected

Change-type: patch
  • Loading branch information
Page- committed Dec 24, 2024
1 parent af5ba63 commit bea6440
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 75 deletions.
67 changes: 38 additions & 29 deletions src/features/device-logs/lib/backends/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import {
LOKI_PUSH_TIMEOUT,
} from '../../../../lib/config.js';
import type {
DeviceLog,
DeviceLogsBackend,
InternalDeviceLog,
LogContext,
LokiLogContext,
OutputDeviceLog,
Subscription,
} from '../struct.js';
import { captureException } from '../../../../infra/error-handling/index.js';
Expand All @@ -42,6 +43,11 @@ import { requestAsync } from '../../../../infra/request-promise/index.js';

const { BadRequestError } = errors;

interface LokiDeviceLog extends Omit<InternalDeviceLog, 'nanoTimestamp'> {
version?: number;
createdAt?: number;
}

// invert status object for quick lookup of status identifier using status code
const statusKeys = _.transform(
loki.status,
Expand All @@ -57,7 +63,6 @@ const lokiIngesterAddress = `${LOKI_INGESTER_HOST}:${LOKI_INGESTER_GRPC_PORT}`;
const MIN_BACKOFF = 100;
const MAX_BACKOFF = 10 * 1000;
const VERSION = 2;
const VERBOSE_ERROR_MESSAGE = false;

function createTimestampFromDate(date = new Date()) {
const timestamp = new loki.Timestamp();
Expand Down Expand Up @@ -188,7 +193,10 @@ export class LokiBackend implements DeviceLogsBackend {
* @param ctx
* @param count
*/
public async history($ctx: LogContext, count: number): Promise<DeviceLog[]> {
public async history(
$ctx: LogContext,
count: number,
): Promise<OutputDeviceLog[]> {
const ctx = await assertLokiLogContext($ctx);

const [, body] = await requestAsync({
Expand All @@ -205,30 +213,32 @@ export class LokiBackend implements DeviceLogsBackend {
gzip: LOKI_HISTORY_GZIP,
});

const logs = (
return _(
body.data.result as Array<{
values: Array<[timestamp: string, logLine: string]>;
}>
}>,
)
.flatMap(({ values }) => values)
.map(([timestamp, logLine]) => {
const log = JSON.parse(logLine);
log.nanoTimestamp = BigInt(timestamp);
.map(([timestamp, logLine]): [bigint, OutputDeviceLog] => {
const log: LokiDeviceLog = JSON.parse(logLine);
if (log.version !== VERSION) {
throw new Error(
`Invalid Loki serialization version: ${JSON.stringify(log)}`,
);
}
delete log.version;
return log as DeviceLog;
});

return _.orderBy(logs, 'nanoTimestamp', 'asc');
const nanoTimestamp = BigInt(timestamp);
log.createdAt = Math.floor(Number(nanoTimestamp / 1000000n));
return [nanoTimestamp, log as OutputDeviceLog];
})
.sortBy(([timestamp]) => timestamp)
.map(([, log]) => log)
.value();
}

public async publish(
ctx: LogContext,
logs: Array<DeviceLog & { version?: number }>,
logs: Array<InternalDeviceLog & { version?: number }>,
): Promise<any> {
const logEntries = this.fromDeviceLogsToEntries(ctx, logs);

Expand All @@ -243,14 +253,10 @@ export class LokiBackend implements DeviceLogsBackend {
} catch (err) {
incrementPublishCallFailedTotal();
incrementPublishLogMessagesDropped(countLogs);
let message = `Failed to publish logs for device ${lokiCtx.uuid}`;
if (VERBOSE_ERROR_MESSAGE) {
message += JSON.stringify(logs, omitNanoTimestamp, '\t').substring(
0,
1000,
);
}
captureException(err, message);
captureException(
err,
`Failed to publish logs for device ${lokiCtx.uuid}`,
);
throw new BadRequestError(
`Failed to publish logs for device ${lokiCtx.uuid}`,
);
Expand Down Expand Up @@ -353,21 +359,24 @@ export class LokiBackend implements DeviceLogsBackend {
return `{fleet_id="${ctx.appId}"}`;
}

private fromStreamToDeviceLogs(stream: loki.StreamAdapter): DeviceLog[] {
private fromStreamToDeviceLogs(
stream: loki.StreamAdapter,
): OutputDeviceLog[] {
try {
return stream.getEntriesList().map((entry) => {
const log = JSON.parse(entry.getLine());
const timestamp = entry.getTimestamp()!;
log.nanoTimestamp =
BigInt(timestamp.getSeconds()) * 1000000000n +
BigInt(timestamp.getNanos());
const log: LokiDeviceLog = JSON.parse(entry.getLine());
if (log.version !== VERSION) {
throw new Error(
`Invalid Loki serialization version: ${JSON.stringify(log)}`,
);
}
delete log.version;
return log as DeviceLog;
const timestampEntry = entry.getTimestamp()!;
const nanoTimestamp =
BigInt(timestampEntry.getSeconds()) * 1000000000n +
BigInt(timestampEntry.getNanos());
log.createdAt = Math.floor(Number(nanoTimestamp / 1000000n));
return log as OutputDeviceLog;
});
} catch (err) {
captureException(err, `Failed to convert stream to device log`);
Expand All @@ -377,7 +386,7 @@ export class LokiBackend implements DeviceLogsBackend {

private fromDeviceLogsToEntries(
ctx: LogContext,
logs: Array<DeviceLog & { version?: number }>,
logs: Array<InternalDeviceLog & { version?: number }>,
) {
const structuredMetadata = this.getStructuredMetadata(ctx);
return logs.map((log) => {
Expand Down
30 changes: 23 additions & 7 deletions src/features/device-logs/lib/backends/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import {
} from '../../../../lib/config.js';
import { DAYS } from '@balena/env-parsing';
import type {
DeviceLog,
DeviceLogsBackend,
InternalDeviceLog,
LogContext,
OutputDeviceLog,
Subscription,
} from '../struct.js';
import {
Expand Down Expand Up @@ -53,6 +54,10 @@ const schema = avro.Type.forSchema({
],
});

interface RedisDeviceLog extends OutputDeviceLog {
version: number;
}

declare module 'ioredis' {
interface RedisCommander<Context> {
publishLogs(
Expand Down Expand Up @@ -145,7 +150,10 @@ export class RedisBackend implements DeviceLogsBackend {
this.subscriptions = new EventEmitter();
}

public async history(ctx: LogContext, count: number): Promise<DeviceLog[]> {
public async history(
ctx: LogContext,
count: number,
): Promise<OutputDeviceLog[]> {
if (!this.connected) {
throw new ServiceUnavailableError();
}
Expand All @@ -163,7 +171,10 @@ export class RedisBackend implements DeviceLogsBackend {
return this.connected;
}

public async publish(ctx: LogContext, logs: DeviceLog[]): Promise<void> {
public async publish(
ctx: LogContext,
logs: InternalDeviceLog[],
): Promise<void> {
if (!this.connected) {
throw new ServiceUnavailableError();
}
Expand Down Expand Up @@ -246,7 +257,9 @@ export class RedisBackend implements DeviceLogsBackend {
}
};

private async fromRedisLog(payload: string): Promise<DeviceLog | undefined> {
private async fromRedisLog(
payload: string,
): Promise<RedisDeviceLog | undefined> {
try {
let decompressedBuffer = Buffer.from(payload, BUFFER_ENCODING);
const compression = await getCompressionLib();
Expand All @@ -268,15 +281,18 @@ export class RedisBackend implements DeviceLogsBackend {
delete log.serviceId;
}
delete log.version;
return log as DeviceLog;
return log as RedisDeviceLog;
} catch (err) {
captureException(err, `Failed to deserialize a Redis log: ${payload}`);
}
}

private async toRedisLog(log: DeviceLog): Promise<string> {
private async toRedisLog(log: InternalDeviceLog): Promise<string> {
try {
let compressedLog = schema.toBuffer(log);
let compressedLog = schema.toBuffer({
...log,
createdAt: Math.floor(Number(log.nanoTimestamp / 1000000n)),
});
const compression = await getCompressionLib();
if (compression != null) {
compressedLog = await compression.compress(compressedLog);
Expand Down
21 changes: 10 additions & 11 deletions src/features/device-logs/lib/read.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import {
handleHttpErrors,
} from '../../../infra/error-handling/index.js';

import type { DeviceLog, DeviceLogsBackend, LogContext } from './struct.js';
import type {
DeviceLogsBackend,
LogContext,
OutputDeviceLog,
} from './struct.js';
import { StreamState } from './struct.js';
import {
addRetentionLimit,
getPrimaryBackend,
getSecondaryBackend,
omitNanoTimestamp,
shouldReadFromSecondary,
} from './config.js';
import { getNanoTimestamp } from '../../../lib/utils.js';
import type { SetupOptions } from '../../../index.js';
import {
LOGS_DEFAULT_HISTORY_COUNT,
Expand Down Expand Up @@ -54,9 +56,7 @@ export const read =
LOGS_DEFAULT_HISTORY_COUNT,
);

res
.set('Content-Type', 'application/json')
.send(JSON.stringify(logs, omitNanoTimestamp));
res.json(logs);
}
} catch (err) {
if (handleHttpErrors(req, res, err)) {
Expand All @@ -75,7 +75,7 @@ async function handleStreamingRead(
const backend = await getReadBackend();
let state: StreamState = StreamState.Buffering;
let dropped = 0;
const buffer: DeviceLog[] = [];
const buffer: OutputDeviceLog[] = [];

res.setHeader('Content-Type', NDJSON_CTYPE);
res.setHeader('Cache-Control', 'no-cache');
Expand All @@ -93,14 +93,14 @@ async function handleStreamingRead(
return r;
}

function onLog(log: DeviceLog) {
function onLog(log: OutputDeviceLog) {
if (state === StreamState.Buffering) {
buffer.push(log);
} else if (state === StreamState.Saturated) {
dropped++;
} else if (state !== StreamState.Closed) {
if (
!write(JSON.stringify(log, omitNanoTimestamp) + '\n') &&
!write(JSON.stringify(log) + '\n') &&
state === StreamState.Writable
) {
state = StreamState.Saturated;
Expand All @@ -121,7 +121,6 @@ async function handleStreamingRead(
if (dropped) {
const now = Date.now();
onLog({
nanoTimestamp: getNanoTimestamp(),
createdAt: now,
timestamp: now,
isStdErr: true,
Expand Down Expand Up @@ -224,7 +223,7 @@ function getHistory(
ctx: LogContext,
{ query }: Request,
defaultCount: number,
): Resolvable<DeviceLog[]> {
): Resolvable<OutputDeviceLog[]> {
const count = getCount(query.count as string | undefined, defaultCount);

// Optimize the case where the caller doesn't need any history
Expand Down
9 changes: 4 additions & 5 deletions src/features/device-logs/lib/store.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Request, RequestHandler, Response } from 'express';
import type { DeviceLog, LogContext, SupervisorLog } from './struct.js';
import type { InternalDeviceLog, LogContext, SupervisorLog } from './struct.js';

import onFinished from 'on-finished';
import type { permissions } from '@balena/pinejs';
Expand Down Expand Up @@ -94,7 +94,7 @@ const getWriteContext = (() => {
export const store: RequestHandler = async (req: Request, res: Response) => {
try {
const body: SupervisorLog[] = req.body;
const logs: DeviceLog[] = supervisor.convertLogs(body);
const logs = supervisor.convertLogs(body);
if (logs.length) {
const ctx = await getWriteContext(req);
// start publishing to both backends
Expand Down Expand Up @@ -133,7 +133,7 @@ const secondaryBackend = LOGS_SECONDARY_BACKEND_ENABLED
? await getSecondaryBackend()
: undefined;

const publishBackend = async (ctx: LogContext, buffer: DeviceLog[]) => {
const publishBackend = async (ctx: LogContext, buffer: InternalDeviceLog[]) => {
const primaryBackendPromise = primaryBackend.publish(ctx, buffer);
const secondaryBackendPromise = shouldPublishToSecondary()
? secondaryBackend?.publish(ctx, buffer).catch((err) => {
Expand Down Expand Up @@ -162,7 +162,7 @@ function handleStreamingWrite(
}

const bufferLimit = Math.min(LOGS_WRITE_BUFFER_LIMIT, ctx.retention_limit);
const buffer: DeviceLog[] = [];
const buffer: InternalDeviceLog[] = [];
const parser = ndjson.parse();

function close(err?: Error | null) {
Expand Down Expand Up @@ -220,7 +220,6 @@ function handleStreamingWrite(
const deleteCount = buffer.length - bufferLimit;
buffer.splice(0, deleteCount, {
nanoTimestamp: buffer[0].nanoTimestamp,
createdAt: buffer[0].createdAt,
timestamp: buffer[0].timestamp,
isStdErr: true,
isSystem: true,
Expand Down
22 changes: 14 additions & 8 deletions src/features/device-logs/lib/struct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@ export interface LokiLogContext extends LogContext {
readonly appId: string;
}

// This is the format we store and that we output to consumers
export interface DeviceLog {
// This is the base format that we receive from consumers after validation
interface DeviceLog {
message: string;
nanoTimestamp: bigint;
// These 2 dates are timestamps including milliseconds
createdAt: number;
// This is a timestamp including milliseconds
timestamp: number;
isSystem: boolean;
isStdErr: boolean;
serviceId?: number;
}
// This is the format we use internally
export interface InternalDeviceLog extends DeviceLog {
nanoTimestamp: bigint;
}
// This is the format that we output to consumers
export interface OutputDeviceLog extends DeviceLog {
createdAt: number;
}

// This is the format we get from new supervisors
export interface SupervisorLog {
Expand All @@ -42,15 +48,15 @@ export interface OldSupervisorLog {
image_id?: number;
}

export type Subscription = (log: DeviceLog) => void;
export type Subscription = (log: OutputDeviceLog) => void;

export interface DeviceLogsBackend {
history(ctx: LogContext, count: number): Promise<DeviceLog[]>;
history(ctx: LogContext, count: number): Promise<OutputDeviceLog[]>;
available: boolean;
/**
* `logs` will be mutated to empty and so must be handled synchronously
*/
publish(ctx: LogContext, logs: DeviceLog[]): Promise<any>;
publish(ctx: LogContext, logs: InternalDeviceLog[]): Promise<any>;
subscribe(ctx: LogContext, subscription: Subscription): void;
unsubscribe(ctx: LogContext, subscription: Subscription): void;
}
Expand Down
Loading

0 comments on commit bea6440

Please sign in to comment.