Skip to content

Commit

Permalink
Loki: allow configuring LOKI_RETRIES_ENABLED and LOKI_PUSH_TIMEOUT
Browse files Browse the repository at this point in the history
Change-type: minor
  • Loading branch information
Page- committed Dec 18, 2024
1 parent 9381fac commit c96184c
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 54 deletions.
10 changes: 10 additions & 0 deletions config/confd/templates/env.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ REDIS_LOGS_AUTH={{getenv "REDIS_LOGS_AUTH"}}
REDIS_LOGS_RO_HOST={{getenv "REDIS_LOGS_RO_HOST"}}
REDIS_LOGS_RO_AUTH={{getenv "REDIS_LOGS_RO_AUTH"}}
REDIS_LOGS_SHARDED_PUBSUB={{getenv "REDIS_LOGS_SHARDED_PUBSUB"}}
LOGS_REDIS_READ_PCT={{getenv "LOGS_REDIS_READ_PCT"}}
LOGS_REDIS_WRITE_PCT={{getenv "LOGS_REDIS_WRITE_PCT"}}
LOGS_REDIS_ENABLED={{getenv "LOGS_REDIS_ENABLED"}}

LOKI_HOST={{getenv "LOKI_HOST"}}
LOKI_QUERY_HOST={{getenv "LOKI_QUERY_HOST"}}
LOKI_INGESTER_HOST={{getenv "LOKI_INGESTER_HOST"}}
Expand All @@ -74,10 +78,16 @@ LOKI_GRPC_SEND_GZIP={{getenv "LOKI_GRPC_SEND_GZIP"}}
LOKI_GRPC_RECEIVE_COMPRESSION_LEVEL={{getenv "LOKI_GRPC_RECEIVE_COMPRESSION_LEVEL"}}
LOKI_READ_PCT={{getenv "LOKI_READ_PCT"}}
LOKI_WRITE_PCT={{getenv "LOKI_WRITE_PCT"}}
LOKI_RETRIES_ENABLED={{getenv "LOKI_RETRIES_ENABLED"}}
LOKI_PUSH_TIMEOUT={{getenv "LOKI_PUSH_TIMEOUT"}}
LOGS_LOKI_ENABLED={{getenv "LOGS_LOKI_ENABLED"}}

LOGS_PRIMARY_BACKEND={{getenv "LOGS_PRIMARY_BACKEND"}}
LOGS_READ_STREAM_FLUSH_INTERVAL={{getenv "LOGS_READ_STREAM_FLUSH_INTERVAL"}}
LOGS_STREAM_FLUSH_INTERVAL={{getenv "LOGS_STREAM_FLUSH_INTERVAL"}}
LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL={{getenv "LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL"}}
LOGS_WRITE_BUFFER_LIMIT={{getenv "LOGS_WRITE_BUFFER_LIMIT"}}

REGISTRY2_HOST={{getenv "REGISTRY2_HOST"}}
SENTRY_DSN={{getenv "SENTRY_DSN"}}
SUPERUSER_EMAIL={{getenv "SUPERUSER_EMAIL"}}
Expand Down
66 changes: 55 additions & 11 deletions src/features/device-logs/lib/config.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,44 @@
import type { DeviceLogsBackend, LogContext } from './struct.js';

import _ from 'lodash';
import { RedisBackend } from './backends/redis.js';
import {
LOGS_DEFAULT_RETENTION_LIMIT,
LOKI_QUERY_HOST,
LOKI_READ_PCT,
LOKI_INGESTER_HOST,
LOKI_WRITE_PCT,
LOGS_LOKI_ENABLED,
LOGS_REDIS_ENABLED,
LOGS_PRIMARY_BACKEND,
LOGS_REDIS_READ_PCT,
LOGS_REDIS_WRITE_PCT,
} from '../../../lib/config.js';

