Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logs: switch to an agnostic primary/secondary backend paradigm #1908

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/confd/templates/env.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ REDIS_LOGS_AUTH={{getenv "REDIS_LOGS_AUTH"}}
REDIS_LOGS_RO_HOST={{getenv "REDIS_LOGS_RO_HOST"}}
REDIS_LOGS_RO_AUTH={{getenv "REDIS_LOGS_RO_AUTH"}}
REDIS_LOGS_SHARDED_PUBSUB={{getenv "REDIS_LOGS_SHARDED_PUBSUB"}}
LOGS_REDIS_READ_PCT={{getenv "LOGS_REDIS_READ_PCT"}}
LOGS_REDIS_WRITE_PCT={{getenv "LOGS_REDIS_WRITE_PCT"}}
LOGS_REDIS_ENABLED={{getenv "LOGS_REDIS_ENABLED"}}

LOKI_HOST={{getenv "LOKI_HOST"}}
LOKI_QUERY_HOST={{getenv "LOKI_QUERY_HOST"}}
LOKI_INGESTER_HOST={{getenv "LOKI_INGESTER_HOST"}}
Expand All @@ -74,10 +78,16 @@ LOKI_GRPC_SEND_GZIP={{getenv "LOKI_GRPC_SEND_GZIP"}}
LOKI_GRPC_RECEIVE_COMPRESSION_LEVEL={{getenv "LOKI_GRPC_RECEIVE_COMPRESSION_LEVEL"}}
LOKI_READ_PCT={{getenv "LOKI_READ_PCT"}}
LOKI_WRITE_PCT={{getenv "LOKI_WRITE_PCT"}}
LOKI_RETRIES_ENABLED={{getenv "LOKI_RETRIES_ENABLED"}}
LOKI_PUSH_TIMEOUT={{getenv "LOKI_PUSH_TIMEOUT"}}
LOGS_LOKI_ENABLED={{getenv "LOGS_LOKI_ENABLED"}}

LOGS_PRIMARY_BACKEND={{getenv "LOGS_PRIMARY_BACKEND"}}
LOGS_READ_STREAM_FLUSH_INTERVAL={{getenv "LOGS_READ_STREAM_FLUSH_INTERVAL"}}
LOGS_STREAM_FLUSH_INTERVAL={{getenv "LOGS_STREAM_FLUSH_INTERVAL"}}
LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL={{getenv "LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL"}}
LOGS_WRITE_BUFFER_LIMIT={{getenv "LOGS_WRITE_BUFFER_LIMIT"}}

