Skip to content

Commit

Permalink
feat: support Loki JSON write (#5288)
Browse files Browse the repository at this point in the history
* perf: small updates

* refactor: move loki to a separate file

* chore: extract content_type pattern matching

* chore: minor update

* feat: loki json write

* chore: add decompression http layer

* fix: label string value instead of to_string

* chore: add test

* fix: typo

* fix: license header

* chore: rename
  • Loading branch information
shuiyisong authored Jan 6, 2025
1 parent b229c94 commit bbbba29
Show file tree
Hide file tree
Showing 7 changed files with 550 additions and 213 deletions.
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ datatypes.workspace = true
derive_builder.workspace = true
futures = "0.3"
futures-util.workspace = true
hashbrown = "0.14"
hashbrown = "0.15"
headers = "0.3"
hostname = "0.3"
http.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid Loki JSON request: {}", msg))]
InvalidLokiPayload {
msg: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unsupported content type: {:?}", content_type))]
UnsupportedContentType {
content_type: ContentType,
Expand Down Expand Up @@ -660,6 +667,7 @@ impl ErrorExt for Error {
| MysqlValueConversion { .. }
| ParseJson { .. }
| ParseJson5 { .. }
| InvalidLokiPayload { .. }
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
| OpenTelemetryLog { .. }
Expand Down
8 changes: 7 additions & 1 deletion src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub mod handler;
pub mod header;
pub mod influxdb;
pub mod logs;
pub mod loki;
pub mod mem_prof;
pub mod opentsdb;
pub mod otlp;
Expand Down Expand Up @@ -742,7 +743,12 @@ impl HttpServer {

fn route_loki<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/api/v1/push", routing::post(event::loki_ingest))
.route("/api/v1/push", routing::post(loki::loki_ingest))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(RequestDecompressionLayer::new()),
)
.with_state(log_state)
}

Expand Down
206 changes: 12 additions & 194 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,81 +12,56 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::result::Result as StdResult;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;

use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType,
Value as GreptimeValue,
};
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use axum::body::HttpBody;
use axum::extract::{FromRequest, Multipart, Path, Query, State};
use axum::headers::ContentType;
use axum::http::header::CONTENT_TYPE;
use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, Json, TypedHeader};
use bytes::Bytes;
use common_error::ext::ErrorExt;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use datatypes::value::column_data_to_json;
use lazy_static::lazy_static;
use loki_api::prost_types::Timestamp;
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::{GreptimeTransformer, PipelineVersion};
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_json::{json, Deserializer, Map, Value};
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
status_code_to_http_status, CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu,
ParseJson5Snafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
status_code_to_http_status, CatalogSnafu, Error, InvalidParameterSnafu, ParseJsonSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::extractor::LogTableName;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_LOKI_LOGS_INGESTION_COUNTER,
METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
};
use crate::prom_store;
use crate::query_handler::PipelineHandlerRef;

const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";

const LOKI_TABLE_NAME: &str = "loki_logs";
const LOKI_LINE_COLUMN: &str = "line";

lazy_static! {
static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: LOKI_LINE_COLUMN.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
];
pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
pub static ref PB_CONTENT_TYPE: ContentType =
ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
}

#[derive(Debug, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -484,146 +459,6 @@ pub async fn pipeline_dryrun(
}
}