export const LOKI_ENABLED =
LOKI_QUERY_HOST && LOKI_INGESTER_HOST && LOKI_WRITE_PCT > 0;
export const shouldPublishToLoki = () =>
LOKI_ENABLED && LOKI_WRITE_PCT > Math.random() * 100;
export const shouldReadFromLoki = () =>
LOKI_ENABLED && LOKI_READ_PCT > Math.random() * 100;
export const LOGS_SECONDARY_BACKEND_ENABLED =
LOGS_PRIMARY_BACKEND === 'loki'
? LOGS_REDIS_ENABLED
: LOGS_PRIMARY_BACKEND === 'redis'
? LOGS_LOKI_ENABLED
: false;

export const shouldReadFromSecondary = (): boolean => {
if (LOGS_SECONDARY_BACKEND_ENABLED) {
if (LOGS_PRIMARY_BACKEND === 'loki') {
return LOKI_READ_PCT > Math.random() * 100;
} else if (LOGS_PRIMARY_BACKEND === 'redis') {
return LOGS_REDIS_READ_PCT > Math.random() * 100;
}
}
return false;
};
export const shouldPublishToSecondary = (): boolean => {
if (LOGS_SECONDARY_BACKEND_ENABLED) {
if (LOGS_PRIMARY_BACKEND === 'loki') {
return LOKI_WRITE_PCT > Math.random() * 100;
} else if (LOGS_PRIMARY_BACKEND === 'redis') {
return LOGS_REDIS_WRITE_PCT > Math.random() * 100;
}
}
return false;
};

export function addRetentionLimit(
ctx: Omit<LogContext, 'retention_limit'>,
Expand All @@ -26,9 +49,30 @@ export function addRetentionLimit(
};
}

export const getBackend = _.once((): DeviceLogsBackend => new RedisBackend());
export const getPrimaryBackend = _.once(
async (): Promise<DeviceLogsBackend> =>
LOGS_PRIMARY_BACKEND === 'redis'
? await getRedisBackend()
: await getLokiBackend(),
);

export const getSecondaryBackend = _.once(
async (): Promise<DeviceLogsBackend> => {
if (LOGS_SECONDARY_BACKEND_ENABLED === false) {
throw new Error('Secondary backend is not enabled');
}
return LOGS_PRIMARY_BACKEND !== 'redis'
? await getRedisBackend()
: await getLokiBackend();
},
);

const getRedisBackend = _.once(async (): Promise<DeviceLogsBackend> => {
const { RedisBackend } = await import('./backends/redis.js');
return new RedisBackend();
});

