Skip to content

Commit

Permalink
chore: rename
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Jan 6, 2025
1 parent 500ae9e commit e8611ca
Showing 1 changed file with 11 additions and 21 deletions.
32 changes: 11 additions & 21 deletions src/servers/src/http/loki.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ async fn handle_json_req(
bytes: Bytes,
schemas: &mut Vec<ColumnSchema>,
) -> Result<Vec<Vec<GreptimeValue>>> {
let mut global_label_key_index: HashMap<String, u16> = HashMap::new();
global_label_key_index.insert(GREPTIME_TIMESTAMP.to_string(), 0);
global_label_key_index.insert(LOKI_LINE_COLUMN.to_string(), 1);
let mut column_indexer: HashMap<String, u16> = HashMap::new();
column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0);
column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1);

let payload: serde_json::Value =
serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?;
Expand Down Expand Up @@ -230,12 +230,7 @@ async fn handle_json_req(
// TODO(shuiyisong): we'll ignore structured metadata for now

let mut row = init_row(schemas.len(), ts, line_text);
process_labels(
&mut global_label_key_index,
schemas,
&mut row,
labels.iter(),
);
process_labels(&mut column_indexer, schemas, &mut row, labels.iter());

rows.push(row);
}
Expand All @@ -252,9 +247,9 @@ async fn handle_pb_req(
let req = loki_api::logproto::PushRequest::decode(&decompressed[..])
.context(DecodeOtlpRequestSnafu)?;

let mut global_label_key_index: HashMap<String, u16> = HashMap::new();
global_label_key_index.insert(GREPTIME_TIMESTAMP.to_string(), 0);
global_label_key_index.insert(LOKI_LINE_COLUMN.to_string(), 1);
let mut column_indexer: HashMap<String, u16> = HashMap::new();
column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0);
column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1);

let cnt = req.streams.iter().map(|s| s.entries.len()).sum::<usize>();
let mut rows = Vec::with_capacity(cnt);
Expand All @@ -278,12 +273,7 @@ async fn handle_pb_req(
let line = entry.line;

let mut row = init_row(schemas.len(), prost_ts_to_nano(&ts), line);
process_labels(
&mut global_label_key_index,
schemas,
&mut row,
labels.iter(),
);
process_labels(&mut column_indexer, schemas, &mut row, labels.iter());

rows.push(row);
}
Expand Down Expand Up @@ -314,14 +304,14 @@ fn init_row(schema_len: usize, ts: i64, line: String) -> Vec<GreptimeValue> {
}

fn process_labels<'a>(
global_label_key_index: &mut HashMap<String, u16>,
column_indexer: &mut HashMap<String, u16>,
schemas: &mut Vec<ColumnSchema>,
row: &mut Vec<GreptimeValue>,
labels: impl Iterator<Item = (&'a String, &'a String)>,
) {
// insert labels
for (k, v) in labels {
if let Some(index) = global_label_key_index.get(k) {
if let Some(index) = column_indexer.get(k) {
// exist in schema
// insert value using index
row[*index as usize] = GreptimeValue {
Expand All @@ -337,7 +327,7 @@ fn process_labels<'a>(
datatype_extension: None,
options: None,
});
global_label_key_index.insert(k.clone(), (schemas.len() - 1) as u16);
column_indexer.insert(k.clone(), (schemas.len() - 1) as u16);

row.push(GreptimeValue {
value_data: Some(ValueData::StringValue(v.clone())),
Expand Down

0 comments on commit e8611ca

Please sign in to comment.