From 6a9a92931d0502c65004d2e9f4ffba10b825b28f Mon Sep 17 00:00:00 2001
From: Eugene Tolbakov <ev.tolbakov@gmail.com>
Date: Sat, 18 May 2024 09:04:41 +0100
Subject: [PATCH] chore: change binary array type from LargeBinaryArray to
 BinaryArray (#3924)

* chore: change binary array type from LargeBinaryArray to BinaryArray

* fix: adjust try_into_vector logic

* fix: apply CR suggestions, add tests

* chore: fix failing test

* chore: fix integration test

* chore: adjust the assertions according to changed implementation

* chore: add a test with LargeBinary type

* chore: apply CR suggestions

* chore: simplify tests
---
 src/datatypes/src/arrow_array.rs              |   4 +-
 src/datatypes/src/types/binary_type.rs        |   2 +-
 src/datatypes/src/value.rs                    |   8 +-
 src/datatypes/src/vectors/binary.rs           |  10 +-
 src/datatypes/src/vectors/helper.rs           |  62 +++++++++--
 src/mito2/src/memtable/partition_tree/dict.rs |   4 -
 src/mito2/src/sst/parquet.rs                  | 101 +++++++++++++++++-
 src/mito2/src/test_util/sst_util.rs           |  70 +++++++++++-
 src/servers/src/mysql/helper.rs               |   2 +-
 9 files changed, 237 insertions(+), 26 deletions(-)

diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs
index d9b231bdb41e..40b7d46d1d02 100644
--- a/src/datatypes/src/arrow_array.rs
+++ b/src/datatypes/src/arrow_array.rs
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-pub type BinaryArray = arrow::array::LargeBinaryArray;
-pub type MutableBinaryArray = arrow::array::LargeBinaryBuilder;
+pub type BinaryArray = arrow::array::BinaryArray;
+pub type MutableBinaryArray = arrow::array::BinaryBuilder;
 pub type StringArray = arrow::array::StringArray;
 pub type MutableStringArray = arrow::array::StringBuilder;
diff --git a/src/datatypes/src/types/binary_type.rs b/src/datatypes/src/types/binary_type.rs
index 7213489da102..fa6aa134fd69 100644
--- a/src/datatypes/src/types/binary_type.rs
+++ b/src/datatypes/src/types/binary_type.rs
@@ -47,7 +47,7 @@ impl DataType for BinaryType {
     }
 
     fn as_arrow_type(&self) -> ArrowDataType {
-        ArrowDataType::LargeBinary
+        ArrowDataType::Binary
     }
 
     fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs
index 9c9b79a4a6a9..cfeff152e2b3 100644
--- a/src/datatypes/src/value.rs
+++ b/src/datatypes/src/value.rs
@@ -342,7 +342,7 @@ impl Value {
             Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
             Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
             Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
-            Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())),
+            Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
             Value::Date(v) => ScalarValue::Date32(Some(v.val())),
             Value::DateTime(v) => ScalarValue::Date64(Some(v.val())),
             Value::Null => to_null_scalar_value(output_type)?,
@@ -413,7 +413,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValu
         ConcreteDataType::UInt64(_) => ScalarValue::UInt64(None),
         ConcreteDataType::Float32(_) => ScalarValue::Float32(None),
         ConcreteDataType::Float64(_) => ScalarValue::Float64(None),
-        ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None),
+        ConcreteDataType::Binary(_) => ScalarValue::Binary(None),
         ConcreteDataType::String(_) => ScalarValue::Utf8(None),
         ConcreteDataType::Date(_) => ScalarValue::Date32(None),
         ConcreteDataType::DateTime(_) => ScalarValue::Date64(None),
@@ -2105,7 +2105,7 @@ mod tests {
                 .unwrap()
         );
         assert_eq!(
-            ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())),
+            ScalarValue::Binary(Some("world".as_bytes().to_vec())),
             Value::Binary(Bytes::from("world".as_bytes()))
                 .try_to_scalar_value(&ConcreteDataType::binary_datatype())
                 .unwrap()
@@ -2187,7 +2187,7 @@ mod tests {
                 .unwrap()
         );
         assert_eq!(
-            ScalarValue::LargeBinary(None),
+            ScalarValue::Binary(None),
             Value::Null
                 .try_to_scalar_value(&ConcreteDataType::binary_datatype())
                 .unwrap()
diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs
index 36187bd5af7f..e2074f949c2b 100644
--- a/src/datatypes/src/vectors/binary.rs
+++ b/src/datatypes/src/vectors/binary.rs
@@ -52,6 +52,14 @@ impl From<Vec<Option<Vec<u8>>>> for BinaryVector {
     }
 }
 
