Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add json write #4744

Merged
merged 6 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum_dispatch = "0.3"
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
jsonb.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["sync"] }
once_cell.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/pipeline/src/etl/transform/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
// limitations under the License.

pub mod greptime;
pub use greptime::identity_pipeline;
229 changes: 229 additions & 0 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ pub mod coerce;

use std::collections::HashSet;

use ahash::HashMap;
use api::helper::proto_value_type;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
use coerce::{coerce_columns, coerce_value};
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use serde_json::Map;

use crate::etl::error::{
Result, TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu,
Expand Down Expand Up @@ -120,6 +126,7 @@ impl Transformer for GreptimeTransformer {
if let Some(idx) = transform.index {
if idx == Index::Time {
match transform.real_fields.len() {
//safety unwrap is fine here because we have checked the length of real_fields
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
1 => timestamp_columns
.push(transform.real_fields.first().unwrap().input_name()),
_ => {
Expand Down Expand Up @@ -194,3 +201,225 @@ impl Transformer for GreptimeTransformer {
&mut self.transforms
}
}

#[derive(Debug, Default)]
struct SchemaInfo {
schema: Vec<ColumnSchema>,
index: HashMap<String, usize>,
paomian marked this conversation as resolved.
Show resolved Hide resolved
}

fn resolve_schema(
index: Option<usize>,
value_data: ValueData,
column_schema: ColumnSchema,
row: &mut Vec<GreptimeValue>,
schema_info: &mut SchemaInfo,
) {
if let Some(index) = index {
let api_value = GreptimeValue {
value_data: Some(value_data),
};
let value_column_data_type = proto_value_type(&api_value);
// safety unwrap is fine here because index is always valid
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
if value_column_data_type.is_some_and(|t| t != schema_column_data_type) {
row[index] = GreptimeValue { value_data: None };
paomian marked this conversation as resolved.
Show resolved Hide resolved
} else {
row[index] = api_value;
}
} else {
let key = column_schema.column_name.clone();
schema_info.schema.push(column_schema);
schema_info.index.insert(key, schema_info.schema.len() - 1);
let api_value = GreptimeValue {
value_data: Some(value_data),
};
row.push(api_value);
}
}

fn json_value_to_row(schema_info: &mut SchemaInfo, map: Map<String, serde_json::Value>) -> Row {
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
for _ in 0..schema_info.schema.len() {
row.push(GreptimeValue { value_data: None });
}
for (key, value) in map {
paomian marked this conversation as resolved.
Show resolved Hide resolved
if key == DEFAULT_GREPTIME_TIMESTAMP_COLUMN {
continue;
}
let index = schema_info.index.get(&key).copied();
match value {
serde_json::Value::Null => {
// do nothing
}
serde_json::Value::String(s) => {
resolve_schema(
index,
ValueData::StringValue(s),
ColumnSchema {
column_name: key,
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
},
&mut row,
schema_info,
);
}
serde_json::Value::Bool(b) => {
resolve_schema(
index,
ValueData::BoolValue(b),
ColumnSchema {
column_name: key,
datatype: ColumnDataType::Boolean as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
},
&mut row,
schema_info,
);
}
serde_json::Value::Number(n) => {
if n.is_i64() {
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
resolve_schema(
index,
// safety unwrap is fine here because we have checked the number type
ValueData::I64Value(n.as_i64().unwrap()),
ColumnSchema {
column_name: key,
datatype: ColumnDataType::Int64 as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
},
&mut row,
schema_info,
);
} else if n.is_u64() {
resolve_schema(
index,
// safety unwrap is fine here because we have checked the number type
ValueData::U64Value(n.as_u64().unwrap()),
ColumnSchema {
column_name: key,
datatype: ColumnDataType::Uint64 as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
},
&mut row,
schema_info,
);
} else if n.is_f64() {
resolve_schema(
index,
// safety unwrap is fine here because we have checked the number type
ValueData::F64Value(n.as_f64().unwrap()),
ColumnSchema {
column_name: key,
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
},
&mut row,
schema_info,
);
} else {
unreachable!("unexpected number type");
}
}
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
resolve_schema(
index,
ValueData::BinaryValue(jsonb::Value::from(value).to_vec()),
ColumnSchema {
column_name: key,
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
options: None,
},
&mut row,
schema_info,
);
}
}
}
Row { values: row }
}

pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
let mut rows = Vec::with_capacity(array.len());

let mut schema = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema, map);
rows.push(row);
}
}
let greptime_timestamp_schema = ColumnSchema {
column_name: DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
datatype: ColumnDataType::TimestampNanosecond as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
};
let ns = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let ts = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
};
let column_count = schema.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
row.values.push(ts.clone());
}
schema.schema.push(greptime_timestamp_schema);
Ok(Rows {
schema: schema.schema,
rows,
})
}

#[cfg(test)]
mod tests {
use crate::identity_pipeline;

#[test]
fn test_identify_pipeline() {
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": "88.5",
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array).unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
}
}
1 change: 1 addition & 0 deletions src/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod metrics;

pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{parse, Content, Pipeline};
Expand Down
84 changes: 45 additions & 39 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,7 @@ pub async fn log_ingester(

let handler = log_state.log_handler;

let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;
let pipeline_name = query_params.pipeline_name;
let table_name = query_params.table.context(InvalidParameterSnafu {
reason: "table is required",
})?;
Expand Down Expand Up @@ -416,7 +414,7 @@ fn extract_pipeline_value_by_content_type(

async fn ingest_logs_inner(
state: LogHandlerRef,
pipeline_name: String,
pipeline_name: Option<String>,
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
version: PipelineVersion,
table_name: String,
pipeline_data: Vec<Value>,
Expand All @@ -425,47 +423,55 @@ async fn ingest_logs_inner(
let db = query_ctx.get_db_string();
let exec_timer = std::time::Instant::now();

let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

let transform_timer = std::time::Instant::now();
let mut intermediate_state = pipeline.init_intermediate_state();

let mut results = Vec::with_capacity(pipeline_data.len());
let transformed_data: Rows;

if let Some(pipeline_name) = pipeline_name {
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

let transform_timer = std::time::Instant::now();
let mut intermediate_state = pipeline.init_intermediate_state();

for v in pipeline_data {
pipeline
.prepare(v, &mut intermediate_state)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
let r = pipeline
.exec_mut(&mut intermediate_state)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
results.push(r);
pipeline.reset_intermediate_state(&mut intermediate_state);
}

for v in pipeline_data {
pipeline
.prepare(v, &mut intermediate_state)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
let r = pipeline
.exec_mut(&mut intermediate_state)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());

transformed_data = Rows {
rows: results,
schema: pipeline.schemas().clone(),
};
} else {
let rows = pipeline::identity_pipeline(pipeline_data)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
results.push(r);
pipeline.reset_intermediate_state(&mut intermediate_state);
transformed_data = rows;
}

METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());

let transformed_data: Rows = Rows {
rows: results,
schema: pipeline.schemas().clone(),
};

let insert_request = RowInsertRequest {
rows: Some(transformed_data),
table_name: table_name.clone(),
Expand Down
Loading