Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otlp logs push #534

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions lib/handlers/otlp_log_push.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
const DATABASE = require('../db/clickhouse')
const { asyncLogError, logType, metricType, bothType, readonly } = require('../../common')
const UTILS = require('../utils')
const stringify = UTILS.stringify
const fingerPrint = UTILS.fingerPrint
const { bulk_labels, bulk, labels } = DATABASE.cache

async function handle (req, res) {
if (readonly) {
asyncLogError('Readonly! No push support.', req.log)
return res.code(500).send()
}
try {
const promises = []
const fingerprints = {}
for (const resourceLogsEntry of req.body.resourceLogs) {
const resAttrs = resource2Attrs(resourceLogsEntry.resource)
for (const scopeLogsEntry of resourceLogsEntry.scopeLogs) {
const scopeAttrs = {
...resAttrs,
...resource2Attrs(scopeLogsEntry.scope)
}
for (const logRecord of scopeLogsEntry.logRecords) {
const logAttrs = {
...scopeAttrs,
...resource2Attrs(logRecord)
}
if (logRecord.severityText) {
logAttrs.level = logRecord.severityText
}
const labels = stringify(logAttrs)
const fingerprint = fingerPrint(labels)
const ts = BigInt(logRecord.timeUnixNano)
promises.push(bulk.add([[
fingerprint,
ts,
null,
anyValueToString(logRecord.body),
logType
]]))
const date = new Date(Number(ts / BigInt(1000000))).toISOString().split('T')[0]
!fingerprints[fingerprint] && promises.push(bulk_labels.add([[
date,
fingerprint,
labels,
labels.name || '',
logType
]]))
fingerprints[fingerprint] = true
}
}
}
await Promise.all(promises)
} catch (error) {
await asyncLogError(error)
res.status(500).send({ error: 'Internal Server Error' })
}
}

function resource2Attrs (resource) {
if (!resource || !resource.attributes) {
return {}
}
const attrs = {}
for (const attribute of resource.attributes) {
attrs[normalizeAttrName(attribute.key)] = anyValueToString(attribute.value)
}
return attrs
}

function normalizeAttrName (name) {
return name.replaceAll(/[^a-zA-Z0-9_]/g, '_')
}

function anyValueToString (value) {
if (!value) {
return ''
}
if (value.stringValue) {
return value.stringValue
}
if (value.boolValue) {
return value.boolValue ? 'true' : 'false'
}
if (value.intValue) {
return value.intValue.toString()
}
if (value.doubleValue) {
return value.doubleValue.toString()
}
if (value.bytesValue) {
return Buffer.from(value.bytesValue).toString('base64')
}
if (value.arrayValue) {
return JSON.stringify(value.arrayValue.values.map(anyValueToString))
}
if (value.kvlistValue) {
return JSON.stringify(value.kvlistValue.values.reduce((agg, pair) => ({
...agg,
[pair.key]: anyValueToString(pair.value)
})))
}
return ''
}

module.exports = handle
177 changes: 177 additions & 0 deletions lib/otlp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,180 @@ message Status {
// The status code.
StatusCode code = 3;
}

// Recource logs definition

message LogsData {
// An array of ResourceLogs.
// For data coming from a single resource this array will typically contain
// one element. Intermediary nodes that receive data from multiple origins
// typically batch the data before forwarding further and in that case this
// array will contain multiple elements.
repeated ResourceLogs resource_logs = 1;
}

// A collection of ScopeLogs from a Resource.
message ResourceLogs {
reserved 1000;

// The resource for the logs in this message.
// If this field is not set then resource info is unknown.
Resource resource = 1;

// A list of ScopeLogs that originate from a resource.
repeated ScopeLogs scope_logs = 2;

// The Schema URL, if known. This is the identifier of the Schema that the resource data
// is recorded in. To learn more about Schema URL see
// https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
// This schema_url applies to the data in the "resource" field. It does not apply
// to the data in the "scope_logs" field which have their own schema_url field.
string schema_url = 3;
}

// A collection of Logs produced by a Scope.
message ScopeLogs {
// The instrumentation scope information for the logs in this message.
// Semantically when InstrumentationScope isn't set, it is equivalent with
// an empty instrumentation scope name (unknown).
InstrumentationScope scope = 1;

// A list of log records.
repeated LogRecord log_records = 2;

// The Schema URL, if known. This is the identifier of the Schema that the log data
// is recorded in. To learn more about Schema URL see
// https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
// This schema_url applies to all logs in the "logs" field.
string schema_url = 3;
}

