diff --git a/config/confd/templates/env.tmpl b/config/confd/templates/env.tmpl index 7a514f1d5..a2083fb30 100644 --- a/config/confd/templates/env.tmpl +++ b/config/confd/templates/env.tmpl @@ -64,6 +64,10 @@ REDIS_LOGS_AUTH={{getenv "REDIS_LOGS_AUTH"}} REDIS_LOGS_RO_HOST={{getenv "REDIS_LOGS_RO_HOST"}} REDIS_LOGS_RO_AUTH={{getenv "REDIS_LOGS_RO_AUTH"}} REDIS_LOGS_SHARDED_PUBSUB={{getenv "REDIS_LOGS_SHARDED_PUBSUB"}} +LOGS_REDIS_READ_PCT={{getenv "LOGS_REDIS_READ_PCT"}} +LOGS_REDIS_WRITE_PCT={{getenv "LOGS_REDIS_WRITE_PCT"}} +LOGS_REDIS_ENABLED={{getenv "LOGS_REDIS_ENABLED"}} + LOKI_HOST={{getenv "LOKI_HOST"}} LOKI_QUERY_HOST={{getenv "LOKI_QUERY_HOST"}} LOKI_INGESTER_HOST={{getenv "LOKI_INGESTER_HOST"}} @@ -74,10 +78,16 @@ LOKI_GRPC_SEND_GZIP={{getenv "LOKI_GRPC_SEND_GZIP"}} LOKI_GRPC_RECEIVE_COMPRESSION_LEVEL={{getenv "LOKI_GRPC_RECEIVE_COMPRESSION_LEVEL"}} LOKI_READ_PCT={{getenv "LOKI_READ_PCT"}} LOKI_WRITE_PCT={{getenv "LOKI_WRITE_PCT"}} +LOKI_RETRIES_ENABLED={{getenv "LOKI_RETRIES_ENABLED"}} +LOKI_PUSH_TIMEOUT={{getenv "LOKI_PUSH_TIMEOUT"}} +LOGS_LOKI_ENABLED={{getenv "LOGS_LOKI_ENABLED"}} + +LOGS_PRIMARY_BACKEND={{getenv "LOGS_PRIMARY_BACKEND"}} LOGS_READ_STREAM_FLUSH_INTERVAL={{getenv "LOGS_READ_STREAM_FLUSH_INTERVAL"}} LOGS_STREAM_FLUSH_INTERVAL={{getenv "LOGS_STREAM_FLUSH_INTERVAL"}} LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL={{getenv "LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL"}} LOGS_WRITE_BUFFER_LIMIT={{getenv "LOGS_WRITE_BUFFER_LIMIT"}} + REGISTRY2_HOST={{getenv "REGISTRY2_HOST"}} SENTRY_DSN={{getenv "SENTRY_DSN"}} SUPERUSER_EMAIL={{getenv "SUPERUSER_EMAIL"}} diff --git a/src/features/device-logs/lib/config.ts b/src/features/device-logs/lib/config.ts index 564f788b5..22b9b285b 100644 --- a/src/features/device-logs/lib/config.ts +++ b/src/features/device-logs/lib/config.ts @@ -1,21 +1,44 @@ import type { DeviceLogsBackend, LogContext } from './struct.js'; import _ from 'lodash'; -import { RedisBackend } from './backends/redis.js'; import { LOGS_DEFAULT_RETENTION_LIMIT, - LOKI_QUERY_HOST, LOKI_READ_PCT, - LOKI_INGESTER_HOST, LOKI_WRITE_PCT, + LOGS_LOKI_ENABLED, + LOGS_REDIS_ENABLED, + LOGS_PRIMARY_BACKEND, + LOGS_REDIS_READ_PCT, + LOGS_REDIS_WRITE_PCT, } from '../../../lib/config.js'; -export const LOKI_ENABLED = - LOKI_QUERY_HOST && LOKI_INGESTER_HOST && LOKI_WRITE_PCT > 0; -export const shouldPublishToLoki = () => - LOKI_ENABLED && LOKI_WRITE_PCT > Math.random() * 100; -export const shouldReadFromLoki = () => - LOKI_ENABLED && LOKI_READ_PCT > Math.random() * 100; +export const LOGS_SECONDARY_BACKEND_ENABLED = + LOGS_PRIMARY_BACKEND === 'loki' + ? LOGS_REDIS_ENABLED + : LOGS_PRIMARY_BACKEND === 'redis' + ? LOGS_LOKI_ENABLED + : false; + +export const shouldReadFromSecondary = (): boolean => { + if (LOGS_SECONDARY_BACKEND_ENABLED) { + if (LOGS_PRIMARY_BACKEND === 'loki') { + return LOKI_READ_PCT > Math.random() * 100; + } else if (LOGS_PRIMARY_BACKEND === 'redis') { + return LOGS_REDIS_READ_PCT > Math.random() * 100; + } + } + return false; +}; +export const shouldPublishToSecondary = (): boolean => { + if (LOGS_SECONDARY_BACKEND_ENABLED) { + if (LOGS_PRIMARY_BACKEND === 'loki') { + return LOKI_WRITE_PCT > Math.random() * 100; + } else if (LOGS_PRIMARY_BACKEND === 'redis') { + return LOGS_REDIS_WRITE_PCT > Math.random() * 100; + } + } + return false; +}; export function addRetentionLimit( ctx: Omit, @@ -26,9 +49,30 @@ export function addRetentionLimit( }; } -export const getBackend = _.once((): DeviceLogsBackend => new RedisBackend()); +export const getPrimaryBackend = _.once( + async (): Promise => + LOGS_PRIMARY_BACKEND === 'redis' + ? await getRedisBackend() + : await getLokiBackend(), +); + +export const getSecondaryBackend = _.once( + async (): Promise => { + if (LOGS_SECONDARY_BACKEND_ENABLED === false) { + throw new Error('Secondary backend is not enabled'); + } + return LOGS_PRIMARY_BACKEND !== 'redis' + ? await getRedisBackend() + : await getLokiBackend(); + }, +); + +const getRedisBackend = _.once(async (): Promise => { + const { RedisBackend } = await import('./backends/redis.js'); + return new RedisBackend(); +}); -export const getLokiBackend = _.once(async (): Promise => { +const getLokiBackend = _.once(async (): Promise => { const { LokiBackend } = await import('./backends/loki.js'); return new LokiBackend(); }); diff --git a/src/features/device-logs/lib/read.ts b/src/features/device-logs/lib/read.ts index c9e00e086..fd2cd9084 100644 --- a/src/features/device-logs/lib/read.ts +++ b/src/features/device-logs/lib/read.ts @@ -12,10 +12,10 @@ import type { DeviceLog, DeviceLogsBackend, LogContext } from './struct.js'; import { StreamState } from './struct.js'; import { addRetentionLimit, - getBackend, - getLokiBackend, + getPrimaryBackend, + getSecondaryBackend, omitNanoTimestamp, - shouldReadFromLoki, + shouldReadFromSecondary, } from './config.js'; import { getNanoTimestamp } from '../../../lib/utils.js'; import type { SetupOptions } from '../../../index.js'; @@ -31,7 +31,9 @@ const { NotFoundError } = errors; const { api } = sbvrUtils; const getReadBackend = async () => - shouldReadFromLoki() ? await getLokiBackend() : getBackend(); + shouldReadFromSecondary() + ? await getSecondaryBackend() + : await getPrimaryBackend(); export const read = ( diff --git a/src/features/device-logs/lib/store.ts b/src/features/device-logs/lib/store.ts index 592b53870..3dec428f4 100644 --- a/src/features/device-logs/lib/store.ts +++ b/src/features/device-logs/lib/store.ts @@ -1,10 +1,5 @@ import type { Request, RequestHandler, Response } from 'express'; -import type { - DeviceLog, - DeviceLogsBackend, - LogContext, - SupervisorLog, -} from './struct.js'; +import type { DeviceLog, LogContext, SupervisorLog } from './struct.js'; import onFinished from 'on-finished'; import type { permissions } from '@balena/pinejs'; @@ -19,15 +14,16 @@ import { } from '../../../infra/error-handling/index.js'; import { addRetentionLimit, - getBackend, - getLokiBackend, - LOKI_ENABLED, - shouldPublishToLoki, + getPrimaryBackend, + getSecondaryBackend, + LOGS_SECONDARY_BACKEND_ENABLED, + shouldPublishToSecondary, } from './config.js'; import type { SetupOptions } from '../../../index.js'; import { DEVICE_LOGS_WRITE_AUTH_CACHE_TIMEOUT, LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL, + LOGS_PRIMARY_BACKEND, LOGS_STREAM_FLUSH_INTERVAL, LOGS_WRITE_BUFFER_LIMIT, NDJSON_CTYPE, @@ -102,7 +98,7 @@ export const store: RequestHandler = async (req: Request, res: Response) => { if (logs.length) { const ctx = await getWriteContext(req); // start publishing to both backends - await publishBackend(getBackend(), ctx, logs); + await publishBackend(ctx, logs); } res.status(201).end(); } catch (err) { @@ -132,38 +128,31 @@ function handleStoreErrors(req: Request, res: Response, err: Error) { res.status(500).end(); } -const lokiBackend = LOKI_ENABLED ? await getLokiBackend() : undefined; +const primaryBackend = await getPrimaryBackend(); +const secondaryBackend = LOGS_SECONDARY_BACKEND_ENABLED + ? await getSecondaryBackend() + : undefined; -const publishBackend = LOKI_ENABLED - ? async ( - backend: DeviceLogsBackend, - ctx: LogContext, - buffer: DeviceLog[], - ) => { - const publishingToRedis = backend.publish(ctx, buffer); - const publishingToLoki = shouldPublishToLoki() - ? lokiBackend?.publish(ctx, buffer).catch((err) => { - captureException(err, 'Failed to publish logs to Loki'); - }) - : undefined; - await Promise.all([publishingToRedis, publishingToLoki]); - } - : async ( - backend: DeviceLogsBackend, - ctx: LogContext, - buffer: DeviceLog[], - ) => { - await backend.publish(ctx, buffer); - }; +const publishBackend = async (ctx: LogContext, buffer: DeviceLog[]) => { + const primaryBackendPromise = primaryBackend.publish(ctx, buffer); + const secondaryBackendPromise = shouldPublishToSecondary() + ? secondaryBackend?.publish(ctx, buffer).catch((err) => { + captureException( + err, + `Failed to publish logs to ${LOGS_PRIMARY_BACKEND === 'loki' ? 'redis' : 'loki'}`, + ); + }) + : undefined; + await Promise.all([primaryBackendPromise, secondaryBackendPromise]); +}; function handleStreamingWrite( ctx: LogContext, req: Request, res: Response, ): void { - const backend = getBackend(); // If the backend is down, reject right away, don't take in new connections - if (!backend.available) { + if (!primaryBackend.available) { throw new ServiceUnavailableError('The logs storage is unavailable'); } if (req.get('Content-Type') !== NDJSON_CTYPE) { @@ -200,7 +189,7 @@ function handleStreamingWrite( } buffer.push(log); // If we buffer too much or the backend goes down, pause it for back-pressure - if (buffer.length >= bufferLimit || !backend.available) { + if (buffer.length >= bufferLimit || !primaryBackend.available) { req.pause(); } }); @@ -220,7 +209,7 @@ function handleStreamingWrite( async function tryPublish() { try { // Don't flush if the backend is reporting as unavailable - if (buffer.length && backend.available) { + if (buffer.length && primaryBackend.available) { if (buffer.length > bufferLimit) { // Ensure the buffer cannot be larger than the buffer limit, adding a warning message if we removed messages const deleteCount = buffer.length - bufferLimit; @@ -234,7 +223,7 @@ function handleStreamingWrite( }); } // Even if the connection was closed, still flush the buffer - const publishPromise = publishBackend(backend, ctx, buffer); + const publishPromise = publishBackend(ctx, buffer); // Clear the buffer buffer.length = 0; // Resume in case it was paused due to buffering @@ -259,7 +248,7 @@ function handleStreamingWrite( return; } // If the backend goes down temporarily, ease down the polling - const delay = backend.available + const delay = primaryBackend.available ? LOGS_STREAM_FLUSH_INTERVAL : LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL; setTimeout(tryPublish, delay); diff --git a/src/lib/config.ts b/src/lib/config.ts index 458198994..21ccd9426 100644 --- a/src/lib/config.ts +++ b/src/lib/config.ts @@ -325,11 +325,32 @@ if (LOKI_WRITE_PCT < 100 && LOKI_READ_PCT > 0) { throw new Error('LOKI_READ_PCT can only be set if LOKI_WRITE_PCT is 100'); } +export const LOGS_LOKI_ENABLED = boolVar('LOGS_LOKI_ENABLED', false); +if (LOGS_LOKI_ENABLED && (!LOKI_INGESTER_HOST || !LOKI_QUERY_HOST)) { + throw new Error( + 'If LOGS_LOKI_ENABLED is true then LOKI_INGESTER_HOST and LOKI_QUERY_HOST must be set', + ); +} + // Retries disabled so that writes to Redis are not delayed on Loki error export const LOKI_RETRIES_ENABLED = boolVar('LOKI_RETRIES_ENABLED', false); // Timeout set to 1s so that writes to Redis are not delayed if Loki is slow export const LOKI_PUSH_TIMEOUT = intVar('LOKI_PUSH_TIMEOUT', 1000); +// control the percent of logs written to redis when running as a secondary backend +export const LOGS_REDIS_WRITE_PCT = intVar('LOGS_REDIS_WRITE_PCT', 100); +/** + * This is the percent of log read requests that will go to redis, however the number of logs fetched from redis + * will vary based upon the type of those read requests, eg it could be a long streaming request or a one-off fetch + */ +export const LOGS_REDIS_READ_PCT = intVar('LOGS_REDIS_READ_PCT', 100); +if (LOGS_REDIS_WRITE_PCT < 100 && LOGS_REDIS_READ_PCT > 0) { + throw new Error( + 'LOGS_REDIS_READ_PCT can only be set if LOGS_REDIS_WRITE_PCT is 100', + ); +} +export const LOGS_REDIS_ENABLED = boolVar('LOGS_REDIS_ENABLED', true); + export const NDJSON_CTYPE = 'application/x-ndjson'; // Logs read config @@ -342,6 +363,30 @@ export const LOGS_SUBSCRIPTION_EXPIRY_HEARTBEAT_SECONDS = export const LOGS_DEFAULT_RETENTION_LIMIT = 1000; +export const LOGS_PRIMARY_BACKEND = optionalVar( + 'LOGS_PRIMARY_BACKEND', + 'redis', +); +if (LOGS_PRIMARY_BACKEND !== 'redis' && LOGS_PRIMARY_BACKEND !== 'loki') { + throw new Error('LOGS_PRIMARY_BACKEND must be either "redis" or "loki"'); +} +switch (LOGS_PRIMARY_BACKEND) { + case 'redis': + if (LOGS_REDIS_READ_PCT < 100 || LOGS_REDIS_WRITE_PCT < 100) { + throw new Error( + 'LOGS_REDIS_READ_PCT and LOGS_REDIS_WRITE_PCT must be 100 if using redis as the primary logs backend', + ); + } + break; + case 'loki': + if (LOKI_READ_PCT < 100 || LOKI_WRITE_PCT < 100) { + throw new Error( + 'LOKI_READ_PCT and LOKI_WRITE_PCT must be 100 if using loki as the primary logs backend', + ); + } + break; +} + // Logs read config export const LOGS_READ_STREAM_FLUSH_INTERVAL = intVar( 'LOGS_READ_STREAM_FLUSH_INTERVAL',