Skip to content

Commit

Permalink
use schema in table from markdown and pandas (#5485)
Browse files Browse the repository at this point in the history
bugfix, now we use schema to properly type columns from markdown and pandas

GitOrigin-RevId: 4653d4ae6a88edff9298ca004bbc2cd5af892971
  • Loading branch information
krzysiek-pathway authored and Manul from Pathway committed Jan 26, 2024
1 parent 3dc1077 commit b41c46e
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Wrappers for OpenAI Chat and Embedding services are now added to Pathway xpack for LLMs.
- A vector indexing pipeline that allows querying for the most similar documents. It is available as class `VectorStore` as part of Pathway xpack for LLMs.

### Fixed

- `pw.debug.table_from_markdown` now uses schema parameter (when set) to properly assign _simple types_ (`int, bool, float, str, bytes`) and optional _simple types_ to columns.

## [0.7.9] - 2024-01-18

### Changed
Expand Down
66 changes: 63 additions & 3 deletions python/pathway/internals/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from pathway.engine import *
from pathway.internals import dtype as dt, json
from pathway.internals.schema import Schema

if TYPE_CHECKING:
_Value: TypeAlias = "Value"
Expand Down Expand Up @@ -45,15 +46,61 @@ def __call__(
...


def denumpify(x):
def denumpify(x, type_from_schema: dt.DType | None = None):
def denumpify_inner(x):
if pd.api.types.is_scalar(x) and pd.isna(x):
return None
if isinstance(x, np.generic):
return x.item()
return x

def _is_instance_of_simple_type(x):
return (
dt.INT.is_value_compatible(x)
or dt.BOOL.is_value_compatible(x)
or dt.STR.is_value_compatible(x)
or dt.BYTES.is_value_compatible(x)
or dt.FLOAT.is_value_compatible(x)
)

def fix_possibly_misassigned_type(entry, type_from_schema):
assert (
(type_from_schema.is_value_compatible(entry))
# the only exception for str should be conversion to bytes; however,
# some places use schema_from_pandas, which considers some complex types
# as str, which means we enter here, as it looks like simple type STR even
# though it's not, below the exception that should be here
# or (isinstance(v, str) and type_from_schema.wrapped == bytes)
or type_from_schema.wrapped == str
)

if type_from_schema == dt.STR and _is_instance_of_simple_type(entry):
return str(entry)

if type_from_schema == dt.FLOAT:
return float(entry)

if isinstance(entry, str) and type_from_schema == dt.BYTES:
return entry.encode("utf-8")

return entry

v = denumpify_inner(x)

if isinstance(type_from_schema, dt._SimpleDType):
v = fix_possibly_misassigned_type(v, type_from_schema)
elif (
isinstance(type_from_schema, dt.Optional)
and isinstance(type_from_schema.wrapped, dt._SimpleDType)
and not dt.NONE.is_value_compatible(v)
):
# pandas stores optional ints as floats
if isinstance(v, float) and type_from_schema.wrapped == dt.INT:
assert v.is_integer()
v = fix_possibly_misassigned_type(int(v), type_from_schema.wrapped)
else:
v = fix_possibly_misassigned_type(v, type_from_schema.wrapped)

if isinstance(v, str):
return v.encode("utf-8", "ignore").decode("utf-8")
else:
Expand Down Expand Up @@ -85,12 +132,25 @@ def static_table_from_pandas(
df: pd.DataFrame,
connector_properties: ConnectorProperties | None = None,
id_from: list[str] | None = None,
schema: type[Schema] | None = None,
) -> Table:
if schema is not None and id_from is not None:
assert schema.primary_key_columns() == id_from

if id_from is None and schema is not None:
id_from = schema.primary_key_columns()

ids = ids_from_pandas(df, connector_properties, id_from)
column_types: dict[str, dt.DType] | None = None
if schema is not None:
column_types = dict(schema.__dtypes__)
for column in PANDAS_PSEUDOCOLUMNS:
column_types[column] = dt.INT

data = {}
for c in df.columns:
data[c] = [denumpify(v) for _, v in df[c].items()]
type_from_schema = None if column_types is None else column_types[c]
data[c] = [denumpify(v, type_from_schema) for _, v in df[c].items()]
# df[c].items() is used because df[c].values is a numpy array
ordinary_columns = [
column for column in df.columns if column not in PANDAS_PSEUDOCOLUMNS
Expand All @@ -111,7 +171,7 @@ def static_table_from_pandas(

assert len(connector_properties.column_properties) == len(
ordinary_columns
), "prrovided connector properties do not match the dataframe"
), "provided connector properties do not match the dataframe"

input_data: CapturedStream = []
for i, index in enumerate(df.index):
Expand Down
7 changes: 6 additions & 1 deletion python/pathway/internals/dtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import numpy as np
import numpy.typing as npt
import pandas as pd

from pathway.engine import PathwayType
from pathway.internals import api, datetime_types, json as js
Expand Down Expand Up @@ -88,6 +89,10 @@ def is_value_compatible(self, arg):
)
elif self.wrapped == int:
return np.issubdtype(type(arg), np.integer)
elif self.wrapped == bool:
return isinstance(arg, (bool, np.bool_))
elif self.wrapped == bytes:
return isinstance(arg, (bytes, str))
else:
return isinstance(arg, self.wrapped)

Expand Down Expand Up @@ -123,7 +128,7 @@ def __new__(cls) -> _NoneDType:
return super().__new__(cls)

def is_value_compatible(self, arg):
return arg is None
return arg is None or isinstance(arg, pd._libs.missing.NAType)

@property
def typehint(self) -> None:
Expand Down
5 changes: 2 additions & 3 deletions python/pathway/internals/graph_runner/operator_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def _run(
output_storages: dict[Table, Storage],
):
datasource = operator.datasource

if self.graph_builder.debug and operator.debug_datasource is not None:
if (
datasource.schema._dtypes()
Expand All @@ -113,7 +112,7 @@ def _run(
scope=self.scope,
df=operator.debug_datasource.data,
connector_properties=operator.debug_datasource.connector_properties,
id_from=operator.debug_datasource.schema.primary_key_columns(),
schema=operator.debug_datasource.schema,
)
self.state.set_table(output_storages[table], materialized_table)
elif isinstance(datasource, PandasDataSource):
Expand All @@ -123,7 +122,7 @@ def _run(
scope=self.scope,
df=datasource.data,
connector_properties=datasource.connector_properties,
id_from=datasource.schema.primary_key_columns(),
schema=datasource.schema,
)
self.state.set_table(output_storages[table], materialized_table)
elif isinstance(datasource, GenericDataSource):
Expand Down
1 change: 0 additions & 1 deletion python/pathway/internals/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,6 @@ def column_definition(
>>> NewSchema
<pathway.Schema types={'key': <class 'int'>, '@timestamp': <class 'str'>, 'data': <class 'str'>}>
"""
from pathway.internals import dtype as dt

return ColumnDefinition(
dtype=dt.wrap(dtype) if dtype is not None else None,
Expand Down
143 changes: 142 additions & 1 deletion python/pathway/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pathway as pw
import pathway.internals.dtype as dt
from pathway.tests.utils import T
from pathway.tests.utils import T, assert_table_equality_wo_index


def test_date_time_naive_schema():
Expand Down Expand Up @@ -45,3 +45,144 @@ def test_date_time_utc_schema():
"t2": dt.DATE_TIME_UTC,
"diff": dt.DURATION,
}


def test_markdown_type_float():
class TestInputSchema(pw.Schema):
float_num: float
should_be_float_num: float

class TestOutputSchema(pw.Schema):
float_num: float
should_be_float_num: float
test1: float
test2: float

t = pw.debug.table_from_markdown(
"""
| float_num | should_be_float_num
1 | 2.7 | 1
2 | 3.1 | 2
""",
schema=TestInputSchema,
)

t = t.with_columns(test1=2 * t.float_num, test2=2 * t.should_be_float_num)

expected = pw.debug.table_from_markdown(
"""
float_num | should_be_float_num | test1 | test2
2.7 | 1.0 | 5.4 | 2.0
3.1 | 2.0 | 6.2 | 4.0
""",
schema=TestOutputSchema,
)

assert_table_equality_wo_index(t, expected)


def test_markdown_type_optional_float():
class TestInputSchema(pw.Schema):
float_num: float
should_be_float_num: float | None

class TestOutputSchema(pw.Schema):
float_num: float
should_be_float_num: float
test1: float
test2: float

t = pw.debug.table_from_markdown(
"""
| float_num | should_be_float_num
1 | 2.7 | 1
2 | 3.1 | 2
2 | 3.1 | None
""",
schema=TestInputSchema,
)

t = t.filter(t.should_be_float_num.is_not_none())
t = t.with_columns(test1=2 * t.float_num, test2=2 * t.should_be_float_num)

expected = pw.debug.table_from_markdown(
"""
float_num | should_be_float_num | test1 | test2
2.7 | 1.0 | 5.4 | 2.0
3.1 | 2.0 | 6.2 | 4.0
""",
schema=TestOutputSchema,
)

assert_table_equality_wo_index(t, expected)


def test_markdown_type_bytes():
class TestInputSchema(pw.Schema):
text: str
text_bytes: bytes

class TestOutputSchema(pw.Schema):
text: str
text_bytes: bytes
bytes_as_text: str

t = pw.debug.table_from_markdown(
"""
| text | text_bytes
1 | aa | aa
2 | bb | bb
""",
schema=TestInputSchema,
)

t = t.with_columns(
bytes_as_text=pw.apply_with_type(lambda x: x.decode("utf-8"), str, t.text_bytes)
)

expected = pw.debug.table_from_markdown(
"""
text | text_bytes | bytes_as_text
aa | aa | aa
bb | bb | bb
""",
schema=TestOutputSchema,
)

assert_table_equality_wo_index(t, expected)


def test_markdown_type_str():
class InputNumbersAsString(pw.Schema):
number_as_string: str

class OutputNumbersAsString(pw.Schema):
number_as_string: str
ext_str: str
converted_to_int: int

t = pw.debug.table_from_markdown(
"""
| number_as_string
1 | 2
2 | 3
""",
schema=InputNumbersAsString,
)

t = t.with_columns(ext_str=t.number_as_string + "a")
t = t.with_columns(converted_to_int=pw.cast(int, t.number_as_string) + 1)

expected = pw.debug.table_from_markdown(
"""
number_as_string | ext_str | converted_to_int
2 | 2a | 3
3 | 3a | 4
""",
schema=OutputNumbersAsString,
)

assert_table_equality_wo_index(t, expected)

0 comments on commit b41c46e

Please sign in to comment.