// Possible values for LogRecord.SeverityNumber.
enum SeverityNumber {
// UNSPECIFIED is the default SeverityNumber, it MUST NOT be used.
SEVERITY_NUMBER_UNSPECIFIED = 0;
SEVERITY_NUMBER_TRACE = 1;
SEVERITY_NUMBER_TRACE2 = 2;
SEVERITY_NUMBER_TRACE3 = 3;
SEVERITY_NUMBER_TRACE4 = 4;
SEVERITY_NUMBER_DEBUG = 5;
SEVERITY_NUMBER_DEBUG2 = 6;
SEVERITY_NUMBER_DEBUG3 = 7;
SEVERITY_NUMBER_DEBUG4 = 8;
SEVERITY_NUMBER_INFO = 9;
SEVERITY_NUMBER_INFO2 = 10;
SEVERITY_NUMBER_INFO3 = 11;
SEVERITY_NUMBER_INFO4 = 12;
SEVERITY_NUMBER_WARN = 13;
SEVERITY_NUMBER_WARN2 = 14;
SEVERITY_NUMBER_WARN3 = 15;
SEVERITY_NUMBER_WARN4 = 16;
SEVERITY_NUMBER_ERROR = 17;
SEVERITY_NUMBER_ERROR2 = 18;
SEVERITY_NUMBER_ERROR3 = 19;
SEVERITY_NUMBER_ERROR4 = 20;
SEVERITY_NUMBER_FATAL = 21;
SEVERITY_NUMBER_FATAL2 = 22;
SEVERITY_NUMBER_FATAL3 = 23;
SEVERITY_NUMBER_FATAL4 = 24;
}

// LogRecordFlags represents constants used to interpret the
// LogRecord.flags field, which is protobuf 'fixed32' type and is to
// be used as bit-fields. Each non-zero value defined in this enum is
// a bit-mask. To extract the bit-field, for example, use an
// expression like:
//
// (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK)
//
enum LogRecordFlags {
// The zero value for the enum. Should not be used for comparisons.
// Instead use bitwise "and" with the appropriate mask as shown above.
LOG_RECORD_FLAGS_DO_NOT_USE = 0;

// Bits 0-7 are used for trace flags.
LOG_RECORD_FLAGS_TRACE_FLAGS_MASK = 0x000000FF;

// Bits 8-31 are reserved for future use.
}

// A log record according to OpenTelemetry Log Data Model:
// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md
message LogRecord {
reserved 4;

// time_unix_nano is the time when the event occurred.
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
// Value of 0 indicates unknown or missing timestamp.
fixed64 time_unix_nano = 1;

// Time when the event was observed by the collection system.
// For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK)
// this timestamp is typically set at the generation time and is equal to Timestamp.
// For events originating externally and collected by OpenTelemetry (e.g. using
// Collector) this is the time when OpenTelemetry's code observed the event measured
// by the clock of the OpenTelemetry code. This field MUST be set once the event is
// observed by OpenTelemetry.
//
// For converting OpenTelemetry log data to formats that support only one timestamp or
// when receiving OpenTelemetry log data by recipients that support only one timestamp
// internally the following logic is recommended:
// - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano.
//
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
// Value of 0 indicates unknown or missing timestamp.
fixed64 observed_time_unix_nano = 11;

// Numerical value of the severity, normalized to values described in Log Data Model.
// [Optional].
SeverityNumber severity_number = 2;

// The severity text (also known as log level). The original string representation as
// it is known at the source. [Optional].
string severity_text = 3;

// A value containing the body of the log record. Can be for example a human-readable
// string message (including multi-line) describing the event in a free form or it can
// be a structured data composed of arrays and maps of other values. [Optional].
AnyValue body = 5;

// Additional attributes that describe the specific event occurrence. [Optional].
// Attribute keys MUST be unique (it is not allowed to have more than one
// attribute with the same key).
repeated KeyValue attributes = 6;
uint32 dropped_attributes_count = 7;

// Flags, a bit field. 8 least significant bits are the trace flags as
// defined in W3C Trace Context specification. 24 most significant bits are reserved
// and must be set to 0. Readers must not assume that 24 most significant bits
// will be zero and must correctly mask the bits when reading 8-bit trace flag (use
// flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). [Optional].
fixed32 flags = 8;

// A unique identifier for a trace. All logs from the same trace share
// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR
// of length other than 16 bytes is considered invalid (empty string in OTLP/JSON
// is zero-length and thus is also invalid).
//
// This field is optional.
//
// The receivers SHOULD assume that the log record is not associated with a
// trace if any of the following is true:
// - the field is not present,
// - the field contains an invalid value.
bytes trace_id = 9;

// A unique identifier for a span within a trace, assigned when the span
// is created. The ID is an 8-byte array. An ID with all zeroes OR of length
// other than 8 bytes is considered invalid (empty string in OTLP/JSON
// is zero-length and thus is also invalid).
//
// This field is optional. If the sender specifies a valid span_id then it SHOULD also
// specify a valid trace_id.
//
// The receivers SHOULD assume that the log record is not associated with a
// span if any of the following is true:
// - the field is not present,
// - the field contains an invalid value.
bytes span_id = 10;
}
27 changes: 26 additions & 1 deletion parsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const path = require('path')
const WriteRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'prompb.proto')).lookupType('WriteRequest')
const PushRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'loki.proto')).lookupType('PushRequest')
const OTLPTraceData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('TracesData')
const OTLPLogsData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('LogsData')
const { parse: queryParser } = require('fast-querystring')