+impl From<Vec<&[u8]>> for BinaryVector {
+    fn from(data: Vec<&[u8]>) -> Self {
+        Self {
+            array: BinaryArray::from_iter_values(data),
+        }
+    }
+}
+
 impl Vector for BinaryVector {
     fn data_type(&self) -> ConcreteDataType {
         ConcreteDataType::binary_datatype()
@@ -257,7 +265,7 @@ mod tests {
 
         let arrow_arr = v.to_arrow_array();
         assert_eq!(2, arrow_arr.len());
-        assert_eq!(&ArrowDataType::LargeBinary, arrow_arr.data_type());
+        assert_eq!(&ArrowDataType::Binary, arrow_arr.data_type());
     }
 
     #[test]
diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs
index 21c37ec07742..b583b20697fd 100644
--- a/src/datatypes/src/vectors/helper.rs
+++ b/src/datatypes/src/vectors/helper.rs
@@ -258,9 +258,9 @@ impl Helper {
         Ok(match array.as_ref().data_type() {
             ArrowDataType::Null => Arc::new(NullVector::try_from_arrow_array(array)?),
             ArrowDataType::Boolean => Arc::new(BooleanVector::try_from_arrow_array(array)?),
-            ArrowDataType::LargeBinary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
-            ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Binary => {
-                let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::LargeBinary)
+            ArrowDataType::Binary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
+            ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
+                let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Binary)
                     .context(crate::error::ArrowComputeSnafu)?;
                 Arc::new(BinaryVector::try_from_arrow_array(array)?)
             }
@@ -278,7 +278,7 @@ impl Helper {
             ArrowDataType::LargeUtf8 => {
                 let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8)
                     .context(crate::error::ArrowComputeSnafu)?;
-                Arc::new(BinaryVector::try_from_arrow_array(array)?)
+                Arc::new(StringVector::try_from_arrow_array(array)?)
             }
             ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?),
             ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?),
@@ -402,8 +402,10 @@ mod tests {
         TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
         TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
     };
+    use arrow::buffer::Buffer;
     use arrow::datatypes::Int32Type;
-    use arrow_array::DictionaryArray;
+    use arrow_array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeStringArray};
+    use arrow_schema::DataType;
     use common_decimal::Decimal128;
     use common_time::time::Time;
     use common_time::timestamp::TimeUnit;
