Skip to content

Commit

Permalink
Merge pull request #1894 from balena-io/loki-single-stream
Browse files Browse the repository at this point in the history
Loki: simplify stream logic as we only use a single stream per device
  • Loading branch information
Page- authored Dec 11, 2024
2 parents ffbc374 + 339f6a1 commit 156d747
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 24 deletions.
26 changes: 9 additions & 17 deletions src/features/device-logs/lib/backends/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ export class LokiBackend implements DeviceLogsBackend {
const countLogs = logs.length;
incrementPublishCallTotal();
incrementPublishLogMessagesTotal(countLogs);
const streams = this.fromDeviceLogsToStreams(ctx, logs);
const stream = this.fromDeviceLogsToStream(ctx, logs);
const lokiCtx = await assertLokiLogContext(ctx);
try {
await this.push(lokiCtx.belongs_to__application, streams);
await this.push(lokiCtx.belongs_to__application, stream);
incrementPublishCallSuccessTotal();
} catch (err) {
incrementPublishCallFailedTotal();
Expand All @@ -242,10 +242,10 @@ export class LokiBackend implements DeviceLogsBackend {
}
}

private push(appId: number, streams: loki.StreamAdapter[]): Promise<any> {
private push(appId: number, stream: loki.StreamAdapter): Promise<any> {
incrementLokiPushTotal();
const pushRequest = new loki.PushRequest();
pushRequest.setStreamsList(streams);
pushRequest.addStreams(stream);
const startAt = Date.now();
return new Promise<loki.PushResponse>((resolve, reject) => {
this.pusher.push(
Expand Down Expand Up @@ -368,12 +368,13 @@ export class LokiBackend implements DeviceLogsBackend {
}
}

private fromDeviceLogsToStreams(
private fromDeviceLogsToStream(
ctx: LogContext,
logs: Array<DeviceLog & { version?: number }>,
) {
const streams: loki.StreamAdapter[] = [];
const streamIndex: { [key: string]: loki.StreamAdapter } = {}; // index streams by labels for fast lookup
const labels = this.getLabels(ctx);
const stream = new loki.StreamAdapter();
stream.setLabels(labels);
for (const log of logs) {
this.validateLog(log);
log.version = VERSION;
Expand All @@ -386,18 +387,9 @@ export class LokiBackend implements DeviceLogsBackend {
const entry = new loki.EntryAdapter()
.setLine(logJson)
.setTimestamp(timestamp);
const labels = this.getLabels(ctx);
// append entry to stream
let stream = streamIndex[labels];
if (!stream) {
// new stream if none exist for labels
stream = new loki.StreamAdapter();
stream.setLabels(labels);
streams.push(stream);
streamIndex[labels] = stream;
}
stream.addEntries(entry);
}
return streams;
return stream;
}
}
10 changes: 3 additions & 7 deletions test/13_loki-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,10 @@ export default () => {
createLog({ serviceId: 3 }),
];
// @ts-expect-error usage of private function
const streams = loki.fromDeviceLogsToStreams(ctx, _.cloneDeep(logs));
expect(streams.length).to.equal(
1,
'should be 1 stream since all logs share the same device id',
);
const stream = loki.fromDeviceLogsToStream(ctx, _.cloneDeep(logs));
// @ts-expect-error usage of private function
const logsFromStreams = streams.flatMap(loki.fromStreamToDeviceLogs);
expect(logsFromStreams).to.deep.equal(logs);
const logsFromStream = loki.fromStreamToDeviceLogs(stream);
expect(logsFromStream).to.deep.equal(logs);
});

it('should push multiple logs with different labels and return in order', async function () {
Expand Down

0 comments on commit 156d747

Please sign in to comment.