Skip to content

Commit

Permalink
feat: Migrate Map Functions (#13047)
Browse files Browse the repository at this point in the history
* add page

* small fixes

* delete md

* Migrate map functions
  • Loading branch information
jonathanc-n authored Oct 24, 2024
1 parent f2da32b commit ac827ab
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 156 deletions.
69 changes: 67 additions & 2 deletions datafusion/functions-nested/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::any::Any;
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use arrow::array::ArrayData;
use arrow_array::{Array, ArrayRef, MapArray, OffsetSizeTrait, StructArray};
Expand All @@ -27,7 +27,10 @@ use arrow_schema::{DataType, Field, SchemaBuilder};
use datafusion_common::utils::{fixed_size_list_to_arrays, list_to_arrays};
use datafusion_common::{exec_err, Result, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_MAP;
use datafusion_expr::{
ColumnarValue, Documentation, Expr, ScalarUDFImpl, Signature, Volatility,
};

use crate::make_array::make_array;

Expand Down Expand Up @@ -238,7 +241,69 @@ impl ScalarUDFImpl for MapFunc {
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_map_batch(args)
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_map_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_map_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_MAP)
.with_description(
"Returns an Arrow map with the specified key-value pairs.\n\n\
The `make_map` function creates a map from two lists: one for keys and one for values. Each key must be unique and non-null."
)
.with_syntax_example(
"map(key, value)\nmap(key: value)\nmake_map(['key1', 'key2'], ['value1', 'value2'])"
)
.with_sql_example(
r#"```sql
-- Using map function
SELECT MAP('type', 'test');
----
{type: test}
SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]);
----
{POST: 41, HEAD: 33, PATCH: }
SELECT MAP([[1,2], [3,4]], ['a', 'b']);
----
{[1, 2]: a, [3, 4]: b}
SELECT MAP { 'a': 1, 'b': 2 };
----
{a: 1, b: 2}
-- Using make_map function
SELECT MAKE_MAP(['POST', 'HEAD'], [41, 33]);
----
{POST: 41, HEAD: 33}
SELECT MAKE_MAP(['key1', 'key2'], ['value1', null]);
----
{key1: value1, key2: }
```"#
)
.with_argument(
"key",
"For `map`: Expression to be used for key. Can be a constant, column, function, or any combination of arithmetic or string operators.\n\
For `make_map`: The list of keys to be used in the map. Each key must be unique and non-null."
)
.with_argument(
"value",
"For `map`: Expression to be used for value. Can be a constant, column, function, or any combination of arithmetic or string operators.\n\
For `make_map`: The list of values to be mapped to the corresponding keys."
)
.build()
.unwrap()
})
}

fn get_element_type(data_type: &DataType) -> Result<&DataType> {
match data_type {
DataType::List(element) => Ok(element.data_type()),
Expand Down
49 changes: 47 additions & 2 deletions datafusion/functions-nested/src/map_extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ use arrow_buffer::OffsetBuffer;
use arrow_schema::Field;

use datafusion_common::{cast::as_map_array, exec_err, Result};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_MAP;
use datafusion_expr::{
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use std::vec;

use crate::utils::{get_map_entry_field, make_scalar_function};
Expand Down Expand Up @@ -101,6 +104,48 @@ impl ScalarUDFImpl for MapExtract {
field.first().unwrap().data_type().clone(),
])
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_map_extract_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_map_extract_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_MAP)
.with_description(
"Returns a list containing the value for the given key or an empty list if the key is not present in the map.",
)
.with_syntax_example("map_extract(map, key)")
.with_sql_example(
r#"```sql
SELECT map_extract(MAP {'a': 1, 'b': NULL, 'c': 3}, 'a');
----
[1]
SELECT map_extract(MAP {1: 'one', 2: 'two'}, 2);
----
['two']
SELECT map_extract(MAP {'x': 10, 'y': NULL, 'z': 30}, 'y');
----
[]
```"#,
)
.with_argument(
"map",
"Map expression. Can be a constant, column, or function, and any combination of map operators.",
)
.with_argument(
"key",
"Key to extract from the map. Can be a constant, column, or function, any combination of arithmetic or string operators, or a named expression of the previously listed.",
)
.build()
.unwrap()
})
}

fn general_map_extract_inner(
Expand Down
41 changes: 38 additions & 3 deletions datafusion/functions-nested/src/map_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use crate::utils::{get_map_entry_field, make_scalar_function};
use arrow_array::{Array, ArrayRef, ListArray};
use arrow_schema::{DataType, Field};
use datafusion_common::{cast::as_map_array, exec_err, Result};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_MAP;
use datafusion_expr::{
ArrayFunctionSignature, ColumnarValue, ScalarUDFImpl, Signature, TypeSignature,
Volatility,
ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature,
TypeSignature, Volatility,
};
use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

make_udf_expr_and_func!(
MapKeysFunc,
Expand Down Expand Up @@ -81,6 +82,40 @@ impl ScalarUDFImpl for MapKeysFunc {
fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
make_scalar_function(map_keys_inner)(args)
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_map_keys_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_map_keys_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_MAP)
.with_description(
"Returns a list of all keys in the map."
)
.with_syntax_example("map_keys(map)")
.with_sql_example(
r#"```sql
SELECT map_keys(MAP {'a': 1, 'b': NULL, 'c': 3});
----
[a, b, c]
SELECT map_keys(map([100, 5], [42, 43]));
----
[100, 5]
```"#,
)
.with_argument(
"map",
"Map expression. Can be a constant, column, or function, and any combination of map operators."
)
.build()
.unwrap()
})
}

fn map_keys_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
Expand Down
41 changes: 38 additions & 3 deletions datafusion/functions-nested/src/map_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use crate::utils::{get_map_entry_field, make_scalar_function};
use arrow_array::{Array, ArrayRef, ListArray};
use arrow_schema::{DataType, Field};
use datafusion_common::{cast::as_map_array, exec_err, Result};
use datafusion_expr::scalar_doc_sections::DOC_SECTION_MAP;
use datafusion_expr::{
ArrayFunctionSignature, ColumnarValue, ScalarUDFImpl, Signature, TypeSignature,
Volatility,
ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature,
TypeSignature, Volatility,
};
use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

make_udf_expr_and_func!(
MapValuesFunc,
Expand Down Expand Up @@ -81,6 +82,40 @@ impl ScalarUDFImpl for MapValuesFunc {
fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
make_scalar_function(map_values_inner)(args)
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_map_values_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_map_values_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_MAP)
.with_description(
"Returns a list of all values in the map."
)
.with_syntax_example("map_values(map)")
.with_sql_example(
r#"```sql
SELECT map_values(MAP {'a': 1, 'b': NULL, 'c': 3});
----
[1, , 3]
SELECT map_values(map([100, 5], [42, 43]));
----
[42, 43]
```"#,
)
.with_argument(
"map",
"Map expression. Can be a constant, column, or function, and any combination of map operators."
)
.build()
.unwrap()
})
}

fn map_values_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
Expand Down
1 change: 0 additions & 1 deletion dev/update_function_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,3 @@ echo "Running prettier"
npx [email protected] --write "$TARGET_FILE"

echo "'$TARGET_FILE' successfully updated!"

Loading

0 comments on commit ac827ab

Please sign in to comment.