export const getLokiBackend = _.once(async (): Promise<DeviceLogsBackend> => {
const getLokiBackend = _.once(async (): Promise<DeviceLogsBackend> => {
const { LokiBackend } = await import('./backends/loki.js');
return new LokiBackend();
});
Expand Down
10 changes: 6 additions & 4 deletions src/features/device-logs/lib/read.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import type { DeviceLog, DeviceLogsBackend, LogContext } from './struct.js';
import { StreamState } from './struct.js';
import {
addRetentionLimit,
getBackend,
getLokiBackend,
getPrimaryBackend,
getSecondaryBackend,
omitNanoTimestamp,
shouldReadFromLoki,
shouldReadFromSecondary,
} from './config.js';
import { getNanoTimestamp } from '../../../lib/utils.js';
import type { SetupOptions } from '../../../index.js';
Expand All @@ -31,7 +31,9 @@ const { NotFoundError } = errors;
const { api } = sbvrUtils;

const getReadBackend = async () =>
shouldReadFromLoki() ? await getLokiBackend() : getBackend();
shouldReadFromSecondary()
? await getSecondaryBackend()
: await getPrimaryBackend();

export const read =
(
Expand Down
67 changes: 28 additions & 39 deletions src/features/device-logs/lib/store.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import type { Request, RequestHandler, Response } from 'express';
import type {
DeviceLog,
DeviceLogsBackend,
LogContext,
SupervisorLog,
} from './struct.js';
import type { DeviceLog, LogContext, SupervisorLog } from './struct.js';

import onFinished from 'on-finished';
import type { permissions } from '@balena/pinejs';
Expand All @@ -19,15 +14,16 @@ import {
} from '../../../infra/error-handling/index.js';
import {
addRetentionLimit,
getBackend,
getLokiBackend,
LOKI_ENABLED,
shouldPublishToLoki,
getPrimaryBackend,
getSecondaryBackend,
LOGS_SECONDARY_BACKEND_ENABLED,
shouldPublishToSecondary,
} from './config.js';
import type { SetupOptions } from '../../../index.js';
import {
DEVICE_LOGS_WRITE_AUTH_CACHE_TIMEOUT,
LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL,
LOGS_PRIMARY_BACKEND,
LOGS_STREAM_FLUSH_INTERVAL,
LOGS_WRITE_BUFFER_LIMIT,
NDJSON_CTYPE,
Expand Down Expand Up @@ -102,7 +98,7 @@ export const store: RequestHandler = async (req: Request, res: Response) => {
if (logs.length) {
const ctx = await getWriteContext(req);
// start publishing to both backends
await publishBackend(getBackend(), ctx, logs);
await publishBackend(ctx, logs);
}
res.status(201).end();
} catch (err) {
Expand Down Expand Up @@ -132,38 +128,31 @@ function handleStoreErrors(req: Request, res: Response, err: Error) {
res.status(500).end();
}

const lokiBackend = LOKI_ENABLED ? await getLokiBackend() : undefined;
const primaryBackend = await getPrimaryBackend();
const secondaryBackend = LOGS_SECONDARY_BACKEND_ENABLED
? await getSecondaryBackend()
: undefined;

const publishBackend = LOKI_ENABLED
? async (
backend: DeviceLogsBackend,
ctx: LogContext,
buffer: DeviceLog[],
) => {
const publishingToRedis = backend.publish(ctx, buffer);
const publishingToLoki = shouldPublishToLoki()
? lokiBackend?.publish(ctx, buffer).catch((err) => {
captureException(err, 'Failed to publish logs to Loki');
})
: undefined;
await Promise.all([publishingToRedis, publishingToLoki]);
}
: async (
backend: DeviceLogsBackend,
ctx: LogContext,
buffer: DeviceLog[],
) => {
await backend.publish(ctx, buffer);
};
const publishBackend = async (ctx: LogContext, buffer: DeviceLog[]) => {
const primaryBackendPromise = primaryBackend.publish(ctx, buffer);
const secondaryBackendPromise = shouldPublishToSecondary()
? secondaryBackend?.publish(ctx, buffer).catch((err) => {
captureException(
err,
`Failed to publish logs to ${LOGS_PRIMARY_BACKEND === 'loki' ? 'redis' : 'loki'}`,
);
})
: undefined;
await Promise.all([primaryBackendPromise, secondaryBackendPromise]);
};

function handleStreamingWrite(
ctx: LogContext,
req: Request,
res: Response,
): void {
const backend = getBackend();
// If the backend is down, reject right away, don't take in new connections
if (!backend.available) {
if (!primaryBackend.available) {
throw new ServiceUnavailableError('The logs storage is unavailable');
}
if (req.get('Content-Type') !== NDJSON_CTYPE) {
Expand Down Expand Up @@ -200,7 +189,7 @@ function handleStreamingWrite(
}
buffer.push(log);
// If we buffer too much or the backend goes down, pause it for back-pressure
if (buffer.length >= bufferLimit || !backend.available) {
if (buffer.length >= bufferLimit || !primaryBackend.available) {
req.pause();
}
});
Expand All @@ -220,7 +209,7 @@ function handleStreamingWrite(
async function tryPublish() {
try {
// Don't flush if the backend is reporting as unavailable
if (buffer.length && backend.available) {
if (buffer.length && primaryBackend.available) {
if (buffer.length > bufferLimit) {
// Ensure the buffer cannot be larger than the buffer limit, adding a warning message if we removed messages
const deleteCount = buffer.length - bufferLimit;
Expand All @@ -234,7 +223,7 @@ function handleStreamingWrite(
});
}
// Even if the connection was closed, still flush the buffer
const publishPromise = publishBackend(backend, ctx, buffer);
const publishPromise = publishBackend(ctx, buffer);
// Clear the buffer
buffer.length = 0;
// Resume in case it was paused due to buffering
Expand All @@ -259,7 +248,7 @@ function handleStreamingWrite(
return;
}
// If the backend goes down temporarily, ease down the polling
const delay = backend.available
const delay = primaryBackend.available
? LOGS_STREAM_FLUSH_INTERVAL
: LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL;
setTimeout(tryPublish, delay);
Expand Down
45 changes: 45 additions & 0 deletions src/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,32 @@ if (LOKI_WRITE_PCT < 100 && LOKI_READ_PCT > 0) {
throw new Error('LOKI_READ_PCT can only be set if LOKI_WRITE_PCT is 100');
}

export const LOGS_LOKI_ENABLED = boolVar('LOGS_LOKI_ENABLED', false);
if (LOGS_LOKI_ENABLED && (!LOKI_INGESTER_HOST || !LOKI_QUERY_HOST)) {
throw new Error(
'If LOGS_LOKI_ENABLED is true then LOKI_INGESTER_HOST and LOKI_QUERY_HOST must be set',
);
}

// Retries disabled so that writes to Redis are not delayed on Loki error
export const LOKI_RETRIES_ENABLED = boolVar('LOKI_RETRIES_ENABLED', false);
// Timeout set to 1s so that writes to Redis are not delayed if Loki is slow
export const LOKI_PUSH_TIMEOUT = intVar('LOKI_PUSH_TIMEOUT', 1000);

// control the percent of logs written to redis when running as a secondary backend
export const LOGS_REDIS_WRITE_PCT = intVar('LOGS_REDIS_WRITE_PCT', 100);
/**
* This is the percent of log read requests that will go to redis, however the number of logs fetched from redis
* will vary based upon the type of those read requests, eg it could be a long streaming request or a one-off fetch
*/
export const LOGS_REDIS_READ_PCT = intVar('LOGS_REDIS_READ_PCT', 100);
if (LOGS_REDIS_WRITE_PCT < 100 && LOGS_REDIS_READ_PCT > 0) {
throw new Error(
'LOGS_REDIS_READ_PCT can only be set if LOGS_REDIS_WRITE_PCT is 100',
);
}
export const LOGS_REDIS_ENABLED = boolVar('LOGS_REDIS_ENABLED', true);

export const NDJSON_CTYPE = 'application/x-ndjson';

// Logs read config
Expand All @@ -342,6 +363,30 @@ export const LOGS_SUBSCRIPTION_EXPIRY_HEARTBEAT_SECONDS =

export const LOGS_DEFAULT_RETENTION_LIMIT = 1000;

export const LOGS_PRIMARY_BACKEND = optionalVar(
'LOGS_PRIMARY_BACKEND',
'redis',
);
if (LOGS_PRIMARY_BACKEND !== 'redis' && LOGS_PRIMARY_BACKEND !== 'loki') {
throw new Error('LOGS_PRIMARY_BACKEND must be either "redis" or "loki"');
}
switch (LOGS_PRIMARY_BACKEND) {
case 'redis':
if (LOGS_REDIS_READ_PCT < 100 || LOGS_REDIS_WRITE_PCT < 100) {
throw new Error(
'LOGS_REDIS_READ_PCT and LOGS_REDIS_WRITE_PCT must be 100 if using redis as the primary logs backend',
);
}
break;
case 'loki':
if (LOKI_READ_PCT < 100 || LOKI_WRITE_PCT < 100) {
throw new Error(
'LOKI_READ_PCT and LOKI_WRITE_PCT must be 100 if using loki as the primary logs backend',
);
}
break;
}

// Logs read config
export const LOGS_READ_STREAM_FLUSH_INTERVAL = intVar(
'LOGS_READ_STREAM_FLUSH_INTERVAL',
Expand Down

0 comments on commit c96184c

Please sign in to comment.