diff --git a/lib/handlers/otlp_log_push.js b/lib/handlers/otlp_log_push.js new file mode 100644 index 00000000..43347149 --- /dev/null +++ b/lib/handlers/otlp_log_push.js @@ -0,0 +1,106 @@ +const DATABASE = require('../db/clickhouse') +const { asyncLogError, logType, metricType, bothType, readonly } = require('../../common') +const UTILS = require('../utils') +const stringify = UTILS.stringify +const fingerPrint = UTILS.fingerPrint +const { bulk_labels, bulk, labels } = DATABASE.cache + +async function handle (req, res) { + if (readonly) { + asyncLogError('Readonly! No push support.', req.log) + return res.code(500).send() + } + try { + const promises = [] + const fingerprints = {} + for (const resourceLogsEntry of req.body.resourceLogs) { + const resAttrs = resource2Attrs(resourceLogsEntry.resource) + for (const scopeLogsEntry of resourceLogsEntry.scopeLogs) { + const scopeAttrs = { + ...resAttrs, + ...resource2Attrs(scopeLogsEntry.scope) + } + for (const logRecord of scopeLogsEntry.logRecords) { + const logAttrs = { + ...scopeAttrs, + ...resource2Attrs(logRecord) + } + if (logRecord.severityText) { + logAttrs.level = logRecord.severityText + } + const labels = stringify(logAttrs) + const fingerprint = fingerPrint(labels) + const ts = BigInt(logRecord.timeUnixNano) + promises.push(bulk.add([[ + fingerprint, + ts, + null, + anyValueToString(logRecord.body), + logType + ]])) + const date = new Date(Number(ts / BigInt(1000000))).toISOString().split('T')[0] + !fingerprints[fingerprint] && promises.push(bulk_labels.add([[ + date, + fingerprint, + labels, + labels.name || '', + logType + ]])) + fingerprints[fingerprint] = true + } + } + } + await Promise.all(promises) + } catch (error) { + await asyncLogError(error) + res.status(500).send({ error: 'Internal Server Error' }) + } +} + +function resource2Attrs (resource) { + if (!resource || !resource.attributes) { + return {} + } + const attrs = {} + for (const attribute of resource.attributes) { + attrs[normalizeAttrName(attribute.key)] = anyValueToString(attribute.value) + } + return attrs +} + +function normalizeAttrName (name) { + return name.replaceAll(/[^a-zA-Z0-9_]/g, '_') +} + +function anyValueToString (value) { + if (!value) { + return '' + } + if (value.stringValue) { + return value.stringValue + } + if (value.boolValue) { + return value.boolValue ? 'true' : 'false' + } + if (value.intValue) { + return value.intValue.toString() + } + if (value.doubleValue) { + return value.doubleValue.toString() + } + if (value.bytesValue) { + return Buffer.from(value.bytesValue).toString('base64') + } + if (value.arrayValue) { + return JSON.stringify(value.arrayValue.values.map(anyValueToString)) + } + if (value.kvlistValue) { + return JSON.stringify(value.kvlistValue.values.reduce((agg, pair) => ({ + ...agg, + [pair.key]: anyValueToString(pair.value) + }))) + } + return '' +} + +module.exports = handle diff --git a/lib/otlp.proto b/lib/otlp.proto index 81a15fd3..cee0cf0f 100644 --- a/lib/otlp.proto +++ b/lib/otlp.proto @@ -340,3 +340,180 @@ message Status { // The status code. StatusCode code = 3; } + +// Recource logs definition + +message LogsData { + // An array of ResourceLogs. + // For data coming from a single resource this array will typically contain + // one element. Intermediary nodes that receive data from multiple origins + // typically batch the data before forwarding further and in that case this + // array will contain multiple elements. + repeated ResourceLogs resource_logs = 1; +} + +// A collection of ScopeLogs from a Resource. +message ResourceLogs { + reserved 1000; + + // The resource for the logs in this message. + // If this field is not set then resource info is unknown. + Resource resource = 1; + + // A list of ScopeLogs that originate from a resource. + repeated ScopeLogs scope_logs = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the resource data + // is recorded in. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_logs" field which have their own schema_url field. + string schema_url = 3; +} + +// A collection of Logs produced by a Scope. +message ScopeLogs { + // The instrumentation scope information for the logs in this message. + // Semantically when InstrumentationScope isn't set, it is equivalent with + // an empty instrumentation scope name (unknown). + InstrumentationScope scope = 1; + + // A list of log records. + repeated LogRecord log_records = 2; + + // The Schema URL, if known. This is the identifier of the Schema that the log data + // is recorded in. To learn more about Schema URL see + // https://opentelemetry.io/docs/specs/otel/schemas/#schema-url + // This schema_url applies to all logs in the "logs" field. + string schema_url = 3; +} + +// Possible values for LogRecord.SeverityNumber. +enum SeverityNumber { + // UNSPECIFIED is the default SeverityNumber, it MUST NOT be used. + SEVERITY_NUMBER_UNSPECIFIED = 0; + SEVERITY_NUMBER_TRACE = 1; + SEVERITY_NUMBER_TRACE2 = 2; + SEVERITY_NUMBER_TRACE3 = 3; + SEVERITY_NUMBER_TRACE4 = 4; + SEVERITY_NUMBER_DEBUG = 5; + SEVERITY_NUMBER_DEBUG2 = 6; + SEVERITY_NUMBER_DEBUG3 = 7; + SEVERITY_NUMBER_DEBUG4 = 8; + SEVERITY_NUMBER_INFO = 9; + SEVERITY_NUMBER_INFO2 = 10; + SEVERITY_NUMBER_INFO3 = 11; + SEVERITY_NUMBER_INFO4 = 12; + SEVERITY_NUMBER_WARN = 13; + SEVERITY_NUMBER_WARN2 = 14; + SEVERITY_NUMBER_WARN3 = 15; + SEVERITY_NUMBER_WARN4 = 16; + SEVERITY_NUMBER_ERROR = 17; + SEVERITY_NUMBER_ERROR2 = 18; + SEVERITY_NUMBER_ERROR3 = 19; + SEVERITY_NUMBER_ERROR4 = 20; + SEVERITY_NUMBER_FATAL = 21; + SEVERITY_NUMBER_FATAL2 = 22; + SEVERITY_NUMBER_FATAL3 = 23; + SEVERITY_NUMBER_FATAL4 = 24; +} + +// LogRecordFlags represents constants used to interpret the +// LogRecord.flags field, which is protobuf 'fixed32' type and is to +// be used as bit-fields. Each non-zero value defined in this enum is +// a bit-mask. To extract the bit-field, for example, use an +// expression like: +// +// (logRecord.flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK) +// +enum LogRecordFlags { + // The zero value for the enum. Should not be used for comparisons. + // Instead use bitwise "and" with the appropriate mask as shown above. + LOG_RECORD_FLAGS_DO_NOT_USE = 0; + + // Bits 0-7 are used for trace flags. + LOG_RECORD_FLAGS_TRACE_FLAGS_MASK = 0x000000FF; + + // Bits 8-31 are reserved for future use. +} + +// A log record according to OpenTelemetry Log Data Model: +// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md +message LogRecord { + reserved 4; + + // time_unix_nano is the time when the event occurred. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // Value of 0 indicates unknown or missing timestamp. + fixed64 time_unix_nano = 1; + + // Time when the event was observed by the collection system. + // For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK) + // this timestamp is typically set at the generation time and is equal to Timestamp. + // For events originating externally and collected by OpenTelemetry (e.g. using + // Collector) this is the time when OpenTelemetry's code observed the event measured + // by the clock of the OpenTelemetry code. This field MUST be set once the event is + // observed by OpenTelemetry. + // + // For converting OpenTelemetry log data to formats that support only one timestamp or + // when receiving OpenTelemetry log data by recipients that support only one timestamp + // internally the following logic is recommended: + // - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // Value of 0 indicates unknown or missing timestamp. + fixed64 observed_time_unix_nano = 11; + + // Numerical value of the severity, normalized to values described in Log Data Model. + // [Optional]. + SeverityNumber severity_number = 2; + + // The severity text (also known as log level). The original string representation as + // it is known at the source. [Optional]. + string severity_text = 3; + + // A value containing the body of the log record. Can be for example a human-readable + // string message (including multi-line) describing the event in a free form or it can + // be a structured data composed of arrays and maps of other values. [Optional]. + AnyValue body = 5; + + // Additional attributes that describe the specific event occurrence. [Optional]. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated KeyValue attributes = 6; + uint32 dropped_attributes_count = 7; + + // Flags, a bit field. 8 least significant bits are the trace flags as + // defined in W3C Trace Context specification. 24 most significant bits are reserved + // and must be set to 0. Readers must not assume that 24 most significant bits + // will be zero and must correctly mask the bits when reading 8-bit trace flag (use + // flags & LOG_RECORD_FLAGS_TRACE_FLAGS_MASK). [Optional]. + fixed32 flags = 8; + + // A unique identifier for a trace. All logs from the same trace share + // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR + // of length other than 16 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is optional. + // + // The receivers SHOULD assume that the log record is not associated with a + // trace if any of the following is true: + // - the field is not present, + // - the field contains an invalid value. + bytes trace_id = 9; + + // A unique identifier for a span within a trace, assigned when the span + // is created. The ID is an 8-byte array. An ID with all zeroes OR of length + // other than 8 bytes is considered invalid (empty string in OTLP/JSON + // is zero-length and thus is also invalid). + // + // This field is optional. If the sender specifies a valid span_id then it SHOULD also + // specify a valid trace_id. + // + // The receivers SHOULD assume that the log record is not associated with a + // span if any of the following is true: + // - the field is not present, + // - the field contains an invalid value. + bytes span_id = 10; +} diff --git a/parsers.js b/parsers.js index 6cd4da05..533f1675 100644 --- a/parsers.js +++ b/parsers.js @@ -13,6 +13,7 @@ const path = require('path') const WriteRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'prompb.proto')).lookupType('WriteRequest') const PushRequest = protobufjs.loadSync(path.join(__dirname, 'lib', 'loki.proto')).lookupType('PushRequest') const OTLPTraceData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('TracesData') +const OTLPLogsData = protobufjs.loadSync(path.join(__dirname, 'lib', 'otlp.proto')).lookupType('LogsData') const { parse: queryParser } = require('fast-querystring') /** @@ -202,6 +203,29 @@ function tempoNDJsonParser (req, payload) { return parser } +/** + * + * @param req {FastifyRequest} + * @param payload {Stream} + * @returns {*} + */ +async function otlpLogsDataParser (req, payload) { + const length = getContentLength(req, 5e6) + await shaper.register(length) + let body = [] + const otelStream = stream.Readable.from(payload) + otelStream.on('data', data => { + body.push(data) + }) + await new Promise(resolve => otelStream.once('end', resolve)) + body = Buffer.concat(body) + body = OTLPLogsData.toObject(OTLPLogsData.decode(body), { + longs: String, + bytes: String + }) + return body +} + /** * * @param subparsers {function(FastifyRequest): Promise<*|undefined>} @@ -363,5 +387,6 @@ module.exports = { tempoNDJsonParser, otlpPushProtoParser, wwwFormParser, - parsers + parsers, + otlpLogsDataParser } diff --git a/qryn_bun.mjs b/qryn_bun.mjs index 992ed4e7..b972e45e 100644 --- a/qryn_bun.mjs +++ b/qryn_bun.mjs @@ -12,7 +12,7 @@ import { combinedParser, jsonParser, lokiPushJSONParser, - lokiPushProtoParser, otlpPushProtoParser, prometheusPushProtoParser, + lokiPushProtoParser, otlpLogsDataParser, otlpPushProtoParser, prometheusPushProtoParser, rawStringParser, tempoPushNDJSONParser, tempoPushParser, wwwFormParser, yamlParser @@ -56,6 +56,7 @@ import handlerPromGetRules from './lib/handlers/alerts/prom_get_rules.js' import handlerTail from './lib/handlers/tail.js' import handlerTempoLabelV2 from './lib/handlers/tempo_v2_tags.js' import handlerTempoLabelV2Values from './lib/handlers/tempo_v2_values.js' +import handlerOtlpLogsPush from './lib/handlers/otlp_log_push.js' import {init as pyroscopeInit } from './pyroscope/pyroscope.js' import { boolEnv, readonly, readerMode, writerMode } from './common.js' @@ -332,6 +333,10 @@ export default async() => { readerMode && pyroscopeInit(fastify) + writerMode && fastify.post('/v1/logs', handlerOtlpLogsPush, { + '*': otlpLogsDataParser + }) + const serveView = fs.existsSync(path.join(__dirname, 'view/index.html')) if (serveView) { app.plug(group(path.join(__dirname, 'view'))); diff --git a/qryn_node.js b/qryn_node.js index 1b863b80..a42c1560 100755 --- a/qryn_node.js +++ b/qryn_node.js @@ -59,13 +59,11 @@ const { shaper, parsers, lokiPushJSONParser, lokiPushProtoParser, jsonParser, rawStringParser, tempoPushParser, tempoPushNDJSONParser, - yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser + yamlParser, prometheusPushProtoParser, combinedParser, otlpPushProtoParser, wwwFormParser, otlpLogsDataParser } = require('./parsers') const fastifyPlugin = require('fastify-plugin') - - let fastify = require('fastify')({ logger, bodyLimit: parseInt(process.env.FASTIFY_BODYLIMIT) || 5242880, @@ -460,6 +458,11 @@ let fastify = require('fastify')({ readerMode && require('./pyroscope/pyroscope').init(fastify) + const handleOTLPLogs = require('./lib/handlers/otlp_log_push').bind(this) + writerMode && fastify.post('/v1/logs', handleOTLPLogs, { + '*': otlpLogsDataParser + }) + // Run API Service fastify.listen( {