REGISTRY2_HOST={{getenv "REGISTRY2_HOST"}}
SENTRY_DSN={{getenv "SENTRY_DSN"}}
SUPERUSER_EMAIL={{getenv "SUPERUSER_EMAIL"}}
Expand Down
6 changes: 4 additions & 2 deletions src/features/device-logs/lib/backends/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,14 @@ export class LokiBackend implements DeviceLogsBackend {
) {
return logs.map((log) => {
this.validateLog(log);
log.version = VERSION;
const timestamp = new loki.Timestamp();
timestamp.setSeconds(Math.floor(Number(log.nanoTimestamp / 1000000000n)));
timestamp.setNanos(Number(log.nanoTimestamp % 1000000000n));
// store log line as JSON
const logJson = JSON.stringify(log, omitNanoTimestamp);
const logJson = JSON.stringify(
{ ...log, version: VERSION },
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to not mutate the log, it isn't a problem when loki is the secondary but when it's the primary it can cause problems for the secondary as they're getting a loki versioned log which they don't expect

omitNanoTimestamp,
);
const structuredMetadata = this.getStructuredMetadata(ctx);
// create entry with labels, line and timestamp
return new loki.EntryAdapter()
Expand Down
66 changes: 55 additions & 11 deletions src/features/device-logs/lib/config.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,44 @@
import type { DeviceLogsBackend, LogContext } from './struct.js';

import _ from 'lodash';
import { RedisBackend } from './backends/redis.js';
import {
LOGS_DEFAULT_RETENTION_LIMIT,
LOKI_QUERY_HOST,
LOKI_READ_PCT,
LOKI_INGESTER_HOST,
LOKI_WRITE_PCT,
LOGS_LOKI_ENABLED,
LOGS_REDIS_ENABLED,
LOGS_PRIMARY_BACKEND,
LOGS_REDIS_READ_PCT,
LOGS_REDIS_WRITE_PCT,
} from '../../../lib/config.js';

export const LOKI_ENABLED =
LOKI_QUERY_HOST && LOKI_INGESTER_HOST && LOKI_WRITE_PCT > 0;
export const shouldPublishToLoki = () =>
LOKI_ENABLED && LOKI_WRITE_PCT > Math.random() * 100;
export const shouldReadFromLoki = () =>
LOKI_ENABLED && LOKI_READ_PCT > Math.random() * 100;
const LOGS_SECONDARY_BACKEND =
LOGS_PRIMARY_BACKEND === 'loki' ? 'redis' : 'loki';

export const LOGS_SECONDARY_BACKEND_ENABLED =
(LOGS_SECONDARY_BACKEND === 'loki' && LOGS_LOKI_ENABLED) ||
(LOGS_SECONDARY_BACKEND === 'redis' && LOGS_REDIS_ENABLED);

export const shouldReadFromSecondary = (): boolean => {
if (LOGS_SECONDARY_BACKEND_ENABLED) {
if (LOGS_SECONDARY_BACKEND === 'loki') {
return LOKI_READ_PCT > Math.random() * 100;
} else if (LOGS_SECONDARY_BACKEND === 'redis') {
return LOGS_REDIS_READ_PCT > Math.random() * 100;
}
}
return false;
};
export const shouldPublishToSecondary = (): boolean => {
if (LOGS_SECONDARY_BACKEND_ENABLED) {
if (LOGS_SECONDARY_BACKEND === 'loki') {
return LOKI_WRITE_PCT > Math.random() * 100;
} else if (LOGS_SECONDARY_BACKEND === 'redis') {
return LOGS_REDIS_WRITE_PCT > Math.random() * 100;
}
}
return false;
};

export function addRetentionLimit(
ctx: Omit<LogContext, 'retention_limit'>,
Expand All @@ -26,9 +49,30 @@ export function addRetentionLimit(
};
}

export const getBackend = _.once((): DeviceLogsBackend => new RedisBackend());
export const getPrimaryBackend = _.once(
async (): Promise<DeviceLogsBackend> =>
LOGS_PRIMARY_BACKEND === 'redis'
? await getRedisBackend()
: await getLokiBackend(),
);

export const getSecondaryBackend = _.once(
async (): Promise<DeviceLogsBackend> => {
if (LOGS_SECONDARY_BACKEND_ENABLED === false) {
throw new Error('Secondary backend is not enabled');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT How about just returning undefined here when LOGS_SECONDARY_BACKEND_ENABLED is false, so that we don't need to do LOGS_SECONDARY_BACKEND_ENABLED ? getSecondaryBackend() : undefined? This way we encapsulate everything in here and we don't have to check LOGS_SECONDARY_BACKEND_ENABLED both right before calling this fn & inside it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing the throw helps avoid accidental mistakes and also makes it simpler in the case of reading, in the case of writing we need both backends permanently available though (and cannot fetch them on demand because we can't have async boundaries) hence that one needing to be special cased and that's why I went with a throw rather than an implicit == null -> disabled

}
return LOGS_SECONDARY_BACKEND === 'redis'
? await getRedisBackend()
: await getLokiBackend();
},
);

const getRedisBackend = _.once(async (): Promise<DeviceLogsBackend> => {
const { RedisBackend } = await import('./backends/redis.js');
return new RedisBackend();
});

export const getLokiBackend = _.once(async (): Promise<DeviceLogsBackend> => {
const getLokiBackend = _.once(async (): Promise<DeviceLogsBackend> => {
const { LokiBackend } = await import('./backends/loki.js');
return new LokiBackend();
});
Expand Down
10 changes: 6 additions & 4 deletions src/features/device-logs/lib/read.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import type { DeviceLog, DeviceLogsBackend, LogContext } from './struct.js';
import { StreamState } from './struct.js';
import {
addRetentionLimit,
getBackend,
getLokiBackend,
getPrimaryBackend,
getSecondaryBackend,
omitNanoTimestamp,
shouldReadFromLoki,
shouldReadFromSecondary,
} from './config.js';
import { getNanoTimestamp } from '../../../lib/utils.js';
import type { SetupOptions } from '../../../index.js';
Expand All @@ -31,7 +31,9 @@ const { NotFoundError } = errors;
const { api } = sbvrUtils;

const getReadBackend = async () =>
shouldReadFromLoki() ? await getLokiBackend() : getBackend();
shouldReadFromSecondary()
? await getSecondaryBackend()
: await getPrimaryBackend();

export const read =
(
Expand Down
67 changes: 28 additions & 39 deletions src/features/device-logs/lib/store.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import type { Request, RequestHandler, Response } from 'express';
import type {
DeviceLog,
DeviceLogsBackend,
LogContext,
SupervisorLog,
} from './struct.js';
import type { DeviceLog, LogContext, SupervisorLog } from './struct.js';

import onFinished from 'on-finished';
import type { permissions } from '@balena/pinejs';
Expand All @@ -19,15 +14,16 @@ import {
} from '../../../infra/error-handling/index.js';
import {
addRetentionLimit,
getBackend,
getLokiBackend,
LOKI_ENABLED,
shouldPublishToLoki,
getPrimaryBackend,
getSecondaryBackend,
LOGS_SECONDARY_BACKEND_ENABLED,
shouldPublishToSecondary,
} from './config.js';
import type { SetupOptions } from '../../../index.js';
import {
DEVICE_LOGS_WRITE_AUTH_CACHE_TIMEOUT,
LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL,
LOGS_PRIMARY_BACKEND,
LOGS_STREAM_FLUSH_INTERVAL,
LOGS_WRITE_BUFFER_LIMIT,
NDJSON_CTYPE,
Expand Down Expand Up @@ -102,7 +98,7 @@ export const store: RequestHandler = async (req: Request, res: Response) => {
if (logs.length) {
const ctx = await getWriteContext(req);
// start publishing to both backends
await publishBackend(getBackend(), ctx, logs);
await publishBackend(ctx, logs);
}
res.status(201).end();
} catch (err) {
Expand Down Expand Up @@ -132,38 +128,31 @@ function handleStoreErrors(req: Request, res: Response, err: Error) {
res.status(500).end();
}

const lokiBackend = LOKI_ENABLED ? await getLokiBackend() : undefined;
const primaryBackend = await getPrimaryBackend();
const secondaryBackend = LOGS_SECONDARY_BACKEND_ENABLED
? await getSecondaryBackend()
: undefined;

const publishBackend = LOKI_ENABLED
? async (
backend: DeviceLogsBackend,
ctx: LogContext,
buffer: DeviceLog[],
) => {
const publishingToRedis = backend.publish(ctx, buffer);
const publishingToLoki = shouldPublishToLoki()
? lokiBackend?.publish(ctx, buffer).catch((err) => {
captureException(err, 'Failed to publish logs to Loki');
})
: undefined;
await Promise.all([publishingToRedis, publishingToLoki]);
}
: async (
backend: DeviceLogsBackend,
ctx: LogContext,
buffer: DeviceLog[],
) => {
await backend.publish(ctx, buffer);
};
const publishBackend = async (ctx: LogContext, buffer: DeviceLog[]) => {
const primaryBackendPromise = primaryBackend.publish(ctx, buffer);
const secondaryBackendPromise = shouldPublishToSecondary()
? secondaryBackend?.publish(ctx, buffer).catch((err) => {
captureException(
err,
`Failed to publish logs to ${LOGS_PRIMARY_BACKEND === 'loki' ? 'redis' : 'loki'}`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT

Suggested change
`Failed to publish logs to ${LOGS_PRIMARY_BACKEND === 'loki' ? 'redis' : 'loki'}`,
`Failed to publish logs to ${LOGS_SECONDARY_BACKEND}`,

);
})
: undefined;
await Promise.all([primaryBackendPromise, secondaryBackendPromise]);
};

function handleStreamingWrite(
ctx: LogContext,
req: Request,
res: Response,
): void {
const backend = getBackend();
// If the backend is down, reject right away, don't take in new connections
if (!backend.available) {
if (!primaryBackend.available) {
throw new ServiceUnavailableError('The logs storage is unavailable');
}
if (req.get('Content-Type') !== NDJSON_CTYPE) {
Expand Down Expand Up @@ -200,7 +189,7 @@ function handleStreamingWrite(
}
buffer.push(log);
// If we buffer too much or the backend goes down, pause it for back-pressure
if (buffer.length >= bufferLimit || !backend.available) {
if (buffer.length >= bufferLimit || !primaryBackend.available) {
req.pause();
}
});
Expand All @@ -220,7 +209,7 @@ function handleStreamingWrite(
async function tryPublish() {
try {
// Don't flush if the backend is reporting as unavailable
if (buffer.length && backend.available) {
if (buffer.length && primaryBackend.available) {
if (buffer.length > bufferLimit) {
// Ensure the buffer cannot be larger than the buffer limit, adding a warning message if we removed messages
const deleteCount = buffer.length - bufferLimit;
Expand All @@ -234,7 +223,7 @@ function handleStreamingWrite(
});
}
// Even if the connection was closed, still flush the buffer
const publishPromise = publishBackend(backend, ctx, buffer);
const publishPromise = publishBackend(ctx, buffer);
// Clear the buffer
buffer.length = 0;
// Resume in case it was paused due to buffering
Expand All @@ -259,7 +248,7 @@ function handleStreamingWrite(
return;
}
// If the backend goes down temporarily, ease down the polling
const delay = backend.available
const delay = primaryBackend.available
? LOGS_STREAM_FLUSH_INTERVAL
: LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL;
setTimeout(tryPublish, delay);
Expand Down
56 changes: 56 additions & 0 deletions src/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,35 @@ if (LOKI_WRITE_PCT < 100 && LOKI_READ_PCT > 0) {
throw new Error('LOKI_READ_PCT can only be set if LOKI_WRITE_PCT is 100');
}

export const LOGS_LOKI_ENABLED = boolVar(
'LOGS_LOKI_ENABLED',
LOKI_INGESTER_HOST != null && LOKI_QUERY_HOST != null,
);
if (LOGS_LOKI_ENABLED && (!LOKI_INGESTER_HOST || !LOKI_QUERY_HOST)) {
throw new Error(
'If LOGS_LOKI_ENABLED is true then LOKI_INGESTER_HOST and LOKI_QUERY_HOST must be set',
);
}

// Retries disabled so that writes to Redis are not delayed on Loki error
export const LOKI_RETRIES_ENABLED = boolVar('LOKI_RETRIES_ENABLED', false);
// Timeout set to 1s so that writes to Redis are not delayed if Loki is slow
export const LOKI_PUSH_TIMEOUT = intVar('LOKI_PUSH_TIMEOUT', 1000);

// control the percent of logs written to redis when running as a secondary backend
export const LOGS_REDIS_WRITE_PCT = intVar('LOGS_REDIS_WRITE_PCT', 100);
/**
* This is the percent of log read requests that will go to redis, however the number of logs fetched from redis
* will vary based upon the type of those read requests, eg it could be a long streaming request or a one-off fetch
*/
export const LOGS_REDIS_READ_PCT = intVar('LOGS_REDIS_READ_PCT', 100);
if (LOGS_REDIS_WRITE_PCT < 100 && LOGS_REDIS_READ_PCT > 0) {
throw new Error(
'LOGS_REDIS_READ_PCT can only be set if LOGS_REDIS_WRITE_PCT is 100',
);
}
export const LOGS_REDIS_ENABLED = boolVar('LOGS_REDIS_ENABLED', true);

export const NDJSON_CTYPE = 'application/x-ndjson';

// Logs read config
Expand All @@ -342,6 +366,38 @@ export const LOGS_SUBSCRIPTION_EXPIRY_HEARTBEAT_SECONDS =

export const LOGS_DEFAULT_RETENTION_LIMIT = 1000;

export const LOGS_PRIMARY_BACKEND = optionalVar(
'LOGS_PRIMARY_BACKEND',
'redis',
);
if (LOGS_PRIMARY_BACKEND !== 'redis' && LOGS_PRIMARY_BACKEND !== 'loki') {
throw new Error('LOGS_PRIMARY_BACKEND must be either "redis" or "loki"');
}
if (LOGS_PRIMARY_BACKEND === 'loki' && !LOGS_LOKI_ENABLED) {
throw new Error(
'LOGS_PRIMARY_BACKEND cannot be "loki" if LOGS_LOKI_ENABLED is false',
);
} else if (LOGS_PRIMARY_BACKEND === 'redis' && !LOGS_REDIS_ENABLED) {
throw new Error(
'LOGS_PRIMARY_BACKEND cannot be "redis" if LOGS_REDIS_ENABLED is false',
);
}
if (
LOGS_PRIMARY_BACKEND === 'redis' &&
(LOGS_REDIS_READ_PCT < 100 || LOGS_REDIS_WRITE_PCT < 100)
) {
throw new Error(
'LOGS_REDIS_READ_PCT and LOGS_REDIS_WRITE_PCT must be 100 if using redis as the primary logs backend',
);
} else if (
LOGS_PRIMARY_BACKEND === 'loki' &&
(LOKI_READ_PCT < 100 || LOKI_WRITE_PCT < 100)
) {
throw new Error(
'LOKI_READ_PCT and LOKI_WRITE_PCT must be 100 if using loki as the primary logs backend',
);
}

// Logs read config
export const LOGS_READ_STREAM_FLUSH_INTERVAL = intVar(
'LOGS_READ_STREAM_FLUSH_INTERVAL',
Expand Down
Loading