From 339f6a1dad9b1f4fd15130cfc9e8c46b4c70b4f6 Mon Sep 17 00:00:00 2001 From: Pagan Gazzard Date: Tue, 10 Dec 2024 18:59:25 +0000 Subject: [PATCH] Loki: simplify stream logic as we only use a single stream per device Change-type: patch --- src/features/device-logs/lib/backends/loki.ts | 26 +++++++------------ test/13_loki-backend.ts | 10 +++---- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/src/features/device-logs/lib/backends/loki.ts b/src/features/device-logs/lib/backends/loki.ts index 9a4bdd9fb..1ccc258c7 100644 --- a/src/features/device-logs/lib/backends/loki.ts +++ b/src/features/device-logs/lib/backends/loki.ts @@ -220,10 +220,10 @@ export class LokiBackend implements DeviceLogsBackend { const countLogs = logs.length; incrementPublishCallTotal(); incrementPublishLogMessagesTotal(countLogs); - const streams = this.fromDeviceLogsToStreams(ctx, logs); + const stream = this.fromDeviceLogsToStream(ctx, logs); const lokiCtx = await assertLokiLogContext(ctx); try { - await this.push(lokiCtx.belongs_to__application, streams); + await this.push(lokiCtx.belongs_to__application, stream); incrementPublishCallSuccessTotal(); } catch (err) { incrementPublishCallFailedTotal(); @@ -242,10 +242,10 @@ export class LokiBackend implements DeviceLogsBackend { } } - private push(appId: number, streams: loki.StreamAdapter[]): Promise { + private push(appId: number, stream: loki.StreamAdapter): Promise { incrementLokiPushTotal(); const pushRequest = new loki.PushRequest(); - pushRequest.setStreamsList(streams); + pushRequest.addStreams(stream); const startAt = Date.now(); return new Promise((resolve, reject) => { this.pusher.push( @@ -368,12 +368,13 @@ export class LokiBackend implements DeviceLogsBackend { } } - private fromDeviceLogsToStreams( + private fromDeviceLogsToStream( ctx: LogContext, logs: Array, ) { - const streams: loki.StreamAdapter[] = []; - const streamIndex: { [key: string]: loki.StreamAdapter } = {}; // index streams by labels for fast lookup + const labels = this.getLabels(ctx); + const stream = new loki.StreamAdapter(); + stream.setLabels(labels); for (const log of logs) { this.validateLog(log); log.version = VERSION; @@ -386,18 +387,9 @@ export class LokiBackend implements DeviceLogsBackend { const entry = new loki.EntryAdapter() .setLine(logJson) .setTimestamp(timestamp); - const labels = this.getLabels(ctx); // append entry to stream - let stream = streamIndex[labels]; - if (!stream) { - // new stream if none exist for labels - stream = new loki.StreamAdapter(); - stream.setLabels(labels); - streams.push(stream); - streamIndex[labels] = stream; - } stream.addEntries(entry); } - return streams; + return stream; } } diff --git a/test/13_loki-backend.ts b/test/13_loki-backend.ts index c43df6941..d5bad365b 100644 --- a/test/13_loki-backend.ts +++ b/test/13_loki-backend.ts @@ -57,14 +57,10 @@ export default () => { createLog({ serviceId: 3 }), ]; // @ts-expect-error usage of private function - const streams = loki.fromDeviceLogsToStreams(ctx, _.cloneDeep(logs)); - expect(streams.length).to.equal( - 1, - 'should be 1 stream since all logs share the same device id', - ); + const stream = loki.fromDeviceLogsToStream(ctx, _.cloneDeep(logs)); // @ts-expect-error usage of private function - const logsFromStreams = streams.flatMap(loki.fromStreamToDeviceLogs); - expect(logsFromStreams).to.deep.equal(logs); + const logsFromStream = loki.fromStreamToDeviceLogs(stream); + expect(logsFromStream).to.deep.equal(logs); }); it('should push multiple logs with different labels and return in order', async function () {