Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Loki JSON write #5288

Merged
merged 11 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ datatypes.workspace = true
derive_builder.workspace = true
futures = "0.3"
futures-util.workspace = true
hashbrown = "0.14"
hashbrown = "0.15"
headers = "0.3"
hostname = "0.3"
http.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid Loki JSON request: {}", msg))]
InvalidLokiPayload {
msg: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unsupported content type: {:?}", content_type))]
UnsupportedContentType {
content_type: ContentType,
Expand Down Expand Up @@ -660,6 +667,7 @@ impl ErrorExt for Error {
| MysqlValueConversion { .. }
| ParseJson { .. }
| ParseJson5 { .. }
| InvalidLokiPayload { .. }
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
| OpenTelemetryLog { .. }
Expand Down
8 changes: 7 additions & 1 deletion src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub mod handler;
pub mod header;
pub mod influxdb;
pub mod logs;
pub mod loki;
pub mod mem_prof;
pub mod opentsdb;
pub mod otlp;
Expand Down Expand Up @@ -742,7 +743,12 @@ impl HttpServer {

fn route_loki<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/api/v1/push", routing::post(event::loki_ingest))
.route("/api/v1/push", routing::post(loki::loki_ingest))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(RequestDecompressionLayer::new()),
)
.with_state(log_state)
}

Expand Down
206 changes: 12 additions & 194 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,81 +12,56 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::result::Result as StdResult;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;

use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType,
Value as GreptimeValue,
};
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use axum::body::HttpBody;
use axum::extract::{FromRequest, Multipart, Path, Query, State};
use axum::headers::ContentType;
use axum::http::header::CONTENT_TYPE;
use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, Json, TypedHeader};
use bytes::Bytes;
use common_error::ext::ErrorExt;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use datatypes::value::column_data_to_json;
use lazy_static::lazy_static;
use loki_api::prost_types::Timestamp;
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::{GreptimeTransformer, PipelineVersion};
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_json::{json, Deserializer, Map, Value};
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
status_code_to_http_status, CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu,
ParseJson5Snafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
status_code_to_http_status, CatalogSnafu, Error, InvalidParameterSnafu, ParseJsonSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::extractor::LogTableName;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_LOKI_LOGS_INGESTION_COUNTER,
METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE,
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
};
use crate::prom_store;
use crate::query_handler::PipelineHandlerRef;

const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";

const LOKI_TABLE_NAME: &str = "loki_logs";
const LOKI_LINE_COLUMN: &str = "line";

lazy_static! {
static ref LOKI_INIT_SCHEMAS: Vec<ColumnSchema> = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: LOKI_LINE_COLUMN.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
];
pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
pub static ref PB_CONTENT_TYPE: ContentType =
ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
}

#[derive(Debug, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -484,146 +459,6 @@ pub async fn pipeline_dryrun(
}
}

#[axum_macros::debug_handler]
pub async fn loki_ingest(
State(log_state): State<LogState>,
Extension(mut ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
LogTableName(table_name): LogTableName,
bytes: Bytes,
) -> Result<HttpResponse> {
ctx.set_channel(Channel::Loki);
let ctx = Arc::new(ctx);
let db = ctx.get_db_string();
let db_str = db.as_str();
let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
let exec_timer = Instant::now();

// decompress req
ensure!(
content_type.to_string() == CONTENT_TYPE_PROTOBUF_STR,
UnsupportedContentTypeSnafu { content_type }
);
let decompressed = prom_store::snappy_decompress(&bytes).unwrap();
let req = loki_api::logproto::PushRequest::decode(&decompressed[..])
.context(DecodeOtlpRequestSnafu)?;

// init schemas
let mut schemas = LOKI_INIT_SCHEMAS.clone();

let mut global_label_key_index: HashMap<String, i32> = HashMap::new();
global_label_key_index.insert(GREPTIME_TIMESTAMP.to_string(), 0);
global_label_key_index.insert(LOKI_LINE_COLUMN.to_string(), 1);

let mut rows = vec![];

for stream in req.streams {
// parse labels for each row
// encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
// use very dirty hack to parse labels
let labels = stream.labels.replace("=", ":");
// use btreemap to keep order
let labels: BTreeMap<String, String> = json5::from_str(&labels).context(ParseJson5Snafu)?;

// process entries
for entry in stream.entries {
let ts = if let Some(ts) = entry.timestamp {
ts
} else {
continue;
};
let line = entry.line;

// create and init row
let mut row = Vec::with_capacity(schemas.len());
for _ in 0..schemas.len() {
row.push(GreptimeValue { value_data: None });
}
// insert ts and line
row[0] = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(prost_ts_to_nano(&ts))),
};
row[1] = GreptimeValue {
value_data: Some(ValueData::StringValue(line)),
};
// insert labels
for (k, v) in labels.iter() {
if let Some(index) = global_label_key_index.get(k) {
// exist in schema
// insert value using index
row[*index as usize] = GreptimeValue {
value_data: Some(ValueData::StringValue(v.clone())),
};
} else {
// not exist
// add schema and append to values
schemas.push(ColumnSchema {
column_name: k.clone(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
datatype_extension: None,
options: None,
});
global_label_key_index.insert(k.clone(), (schemas.len() - 1) as i32);

row.push(GreptimeValue {
value_data: Some(ValueData::StringValue(v.clone())),
});
}
}

rows.push(row);
}
}

// fill Null for missing values
for row in rows.iter_mut() {
if row.len() < schemas.len() {
for _ in row.len()..schemas.len() {
row.push(GreptimeValue { value_data: None });
}
}
}

let rows = Rows {
rows: rows.into_iter().map(|values| Row { values }).collect(),
schema: schemas,
};

let ins_req = RowInsertRequest {
table_name,
rows: Some(rows),
};
let ins_reqs = RowInsertRequests {
inserts: vec![ins_req],
};

let handler = log_state.log_handler;
let output = handler.insert(ins_reqs, ctx).await;

if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
METRIC_LOKI_LOGS_INGESTION_COUNTER
.with_label_values(&[db_str])
.inc_by(*rows as u64);
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
} else {
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_FAILURE_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
}

let response = GreptimedbV1Response::from_output(vec![output])
.await
.with_execution_time(exec_timer.elapsed().as_millis() as u64);
Ok(response)
}

#[axum_macros::debug_handler]
pub async fn log_ingester(
State(log_state): State<LogState>,
Expand Down Expand Up @@ -682,11 +517,11 @@ fn extract_pipeline_value_by_content_type(
ignore_errors: bool,
) -> Result<Vec<Value>> {
Ok(match content_type {
ct if ct == ContentType::json() => transform_ndjson_array_factory(
ct if ct == *JSON_CONTENT_TYPE => transform_ndjson_array_factory(
Deserializer::from_str(&payload).into_iter(),
ignore_errors,
)?,
ct if ct == ContentType::text() || ct == ContentType::text_utf8() => payload
ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
.lines()
.filter(|line| !line.is_empty())
.map(|line| Value::String(line.to_string()))
Expand Down Expand Up @@ -808,11 +643,6 @@ pub struct LogState {
pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
}

#[inline]
fn prost_ts_to_nano(ts: &Timestamp) -> i64 {
ts.seconds * 1_000_000_000 + ts.nanos as i64
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -847,16 +677,4 @@ mod tests {
.to_string();
assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
}

#[test]
fn test_ts_to_nano() {
// ts = 1731748568804293888
// seconds = 1731748568
// nano = 804293888
let ts = Timestamp {
seconds: 1731748568,
nanos: 804293888,
};
assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
}
}
Loading
Loading