diff --git a/package-lock.json b/package-lock.json index 65b4abde0..13c8e3730 100644 --- a/package-lock.json +++ b/package-lock.json @@ -85,6 +85,7 @@ "request": "^2.88.2", "rsmq": "^0.12.4", "semver": "^7.5.0", + "snappy": "^7.2.2", "strict-event-emitter-types": "^2.0.0", "supervisor": "^0.12.0", "tar": "^6.1.13", @@ -3399,6 +3400,201 @@ "url": "https://github.com/sponsors/isaacs" } }, + "node_modules/@napi-rs/snappy-android-arm-eabi": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-android-arm-eabi/-/snappy-android-arm-eabi-7.2.2.tgz", + "integrity": "sha512-H7DuVkPCK5BlAr1NfSU8bDEN7gYs+R78pSHhDng83QxRnCLmVIZk33ymmIwurmoA1HrdTxbkbuNl+lMvNqnytw==", + "cpu": [ + "arm" + ], + "optional": true, + "os": [ + "android" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-android-arm64": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-android-arm64/-/snappy-android-arm64-7.2.2.tgz", + "integrity": "sha512-2R/A3qok+nGtpVK8oUMcrIi5OMDckGYNoBLFyli3zp8w6IArPRfg1yOfVUcHvpUDTo9T7LOS1fXgMOoC796eQw==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "android" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-darwin-arm64": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-darwin-arm64/-/snappy-darwin-arm64-7.2.2.tgz", + "integrity": "sha512-USgArHbfrmdbuq33bD5ssbkPIoT7YCXCRLmZpDS6dMDrx+iM7eD2BecNbOOo7/v1eu6TRmQ0xOzeQ6I/9FIi5g==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "darwin" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-darwin-x64": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-darwin-x64/-/snappy-darwin-x64-7.2.2.tgz", + "integrity": "sha512-0APDu8iO5iT0IJKblk2lH0VpWSl9zOZndZKnBYIc+ei1npw2L5QvuErFOTeTdHBtzvUHASB+9bvgaWnQo4PvTQ==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "darwin" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-freebsd-x64": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-freebsd-x64/-/snappy-freebsd-x64-7.2.2.tgz", + "integrity": "sha512-mRTCJsuzy0o/B0Hnp9CwNB5V6cOJ4wedDTWEthsdKHSsQlO7WU9W1yP7H3Qv3Ccp/ZfMyrmG98Ad7u7lG58WXA==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "freebsd" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-linux-arm-gnueabihf": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-linux-arm-gnueabihf/-/snappy-linux-arm-gnueabihf-7.2.2.tgz", + "integrity": "sha512-v1uzm8+6uYjasBPcFkv90VLZ+WhLzr/tnfkZ/iD9mHYiULqkqpRuC8zvc3FZaJy5wLQE9zTDkTJN1IvUcZ+Vcg==", + "cpu": [ + "arm" + ], + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-linux-arm64-gnu": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-linux-arm64-gnu/-/snappy-linux-arm64-gnu-7.2.2.tgz", + "integrity": "sha512-LrEMa5pBScs4GXWOn6ZYXfQ72IzoolZw5txqUHVGs8eK4g1HR9HTHhb2oY5ySNaKakG5sOgMsb1rwaEnjhChmQ==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-linux-arm64-musl": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-linux-arm64-musl/-/snappy-linux-arm64-musl-7.2.2.tgz", + "integrity": "sha512-3orWZo9hUpGQcB+3aTLW7UFDqNCQfbr0+MvV67x8nMNYj5eAeUtMmUE/HxLznHO4eZ1qSqiTwLbVx05/Socdlw==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-linux-x64-gnu": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-linux-x64-gnu/-/snappy-linux-x64-gnu-7.2.2.tgz", + "integrity": "sha512-jZt8Jit/HHDcavt80zxEkDpH+R1Ic0ssiVCoueASzMXa7vwPJeF4ZxZyqUw4qeSy7n8UUExomu8G8ZbP6VKhgw==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-linux-x64-musl": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-linux-x64-musl/-/snappy-linux-x64-musl-7.2.2.tgz", + "integrity": "sha512-Dh96IXgcZrV39a+Tej/owcd9vr5ihiZ3KRix11rr1v0MWtVb61+H1GXXlz6+Zcx9y8jM1NmOuiIuJwkV4vZ4WA==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "linux" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-win32-arm64-msvc": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-win32-arm64-msvc/-/snappy-win32-arm64-msvc-7.2.2.tgz", + "integrity": "sha512-9No0b3xGbHSWv2wtLEn3MO76Yopn1U2TdemZpCaEgOGccz1V+a/1d16Piz3ofSmnA13HGFz3h9NwZH9EOaIgYA==", + "cpu": [ + "arm64" + ], + "optional": true, + "os": [ + "win32" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-win32-ia32-msvc": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-win32-ia32-msvc/-/snappy-win32-ia32-msvc-7.2.2.tgz", + "integrity": "sha512-QiGe+0G86J74Qz1JcHtBwM3OYdTni1hX1PFyLRo3HhQUSpmi13Bzc1En7APn+6Pvo7gkrcy81dObGLDSxFAkQQ==", + "cpu": [ + "ia32" + ], + "optional": true, + "os": [ + "win32" + ], + "engines": { + "node": ">= 10" + } + }, + "node_modules/@napi-rs/snappy-win32-x64-msvc": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/@napi-rs/snappy-win32-x64-msvc/-/snappy-win32-x64-msvc-7.2.2.tgz", + "integrity": "sha512-a43cyx1nK0daw6BZxVcvDEXxKMFLSBSDTAhsFD0VqSKcC7MGUBMaqyoWUcMiI7LBSz4bxUmxDWKfCYzpEmeb3w==", + "cpu": [ + "x64" + ], + "optional": true, + "os": [ + "win32" + ], + "engines": { + "node": ">= 10" + } + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -11887,6 +12083,33 @@ "url": "https://github.com/chalk/ansi-styles?sponsor=1" } }, + "node_modules/snappy": { + "version": "7.2.2", + "resolved": "https://registry.npmjs.org/snappy/-/snappy-7.2.2.tgz", + "integrity": "sha512-iADMq1kY0v3vJmGTuKcFWSXt15qYUz7wFkArOrsSg0IFfI3nJqIJvK2/ZbEIndg7erIJLtAVX2nSOqPz7DcwbA==", + "engines": { + "node": ">= 10" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/Brooooooklyn" + }, + "optionalDependencies": { + "@napi-rs/snappy-android-arm-eabi": "7.2.2", + "@napi-rs/snappy-android-arm64": "7.2.2", + "@napi-rs/snappy-darwin-arm64": "7.2.2", + "@napi-rs/snappy-darwin-x64": "7.2.2", + "@napi-rs/snappy-freebsd-x64": "7.2.2", + "@napi-rs/snappy-linux-arm-gnueabihf": "7.2.2", + "@napi-rs/snappy-linux-arm64-gnu": "7.2.2", + "@napi-rs/snappy-linux-arm64-musl": "7.2.2", + "@napi-rs/snappy-linux-x64-gnu": "7.2.2", + "@napi-rs/snappy-linux-x64-musl": "7.2.2", + "@napi-rs/snappy-win32-arm64-msvc": "7.2.2", + "@napi-rs/snappy-win32-ia32-msvc": "7.2.2", + "@napi-rs/snappy-win32-x64-msvc": "7.2.2" + } + }, "node_modules/sorted-array-functions": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/sorted-array-functions/-/sorted-array-functions-1.3.0.tgz", diff --git a/package.json b/package.json index 123ef6f9d..1d70e2849 100644 --- a/package.json +++ b/package.json @@ -109,6 +109,7 @@ "request": "^2.88.2", "rsmq": "^0.12.4", "semver": "^7.5.0", + "snappy": "^7.2.2", "strict-event-emitter-types": "^2.0.0", "supervisor": "^0.12.0", "tar": "^6.1.13", diff --git a/src/features/device-logs/lib/backends/redis.ts b/src/features/device-logs/lib/backends/redis.ts index df5a4ae68..390c4ab5f 100644 --- a/src/features/device-logs/lib/backends/redis.ts +++ b/src/features/device-logs/lib/backends/redis.ts @@ -1,4 +1,5 @@ import avro from 'avsc'; +import * as snappy from 'snappy'; import { stripIndent } from 'common-tags'; import { EventEmitter } from 'events'; import _ from 'lodash'; @@ -140,7 +141,8 @@ export class RedisBackend implements DeviceLogsBackend { count === Infinity ? 0 : -count, -1, ); - return _(payloads).map(this.fromRedisLog).compact().value(); + const parsedLogs = await Promise.all(payloads.map(this.fromRedisLog)); + return _.compact(parsedLogs); } public get available(): boolean { @@ -152,7 +154,7 @@ export class RedisBackend implements DeviceLogsBackend { throw new ServiceUnavailableError(); } // Immediately map the logs as they are synchronously cleared - const redisLogs = logs.map(this.toRedisLog, this); + const redisLogs = await Promise.all(logs.map(this.toRedisLog, this)); const limit = ctx.retention_limit; const key = this.getKey(ctx); @@ -229,16 +231,25 @@ export class RedisBackend implements DeviceLogsBackend { return `{device:${ctx.id}}:${suffix}`; } - private handleMessage(key: string, payload: string) { - const log = this.fromRedisLog(payload); + private async handleMessage(key: string, payload: string) { + const log = await this.fromRedisLog(payload); if (log) { this.subscriptions.emit(key, log); } } - private fromRedisLog(payload: string): DeviceLog | undefined { + private async fromRedisLog(payload: string): Promise { try { - const log = schema.fromBuffer(Buffer.from(payload, BUFFER_ENCODING)); + let decompressedBuffer = Buffer.from(payload, BUFFER_ENCODING); + try { + decompressedBuffer = (await snappy.uncompress( + decompressedBuffer, + )) as Buffer; + } catch { + // We ignore ones that fail to decompress as they are likely from before we added compression + // TODO: Stop ignoring these errors once we're sure all logs are compressed + } + const log = schema.fromBuffer(decompressedBuffer); if (log.version !== VERSION) { throw new Error( `Invalid Redis serialization version: ${JSON.stringify(log)}`, @@ -254,9 +265,10 @@ export class RedisBackend implements DeviceLogsBackend { } } - private toRedisLog(log: DeviceLog): string { + private async toRedisLog(log: DeviceLog): Promise { try { - return schema.toBuffer(log).toString(BUFFER_ENCODING); + const compressedLog = await snappy.compress(schema.toBuffer(log)); + return compressedLog.toString(BUFFER_ENCODING); } catch (err) { captureException(err, 'Failed to convert log to redis buffer'); throw new BadRequestError();