From 61a96ca98ae78fd79a4309e7f24534d03cf47d38 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Mon, 26 Sep 2022 06:32:37 +0200 Subject: [PATCH] Improve temporal types (#28) Remove all use of Utf8Array when handling temporal types, in favor or proper DataType::Timestamp and the like. --- Cargo.toml | 4 +- benchmarks/test_benchmarks.py | 12 +-- src/lib.rs | 160 +++++++++++++++------------------- 3 files changed, 76 insertions(+), 100 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d29243e..d1789ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "flaco" -version = "0.6.0-rc2" +version = "0.6.0-rc3" edition = "2018" license = "Unlicense/MIT" @@ -18,7 +18,7 @@ serde_json = "^1" numpy = "0.17" arrow2 = { version = "^0.13", features = ["io_ipc", "io_parquet"] } rust_decimal = { version = "1.16.0", features = ["db-postgres"] } -time = { version = "0.3.3", features = ["formatting"] } +time = { version = "0.3.3", features = ["formatting", "parsing"] } postgres = { version = "0.19.1", features = ["with-time-0_3", "with-serde_json-1", "with-uuid-0_8"] } postgres-protocol = "0.6.2" pyo3 = { version = "0.17", default-features = false, features = ["macros"] } diff --git a/benchmarks/test_benchmarks.py b/benchmarks/test_benchmarks.py index 943dc37..4daf3b1 100644 --- a/benchmarks/test_benchmarks.py +++ b/benchmarks/test_benchmarks.py @@ -106,7 +106,7 @@ def _table_setup(n_rows: int = 1_000_000, include_nulls: bool = False): df["col8"] = pd.to_datetime(df.col7) df["col9"] = pd.to_datetime(df.col7, utc=True) df["col10"] = df.col9.dt.time - df.to_sql(table, index=False, con=engine, chunksize=10_000, if_exists="append") + df.to_sql(table, index=False, con=engine, chunksize=50_000, if_exists="replace") if include_nulls: df = df[:20] @@ -118,8 +118,6 @@ def _table_setup(n_rows: int = 1_000_000, include_nulls: bool = False): def memory_profile(): stmt = "select * from test_table" flaco.read_sql_to_file(DB_URI, stmt, 'result.feather', flaco.FileFormat.Feather) - data = flaco.read_sql_to_numpy(DB_URI, stmt) - df = pd.DataFrame(data, copy=False).convert_dtypes() import duckdb import pyarrow as pa import pyarrow.parquet as pq @@ -127,17 +125,13 @@ def memory_profile(): import pyarrow.dataset as ds with pa.memory_map('result.feather', 'rb') as source: mytable = pa.ipc.open_file(source).read_all() - table = mytable.rename_columns([f"col_{i}" for i in range(10)]) - table_df = table.to_pandas() + table_df = mytable.to_pandas() #print(v) - print(type(mytable), len(mytable)) - print(type(table), len(table)) print(pa.total_allocated_bytes() >> 20) - breakpoint() engine = create_engine(DB_URI) _pandas_df = pd.read_sql(stmt, engine) if __name__ == "__main__": - _table_setup(n_rows=10_000, include_nulls=False) + #_table_setup(n_rows=1_000_000, include_nulls=False) memory_profile() diff --git a/src/lib.rs b/src/lib.rs index 648eaed..203ead1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,13 +1,7 @@ -use arrow2::array::{ - BinaryArray, BooleanArray, FixedSizeBinaryArray, MutableBinaryArray, MutableBooleanArray, - MutableFixedSizeBinaryArray, MutablePrimitiveArray, MutableUtf8Array, PrimitiveArray, - Utf8Array, -}; use arrow2::chunk::Chunk; use arrow2::datatypes::{DataType, Schema}; use arrow2::io::{ipc, parquet}; use arrow2::{array, array::MutableArray}; -use numpy::IntoPyArray; use pyo3::create_exception; use pyo3::exceptions::PyException; use pyo3::prelude::*; @@ -24,7 +18,6 @@ create_exception!(flaco, FlacoException, PyException); fn flaco(py: Python, m: &PyModule) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_function(wrap_pyfunction!(read_sql_to_file, m)?)?; - m.add_function(wrap_pyfunction!(read_sql_to_numpy, m)?)?; m.add_class::()?; m.add("FlacoException", py.get_type::())?; Ok(()) @@ -55,23 +48,6 @@ pub fn read_sql_to_file(uri: &str, stmt: &str, path: &str, format: FileFormat) - Ok(()) } -/// Read SQL to a dict of numpy arrays, where keys are column names. -/// NOTE: This is not very efficient currently, likely should not use it. -#[pyfunction] -pub fn read_sql_to_numpy<'py>( - py: Python<'py>, - uri: &str, - stmt: &str, -) -> PyResult> { - let mut client = postgres::Client::connect(uri, postgres::NoTls).map_err(to_py_err)?; - let table = postgresql::read_sql(&mut client, stmt).map_err(to_py_err)?; - let mut result = BTreeMap::new(); - for (name, column) in table { - result.insert(name, column.into_pyarray(py)); - } - Ok(result) -} - pub type Table = BTreeMap; pub struct Column { @@ -99,43 +75,6 @@ impl Column { self.inner_mut::().try_push(value)?; Ok(()) } - pub fn into_pyarray(mut self, py: Python) -> PyObject { - macro_rules! to_pyarray { - ($mut_arr:ty, $arr:ty) => {{ - self.inner_mut::<$mut_arr>() - .as_arc() - .as_ref() - .as_any() - .downcast_ref::<$arr>() - .unwrap() - .iter() - .map(|v| v.to_object(py)) - .collect::>() - .into_pyarray(py) - .to_object(py) - }}; - } - match self.dtype { - DataType::Boolean => to_pyarray!(MutableBooleanArray, BooleanArray), - DataType::Binary => to_pyarray!(MutableBinaryArray, BinaryArray), - DataType::Utf8 => to_pyarray!(MutableUtf8Array, Utf8Array), - DataType::Int8 => to_pyarray!(MutablePrimitiveArray, PrimitiveArray), - DataType::Int16 => to_pyarray!(MutablePrimitiveArray, PrimitiveArray), - DataType::Int32 => to_pyarray!(MutablePrimitiveArray, PrimitiveArray), - DataType::UInt32 => to_pyarray!(MutablePrimitiveArray, PrimitiveArray), - DataType::Int64 => to_pyarray!(MutablePrimitiveArray, PrimitiveArray), - DataType::UInt64 => to_pyarray!(MutablePrimitiveArray, PrimitiveArray), - DataType::Float32 => to_pyarray!(MutablePrimitiveArray, PrimitiveArray), - DataType::Float64 => to_pyarray!(MutablePrimitiveArray, PrimitiveArray), - DataType::FixedSizeBinary(_) => { - to_pyarray!(MutableFixedSizeBinaryArray, FixedSizeBinaryArray) - } - _ => unimplemented!( - "Dtype: {:?} not implemented for conversion to numpy", - &self.dtype - ), - } - } } fn write_table_to_parquet(table: Table, path: &str) -> Result<()> { @@ -198,6 +137,7 @@ pub mod postgresql { MutableBinaryArray, MutableBooleanArray, MutableFixedSizeBinaryArray, MutablePrimitiveArray, MutableUtf8Array, }; + use arrow2::datatypes::{DataType, TimeUnit}; use postgres as pg; use postgres::fallible_iterator::FallibleIterator; @@ -205,7 +145,9 @@ pub mod postgresql { use rust_decimal::{prelude::ToPrimitive, Decimal}; use std::collections::BTreeMap; use std::{iter::Iterator, net::IpAddr}; - use time; + use time::{self, format_description}; + + const UNIX_EPOCH: time::OffsetDateTime = time::OffsetDateTime::UNIX_EPOCH; pub fn read_sql(client: &mut pg::Client, sql: &str) -> Result { let mut row_iter = client.query_raw::<_, &i32, _>(sql, &[])?; @@ -305,49 +247,89 @@ pub mod postgresql { &Type::TIMESTAMP => { table .entry(column_name) - .or_insert_with(|| Column::new(MutableUtf8Array::::new())) - .push::<_, MutableUtf8Array>( - row.get::<_, Option>(idx) - .map(|v| v.to_string()), + .or_insert_with(|| { + Column::new( + MutablePrimitiveArray::::new() + .to(DataType::Timestamp(TimeUnit::Microsecond, None)), + ) + }) + .push::<_, MutablePrimitiveArray>( + row.get::<_, Option>(idx).map(|v| { + let diff = + (UNIX_EPOCH - v.assume_utc()).whole_microseconds() as i64; + if v.assume_utc() > UNIX_EPOCH { + diff.abs() + } else { + diff + } + }), )?; } &Type::TIMESTAMPTZ => { - let format = time::format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second] [offset_hour sign:mandatory]:[offset_minute]").unwrap(); + let value = row.get::<_, Option>(idx); + let format = + format_description::parse("[offset_hour sign:mandatory]:[offset_minute]") + .unwrap(); table .entry(column_name) - .or_insert_with(|| Column::new(MutableUtf8Array::::new())) - .push::<_, MutableUtf8Array>( - row.get::<_, Option>(idx) - .map(|v| v.format(&format).unwrap()), - )?; + .or_insert_with(|| { + if value.is_none() { + unimplemented!( + "Handle case where first row of TZ aware timestamp is null." + ) + } + Column::new(MutablePrimitiveArray::::new().to( + DataType::Timestamp( + TimeUnit::Microsecond, + value.map(|v| v.offset().format(&format).unwrap()), + ), + )) + }) + .push::<_, MutablePrimitiveArray>(value.map(|v| { + let diff = (UNIX_EPOCH - v).whole_microseconds() as i64; + if v > UNIX_EPOCH { + diff.abs() + } else { + diff + } + }))?; } &Type::DATE => { table .entry(column_name) - .or_insert_with(|| Column::new(MutableUtf8Array::::new())) - .push::<_, MutableUtf8Array>( - row.get::<_, Option>(idx).map(|v| v.to_string()), + .or_insert_with(|| { + Column::new(MutablePrimitiveArray::::new().to(DataType::Date32)) + }) + .push::<_, MutablePrimitiveArray>( + row.get::<_, Option>(idx).map(|v| { + let days = (UNIX_EPOCH.date() - v).whole_days() as i32; + if v > UNIX_EPOCH.date() { + days.abs() + } else { + days + } + }), )?; } - &Type::TIME => { + &Type::TIME | &Type::TIMETZ => { table .entry(column_name) - .or_insert_with(|| Column::new(MutableUtf8Array::::new())) - .push::<_, MutableUtf8Array>( - row.get::<_, Option>(idx).map(|v| v.to_string()), - )?; - } - &Type::TIMETZ => { - // TIMETZ is 12 bytes; Fixed size binary array then since no DataType matches - table - .entry(column_name) - .or_insert_with(|| Column::new(MutableUtf8Array::::new())) - .push::<_, MutableUtf8Array>( - row.get::<_, Option>(idx).map(|v| v.to_string()), + .or_insert_with(|| { + Column::new( + MutablePrimitiveArray::::new() + .to(DataType::Time64(TimeUnit::Microsecond)), + ) + }) + .push::<_, MutablePrimitiveArray>( + row.get::<_, Option>(idx).map(|v| { + let (h, m, s, micro) = v.as_hms_micro(); + let seconds = (h as i64 * 60 * 60) + (m as i64 * 60) + s as i64; + micro as i64 + seconds * 1_000_000 + }), )?; } &Type::INTERVAL => { - // INTERVAL is 16 bytes; Fixed size binary array then sinece i128 not impl FromSql + // INTERVAL is 16 bytes; Fixed size binary array then since i128 not impl FromSql table .entry(column_name) .or_insert_with(|| Column::new(MutableFixedSizeBinaryArray::new(16)))