diff --git a/Cargo.lock b/Cargo.lock index 7530aa0f4da5..2c5c7f2ab582 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4141,6 +4141,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -4692,6 +4698,11 @@ name = "hashbrown" version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "hashlink" @@ -10984,7 +10995,7 @@ dependencies = [ "derive_builder 0.12.0", "futures", "futures-util", - "hashbrown 0.14.5", + "hashbrown 0.15.0", "headers 0.3.9", "hostname", "http 0.2.12", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 33831ba639bb..a4508291993e 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -53,7 +53,7 @@ datatypes.workspace = true derive_builder.workspace = true futures = "0.3" futures-util.workspace = true -hashbrown = "0.14" +hashbrown = "0.15" headers = "0.3" hostname = "0.3" http.workspace = true diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 88a0ad21b623..31aa5342be57 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -514,6 +514,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid Loki JSON request: {}", msg))] + InvalidLokiPayload { + msg: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unsupported content type: {:?}", content_type))] UnsupportedContentType { content_type: ContentType, @@ -660,6 +667,7 @@ impl ErrorExt for Error { | MysqlValueConversion { .. } | ParseJson { .. } | ParseJson5 { .. } + | InvalidLokiPayload { .. } | UnsupportedContentType { .. } | TimestampOverflow { .. } | OpenTelemetryLog { .. } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index dd618f24a3f7..816bccc61e26 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -81,6 +81,7 @@ pub mod handler; pub mod header; pub mod influxdb; pub mod logs; +pub mod loki; pub mod mem_prof; pub mod opentsdb; pub mod otlp; @@ -742,7 +743,12 @@ impl HttpServer { fn route_loki(log_state: LogState) -> Router { Router::new() - .route("/api/v1/push", routing::post(event::loki_ingest)) + .route("/api/v1/push", routing::post(loki::loki_ingest)) + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_error)) + .layer(RequestDecompressionLayer::new()), + ) .with_state(log_state) } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 14e8ad7dd5e4..951c796ac39e 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -12,16 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; use std::result::Result as StdResult; +use std::str::FromStr; use std::sync::Arc; use std::time::Instant; -use api::v1::value::ValueData; -use api::v1::{ - ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, - Value as GreptimeValue, -}; +use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; use axum::body::HttpBody; use axum::extract::{FromRequest, Multipart, Path, Query, State}; use axum::headers::ContentType; @@ -29,28 +25,23 @@ use axum::http::header::CONTENT_TYPE; use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, Json, TypedHeader}; -use bytes::Bytes; use common_error::ext::ErrorExt; -use common_query::prelude::GREPTIME_TIMESTAMP; use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; use datatypes::value::column_data_to_json; use lazy_static::lazy_static; -use loki_api::prost_types::Timestamp; use pipeline::error::PipelineTransformSnafu; use pipeline::util::to_pipeline_version; use pipeline::{GreptimeTransformer, PipelineVersion}; -use prost::Message; use serde::{Deserialize, Serialize}; use serde_json::{json, Deserializer, Map, Value}; use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - status_code_to_http_status, CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, - ParseJson5Snafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, + status_code_to_http_status, CatalogSnafu, Error, InvalidParameterSnafu, ParseJsonSnafu, + PipelineSnafu, Result, UnsupportedContentTypeSnafu, }; -use crate::http::extractor::LogTableName; use crate::http::header::CONTENT_TYPE_PROTOBUF_STR; use crate::http::result::greptime_manage_resp::GreptimedbManageResponse; use crate::http::result::greptime_result_v1::GreptimedbV1Response; @@ -58,35 +49,19 @@ use crate::http::HttpResponse; use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use crate::metrics::{ METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED, - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_LOKI_LOGS_INGESTION_COUNTER, - METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE, + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE, }; -use crate::prom_store; use crate::query_handler::PipelineHandlerRef; const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_"; const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; -const LOKI_TABLE_NAME: &str = "loki_logs"; -const LOKI_LINE_COLUMN: &str = "line"; - lazy_static! { - static ref LOKI_INIT_SCHEMAS: Vec = vec![ - ColumnSchema { - column_name: GREPTIME_TIMESTAMP.to_string(), - datatype: ColumnDataType::TimestampNanosecond.into(), - semantic_type: SemanticType::Timestamp.into(), - datatype_extension: None, - options: None, - }, - ColumnSchema { - column_name: LOKI_LINE_COLUMN.to_string(), - datatype: ColumnDataType::String.into(), - semantic_type: SemanticType::Field.into(), - datatype_extension: None, - options: None, - }, - ]; + pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json(); + pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text(); + pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8(); + pub static ref PB_CONTENT_TYPE: ContentType = + ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap(); } #[derive(Debug, Default, Serialize, Deserialize)] @@ -484,146 +459,6 @@ pub async fn pipeline_dryrun( } } -#[axum_macros::debug_handler] -pub async fn loki_ingest( - State(log_state): State, - Extension(mut ctx): Extension, - TypedHeader(content_type): TypedHeader, - LogTableName(table_name): LogTableName, - bytes: Bytes, -) -> Result { - ctx.set_channel(Channel::Loki); - let ctx = Arc::new(ctx); - let db = ctx.get_db_string(); - let db_str = db.as_str(); - let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string()); - let exec_timer = Instant::now(); - - // decompress req - ensure!( - content_type.to_string() == CONTENT_TYPE_PROTOBUF_STR, - UnsupportedContentTypeSnafu { content_type } - ); - let decompressed = prom_store::snappy_decompress(&bytes).unwrap(); - let req = loki_api::logproto::PushRequest::decode(&decompressed[..]) - .context(DecodeOtlpRequestSnafu)?; - - // init schemas - let mut schemas = LOKI_INIT_SCHEMAS.clone(); - - let mut global_label_key_index: HashMap = HashMap::new(); - global_label_key_index.insert(GREPTIME_TIMESTAMP.to_string(), 0); - global_label_key_index.insert(LOKI_LINE_COLUMN.to_string(), 1); - - let mut rows = vec![]; - - for stream in req.streams { - // parse labels for each row - // encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145 - // use very dirty hack to parse labels - let labels = stream.labels.replace("=", ":"); - // use btreemap to keep order - let labels: BTreeMap = json5::from_str(&labels).context(ParseJson5Snafu)?; - - // process entries - for entry in stream.entries { - let ts = if let Some(ts) = entry.timestamp { - ts - } else { - continue; - }; - let line = entry.line; - - // create and init row - let mut row = Vec::with_capacity(schemas.len()); - for _ in 0..schemas.len() { - row.push(GreptimeValue { value_data: None }); - } - // insert ts and line - row[0] = GreptimeValue { - value_data: Some(ValueData::TimestampNanosecondValue(prost_ts_to_nano(&ts))), - }; - row[1] = GreptimeValue { - value_data: Some(ValueData::StringValue(line)), - }; - // insert labels - for (k, v) in labels.iter() { - if let Some(index) = global_label_key_index.get(k) { - // exist in schema - // insert value using index - row[*index as usize] = GreptimeValue { - value_data: Some(ValueData::StringValue(v.clone())), - }; - } else { - // not exist - // add schema and append to values - schemas.push(ColumnSchema { - column_name: k.clone(), - datatype: ColumnDataType::String.into(), - semantic_type: SemanticType::Tag.into(), - datatype_extension: None, - options: None, - }); - global_label_key_index.insert(k.clone(), (schemas.len() - 1) as i32); - - row.push(GreptimeValue { - value_data: Some(ValueData::StringValue(v.clone())), - }); - } - } - - rows.push(row); - } - } - - // fill Null for missing values - for row in rows.iter_mut() { - if row.len() < schemas.len() { - for _ in row.len()..schemas.len() { - row.push(GreptimeValue { value_data: None }); - } - } - } - - let rows = Rows { - rows: rows.into_iter().map(|values| Row { values }).collect(), - schema: schemas, - }; - - let ins_req = RowInsertRequest { - table_name, - rows: Some(rows), - }; - let ins_reqs = RowInsertRequests { - inserts: vec![ins_req], - }; - - let handler = log_state.log_handler; - let output = handler.insert(ins_reqs, ctx).await; - - if let Ok(Output { - data: OutputData::AffectedRows(rows), - meta: _, - }) = &output - { - METRIC_LOKI_LOGS_INGESTION_COUNTER - .with_label_values(&[db_str]) - .inc_by(*rows as u64); - METRIC_LOKI_LOGS_INGESTION_ELAPSED - .with_label_values(&[db_str, METRIC_SUCCESS_VALUE]) - .observe(exec_timer.elapsed().as_secs_f64()); - } else { - METRIC_LOKI_LOGS_INGESTION_ELAPSED - .with_label_values(&[db_str, METRIC_FAILURE_VALUE]) - .observe(exec_timer.elapsed().as_secs_f64()); - } - - let response = GreptimedbV1Response::from_output(vec![output]) - .await - .with_execution_time(exec_timer.elapsed().as_millis() as u64); - Ok(response) -} - #[axum_macros::debug_handler] pub async fn log_ingester( State(log_state): State, @@ -682,11 +517,11 @@ fn extract_pipeline_value_by_content_type( ignore_errors: bool, ) -> Result> { Ok(match content_type { - ct if ct == ContentType::json() => transform_ndjson_array_factory( + ct if ct == *JSON_CONTENT_TYPE => transform_ndjson_array_factory( Deserializer::from_str(&payload).into_iter(), ignore_errors, )?, - ct if ct == ContentType::text() || ct == ContentType::text_utf8() => payload + ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload .lines() .filter(|line| !line.is_empty()) .map(|line| Value::String(line.to_string())) @@ -808,11 +643,6 @@ pub struct LogState { pub ingest_interceptor: Option>, } -#[inline] -fn prost_ts_to_nano(ts: &Timestamp) -> i64 { - ts.seconds * 1_000_000_000 + ts.nanos as i64 -} - #[cfg(test)] mod tests { use super::*; @@ -847,16 +677,4 @@ mod tests { .to_string(); assert_eq!(a, "[{\"a\":1},{\"b\":2}]"); } - - #[test] - fn test_ts_to_nano() { - // ts = 1731748568804293888 - // seconds = 1731748568 - // nano = 804293888 - let ts = Timestamp { - seconds: 1731748568, - nanos: 804293888, - }; - assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888); - } } diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs new file mode 100644 index 000000000000..b1014110613b --- /dev/null +++ b/src/servers/src/http/loki.rs @@ -0,0 +1,377 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::Instant; + +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, + Value as GreptimeValue, +}; +use axum::extract::State; +use axum::headers::ContentType; +use axum::{Extension, TypedHeader}; +use bytes::Bytes; +use common_query::prelude::GREPTIME_TIMESTAMP; +use common_query::{Output, OutputData}; +use common_telemetry::warn; +use hashbrown::HashMap; +use lazy_static::lazy_static; +use loki_api::prost_types::Timestamp; +use prost::Message; +use session::context::{Channel, QueryContext}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{ + DecodeOtlpRequestSnafu, InvalidLokiPayloadSnafu, ParseJson5Snafu, ParseJsonSnafu, Result, + UnsupportedContentTypeSnafu, +}; +use crate::http::event::{LogState, JSON_CONTENT_TYPE, PB_CONTENT_TYPE}; +use crate::http::extractor::LogTableName; +use crate::http::result::greptime_result_v1::GreptimedbV1Response; +use crate::http::HttpResponse; +use crate::metrics::{ + METRIC_FAILURE_VALUE, METRIC_LOKI_LOGS_INGESTION_COUNTER, METRIC_LOKI_LOGS_INGESTION_ELAPSED, + METRIC_SUCCESS_VALUE, +}; +use crate::{prom_store, unwrap_or_warn_continue}; + +const LOKI_TABLE_NAME: &str = "loki_logs"; +const LOKI_LINE_COLUMN: &str = "line"; + +const STREAMS_KEY: &str = "streams"; +const LABEL_KEY: &str = "stream"; +const LINES_KEY: &str = "values"; + +lazy_static! { + static ref LOKI_INIT_SCHEMAS: Vec = vec![ + ColumnSchema { + column_name: GREPTIME_TIMESTAMP.to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + datatype_extension: None, + options: None, + }, + ColumnSchema { + column_name: LOKI_LINE_COLUMN.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + datatype_extension: None, + options: None, + }, + ]; +} + +#[axum_macros::debug_handler] +pub async fn loki_ingest( + State(log_state): State, + Extension(mut ctx): Extension, + TypedHeader(content_type): TypedHeader, + LogTableName(table_name): LogTableName, + bytes: Bytes, +) -> Result { + ctx.set_channel(Channel::Loki); + let ctx = Arc::new(ctx); + let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string()); + let db = ctx.get_db_string(); + let db_str = db.as_str(); + let exec_timer = Instant::now(); + + // init schemas + let mut schemas = LOKI_INIT_SCHEMAS.clone(); + + let mut rows = match content_type { + x if x == *JSON_CONTENT_TYPE => handle_json_req(bytes, &mut schemas).await, + x if x == *PB_CONTENT_TYPE => handle_pb_req(bytes, &mut schemas).await, + _ => UnsupportedContentTypeSnafu { content_type }.fail(), + }?; + + // fill Null for missing values + for row in rows.iter_mut() { + if row.len() < schemas.len() { + for _ in row.len()..schemas.len() { + row.push(GreptimeValue { value_data: None }); + } + } + } + + let rows = Rows { + rows: rows.into_iter().map(|values| Row { values }).collect(), + schema: schemas, + }; + let ins_req = RowInsertRequest { + table_name, + rows: Some(rows), + }; + let ins_reqs = RowInsertRequests { + inserts: vec![ins_req], + }; + + let handler = log_state.log_handler; + let output = handler.insert(ins_reqs, ctx).await; + + if let Ok(Output { + data: OutputData::AffectedRows(rows), + meta: _, + }) = &output + { + METRIC_LOKI_LOGS_INGESTION_COUNTER + .with_label_values(&[db_str]) + .inc_by(*rows as u64); + METRIC_LOKI_LOGS_INGESTION_ELAPSED + .with_label_values(&[db_str, METRIC_SUCCESS_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } else { + METRIC_LOKI_LOGS_INGESTION_ELAPSED + .with_label_values(&[db_str, METRIC_FAILURE_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } + + let response = GreptimedbV1Response::from_output(vec![output]) + .await + .with_execution_time(exec_timer.elapsed().as_millis() as u64); + Ok(response) +} + +async fn handle_json_req( + bytes: Bytes, + schemas: &mut Vec, +) -> Result>> { + let mut column_indexer: HashMap = HashMap::new(); + column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0); + column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1); + + let payload: serde_json::Value = + serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?; + + let streams = payload + .get(STREAMS_KEY) + .context(InvalidLokiPayloadSnafu { + msg: "missing streams", + })? + .as_array() + .context(InvalidLokiPayloadSnafu { + msg: "streams is not an array", + })?; + + let mut rows = Vec::with_capacity(1000); + + for (stream_index, stream) in streams.iter().enumerate() { + // parse lines first + // do not use `?` in case there are multiple streams + let lines = unwrap_or_warn_continue!( + stream.get(LINES_KEY), + "missing values on stream {}", + stream_index + ); + let lines = unwrap_or_warn_continue!( + lines.as_array(), + "values is not an array on stream {}", + stream_index + ); + + // get labels + let labels = stream + .get(LABEL_KEY) + .and_then(|label| label.as_object()) + .map(|l| { + l.iter() + .filter_map(|(k, v)| v.as_str().map(|v| (k.clone(), v.to_string()))) + .collect::>() + }) + .unwrap_or_default(); + + // process each line + for (line_index, line) in lines.iter().enumerate() { + let line = unwrap_or_warn_continue!( + line.as_array(), + "missing line on stream {} index {}", + stream_index, + line_index + ); + if line.len() < 2 { + warn!( + "line on stream {} index {} is too short", + stream_index, line_index + ); + continue; + } + // get ts + let ts = unwrap_or_warn_continue!( + line.first() + .and_then(|ts| ts.as_str()) + .and_then(|ts| ts.parse::().ok()), + "missing or invalid timestamp on stream {} index {}", + stream_index, + line_index + ); + // get line + let line_text = unwrap_or_warn_continue!( + line.get(1) + .and_then(|line| line.as_str()) + .map(|line| line.to_string()), + "missing or invalid line on stream {} index {}", + stream_index, + line_index + ); + // TODO(shuiyisong): we'll ignore structured metadata for now + + let mut row = init_row(schemas.len(), ts, line_text); + process_labels(&mut column_indexer, schemas, &mut row, labels.iter()); + + rows.push(row); + } + } + + Ok(rows) +} + +async fn handle_pb_req( + bytes: Bytes, + schemas: &mut Vec, +) -> Result>> { + let decompressed = prom_store::snappy_decompress(&bytes).unwrap(); + let req = loki_api::logproto::PushRequest::decode(&decompressed[..]) + .context(DecodeOtlpRequestSnafu)?; + + let mut column_indexer: HashMap = HashMap::new(); + column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0); + column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1); + + let cnt = req.streams.iter().map(|s| s.entries.len()).sum::(); + let mut rows = Vec::with_capacity(cnt); + + for stream in req.streams { + // parse labels for each row + // encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145 + // use very dirty hack to parse labels + // TODO(shuiyisong): remove json5 and parse the string directly + let labels = stream.labels.replace("=", ":"); + // use btreemap to keep order + let labels: BTreeMap = json5::from_str(&labels).context(ParseJson5Snafu)?; + + // process entries + for entry in stream.entries { + let ts = if let Some(ts) = entry.timestamp { + ts + } else { + continue; + }; + let line = entry.line; + + let mut row = init_row(schemas.len(), prost_ts_to_nano(&ts), line); + process_labels(&mut column_indexer, schemas, &mut row, labels.iter()); + + rows.push(row); + } + } + + Ok(rows) +} + +#[inline] +fn prost_ts_to_nano(ts: &Timestamp) -> i64 { + ts.seconds * 1_000_000_000 + ts.nanos as i64 +} + +fn init_row(schema_len: usize, ts: i64, line: String) -> Vec { + // create and init row + let mut row = Vec::with_capacity(schema_len); + // set ts and line + row.push(GreptimeValue { + value_data: Some(ValueData::TimestampNanosecondValue(ts)), + }); + row.push(GreptimeValue { + value_data: Some(ValueData::StringValue(line)), + }); + for _ in 0..(schema_len - 2) { + row.push(GreptimeValue { value_data: None }); + } + row +} + +fn process_labels<'a>( + column_indexer: &mut HashMap, + schemas: &mut Vec, + row: &mut Vec, + labels: impl Iterator, +) { + // insert labels + for (k, v) in labels { + if let Some(index) = column_indexer.get(k) { + // exist in schema + // insert value using index + row[*index as usize] = GreptimeValue { + value_data: Some(ValueData::StringValue(v.clone())), + }; + } else { + // not exist + // add schema and append to values + schemas.push(ColumnSchema { + column_name: k.clone(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + datatype_extension: None, + options: None, + }); + column_indexer.insert(k.clone(), (schemas.len() - 1) as u16); + + row.push(GreptimeValue { + value_data: Some(ValueData::StringValue(v.clone())), + }); + } + } +} + +#[macro_export] +macro_rules! unwrap_or_warn_continue { + ($expr:expr, $msg:expr) => { + if let Some(value) = $expr { + value + } else { + warn!($msg); + continue; + } + }; + + ($expr:expr, $fmt:expr, $($arg:tt)*) => { + if let Some(value) = $expr { + value + } else { + warn!($fmt, $($arg)*); + continue; + } + }; +} + +#[cfg(test)] +mod tests { + use loki_api::prost_types::Timestamp; + + use crate::http::loki::prost_ts_to_nano; + + #[test] + fn test_ts_to_nano() { + // ts = 1731748568804293888 + // seconds = 1731748568 + // nano = 804293888 + let ts = Timestamp { + seconds: 1731748568, + nanos: 804293888, + }; + assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888); + } +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7793c26d19fd..4245527cf707 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -96,7 +96,8 @@ macro_rules! http_tests { test_otlp_metrics, test_otlp_traces, test_otlp_logs, - test_loki_logs, + test_loki_pb_logs, + test_loki_json_logs, ); )* }; @@ -1670,7 +1671,17 @@ pub async fn test_otlp_metrics(store_type: StorageType) { let client = TestClient::new(app); // write metrics data - let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), false).await; + let res = send_req( + &client, + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + )], + "/v1/otlp/v1/metrics", + body.clone(), + false, + ) + .await; assert_eq!(StatusCode::OK, res.status()); // select metrics data @@ -1682,7 +1693,17 @@ pub async fn test_otlp_metrics(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write metrics data with gzip - let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), true).await; + let res = send_req( + &client, + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + )], + "/v1/otlp/v1/metrics", + body.clone(), + true, + ) + .await; assert_eq!(StatusCode::OK, res.status()); // select metrics data again @@ -1713,7 +1734,17 @@ pub async fn test_otlp_traces(store_type: StorageType) { let client = TestClient::new(app); // write traces data - let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), false).await; + let res = send_req( + &client, + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + )], + "/v1/otlp/v1/traces", + body.clone(), + false, + ) + .await; assert_eq!(StatusCode::OK, res.status()); // select traces data @@ -1734,7 +1765,17 @@ pub async fn test_otlp_traces(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write traces data with gzip - let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), true).await; + let res = send_req( + &client, + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + )], + "/v1/otlp/v1/traces", + body.clone(), + true, + ) + .await; assert_eq!(StatusCode::OK, res.status()); // select traces data again @@ -1765,10 +1806,16 @@ pub async fn test_otlp_logs(store_type: StorageType) { // write log data let res = send_req( &client, - vec![( - HeaderName::from_static("x-greptime-log-table-name"), - HeaderValue::from_static("logs1"), - )], + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-log-table-name"), + HeaderValue::from_static("logs1"), + ), + ], "/v1/otlp/v1/logs?db=public", body.clone(), false, @@ -1784,6 +1831,10 @@ pub async fn test_otlp_logs(store_type: StorageType) { let res = send_req( &client, vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), ( HeaderName::from_static("x-greptime-log-table-name"), HeaderValue::from_static("logs"), @@ -1813,9 +1864,9 @@ pub async fn test_otlp_logs(store_type: StorageType) { guard.remove_all().await; } -pub async fn test_loki_logs(store_type: StorageType) { +pub async fn test_loki_pb_logs(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_loke_logs").await; + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_loki_pb_logs").await; let client = TestClient::new(app); @@ -1862,7 +1913,7 @@ pub async fn test_loki_logs(store_type: StorageType) { // test schema let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; validate_data( - "loki_schema", + "loki_pb_schema", &client, "show create table loki_table_name;", expected, @@ -1872,7 +1923,75 @@ pub async fn test_loki_logs(store_type: StorageType) { // test content let expected = r#"[[1730976830000000000,"this is a log message","test","integration","do anything"],[1730976830000000000,"this is a log message","test","integration","do anything"]]"#; validate_data( - "loki_content", + "loki_pb_content", + &client, + "select * from loki_table_name;", + expected, + ) + .await; + + guard.remove_all().await; +} + +pub async fn test_loki_json_logs(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_loki_json_logs").await; + + let client = TestClient::new(app); + + let body = r#" +{ + "streams": [ + { + "stream": { + "source": "test" + }, + "values": [ + [ "1735901380059465984", "this is line one" ], + [ "1735901398478897920", "this is line two" ] + ] + } + ] +} + "#; + + let body = body.as_bytes().to_vec(); + + // write plain to loki + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/json"), + ), + ( + HeaderName::from_static(GREPTIME_LOG_TABLE_NAME_HEADER_NAME), + HeaderValue::from_static("loki_table_name"), + ), + ], + "/v1/loki/api/v1/push", + body, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // test schema + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + validate_data( + "loki_json_schema", + &client, + "show create table loki_table_name;", + expected, + ) + .await; + + // test content + let expected = "[[1735901380059465984,\"this is line one\",\"test\"],[1735901398478897920,\"this is line two\",\"test\"]]"; + validate_data( + "loki_json_content", &client, "select * from loki_table_name;", expected, @@ -1901,9 +2020,7 @@ async fn send_req( body: Vec, with_gzip: bool, ) -> TestResponse { - let mut req = client - .post(path) - .header("content-type", "application/x-protobuf"); + let mut req = client.post(path); for (k, v) in headers { req = req.header(k, v);