@@ -576,10 +578,6 @@ mod tests {
     fn test_try_into_vector() {
         check_try_into_vector(NullArray::new(2));
         check_try_into_vector(BooleanArray::from(vec![true, false]));
-        check_try_into_vector(LargeBinaryArray::from(vec![
-            "hello".as_bytes(),
-            "world".as_bytes(),
-        ]));
         check_try_into_vector(Int8Array::from(vec![1, 2, 3]));
         check_try_into_vector(Int16Array::from(vec![1, 2, 3]));
         check_try_into_vector(Int32Array::from(vec![1, 2, 3]));
@@ -611,6 +609,52 @@ mod tests {
         Helper::try_into_vector(array).unwrap_err();
     }
 
+    #[test]
+    fn test_try_binary_array_into_vector() {
+        let input_vec: Vec<&[u8]> = vec!["hello".as_bytes(), "world".as_bytes()];
+        let assertion_vector = BinaryVector::from(input_vec.clone());
+
+        let input_arrays: Vec<ArrayRef> = vec![
+            Arc::new(LargeBinaryArray::from(input_vec.clone())) as ArrayRef,
+            Arc::new(BinaryArray::from(input_vec.clone())) as ArrayRef,
+            Arc::new(FixedSizeBinaryArray::new(
+                5,
+                Buffer::from_vec("helloworld".as_bytes().to_vec()),
+                None,
+            )) as ArrayRef,
+        ];
+
+        for input_array in input_arrays {
+            let vector = Helper::try_into_vector(input_array).unwrap();
+
+            assert_eq!(2, vector.len());
+            assert_eq!(0, vector.null_count());
+
+            let output_arrow_array: ArrayRef = vector.to_arrow_array();
+            assert_eq!(&DataType::Binary, output_arrow_array.data_type());
+            assert_eq!(&assertion_vector.to_arrow_array(), &output_arrow_array);
+        }
+    }
+
+    #[test]
+    fn test_large_string_array_into_vector() {
+        let input_vec = vec!["a", "b"];
+        let assertion_array = StringArray::from(input_vec.clone());
+
+        let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(input_vec));
+        let vector = Helper::try_into_vector(large_string_array).unwrap();
+        assert_eq!(2, vector.len());
+        assert_eq!(0, vector.null_count());
+
+        let output_arrow_array: StringArray = vector
+            .to_arrow_array()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap()
+            .clone();
+        assert_eq!(&assertion_array, &output_arrow_array);
+    }
+
     #[test]
     fn test_try_from_scalar_time_value() {
         let vector = Helper::try_from_scalar_value(ScalarValue::Time32Second(Some(42)), 3).unwrap();
diff --git a/src/mito2/src/memtable/partition_tree/dict.rs b/src/mito2/src/memtable/partition_tree/dict.rs
index 52217dc94bc2..b841abb9cb73 100644
--- a/src/mito2/src/memtable/partition_tree/dict.rs
+++ b/src/mito2/src/memtable/partition_tree/dict.rs
@@ -281,10 +281,6 @@ impl Drop for KeyDict {
 
 /// Buffer to store unsorted primary keys.
 struct KeyBuffer {
-    // We use arrow's binary builder as out default binary builder
-    // is LargeBinaryBuilder
-    // TODO(yingwen): Change the type binary vector to Binary instead of LargeBinary.
-    /// Builder for binary key array.
     key_builder: BinaryBuilder,
     next_pk_index: usize,
 }
diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs
index de723cae1e3d..3a49d84a2d8e 100644
--- a/src/mito2/src/sst/parquet.rs
+++ b/src/mito2/src/sst/parquet.rs
@@ -79,19 +79,28 @@ pub struct SstInfo {
 mod tests {
     use std::sync::Arc;
 
+    use common_datasource::file_format::parquet::BufferedWriter;
     use common_time::Timestamp;
     use datafusion_common::{Column, ScalarValue};
     use datafusion_expr::{BinaryExpr, Expr, Operator};
+    use datatypes::arrow;
+    use datatypes::arrow::array::RecordBatch;
+    use datatypes::arrow::datatypes::{DataType, Field, Schema};
+    use parquet::basic::{Compression, Encoding, ZstdLevel};
+    use parquet::file::metadata::KeyValue;
+    use parquet::file::properties::WriterProperties;
     use table::predicate::Predicate;
 
     use super::*;
     use crate::cache::{CacheManager, PageKey};
     use crate::sst::index::Indexer;
+    use crate::sst::parquet::format::WriteFormat;
     use crate::sst::parquet::reader::ParquetReaderBuilder;
     use crate::sst::parquet::writer::ParquetWriter;
+    use crate::sst::DEFAULT_WRITE_CONCURRENCY;
     use crate::test_util::sst_util::{
-        assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
-        sst_region_metadata,
+        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
+        new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
     };
     use crate::test_util::{check_reader_result, TestEnv};
 
@@ -399,4 +408,92 @@ mod tests {
         let mut reader = builder.build().await.unwrap();
         check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
     }
+
+    #[tokio::test]
+    async fn test_read_large_binary() {
+        let mut env = TestEnv::new();
+        let object_store = env.init_object_store_manager();
+        let handle = sst_file_handle(0, 1000);
+        let file_path = handle.file_path(FILE_DIR);
+
+        let write_opts = WriteOptions {
+            row_group_size: 50,
+            ..Default::default()
+        };
+
+        let metadata = build_test_binary_test_region_metadata();
+        let json = metadata.to_json().unwrap();
+        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
+
+        let props_builder = WriterProperties::builder()
+            .set_key_value_metadata(Some(vec![key_value_meta]))
+            .set_compression(Compression::ZSTD(ZstdLevel::default()))
+            .set_encoding(Encoding::PLAIN)
+            .set_max_row_group_size(write_opts.row_group_size);
+
+        let writer_props = props_builder.build();
+
+        let write_format = WriteFormat::new(metadata);
+        let fields: Vec<_> = write_format
+            .arrow_schema()
+            .fields()
+            .into_iter()
+            .map(|field| {
+                let data_type = field.data_type().clone();
+                if data_type == DataType::Binary {
+                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
+                } else {
+                    Field::new(field.name(), data_type, field.is_nullable())
+                }
+            })
+            .collect();
+
+        let arrow_schema = Arc::new(Schema::new(fields));
+
+        // Ensures field_0 has LargeBinary type.
+        assert_eq!(
+            &DataType::LargeBinary,
+            arrow_schema.field_with_name("field_0").unwrap().data_type()
+        );
+        let mut buffered_writer = BufferedWriter::try_new(
+            file_path.clone(),
+            object_store.clone(),
+            arrow_schema.clone(),
+            Some(writer_props),
+            write_opts.write_buffer_size.as_bytes() as usize,
+            DEFAULT_WRITE_CONCURRENCY,
+        )
+        .await
+        .unwrap();
+
+        let batch = new_batch_with_binary(&["a"], 0, 60);
+        let arrow_batch = write_format.convert_batch(&batch).unwrap();
+        let arrays: Vec<_> = arrow_batch
+            .columns()
+            .iter()
+            .map(|array| {
+                let data_type = array.data_type().clone();
+                if data_type == DataType::Binary {
+                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
+                } else {
+                    array.clone()
+                }
+            })
+            .collect();
+        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
+
+        buffered_writer.write(&result).await.unwrap();
+        buffered_writer.close().await.unwrap();
+
+        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
+        let mut reader = builder.build().await.unwrap();
+        check_reader_result(
+            &mut reader,
+            &[
+                new_batch_with_binary(&["a"], 0, 50),
+                new_batch_with_binary(&["a"], 50, 60),
+            ],
+        )
+        .await;
+    }
 }
diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs
index 82308fa1b54f..e2c627a1816c 100644
--- a/src/mito2/src/test_util/sst_util.rs
+++ b/src/mito2/src/test_util/sst_util.rs
@@ -18,14 +18,17 @@ use std::sync::Arc;
 
 use api::v1::{OpType, SemanticType};
 use common_time::Timestamp;
+use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array};
 use datatypes::prelude::ConcreteDataType;
 use datatypes::schema::ColumnSchema;
 use datatypes::value::ValueRef;
 use parquet::file::metadata::ParquetMetaData;
-use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
+use store_api::metadata::{
+    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
+};
 use store_api::storage::RegionId;
 
-use crate::read::{Batch, Source};
+use crate::read::{Batch, BatchBuilder, Source};
 use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
 use crate::sst::file::{FileHandle, FileId, FileMeta};
 use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};
@@ -128,6 +131,36 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
         .unwrap()
 }
 
+pub fn new_batch_with_binary(tags: &[&str], start: usize, end: usize) -> Batch {
+    assert!(end >= start);
+    let pk = new_primary_key(tags);
+    let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
+    let sequences = vec![1000; end - start];
+    let op_types = vec![OpType::Put; end - start];
+
+    let field: Vec<_> = (start..end)
+        .map(|_v| "some data".as_bytes().to_vec())
+        .collect();
+
+    let mut builder = BatchBuilder::new(pk);
+    builder
+        .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
+            timestamps.iter().copied(),
+        )))
+        .unwrap()
+        .sequences_array(Arc::new(UInt64Array::from_iter_values(
+            sequences.iter().copied(),
+        )))
+        .unwrap()
+        .op_types_array(Arc::new(UInt8Array::from_iter_values(
+            op_types.iter().map(|v| *v as u8),
+        )))
+        .unwrap()
+        .push_field_array(1, Arc::new(BinaryArray::from_iter_values(field)))
+        .unwrap();
+    builder.build().unwrap()
+}
+
 /// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually
 pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaData>) {
     macro_rules! assert_metadata {
@@ -151,3 +184,36 @@ pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaDat
 
     assert_metadata!(a, b, row_groups, column_index, offset_index,);
 }
+
+/// Creates a new region metadata for testing SSTs with binary datatype.
+///
+/// Schema: tag_0(string), field_0(binary), ts
+pub fn build_test_binary_test_region_metadata() -> RegionMetadataRef {
+    let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
+    builder
+        .push_column_metadata(ColumnMetadata {
+            column_schema: ColumnSchema::new(
+                "tag_0".to_string(),
+                ConcreteDataType::string_datatype(),
+                true,
+            ),
+            semantic_type: SemanticType::Tag,
+            column_id: 0,
+        })
+        .push_column_metadata(ColumnMetadata {
+            column_schema: ColumnSchema::new("field_0", ConcreteDataType::binary_datatype(), true),
+            semantic_type: SemanticType::Field,
+            column_id: 1,
+        })
+        .push_column_metadata(ColumnMetadata {
+            column_schema: ColumnSchema::new(
+                "ts",
+                ConcreteDataType::timestamp_millisecond_datatype(),
+                false,
+            ),
+            semantic_type: SemanticType::Timestamp,
+            column_id: 2,
+        })
+        .primary_key(vec![0]);
+    Arc::new(builder.build().unwrap())
+}
diff --git a/src/servers/src/mysql/helper.rs b/src/servers/src/mysql/helper.rs
index c5d509016da4..f1aede0b5d5f 100644
--- a/src/servers/src/mysql/helper.rs
+++ b/src/servers/src/mysql/helper.rs
@@ -159,7 +159,7 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
             ConcreteDataType::String(_) => Ok(ScalarValue::Utf8(Some(
                 String::from_utf8_lossy(b).to_string(),
             ))),
-            ConcreteDataType::Binary(_) => Ok(ScalarValue::LargeBinary(Some(b.to_vec()))),
+            ConcreteDataType::Binary(_) => Ok(ScalarValue::Binary(Some(b.to_vec()))),
 
             _ => error::PreparedStmtTypeMismatchSnafu {
                 expected: t,