Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into zhongzc/bloom-filter-…
Browse files Browse the repository at this point in the history
…config
  • Loading branch information
zhongzc committed Dec 25, 2024
2 parents 009cbe5 + a9f2191 commit 7fc1db2
Show file tree
Hide file tree
Showing 38 changed files with 1,738 additions and 216 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
hex = "0.4"
http = "0.2"
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
1 change: 1 addition & 0 deletions src/common/error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license.workspace = true
workspace = true

[dependencies]
http.workspace = true
snafu.workspace = true
strum.workspace = true
tonic.workspace = true
21 changes: 21 additions & 0 deletions src/common/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,30 @@ pub mod ext;
pub mod mock;
pub mod status_code;

use http::{HeaderMap, HeaderValue};
pub use snafu;

// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
// please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";

/// Create a http header map from error code and message.
/// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys.
pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap {
let mut header = HeaderMap::new();

let msg = HeaderValue::from_str(msg).unwrap_or_else(|_| {
HeaderValue::from_bytes(
&msg.as_bytes()
.iter()
.flat_map(|b| std::ascii::escape_default(*b))
.collect::<Vec<u8>>(),
)
.expect("Already escaped string should be valid ascii")
});

header.insert(GREPTIME_DB_HEADER_ERROR_CODE, code.into());
header.insert(GREPTIME_DB_HEADER_ERROR_MSG, msg);
header
}
4 changes: 4 additions & 0 deletions src/common/function/src/scalars/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod distance;
pub(crate) mod impl_conv;
mod scalar_add;
mod scalar_mul;
mod vector_mul;

use std::sync::Arc;

Expand All @@ -38,5 +39,8 @@ impl VectorFunction {
// scalar calculation
registry.register(Arc::new(scalar_add::ScalarAddFunction));
registry.register(Arc::new(scalar_mul::ScalarMulFunction));

// vector calculation
registry.register(Arc::new(vector_mul::VectorMulFunction));
}
}
205 changes: 205 additions & 0 deletions src/common/function/src/scalars/vector/vector_mul.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::fmt::Display;

use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use nalgebra::DVectorView;
use snafu::ensure;

use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};

const NAME: &str = "vec_mul";

/// Multiplies corresponding elements of two vectors.
///
/// # Example
///
/// ```sql
/// SELECT vec_to_string(vec_mul("[1, 2, 3]", "[1, 2, 3]")) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | [1,4,9] |
/// +---------+
///
/// ```
#[derive(Debug, Clone, Default)]
pub struct VectorMulFunction;

impl Function for VectorMulFunction {
fn name(&self) -> &str {
NAME
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}

fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
)
}

fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);

let arg0 = &columns[0];
let arg1 = &columns[1];

let len = arg0.len();
let mut result = BinaryVectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
}

let arg0_const = as_veclit_if_const(arg0)?;
let arg1_const = as_veclit_if_const(arg1)?;

for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};

let arg1 = match arg1_const.as_ref() {
Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())),
None => as_veclit(arg1.get_ref(i))?,
};

if let (Some(arg0), Some(arg1)) = (arg0, arg1) {
ensure!(
arg0.len() == arg1.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the vectors must match for multiplying, have: {} vs {}",
arg0.len(),
arg1.len()
),
}
);
let vec0 = DVectorView::from_slice(&arg0, arg0.len());
let vec1 = DVectorView::from_slice(&arg1, arg1.len());
let vec_res = vec1.component_mul(&vec0);

let veclit = vec_res.as_slice();
let binlit = veclit_to_binlit(veclit);
result.push(Some(&binlit));
} else {
result.push_null();
}
}

Ok(result.to_vector())
}
}

impl Display for VectorMulFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use common_query::error;
use datatypes::vectors::StringVector;

use super::*;

#[test]
fn test_vector_mul() {
let func = VectorMulFunction;

let vec0 = vec![1.0, 2.0, 3.0];
let vec1 = vec![1.0, 1.0];
let (len0, len1) = (vec0.len(), vec1.len());
let input0 = Arc::new(StringVector::from(vec![Some(format!("{vec0:?}"))]));
let input1 = Arc::new(StringVector::from(vec![Some(format!("{vec1:?}"))]));

let err = func
.eval(FunctionContext::default(), &[input0, input1])
.unwrap_err();

match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!(
"The length of the vectors must match for multiplying, have: {} vs {}",
len0, len1
)
)
}
_ => unreachable!(),
}

let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[8.0,10.0,12.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
None,
]));

let input1 = Arc::new(StringVector::from(vec![
Some("[1.0,1.0,1.0]".to_string()),
Some("[2.0,2.0,2.0]".to_string()),
None,
Some("[3.0,3.0,3.0]".to_string()),
]));

let result = func
.eval(FunctionContext::default(), &[input0, input1])
.unwrap();

let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(
result.get_ref(0).as_binary().unwrap(),
Some(veclit_to_binlit(&[1.0, 2.0, 3.0]).as_slice())
);
assert_eq!(
result.get_ref(1).as_binary().unwrap(),
Some(veclit_to_binlit(&[16.0, 20.0, 24.0]).as_slice())
);
assert!(result.get_ref(2).is_null());
assert!(result.get_ref(3).is_null());
}
}
4 changes: 2 additions & 2 deletions src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use snafu::{ensure, ResultExt};
use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
use crate::prelude::ConcreteDataType;
pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkipIndexType, SkippingIndexOptions,
COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions,
SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
Expand Down
12 changes: 6 additions & 6 deletions src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ pub struct SkippingIndexOptions {
pub granularity: u32,
/// The type of the skip index.
#[serde(default)]
pub index_type: SkipIndexType,
pub index_type: SkippingIndexType,
}

impl fmt::Display for SkippingIndexOptions {
Expand All @@ -556,15 +556,15 @@ impl fmt::Display for SkippingIndexOptions {

/// Skip index types.
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
pub enum SkipIndexType {
pub enum SkippingIndexType {
#[default]
BloomFilter,
}

impl fmt::Display for SkipIndexType {
impl fmt::Display for SkippingIndexType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SkipIndexType::BloomFilter => write!(f, "BLOOM"),
SkippingIndexType::BloomFilter => write!(f, "BLOOM"),
}
}
}
Expand All @@ -587,15 +587,15 @@ impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
// Parse index type with default value BloomFilter
let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) {
Some(typ) => match typ.to_ascii_uppercase().as_str() {
"BLOOM" => SkipIndexType::BloomFilter,
"BLOOM" => SkippingIndexType::BloomFilter,
_ => {
return error::InvalidSkippingIndexOptionSnafu {
msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"),
}
.fail();
}
},
None => SkipIndexType::default(),
None => SkippingIndexType::default(),
};

Ok(SkippingIndexOptions {
Expand Down
1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ get-size2 = "0.1.2"
greptime-proto.workspace = true
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
# otherwise it is the same with upstream repo
http.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
lazy_static.workspace = true
Expand Down
Loading

0 comments on commit 7fc1db2

Please sign in to comment.