diff --git a/Cargo.lock b/Cargo.lock index 374ac9d0560f..30a12d77255d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4143,9 +4143,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "foldhash" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" +checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f" [[package]] name = "form_urlencoded" diff --git a/src/servers/src/elasticsearch.rs b/src/servers/src/elasticsearch.rs new file mode 100644 index 000000000000..3206cda7c774 --- /dev/null +++ b/src/servers/src/elasticsearch.rs @@ -0,0 +1,363 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Instant; + +use axum::extract::{Query, State}; +use axum::headers::ContentType; +use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; +use axum::response::IntoResponse; +use axum::{Extension, TypedHeader}; +use common_error::ext::ErrorExt; +use common_telemetry::{debug, warn}; +use serde_json::{json, Deserializer, Value}; +use session::context::{Channel, QueryContext}; +use snafu::{ensure, ResultExt}; + +use crate::error::{InvalidElasticsearchInputSnafu, ParseJsonSnafu, Result as ServersResult}; +use crate::http::event::{ + ingest_logs_inner, LogIngesterQueryParams, LogState, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, +}; + +// The fake version of Elasticsearch and used for `_version` API. +const ELASTICSEARCH_VERSION: &str = "8.16.0"; + +// Return fake response for Elasticsearch ping request. +#[axum_macros::debug_handler] +pub async fn handle_get_version() -> impl IntoResponse { + let body = serde_json::json!({ + "version": { + "number": ELASTICSEARCH_VERSION + } + }); + (StatusCode::OK, elasticsearch_headers(), axum::Json(body)) +} + +// Return fake response for Elasticsearch license request. +// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/get-license.html. +#[axum_macros::debug_handler] +pub async fn handle_get_license() -> impl IntoResponse { + let body = serde_json::json!({ + "license": { + "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx", + "type": "oss", + "status": "active", + "expiry_date_in_millis": 4891198687000_i64, + } + }); + (StatusCode::OK, elasticsearch_headers(), axum::Json(body)) +} + +// Process `_bulk` API requests. Only support to create logs. +// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request. +#[axum_macros::debug_handler] +pub async fn handle_bulk_api( + State(log_state): State, + Query(params): Query, + Extension(mut query_ctx): Extension, + TypedHeader(_content_type): TypedHeader, + payload: String, +) -> impl IntoResponse { + let start = Instant::now(); + debug!( + "Received bulk request, params: {:?}, payload: {:?}", + params, payload + ); + + // The `schema` is already set in the query_ctx in auth process. + query_ctx.set_channel(Channel::Elasticsearch); + + // Record the ingestion time histogram. + let _timer = crate::metrics::METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED + .with_label_values(&[params.db.unwrap_or("public".to_string()).as_str()]) + .start_timer(); + + let table = if let Some(table) = params.table { + table + } else { + return ( + StatusCode::BAD_REQUEST, + elasticsearch_headers(), + axum::Json(write_bulk_response( + start.elapsed().as_millis() as i64, + 0, + StatusCode::BAD_REQUEST.as_u16() as u32, + "require parameter 'table'", + )), + ); + }; + + // If pipeline_name is not provided, use the internal pipeline. + let pipeline = if let Some(pipeline) = params.pipeline_name { + pipeline + } else { + GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string() + }; + + // Read the ndjson payload and convert it to a vector of Value. + let log_values = match convert_es_input_to_log_values(&payload, ¶ms.msg_field) { + Ok(log_values) => log_values, + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + elasticsearch_headers(), + axum::Json(write_bulk_response( + start.elapsed().as_millis() as i64, + 0, + StatusCode::BAD_REQUEST.as_u16() as u32, + e.to_string().as_str(), + )), + ); + } + }; + let log_num = log_values.len(); + + if let Err(e) = ingest_logs_inner( + log_state.log_handler, + pipeline, + None, + table, + log_values, + Arc::new(query_ctx), + ) + .await + { + warn!("Failed to ingest logs: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + elasticsearch_headers(), + axum::Json(write_bulk_response( + start.elapsed().as_millis() as i64, + 0, + e.status_code() as u32, + e.to_string().as_str(), + )), + ); + } + + ( + StatusCode::OK, + elasticsearch_headers(), + axum::Json(write_bulk_response( + start.elapsed().as_millis() as i64, + log_num, + StatusCode::CREATED.as_u16() as u32, + "", + )), + ) +} + +// It will generate the following response when write _bulk request to GreptimeDB successfully: +// { +// "took": 1000, +// "errors": false, +// "items": [ +// { "create": { "status": 201 } }, +// { "create": { "status": 201 } }, +// ... +// ] +// } +// If the status code is not 201, it will generate the following response: +// { +// "took": 1000, +// "errors": true, +// "items": [ +// { "create": { "status": 400, "error": { "type": "illegal_argument_exception", "reason": "" } } } +// ] +// } +fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value { + if error_reason.is_empty() { + let items: Vec = (0..n) + .map(|_| { + json!({ + "create": { + "status": status_code + } + }) + }) + .collect(); + json!({ + "took": took_ms, + "errors": false, + "items": items, + }) + } else { + json!({ + "took": took_ms, + "errors": true, + "items": [ + { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } } + ] + }) + } +} + +/// Returns the headers for every response of Elasticsearch API. +pub fn elasticsearch_headers() -> HeaderMap { + HeaderMap::from_iter([ + ( + axum::http::header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ), + // Logstash needs this header to identify the product. + ( + HeaderName::from_static("x-elastic-product"), + HeaderValue::from_static("Elasticsearch"), + ), + ]) +} + +// The input will be Elasticsearch bulk request in NDJSON format. +// For example, the input will be like this: +// { "index" : { "_index" : "test", "_id" : "1" } } +// { "field1" : "value1" } +// { "index" : { "_index" : "test", "_id" : "2" } } +// { "field2" : "value2" } +fn convert_es_input_to_log_values( + input: &str, + msg_field: &Option, +) -> ServersResult> { + // Read the ndjson payload and convert it to `Vec`. Return error if the input is not a valid JSON. + let values: Vec = Deserializer::from_str(input) + .into_iter::() + .collect::>() + .context(ParseJsonSnafu)?; + + // Check if the input is empty. + ensure!( + !values.is_empty(), + InvalidElasticsearchInputSnafu { + reason: "empty bulk request".to_string(), + } + ); + + let mut log_values: Vec = Vec::new(); + + // For Elasticsearch post `_bulk` API, each chunk contains two objects: + // 1. The first object is the command, it should be `create` or `index`. `create` is used for insert, `index` is used for upsert. + // 2. The second object is the document data. + let mut is_document = false; + for v in values { + if !is_document { + // Read the first object to get the command, it should be `create` or `index`. + ensure!( + v.get("create").is_some() || v.get("index").is_some(), + InvalidElasticsearchInputSnafu { + reason: format!( + "invalid bulk request, expected 'create' or 'index' but got {:?}", + v + ), + } + ); + is_document = true; + continue; + } + + // It means the second object is the document data. + if is_document { + // If the msg_field is provided, fetch the value of the field from the document data. + if let Some(msg_field) = msg_field { + if let Some(Value::String(message)) = v.get(msg_field) { + let value = match serde_json::from_str::(message) { + Ok(value) => value, + // If the message is not a valid JSON, just use the original message as the log value. + Err(_) => Value::String(message.to_string()), + }; + log_values.push(value); + } + } else { + log_values.push(v); + } + is_document = false; + } + } + + debug!("Received log data: {:?}", log_values); + + Ok(log_values) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_convert_es_input_to_log_values() { + let test_cases = vec![ + // Normal case. + ( + r#" + {"create":{"_index":"test","_id":"1"}} + {"foo1":"foo1_value", "bar1":"bar1_value"} + {"create":{"_index":"test","_id":"2"}} + {"foo2":"foo2_value","bar2":"bar2_value"} + "#, + None, + Ok(vec![ + json!({"foo1": "foo1_value", "bar1": "bar1_value"}), + json!({"foo2": "foo2_value", "bar2": "bar2_value"}), + ]), + ), + // Specify the `data` field as the message field and the value is a JSON string. + ( + r#" + {"create":{"_index":"test","_id":"1"}} + {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"} + {"create":{"_index":"test","_id":"2"}} + {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"} + "#, + Some("data".to_string()), + Ok(vec![ + json!({"foo1": "foo1_value", "bar1": "bar1_value"}), + json!({"foo2": "foo2_value", "bar2": "bar2_value"}), + ]), + ), + // Simulate the log data from Logstash. + ( + r#" + {"create":{"_id":null,"_index":"logs-generic-default","routing":null}} + {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}} + {"create":{"_id":null,"_index":"logs-generic-default","routing":null}} + {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}} + "#, + Some("message".to_string()), + Ok(vec![ + json!("172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""), + json!("10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""), + ]), + ), + // With invalid bulk request. + ( + r#" + { "not_create_or_index" : { "_index" : "test", "_id" : "1" } } + { "foo1" : "foo1_value", "bar1" : "bar1_value" } + "#, + None, + Err(InvalidElasticsearchInputSnafu { + reason: "it's a invalid bulk request".to_string(), + }), + ), + ]; + + for (input, msg_field, expected) in test_cases { + let log_values = convert_es_input_to_log_values(input, &msg_field); + if expected.is_ok() { + assert_eq!(log_values.unwrap(), expected.unwrap()); + } else { + assert!(log_values.is_err()); + } + } + } +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 31aa5342be57..f8c35cb25200 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -603,6 +603,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid elasticsearch input, reason: {}", reason))] + InvalidElasticsearchInput { + reason: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -674,7 +681,8 @@ impl ErrorExt for Error { | UnsupportedJsonDataTypeForTag { .. } | InvalidTableName { .. } | PrepareStatementNotFound { .. } - | FailedToParseQuery { .. } => StatusCode::InvalidArguments, + | FailedToParseQuery { .. } + | InvalidElasticsearchInput { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index eba588fdbdf4..45b7f0b620b4 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use auth::UserProviderRef; use axum::error_handling::HandleErrorLayer; use axum::extract::DefaultBodyLimit; +use axum::http::StatusCode as HttpStatusCode; use axum::response::{IntoResponse, Json, Response}; use axum::{middleware, routing, BoxError, Router}; use common_base::readable_size::ReadableSize; @@ -48,6 +49,7 @@ use tower_http::trace::TraceLayer; use self::authorize::AuthState; use self::result::table_result::TableResponse; use crate::configurator::ConfiguratorRef; +use crate::elasticsearch; use crate::error::{AddressBindSnafu, AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu}; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::prometheus::{ @@ -606,7 +608,19 @@ impl HttpServerBuilder { let router = router.nest( &format!("/{HTTP_API_VERSION}/loki"), - HttpServer::route_loki(log_state), + HttpServer::route_loki(log_state.clone()), + ); + + let router = router.nest( + &format!("/{HTTP_API_VERSION}/elasticsearch"), + HttpServer::route_elasticsearch(log_state.clone()), + ); + + let router = router.nest( + &format!("/{HTTP_API_VERSION}/elasticsearch/"), + Router::new() + .route("/", routing::get(elasticsearch::handle_get_version)) + .with_state(log_state), ); Self { router, ..self } @@ -752,6 +766,82 @@ impl HttpServer { .with_state(log_state) } + fn route_elasticsearch(log_state: LogState) -> Router { + Router::new() + // Return fake responsefor HEAD '/' request. + .route( + "/", + routing::head((HttpStatusCode::OK, elasticsearch::elasticsearch_headers())), + ) + // Return fake response for Elasticsearch version request. + .route("/", routing::get(elasticsearch::handle_get_version)) + // Return fake response for Elasticsearch license request. + .route("/_license", routing::get(elasticsearch::handle_get_license)) + .route("/_bulk", routing::post(elasticsearch::handle_bulk_api)) + // Return fake response for Elasticsearch ilm request. + .route( + "/_ilm/policy/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + // Return fake response for Elasticsearch index template request. + .route( + "/_index_template/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + // Return fake response for Elasticsearch ingest pipeline request. + // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/put-pipeline-api.html. + .route( + "/_ingest/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + // Return fake response for Elasticsearch nodes discovery request. + // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/cluster.html. + .route( + "/_nodes/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + // Return fake response for Logstash APIs requests. + // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/logstash-apis.html + .route( + "/logstash/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + .route( + "/_logstash/*path", + routing::any(( + HttpStatusCode::OK, + elasticsearch::elasticsearch_headers(), + axum::Json(serde_json::json!({})), + )), + ) + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_error)) + .layer(RequestDecompressionLayer::new()), + ) + .with_state(log_state) + } + fn route_log(log_state: LogState) -> Router { Router::new() .route("/logs", routing::post(event::log_ingester)) diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 951c796ac39e..150b703a4ec1 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -53,8 +53,8 @@ use crate::metrics::{ }; use crate::query_handler::PipelineHandlerRef; +pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_"; -const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; lazy_static! { pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json(); @@ -64,15 +64,30 @@ lazy_static! { ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap(); } +/// LogIngesterQueryParams is used for query params of log ingester API. #[derive(Debug, Default, Serialize, Deserialize)] pub struct LogIngesterQueryParams { - pub table: Option, + /// The database where log data will be written to. pub db: Option, + + /// The table where log data will be written to. + pub table: Option, + + /// The pipeline that will be used for log ingestion. pub pipeline_name: Option, - pub ignore_errors: Option, + /// The version of the pipeline to be used for log ingestion. pub version: Option, + + /// Whether to ignore errors during log ingestion. + pub ignore_errors: Option, + + /// The source of the log data. pub source: Option, + + /// The JSON field name of the log message. If not provided, it will take the whole log as the message. + /// The field must be at the top level of the JSON structure. + pub msg_field: Option, } pub struct PipelineContent(String); @@ -530,7 +545,7 @@ fn extract_pipeline_value_by_content_type( }) } -async fn ingest_logs_inner( +pub(crate) async fn ingest_logs_inner( state: PipelineHandlerRef, pipeline_name: String, version: PipelineVersion, diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 92f2b8b9d0ba..417d2646513b 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -23,6 +23,7 @@ use datatypes::schema::Schema; pub mod addrs; pub mod configurator; +pub(crate) mod elasticsearch; pub mod error; pub mod export_metrics; pub mod grpc; diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index fe81fed6ced5..8eefe8217d44 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -182,6 +182,13 @@ lazy_static! { &[METRIC_DB_LABEL, METRIC_RESULT_LABEL] ) .unwrap(); + pub static ref METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_elasticsearch_logs_ingestion_elapsed", + "servers elasticsearch logs ingestion elapsed", + &[METRIC_DB_LABEL] + ) + .unwrap(); pub static ref METRIC_HTTP_LOGS_TRANSFORM_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_logs_transform_elapsed", diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 1c621b3ab711..3544228afed4 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -398,6 +398,7 @@ pub enum Channel { Influx = 7, Opentsdb = 8, Loki = 9, + Elasticsearch = 10, } impl From for Channel { @@ -412,7 +413,7 @@ impl From for Channel { 7 => Self::Influx, 8 => Self::Opentsdb, 9 => Self::Loki, - + 10 => Self::Elasticsearch, _ => Self::Unknown, } } @@ -440,6 +441,7 @@ impl Display for Channel { Channel::Influx => write!(f, "influx"), Channel::Opentsdb => write!(f, "opentsdb"), Channel::Loki => write!(f, "loki"), + Channel::Elasticsearch => write!(f, "elasticsearch"), Channel::Unknown => write!(f, "unknown"), } }