#[axum_macros::debug_handler]
pub async fn loki_ingest(
State(log_state): State<LogState>,
Extension(mut ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
LogTableName(table_name): LogTableName,
bytes: Bytes,
) -> Result<HttpResponse> {
ctx.set_channel(Channel::Loki);
let ctx = Arc::new(ctx);
let db = ctx.get_db_string();
let db_str = db.as_str();
let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
let exec_timer = Instant::now();

// decompress req
ensure!(
content_type.to_string() == CONTENT_TYPE_PROTOBUF_STR,
UnsupportedContentTypeSnafu { content_type }
);
let decompressed = prom_store::snappy_decompress(&bytes).unwrap();
let req = loki_api::logproto::PushRequest::decode(&decompressed[..])
.context(DecodeOtlpRequestSnafu)?;

// init schemas
let mut schemas = LOKI_INIT_SCHEMAS.clone();

let mut global_label_key_index: HashMap<String, i32> = HashMap::new();
global_label_key_index.insert(GREPTIME_TIMESTAMP.to_string(), 0);
global_label_key_index.insert(LOKI_LINE_COLUMN.to_string(), 1);

let mut rows = vec![];

for stream in req.streams {
// parse labels for each row
// encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
// use very dirty hack to parse labels
let labels = stream.labels.replace("=", ":");
// use btreemap to keep order
let labels: BTreeMap<String, String> = json5::from_str(&labels).context(ParseJson5Snafu)?;

// process entries
for entry in stream.entries {
let ts = if let Some(ts) = entry.timestamp {
ts
} else {
continue;
};
let line = entry.line;

// create and init row
let mut row = Vec::with_capacity(schemas.len());
for _ in 0..schemas.len() {
row.push(GreptimeValue { value_data: None });
}
// insert ts and line
row[0] = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(prost_ts_to_nano(&ts))),
};
row[1] = GreptimeValue {
value_data: Some(ValueData::StringValue(line)),
};
// insert labels
for (k, v) in labels.iter() {
if let Some(index) = global_label_key_index.get(k) {
// exist in schema
// insert value using index
row[*index as usize] = GreptimeValue {
value_data: Some(ValueData::StringValue(v.clone())),
};
} else {
// not exist
// add schema and append to values
schemas.push(ColumnSchema {
column_name: k.clone(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
datatype_extension: None,
options: None,
});
global_label_key_index.insert(k.clone(), (schemas.len() - 1) as i32);

row.push(GreptimeValue {
value_data: Some(ValueData::StringValue(v.clone())),
});
}
}

rows.push(row);
}
}

// fill Null for missing values
for row in rows.iter_mut() {
if row.len() < schemas.len() {
for _ in row.len()..schemas.len() {
row.push(GreptimeValue { value_data: None });
}
}
}

let rows = Rows {
rows: rows.into_iter().map(|values| Row { values }).collect(),
schema: schemas,
};

let ins_req = RowInsertRequest {
table_name,
rows: Some(rows),
};
let ins_reqs = RowInsertRequests {
inserts: vec![ins_req],
};

let handler = log_state.log_handler;
let output = handler.insert(ins_reqs, ctx).await;

if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
METRIC_LOKI_LOGS_INGESTION_COUNTER
.with_label_values(&[db_str])
.inc_by(*rows as u64);
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
} else {
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_FAILURE_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
}

let response = GreptimedbV1Response::from_output(vec![output])
.await
.with_execution_time(exec_timer.elapsed().as_millis() as u64);
Ok(response)
}

#[axum_macros::debug_handler]
pub async fn log_ingester(
State(log_state): State<LogState>,
Expand Down Expand Up @@ -682,11 +517,11 @@ fn extract_pipeline_value_by_content_type(
ignore_errors: bool,
) -> Result<Vec<Value>> {
Ok(match content_type {
ct if ct == ContentType::json() => transform_ndjson_array_factory(
ct if ct == *JSON_CONTENT_TYPE => transform_ndjson_array_factory(
Deserializer::from_str(&payload).into_iter(),
ignore_errors,
)?,
ct if ct == ContentType::text() || ct == ContentType::text_utf8() => payload
ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
.lines()
.filter(|line| !line.is_empty())
.map(|line| Value::String(line.to_string()))
Expand Down Expand Up @@ -808,11 +643,6 @@ pub struct LogState {
pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
}

#[inline]
fn prost_ts_to_nano(ts: &Timestamp) -> i64 {
ts.seconds * 1_000_000_000 + ts.nanos as i64
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -847,16 +677,4 @@ mod tests {
.to_string();
assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
}

#[test]
fn test_ts_to_nano() {
// ts = 1731748568804293888
// seconds = 1731748568
// nano = 804293888
let ts = Timestamp {
seconds: 1731748568,
nanos: 804293888,
};
assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
}
}
Loading

0 comments on commit bbbba29

Please sign in to comment.