From 4c11b3bff5e25feceedab1b1a1629de724a9c6c8 Mon Sep 17 00:00:00 2001 From: Vaibhav Date: Mon, 11 Sep 2023 18:52:01 +0530 Subject: [PATCH] Use regex instead of hash-maps --- arrow-csv/src/reader/mod.rs | 139 ++++++++++++----------- arrow-csv/test/data/custom_null_test.csv | 2 +- 2 files changed, 75 insertions(+), 66 deletions(-) diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs index faaf8d3aed9..9b2c503adad 100644 --- a/arrow-csv/src/reader/mod.rs +++ b/arrow-csv/src/reader/mod.rs @@ -133,8 +133,7 @@ use arrow_schema::*; use chrono::{TimeZone, Utc}; use csv::StringRecord; use lazy_static::lazy_static; -use regex::RegexSet; -use std::collections::HashSet; +use regex::{Regex, RegexSet}; use std::fmt::{self, Debug}; use std::fs::File; use std::io::{BufRead, BufReader as StdBufReader, Read, Seek, SeekFrom}; @@ -214,7 +213,7 @@ pub struct Format { escape: Option, quote: Option, terminator: Option, - nulls: HashSet, + null_regex: Option, } impl Format { @@ -243,8 +242,8 @@ impl Format { self } - pub fn with_nulls(mut self, nulls: HashSet) -> Self { - self.nulls = nulls; + pub fn with_null_regex(mut self, null_regex: Regex) -> Self { + self.null_regex = Some(null_regex); self } @@ -326,6 +325,7 @@ impl Format { if let Some(t) = self.terminator { builder.terminator(csv::Terminator::Any(t)); } + // TODO: Null regex builder.from_reader(reader) } @@ -343,6 +343,7 @@ impl Format { if let Some(t) = self.terminator { builder.terminator(csv_core::Terminator::Any(t)); } + // TODO: Null regex builder.build() } } @@ -564,8 +565,8 @@ pub struct Decoder { /// A decoder for [`StringRecords`] record_decoder: RecordDecoder, - /// Check for if the string is `NULL` value or not. - is_null: Box bool>, + /// Check if the string matches this pattern for `NULL`. + null_regex: Option, } impl Debug for Decoder { @@ -626,7 +627,7 @@ impl Decoder { Some(self.schema.metadata.clone()), self.projection.as_ref(), self.line_number, - &self.is_null, + self.null_regex.as_ref(), )?; self.line_number += rows.len(); Ok(Some(batch)) @@ -645,7 +646,7 @@ fn parse( metadata: Option>, projection: Option<&Vec>, line_number: usize, - is_null: &dyn Fn(&str) -> bool, + null_regex: Option<&Regex>, ) -> Result { let projection: Vec = match projection { Some(v) => v.clone(), @@ -658,7 +659,9 @@ fn parse( let i = *i; let field = &fields[i]; match field.data_type() { - DataType::Boolean => build_boolean_array(line_number, rows, i, is_null), + DataType::Boolean => { + build_boolean_array(line_number, rows, i, null_regex) + } DataType::Decimal128(precision, scale) => { build_decimal_array::( line_number, @@ -666,7 +669,7 @@ fn parse( i, *precision, *scale, - is_null, + null_regex, ) } DataType::Decimal256(precision, scale) => { @@ -676,56 +679,56 @@ fn parse( i, *precision, *scale, - is_null, + null_regex, ) } DataType::Int8 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::Int16 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::Int32 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::Int64 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::UInt8 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::UInt16 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::UInt32 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::UInt64 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::Float32 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::Float64 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::Date32 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::Date64 => { - build_primitive_array::(line_number, rows, i, is_null) + build_primitive_array::(line_number, rows, i, null_regex) } DataType::Time32(TimeUnit::Second) => build_primitive_array::< Time32SecondType, >( - line_number, rows, i, is_null + line_number, rows, i, null_regex ), DataType::Time32(TimeUnit::Millisecond) => { build_primitive_array::( line_number, rows, i, - is_null, + null_regex, ) } DataType::Time64(TimeUnit::Microsecond) => { @@ -733,21 +736,24 @@ fn parse( line_number, rows, i, - is_null, + null_regex, + ) + } + DataType::Time64(TimeUnit::Nanosecond) => { + build_primitive_array::( + line_number, + rows, + i, + null_regex, ) } - DataType::Time64(TimeUnit::Nanosecond) => build_primitive_array::< - Time64NanosecondType, - >( - line_number, rows, i, is_null - ), DataType::Timestamp(TimeUnit::Second, tz) => { build_timestamp_array::( line_number, rows, i, tz.as_deref(), - is_null, + null_regex, ) } DataType::Timestamp(TimeUnit::Millisecond, tz) => { @@ -756,7 +762,7 @@ fn parse( rows, i, tz.as_deref(), - is_null, + null_regex, ) } DataType::Timestamp(TimeUnit::Microsecond, tz) => { @@ -765,7 +771,7 @@ fn parse( rows, i, tz.as_deref(), - is_null, + null_regex, ) } DataType::Timestamp(TimeUnit::Nanosecond, tz) => { @@ -774,7 +780,7 @@ fn parse( rows, i, tz.as_deref(), - is_null, + null_regex, ) } DataType::Utf8 => Ok(Arc::new( @@ -874,12 +880,12 @@ fn build_decimal_array( col_idx: usize, precision: u8, scale: i8, - is_null: &dyn Fn(&str) -> bool, + null_regex: Option<&Regex>, ) -> Result { let mut decimal_builder = PrimitiveBuilder::::with_capacity(rows.len()); for row in rows.iter() { let s = row.get(col_idx); - if is_null(s) { + if s.is_empty() || null_regex.is_some_and(|r| r.is_match(s)) { // append null decimal_builder.append_null(); } else { @@ -907,13 +913,13 @@ fn build_primitive_array( line_number: usize, rows: &StringRecords<'_>, col_idx: usize, - is_null: &dyn Fn(&str) -> bool, + null_regex: Option<&Regex>, ) -> Result { rows.iter() .enumerate() .map(|(row_index, row)| { let s = row.get(col_idx); - if is_null(s) { + if s.is_empty() || null_regex.is_some_and(|r| r.is_match(s)) { return Ok(None); } @@ -937,17 +943,27 @@ fn build_timestamp_array( rows: &StringRecords<'_>, col_idx: usize, timezone: Option<&str>, - is_null: &dyn Fn(&str) -> bool, + null_regex: Option<&Regex>, ) -> Result { Ok(Arc::new(match timezone { Some(timezone) => { let tz: Tz = timezone.parse()?; - build_timestamp_array_impl::(line_number, rows, col_idx, &tz, is_null)? - .with_timezone(timezone) - } - None => { - build_timestamp_array_impl::(line_number, rows, col_idx, &Utc, is_null)? + build_timestamp_array_impl::( + line_number, + rows, + col_idx, + &tz, + null_regex, + )? + .with_timezone(timezone) } + None => build_timestamp_array_impl::( + line_number, + rows, + col_idx, + &Utc, + null_regex, + )?, })) } @@ -956,13 +972,13 @@ fn build_timestamp_array_impl( rows: &StringRecords<'_>, col_idx: usize, timezone: &Tz, - is_null: &dyn Fn(&str) -> bool, + null_regex: Option<&Regex>, ) -> Result, ArrowError> { rows.iter() .enumerate() .map(|(row_index, row)| { let s = row.get(col_idx); - if is_null(s) { + if s.is_empty() || null_regex.is_some_and(|r| r.is_match(s)) { return Ok(None); } @@ -989,13 +1005,13 @@ fn build_boolean_array( line_number: usize, rows: &StringRecords<'_>, col_idx: usize, - is_null: &dyn Fn(&str) -> bool, + null_regex: Option<&Regex>, ) -> Result { rows.iter() .enumerate() .map(|(row_index, row)| { let s = row.get(col_idx); - if is_null(s) { + if s.is_empty() || null_regex.is_some_and(|r| r.is_match(s)) { return Ok(None); } let parsed = parse_bool(s); @@ -1029,8 +1045,8 @@ pub struct ReaderBuilder { bounds: Bounds, /// Optional projection for which columns to load (zero-based column indices) projection: Option>, - /// Strings to consider as `NULL` when parsing. - nulls: HashSet, + /// Pattern to consider as `NULL` when parsing. + null_regex: Option, } impl ReaderBuilder { @@ -1062,7 +1078,7 @@ impl ReaderBuilder { batch_size: 1024, bounds: None, projection: None, - nulls: HashSet::new(), + null_regex: None, } } @@ -1099,8 +1115,8 @@ impl ReaderBuilder { self } - pub fn with_nulls(mut self, nulls: HashSet) -> Self { - self.nulls = nulls; + pub fn with_null_regex(mut self, null_regex: Regex) -> Self { + self.null_regex = Some(null_regex); self } @@ -1154,13 +1170,6 @@ impl ReaderBuilder { None => (header, usize::MAX), }; - let is_null: Box bool> = if self.nulls.is_empty() { - Box::new(|s| s.is_empty()) - } else { - let nulls = self.nulls; - Box::new(move |s| s.is_empty() || nulls.contains(s)) - }; - Decoder { schema: self.schema, to_skip: start, @@ -1169,7 +1178,7 @@ impl ReaderBuilder { end, projection: self.projection, batch_size: self.batch_size, - is_null, + null_regex: self.null_regex, } } } @@ -1507,11 +1516,11 @@ mod tests { let file = File::open("test/data/custom_null_test.csv").unwrap(); - let nulls: HashSet = ["nil"].into_iter().map(|s| s.to_string()).collect(); + let null_regex = Regex::new("^nil$").unwrap(); let mut csv = ReaderBuilder::new(schema) .has_header(true) - .with_nulls(nulls) + .with_null_regex(null_regex) .build(file) .unwrap(); diff --git a/arrow-csv/test/data/custom_null_test.csv b/arrow-csv/test/data/custom_null_test.csv index 30d7b7f2a1b..747cd25f51e 100644 --- a/arrow-csv/test/data/custom_null_test.csv +++ b/arrow-csv/test/data/custom_null_test.csv @@ -3,4 +3,4 @@ c_int,c_float,c_string,c_bool nil,2.2,"2.22",TRUE 3,nil,"3.33",true 4,4.4,nil,False -5,6.6,"",nil \ No newline at end of file +5,6.6,"",nil