Skip to content

Commit

Permalink
Merge pull request #1408 from balena-io/snappy-device-logs
Browse files Browse the repository at this point in the history
Use snappy compression for device logs
  • Loading branch information
Page- authored Aug 29, 2023
2 parents 49d6faa + 7bb118f commit 0509161
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 8 deletions.
223 changes: 223 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
"request": "^2.88.2",
"rsmq": "^0.12.4",
"semver": "^7.5.0",
"snappy": "^7.2.2",
"strict-event-emitter-types": "^2.0.0",
"supervisor": "^0.12.0",
"tar": "^6.1.13",
Expand Down
28 changes: 20 additions & 8 deletions src/features/device-logs/lib/backends/redis.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import avro from 'avsc';
import * as snappy from 'snappy';
import { stripIndent } from 'common-tags';
import { EventEmitter } from 'events';
import _ from 'lodash';
Expand Down Expand Up @@ -140,7 +141,8 @@ export class RedisBackend implements DeviceLogsBackend {
count === Infinity ? 0 : -count,
-1,
);
return _(payloads).map(this.fromRedisLog).compact().value();
const parsedLogs = await Promise.all(payloads.map(this.fromRedisLog));
return _.compact(parsedLogs);
}

public get available(): boolean {
Expand All @@ -152,7 +154,7 @@ export class RedisBackend implements DeviceLogsBackend {
throw new ServiceUnavailableError();
}
// Immediately map the logs as they are synchronously cleared
const redisLogs = logs.map(this.toRedisLog, this);
const redisLogs = await Promise.all(logs.map(this.toRedisLog, this));

const limit = ctx.retention_limit;
const key = this.getKey(ctx);
Expand Down Expand Up @@ -229,16 +231,25 @@ export class RedisBackend implements DeviceLogsBackend {
return `{device:${ctx.id}}:${suffix}`;
}

private handleMessage(key: string, payload: string) {
const log = this.fromRedisLog(payload);
private async handleMessage(key: string, payload: string) {
const log = await this.fromRedisLog(payload);
if (log) {
this.subscriptions.emit(key, log);
}
}

private fromRedisLog(payload: string): DeviceLog | undefined {
private async fromRedisLog(payload: string): Promise<DeviceLog | undefined> {
try {
const log = schema.fromBuffer(Buffer.from(payload, BUFFER_ENCODING));
let decompressedBuffer = Buffer.from(payload, BUFFER_ENCODING);
try {
decompressedBuffer = (await snappy.uncompress(
decompressedBuffer,
)) as Buffer;
} catch {
// We ignore ones that fail to decompress as they are likely from before we added compression
// TODO: Stop ignoring these errors once we're sure all logs are compressed
}
const log = schema.fromBuffer(decompressedBuffer);
if (log.version !== VERSION) {
throw new Error(
`Invalid Redis serialization version: ${JSON.stringify(log)}`,
Expand All @@ -254,9 +265,10 @@ export class RedisBackend implements DeviceLogsBackend {
}
}

private toRedisLog(log: DeviceLog): string {
private async toRedisLog(log: DeviceLog): Promise<string> {
try {
return schema.toBuffer(log).toString(BUFFER_ENCODING);
const compressedLog = await snappy.compress(schema.toBuffer(log));
return compressedLog.toString(BUFFER_ENCODING);
} catch (err) {
captureException(err, 'Failed to convert log to redis buffer');
throw new BadRequestError();
Expand Down

0 comments on commit 0509161

Please sign in to comment.