From b41c46eabed31333af44253494cee6bdebbb538e Mon Sep 17 00:00:00 2001 From: krzysiek-pathway <117256032+krzysiek-pathway@users.noreply.github.com> Date: Fri, 26 Jan 2024 14:25:29 +0100 Subject: [PATCH] use schema in table from markdown and pandas (#5485) bugfix, now we use schema to properly type columns from markdown and pandas GitOrigin-RevId: 4653d4ae6a88edff9298ca004bbc2cd5af892971 --- CHANGELOG.md | 4 + python/pathway/internals/api.py | 66 +++++++- python/pathway/internals/dtype.py | 7 +- .../graph_runner/operator_handler.py | 5 +- python/pathway/internals/schema.py | 1 - python/pathway/tests/test_types.py | 143 +++++++++++++++++- 6 files changed, 217 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fcb1ef4..e7982195 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Wrappers for OpenAI Chat and Embedding services are now added to Pathway xpack for LLMs. - A vector indexing pipeline that allows querying for the most similar documents. It is available as class `VectorStore` as part of Pathway xpack for LLMs. +### Fixed + +- `pw.debug.table_from_markdown` now uses schema parameter (when set) to properly assign _simple types_ (`int, bool, float, str, bytes`) and optional _simple types_ to columns. + ## [0.7.9] - 2024-01-18 ### Changed diff --git a/python/pathway/internals/api.py b/python/pathway/internals/api.py index b19ee294..0a2e56cf 100644 --- a/python/pathway/internals/api.py +++ b/python/pathway/internals/api.py @@ -10,6 +10,7 @@ from pathway.engine import * from pathway.internals import dtype as dt, json +from pathway.internals.schema import Schema if TYPE_CHECKING: _Value: TypeAlias = "Value" @@ -45,7 +46,7 @@ def __call__( ... -def denumpify(x): +def denumpify(x, type_from_schema: dt.DType | None = None): def denumpify_inner(x): if pd.api.types.is_scalar(x) and pd.isna(x): return None @@ -53,7 +54,53 @@ def denumpify_inner(x): return x.item() return x + def _is_instance_of_simple_type(x): + return ( + dt.INT.is_value_compatible(x) + or dt.BOOL.is_value_compatible(x) + or dt.STR.is_value_compatible(x) + or dt.BYTES.is_value_compatible(x) + or dt.FLOAT.is_value_compatible(x) + ) + + def fix_possibly_misassigned_type(entry, type_from_schema): + assert ( + (type_from_schema.is_value_compatible(entry)) + # the only exception for str should be conversion to bytes; however, + # some places use schema_from_pandas, which considers some complex types + # as str, which means we enter here, as it looks like simple type STR even + # though it's not, below the exception that should be here + # or (isinstance(v, str) and type_from_schema.wrapped == bytes) + or type_from_schema.wrapped == str + ) + + if type_from_schema == dt.STR and _is_instance_of_simple_type(entry): + return str(entry) + + if type_from_schema == dt.FLOAT: + return float(entry) + + if isinstance(entry, str) and type_from_schema == dt.BYTES: + return entry.encode("utf-8") + + return entry + v = denumpify_inner(x) + + if isinstance(type_from_schema, dt._SimpleDType): + v = fix_possibly_misassigned_type(v, type_from_schema) + elif ( + isinstance(type_from_schema, dt.Optional) + and isinstance(type_from_schema.wrapped, dt._SimpleDType) + and not dt.NONE.is_value_compatible(v) + ): + # pandas stores optional ints as floats + if isinstance(v, float) and type_from_schema.wrapped == dt.INT: + assert v.is_integer() + v = fix_possibly_misassigned_type(int(v), type_from_schema.wrapped) + else: + v = fix_possibly_misassigned_type(v, type_from_schema.wrapped) + if isinstance(v, str): return v.encode("utf-8", "ignore").decode("utf-8") else: @@ -85,12 +132,25 @@ def static_table_from_pandas( df: pd.DataFrame, connector_properties: ConnectorProperties | None = None, id_from: list[str] | None = None, + schema: type[Schema] | None = None, ) -> Table: + if schema is not None and id_from is not None: + assert schema.primary_key_columns() == id_from + + if id_from is None and schema is not None: + id_from = schema.primary_key_columns() + ids = ids_from_pandas(df, connector_properties, id_from) + column_types: dict[str, dt.DType] | None = None + if schema is not None: + column_types = dict(schema.__dtypes__) + for column in PANDAS_PSEUDOCOLUMNS: + column_types[column] = dt.INT data = {} for c in df.columns: - data[c] = [denumpify(v) for _, v in df[c].items()] + type_from_schema = None if column_types is None else column_types[c] + data[c] = [denumpify(v, type_from_schema) for _, v in df[c].items()] # df[c].items() is used because df[c].values is a numpy array ordinary_columns = [ column for column in df.columns if column not in PANDAS_PSEUDOCOLUMNS @@ -111,7 +171,7 @@ def static_table_from_pandas( assert len(connector_properties.column_properties) == len( ordinary_columns - ), "prrovided connector properties do not match the dataframe" + ), "provided connector properties do not match the dataframe" input_data: CapturedStream = [] for i, index in enumerate(df.index): diff --git a/python/pathway/internals/dtype.py b/python/pathway/internals/dtype.py index 2e523cec..640491b0 100644 --- a/python/pathway/internals/dtype.py +++ b/python/pathway/internals/dtype.py @@ -12,6 +12,7 @@ import numpy as np import numpy.typing as npt +import pandas as pd from pathway.engine import PathwayType from pathway.internals import api, datetime_types, json as js @@ -88,6 +89,10 @@ def is_value_compatible(self, arg): ) elif self.wrapped == int: return np.issubdtype(type(arg), np.integer) + elif self.wrapped == bool: + return isinstance(arg, (bool, np.bool_)) + elif self.wrapped == bytes: + return isinstance(arg, (bytes, str)) else: return isinstance(arg, self.wrapped) @@ -123,7 +128,7 @@ def __new__(cls) -> _NoneDType: return super().__new__(cls) def is_value_compatible(self, arg): - return arg is None + return arg is None or isinstance(arg, pd._libs.missing.NAType) @property def typehint(self) -> None: diff --git a/python/pathway/internals/graph_runner/operator_handler.py b/python/pathway/internals/graph_runner/operator_handler.py index 641198cf..d27a1260 100644 --- a/python/pathway/internals/graph_runner/operator_handler.py +++ b/python/pathway/internals/graph_runner/operator_handler.py @@ -100,7 +100,6 @@ def _run( output_storages: dict[Table, Storage], ): datasource = operator.datasource - if self.graph_builder.debug and operator.debug_datasource is not None: if ( datasource.schema._dtypes() @@ -113,7 +112,7 @@ def _run( scope=self.scope, df=operator.debug_datasource.data, connector_properties=operator.debug_datasource.connector_properties, - id_from=operator.debug_datasource.schema.primary_key_columns(), + schema=operator.debug_datasource.schema, ) self.state.set_table(output_storages[table], materialized_table) elif isinstance(datasource, PandasDataSource): @@ -123,7 +122,7 @@ def _run( scope=self.scope, df=datasource.data, connector_properties=datasource.connector_properties, - id_from=datasource.schema.primary_key_columns(), + schema=datasource.schema, ) self.state.set_table(output_storages[table], materialized_table) elif isinstance(datasource, GenericDataSource): diff --git a/python/pathway/internals/schema.py b/python/pathway/internals/schema.py index 138e0130..069d07fb 100644 --- a/python/pathway/internals/schema.py +++ b/python/pathway/internals/schema.py @@ -652,7 +652,6 @@ def column_definition( >>> NewSchema , '@timestamp': , 'data': }> """ - from pathway.internals import dtype as dt return ColumnDefinition( dtype=dt.wrap(dtype) if dtype is not None else None, diff --git a/python/pathway/tests/test_types.py b/python/pathway/tests/test_types.py index 5beff0db..9be85a84 100644 --- a/python/pathway/tests/test_types.py +++ b/python/pathway/tests/test_types.py @@ -2,7 +2,7 @@ import pathway as pw import pathway.internals.dtype as dt -from pathway.tests.utils import T +from pathway.tests.utils import T, assert_table_equality_wo_index def test_date_time_naive_schema(): @@ -45,3 +45,144 @@ def test_date_time_utc_schema(): "t2": dt.DATE_TIME_UTC, "diff": dt.DURATION, } + + +def test_markdown_type_float(): + class TestInputSchema(pw.Schema): + float_num: float + should_be_float_num: float + + class TestOutputSchema(pw.Schema): + float_num: float + should_be_float_num: float + test1: float + test2: float + + t = pw.debug.table_from_markdown( + """ + | float_num | should_be_float_num + 1 | 2.7 | 1 + 2 | 3.1 | 2 + + """, + schema=TestInputSchema, + ) + + t = t.with_columns(test1=2 * t.float_num, test2=2 * t.should_be_float_num) + + expected = pw.debug.table_from_markdown( + """ + float_num | should_be_float_num | test1 | test2 + 2.7 | 1.0 | 5.4 | 2.0 + 3.1 | 2.0 | 6.2 | 4.0 + """, + schema=TestOutputSchema, + ) + + assert_table_equality_wo_index(t, expected) + + +def test_markdown_type_optional_float(): + class TestInputSchema(pw.Schema): + float_num: float + should_be_float_num: float | None + + class TestOutputSchema(pw.Schema): + float_num: float + should_be_float_num: float + test1: float + test2: float + + t = pw.debug.table_from_markdown( + """ + | float_num | should_be_float_num + 1 | 2.7 | 1 + 2 | 3.1 | 2 + 2 | 3.1 | None + + """, + schema=TestInputSchema, + ) + + t = t.filter(t.should_be_float_num.is_not_none()) + t = t.with_columns(test1=2 * t.float_num, test2=2 * t.should_be_float_num) + + expected = pw.debug.table_from_markdown( + """ + float_num | should_be_float_num | test1 | test2 + 2.7 | 1.0 | 5.4 | 2.0 + 3.1 | 2.0 | 6.2 | 4.0 + """, + schema=TestOutputSchema, + ) + + assert_table_equality_wo_index(t, expected) + + +def test_markdown_type_bytes(): + class TestInputSchema(pw.Schema): + text: str + text_bytes: bytes + + class TestOutputSchema(pw.Schema): + text: str + text_bytes: bytes + bytes_as_text: str + + t = pw.debug.table_from_markdown( + """ + | text | text_bytes + 1 | aa | aa + 2 | bb | bb + + """, + schema=TestInputSchema, + ) + + t = t.with_columns( + bytes_as_text=pw.apply_with_type(lambda x: x.decode("utf-8"), str, t.text_bytes) + ) + + expected = pw.debug.table_from_markdown( + """ + text | text_bytes | bytes_as_text + aa | aa | aa + bb | bb | bb + """, + schema=TestOutputSchema, + ) + + assert_table_equality_wo_index(t, expected) + + +def test_markdown_type_str(): + class InputNumbersAsString(pw.Schema): + number_as_string: str + + class OutputNumbersAsString(pw.Schema): + number_as_string: str + ext_str: str + converted_to_int: int + + t = pw.debug.table_from_markdown( + """ + | number_as_string + 1 | 2 + 2 | 3 + """, + schema=InputNumbersAsString, + ) + + t = t.with_columns(ext_str=t.number_as_string + "a") + t = t.with_columns(converted_to_int=pw.cast(int, t.number_as_string) + 1) + + expected = pw.debug.table_from_markdown( + """ + number_as_string | ext_str | converted_to_int + 2 | 2a | 3 + 3 | 3a | 4 + """, + schema=OutputNumbersAsString, + ) + + assert_table_equality_wo_index(t, expected)