-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Logs: switch to an agnostic primary/secondary backend paradigm #1908
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,44 @@ | ||
import type { DeviceLogsBackend, LogContext } from './struct.js'; | ||
|
||
import _ from 'lodash'; | ||
import { RedisBackend } from './backends/redis.js'; | ||
import { | ||
LOGS_DEFAULT_RETENTION_LIMIT, | ||
LOKI_QUERY_HOST, | ||
LOKI_READ_PCT, | ||
LOKI_INGESTER_HOST, | ||
LOKI_WRITE_PCT, | ||
LOGS_LOKI_ENABLED, | ||
LOGS_REDIS_ENABLED, | ||
LOGS_PRIMARY_BACKEND, | ||
LOGS_REDIS_READ_PCT, | ||
LOGS_REDIS_WRITE_PCT, | ||
} from '../../../lib/config.js'; | ||
|
||
export const LOKI_ENABLED = | ||
LOKI_QUERY_HOST && LOKI_INGESTER_HOST && LOKI_WRITE_PCT > 0; | ||
export const shouldPublishToLoki = () => | ||
LOKI_ENABLED && LOKI_WRITE_PCT > Math.random() * 100; | ||
export const shouldReadFromLoki = () => | ||
LOKI_ENABLED && LOKI_READ_PCT > Math.random() * 100; | ||
const LOGS_SECONDARY_BACKEND = | ||
LOGS_PRIMARY_BACKEND === 'loki' ? 'redis' : 'loki'; | ||
|
||
export const LOGS_SECONDARY_BACKEND_ENABLED = | ||
(LOGS_SECONDARY_BACKEND === 'loki' && LOGS_LOKI_ENABLED) || | ||
(LOGS_SECONDARY_BACKEND === 'redis' && LOGS_REDIS_ENABLED); | ||
|
||
export const shouldReadFromSecondary = (): boolean => { | ||
if (LOGS_SECONDARY_BACKEND_ENABLED) { | ||
if (LOGS_SECONDARY_BACKEND === 'loki') { | ||
return LOKI_READ_PCT > Math.random() * 100; | ||
} else if (LOGS_SECONDARY_BACKEND === 'redis') { | ||
return LOGS_REDIS_READ_PCT > Math.random() * 100; | ||
} | ||
} | ||
return false; | ||
}; | ||
export const shouldPublishToSecondary = (): boolean => { | ||
if (LOGS_SECONDARY_BACKEND_ENABLED) { | ||
if (LOGS_SECONDARY_BACKEND === 'loki') { | ||
return LOKI_WRITE_PCT > Math.random() * 100; | ||
} else if (LOGS_SECONDARY_BACKEND === 'redis') { | ||
return LOGS_REDIS_WRITE_PCT > Math.random() * 100; | ||
} | ||
} | ||
return false; | ||
}; | ||
|
||
export function addRetentionLimit( | ||
ctx: Omit<LogContext, 'retention_limit'>, | ||
|
@@ -26,9 +49,30 @@ export function addRetentionLimit( | |
}; | ||
} | ||
|
||
export const getBackend = _.once((): DeviceLogsBackend => new RedisBackend()); | ||
export const getPrimaryBackend = _.once( | ||
async (): Promise<DeviceLogsBackend> => | ||
LOGS_PRIMARY_BACKEND === 'redis' | ||
? await getRedisBackend() | ||
: await getLokiBackend(), | ||
); | ||
|
||
export const getSecondaryBackend = _.once( | ||
async (): Promise<DeviceLogsBackend> => { | ||
if (LOGS_SECONDARY_BACKEND_ENABLED === false) { | ||
throw new Error('Secondary backend is not enabled'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT How about just returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doing the throw helps avoid accidental mistakes and also makes it simpler in the case of reading, in the case of writing we need both backends permanently available though (and cannot fetch them on demand because we can't have async boundaries) hence that one needing to be special cased and that's why I went with a throw rather than an implicit |
||
} | ||
return LOGS_SECONDARY_BACKEND === 'redis' | ||
? await getRedisBackend() | ||
: await getLokiBackend(); | ||
}, | ||
); | ||
|
||
const getRedisBackend = _.once(async (): Promise<DeviceLogsBackend> => { | ||
const { RedisBackend } = await import('./backends/redis.js'); | ||
return new RedisBackend(); | ||
}); | ||
|
||
export const getLokiBackend = _.once(async (): Promise<DeviceLogsBackend> => { | ||
const getLokiBackend = _.once(async (): Promise<DeviceLogsBackend> => { | ||
const { LokiBackend } = await import('./backends/loki.js'); | ||
return new LokiBackend(); | ||
}); | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,10 +1,5 @@ | ||||||
import type { Request, RequestHandler, Response } from 'express'; | ||||||
import type { | ||||||
DeviceLog, | ||||||
DeviceLogsBackend, | ||||||
LogContext, | ||||||
SupervisorLog, | ||||||
} from './struct.js'; | ||||||
import type { DeviceLog, LogContext, SupervisorLog } from './struct.js'; | ||||||
|
||||||
import onFinished from 'on-finished'; | ||||||
import type { permissions } from '@balena/pinejs'; | ||||||
|
@@ -19,15 +14,16 @@ import { | |||||
} from '../../../infra/error-handling/index.js'; | ||||||
import { | ||||||
addRetentionLimit, | ||||||
getBackend, | ||||||
getLokiBackend, | ||||||
LOKI_ENABLED, | ||||||
shouldPublishToLoki, | ||||||
getPrimaryBackend, | ||||||
getSecondaryBackend, | ||||||
LOGS_SECONDARY_BACKEND_ENABLED, | ||||||
shouldPublishToSecondary, | ||||||
} from './config.js'; | ||||||
import type { SetupOptions } from '../../../index.js'; | ||||||
import { | ||||||
DEVICE_LOGS_WRITE_AUTH_CACHE_TIMEOUT, | ||||||
LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL, | ||||||
LOGS_PRIMARY_BACKEND, | ||||||
LOGS_STREAM_FLUSH_INTERVAL, | ||||||
LOGS_WRITE_BUFFER_LIMIT, | ||||||
NDJSON_CTYPE, | ||||||
|
@@ -102,7 +98,7 @@ export const store: RequestHandler = async (req: Request, res: Response) => { | |||||
if (logs.length) { | ||||||
const ctx = await getWriteContext(req); | ||||||
// start publishing to both backends | ||||||
await publishBackend(getBackend(), ctx, logs); | ||||||
await publishBackend(ctx, logs); | ||||||
} | ||||||
res.status(201).end(); | ||||||
} catch (err) { | ||||||
|
@@ -132,38 +128,31 @@ function handleStoreErrors(req: Request, res: Response, err: Error) { | |||||
res.status(500).end(); | ||||||
} | ||||||
|
||||||
const lokiBackend = LOKI_ENABLED ? await getLokiBackend() : undefined; | ||||||
const primaryBackend = await getPrimaryBackend(); | ||||||
const secondaryBackend = LOGS_SECONDARY_BACKEND_ENABLED | ||||||
? await getSecondaryBackend() | ||||||
: undefined; | ||||||
|
||||||
const publishBackend = LOKI_ENABLED | ||||||
? async ( | ||||||
backend: DeviceLogsBackend, | ||||||
ctx: LogContext, | ||||||
buffer: DeviceLog[], | ||||||
) => { | ||||||
const publishingToRedis = backend.publish(ctx, buffer); | ||||||
const publishingToLoki = shouldPublishToLoki() | ||||||
? lokiBackend?.publish(ctx, buffer).catch((err) => { | ||||||
captureException(err, 'Failed to publish logs to Loki'); | ||||||
}) | ||||||
: undefined; | ||||||
await Promise.all([publishingToRedis, publishingToLoki]); | ||||||
} | ||||||
: async ( | ||||||
backend: DeviceLogsBackend, | ||||||
ctx: LogContext, | ||||||
buffer: DeviceLog[], | ||||||
) => { | ||||||
await backend.publish(ctx, buffer); | ||||||
}; | ||||||
const publishBackend = async (ctx: LogContext, buffer: DeviceLog[]) => { | ||||||
const primaryBackendPromise = primaryBackend.publish(ctx, buffer); | ||||||
const secondaryBackendPromise = shouldPublishToSecondary() | ||||||
? secondaryBackend?.publish(ctx, buffer).catch((err) => { | ||||||
captureException( | ||||||
err, | ||||||
`Failed to publish logs to ${LOGS_PRIMARY_BACKEND === 'loki' ? 'redis' : 'loki'}`, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT
Suggested change
|
||||||
); | ||||||
}) | ||||||
: undefined; | ||||||
await Promise.all([primaryBackendPromise, secondaryBackendPromise]); | ||||||
}; | ||||||
|
||||||
function handleStreamingWrite( | ||||||
ctx: LogContext, | ||||||
req: Request, | ||||||
res: Response, | ||||||
): void { | ||||||
const backend = getBackend(); | ||||||
// If the backend is down, reject right away, don't take in new connections | ||||||
if (!backend.available) { | ||||||
if (!primaryBackend.available) { | ||||||
throw new ServiceUnavailableError('The logs storage is unavailable'); | ||||||
} | ||||||
if (req.get('Content-Type') !== NDJSON_CTYPE) { | ||||||
|
@@ -200,7 +189,7 @@ function handleStreamingWrite( | |||||
} | ||||||
buffer.push(log); | ||||||
// If we buffer too much or the backend goes down, pause it for back-pressure | ||||||
if (buffer.length >= bufferLimit || !backend.available) { | ||||||
if (buffer.length >= bufferLimit || !primaryBackend.available) { | ||||||
req.pause(); | ||||||
} | ||||||
}); | ||||||
|
@@ -220,7 +209,7 @@ function handleStreamingWrite( | |||||
async function tryPublish() { | ||||||
try { | ||||||
// Don't flush if the backend is reporting as unavailable | ||||||
if (buffer.length && backend.available) { | ||||||
if (buffer.length && primaryBackend.available) { | ||||||
if (buffer.length > bufferLimit) { | ||||||
// Ensure the buffer cannot be larger than the buffer limit, adding a warning message if we removed messages | ||||||
const deleteCount = buffer.length - bufferLimit; | ||||||
|
@@ -234,7 +223,7 @@ function handleStreamingWrite( | |||||
}); | ||||||
} | ||||||
// Even if the connection was closed, still flush the buffer | ||||||
const publishPromise = publishBackend(backend, ctx, buffer); | ||||||
const publishPromise = publishBackend(ctx, buffer); | ||||||
// Clear the buffer | ||||||
buffer.length = 0; | ||||||
// Resume in case it was paused due to buffering | ||||||
|
@@ -259,7 +248,7 @@ function handleStreamingWrite( | |||||
return; | ||||||
} | ||||||
// If the backend goes down temporarily, ease down the polling | ||||||
const delay = backend.available | ||||||
const delay = primaryBackend.available | ||||||
? LOGS_STREAM_FLUSH_INTERVAL | ||||||
: LOGS_BACKEND_UNAVAILABLE_FLUSH_INTERVAL; | ||||||
setTimeout(tryPublish, delay); | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is to not mutate the log, it isn't a problem when loki is the secondary but when it's the primary it can cause problems for the secondary as they're getting a loki versioned log which they don't expect