diff --git a/package-lock.json b/package-lock.json index 157cfb198a..96ec3f1950 100644 --- a/package-lock.json +++ b/package-lock.json @@ -68,7 +68,7 @@ "ipaddr.js": "^2.2.0", "jsonwebtoken": "^9.0.2", "lodash": "^4.17.21", - "loki-grpc-client": "^2.0.0", + "loki-grpc-client": "^2.2.0", "memoizee": "^0.4.17", "morgan": "^1.10.0", "ndjson": "^2.0.0", @@ -11072,9 +11072,9 @@ } }, "node_modules/loki-grpc-client": { - "version": "2.1.0", - "resolved": "https://registry.npmjs.org/loki-grpc-client/-/loki-grpc-client-2.1.0.tgz", - "integrity": "sha512-pp8JTFEdNocw1ZLqeXUsUIFIp6f7kI0CcmjMIRs2umKhAVd6a3VWuMeZHvti50EfOB6GFhC7d20r4Kxq8QItiQ==", + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/loki-grpc-client/-/loki-grpc-client-2.2.0.tgz", + "integrity": "sha512-OFHDYrU7Ur15ulvhKwOWZetUCwh/6igWVighLxiIouz3Jb3mREFKd95pneldNMO6wBajzui5wH2aUwV+lht4ZQ==", "license": "Apache-2.0", "dependencies": { "@grpc/grpc-js": "^1.12.2", diff --git a/package.json b/package.json index 9f7f27a861..de7b7958c6 100644 --- a/package.json +++ b/package.json @@ -103,7 +103,7 @@ "ipaddr.js": "^2.2.0", "jsonwebtoken": "^9.0.2", "lodash": "^4.17.21", - "loki-grpc-client": "^2.0.0", + "loki-grpc-client": "^2.2.0", "memoizee": "^0.4.17", "morgan": "^1.10.0", "ndjson": "^2.0.0", diff --git a/src/features/device-logs/lib/backends/loki.ts b/src/features/device-logs/lib/backends/loki.ts index 1ccc258c76..00a3a07f62 100644 --- a/src/features/device-logs/lib/backends/loki.ts +++ b/src/features/device-logs/lib/backends/loki.ts @@ -214,16 +214,16 @@ export class LokiBackend implements DeviceLogsBackend { } public async publish( - ctx: LogContext, + $ctx: LogContext, logs: Array, ): Promise { + const ctx = await assertLokiLogContext($ctx); const countLogs = logs.length; incrementPublishCallTotal(); incrementPublishLogMessagesTotal(countLogs); const stream = this.fromDeviceLogsToStream(ctx, logs); - const lokiCtx = await assertLokiLogContext(ctx); try { - await this.push(lokiCtx.belongs_to__application, stream); + await this.push(ctx.belongs_to__application, stream); incrementPublishCallSuccessTotal(); } catch (err) { incrementPublishCallFailedTotal(); @@ -315,16 +315,22 @@ export class LokiBackend implements DeviceLogsBackend { call?.cancel(); } - private getDeviceQuery(ctx: LogContext) { - return `{device_id="${ctx.id}"}`; + private getDeviceQuery(ctx: LokiLogContext) { + return `{application_id="${ctx.belongs_to__application}"} | device_id="${ctx.id}"`; } private getKey(ctx: LokiLogContext, suffix = 'logs') { return `app:${ctx.belongs_to__application}:device:${ctx.id}:${suffix}`; } - private getLabels(ctx: LogContext): string { - return `{device_id="${ctx.id}"}`; + private getStructuredMetadata(ctx: LogContext): loki.LabelPairAdapter[] { + return [ + new loki.LabelPairAdapter().setName('device_id').setValue(`${ctx.id}`), + ]; + } + + private getLabels(ctx: LokiLogContext): string { + return `{application_id="${ctx.belongs_to__application}"}`; } private validateLog(log: DeviceLog): asserts log is DeviceLog { @@ -348,7 +354,7 @@ export class LokiBackend implements DeviceLogsBackend { private fromStreamToDeviceLogs(stream: loki.StreamAdapter): DeviceLog[] { try { - return stream.getEntriesList().map((entry: loki.EntryAdapter) => { + return stream.getEntriesList().map((entry) => { const log = JSON.parse(entry.getLine()); const timestamp = entry.getTimestamp()!; log.nanoTimestamp = @@ -369,7 +375,7 @@ export class LokiBackend implements DeviceLogsBackend { } private fromDeviceLogsToStream( - ctx: LogContext, + ctx: LokiLogContext, logs: Array, ) { const labels = this.getLabels(ctx); @@ -383,10 +389,12 @@ export class LokiBackend implements DeviceLogsBackend { timestamp.setNanos(Number(log.nanoTimestamp % 1000000000n)); // store log line as JSON const logJson = JSON.stringify(log, omitNanoTimestamp); + const structuredMetadata = this.getStructuredMetadata(ctx); // create entry with labels, line and timestamp const entry = new loki.EntryAdapter() .setLine(logJson) - .setTimestamp(timestamp); + .setTimestamp(timestamp) + .setStructuredmetadataList(structuredMetadata); // append entry to stream stream.addEntries(entry); } diff --git a/test/13_loki-backend.ts b/test/13_loki-backend.ts index c43df6941d..8f55cff772 100644 --- a/test/13_loki-backend.ts +++ b/test/13_loki-backend.ts @@ -57,7 +57,7 @@ export default () => { createLog({ serviceId: 3 }), ]; // @ts-expect-error usage of private function - const streams = loki.fromDeviceLogsToStreams(ctx, _.cloneDeep(logs)); + const streams = loki.fromDeviceLogsToStream(ctx, _.cloneDeep(logs)); expect(streams.length).to.equal( 1, 'should be 1 stream since all logs share the same device id',