/**
Expand Down Expand Up @@ -202,6 +203,29 @@ function tempoNDJsonParser (req, payload) {
return parser
}

/**
*
* @param req {FastifyRequest}
* @param payload {Stream}
* @returns {*}
*/
async function otlpLogsDataParser (req, payload) {
const length = getContentLength(req, 5e6)
await shaper.register(length)
let body = []
const otelStream = stream.Readable.from(payload)
otelStream.on('data', data => {
body.push(data)
})
await new Promise(resolve => otelStream.once('end', resolve))
body = Buffer.concat(body)
body = OTLPLogsData.toObject(OTLPLogsData.decode(body), {
longs: String,
bytes: String
})
return body
}

/**
*
* @param subparsers {function(FastifyRequest): Promise<*|undefined>}
Expand Down Expand Up @@ -363,5 +387,6 @@ module.exports = {
tempoNDJsonParser,
otlpPushProtoParser,
wwwFormParser,
parsers
parsers,
otlpLogsDataParser
}
7 changes: 6 additions & 1 deletion qryn_bun.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
combinedParser,
jsonParser,
lokiPushJSONParser,
lokiPushProtoParser, otlpPushProtoParser, prometheusPushProtoParser,
lokiPushProtoParser, otlpLogsDataParser, otlpPushProtoParser, prometheusPushProtoParser,
rawStringParser,
tempoPushNDJSONParser,
tempoPushParser, wwwFormParser, yamlParser
Expand Down Expand Up @@ -56,6 +56,7 @@ import handlerPromGetRules from './lib/handlers/alerts/prom_get_rules.js'
import handlerTail from './lib/handlers/tail.js'
import handlerTempoLabelV2 from './lib/handlers/tempo_v2_tags.js'
import handlerTempoLabelV2Values from './lib/handlers/tempo_v2_values.js'
import handlerOtlpLogsPush from './lib/handlers/otlp_log_push.js'
import {init as pyroscopeInit } from './pyroscope/pyroscope.js'

import { boolEnv, readonly, readerMode, writerMode } from './common.js'
Expand Down Expand Up @@ -332,6 +333,10 @@ export default async() => {

readerMode && pyroscopeInit(fastify)

writerMode && fastify.post('/v1/logs', handlerOtlpLogsPush, {
'*': otlpLogsDataParser
})

const serveView = fs.existsSync(path.join(__dirname, 'view/index.html'))
if (serveView) {
app.plug(group(path.join(__dirname, 'view')));
Expand Down
9 changes: 6 additions & 3 deletions qryn_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@ const {
shaper,
parsers,
lokiPushJSONParser, lokiPushProtoParser, jsonParser, rawStringParser, tempoPushParser, tempoPushNDJSONParser,
yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser
yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser, otlpLogsDataParser
} = require('./parsers')

const fastifyPlugin = require('fastify-plugin')



let fastify = require('fastify')({
logger,
bodyLimit: parseInt(process.env.FASTIFY_BODYLIMIT) || 5242880,
Expand Down Expand Up @@ -460,6 +458,11 @@ let fastify = require('fastify')({

readerMode && require('./pyroscope/pyroscope').init(fastify)

const handleOTLPLogs = require('./lib/handlers/otlp_log_push').bind(this)
writerMode && fastify.post('/v1/logs', handleOTLPLogs, {
'*': otlpLogsDataParser
})

// Run API Service
fastify.listen(
{
Expand Down
Loading