Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Missing Data #778

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,7 @@ def __getitem__(
)
elif isinstance(item, list):
# Make a projection and filter keys from the dict
return self.apply(lambda value: {k: value[k] for k in item})
return self.apply(lambda value: {k: value.get(k) for k in item})
elif isinstance(item, str):
# Create a StreamingSeries based on a column name
return StreamingSeries(name=item, sdf_id=id(self))
Expand Down
66 changes: 47 additions & 19 deletions quixstreams/dataframe/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,30 +235,48 @@ def _operation(
[Any, _O],
Union[bool, Self],
],
missing_column_value: Any = None,
missing_data_value: Any = None,
) -> Self:
self._validate_other_series(other)

self_composed = self.compose_returning()
if isinstance(other, self.__class__):
other_composed = other.compose_returning()

def f(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
return operator_(
self_composed(value, key, timestamp, headers)[0],
other_composed(value, key, timestamp, headers)[0],
)

return self._from_apply_callback(func=f)
def func(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
try:
# These may raise ColumnDoesNotExist
self_result = self_composed(value, key, timestamp, headers)[0]
other_result = other_composed(value, key, timestamp, headers)[0]

# This may raise TypeError
return operator_(self_result, other_result)
except ColumnDoesNotExist:
return missing_column_value
except TypeError:
if self_result is None or other_result is None:
return missing_data_value
raise
else:

def f(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
return operator_(
self_composed(value, key, timestamp, headers)[0], other
)
def func(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
try:
# This may raise ColumnDoesNotExist
self_result = self_composed(value, key, timestamp, headers)[0]

return self._from_apply_callback(func=f)
# This may raise TypeError
return operator_(self_result, other)
except ColumnDoesNotExist:
return missing_column_value
except TypeError:
if self_result is None or other is None:
return missing_data_value
raise

def isin(self, other: Container) -> Self:
return self._from_apply_callback(func=func)

def isin(self, other: Union[Container, Self]) -> Self:
"""
Check if series value is in "other".
Same as "StreamingSeries in other".
Expand Down Expand Up @@ -287,7 +305,12 @@ def isin(self, other: Container) -> Self:
def f(a, b):
return contains(b, a)

return self._operation(other, f)
return self._operation(
other,
f,
missing_column_value=False,
missing_data_value=False,
)

def contains(self, other: Union[Self, object]) -> Self:
"""
Expand All @@ -312,7 +335,12 @@ def contains(self, other: Union[Self, object]) -> Self:
:param other: object to check
:return: new StreamingSeries
"""
return self._operation(other, operator.contains)
return self._operation(
other,
operator.contains,
missing_column_value=False,
missing_data_value=False,
)

def is_(self, other: Union[Self, object]) -> Self:
"""
Expand All @@ -335,7 +363,7 @@ def is_(self, other: Union[Self, object]) -> Self:
:param other: object to check for "is"
:return: new StreamingSeries
"""
return self._operation(other, operator.is_)
return self._operation(other, operator.is_, missing_column_value=False)

def isnot(self, other: Union[Self, object]) -> Self:
"""
Expand All @@ -359,7 +387,7 @@ def isnot(self, other: Union[Self, object]) -> Self:
:param other: object to check for "is_not"
:return: new StreamingSeries
"""
return self._operation(other, operator.is_not)
return self._operation(other, operator.is_not, missing_column_value=False)

def isnull(self) -> Self:
"""
Expand All @@ -382,7 +410,7 @@ def isnull(self) -> Self:

:return: new StreamingSeries
"""
return self._operation(None, operator.is_)
return self._operation(None, operator.is_, missing_column_value=True)

def notnull(self) -> Self:
"""
Expand All @@ -405,7 +433,7 @@ def notnull(self) -> Self:

:return: new StreamingSeries
"""
return self._operation(None, operator.is_not)
return self._operation(None, operator.is_not, missing_column_value=False)

def abs(self) -> Self:
"""
Expand Down
31 changes: 22 additions & 9 deletions quixstreams/dataframe/windows/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,13 @@ def initialize(self) -> int:
return 0

def agg(self, old: V, new: Any) -> V:
if self.column is ROOT:
new = new if self.column is ROOT else new.get(self.column)
try:
return old + new
return old + new[self.column]
except TypeError:
if new is None:
return old
raise

def result(self, value: V) -> V:
return value
Expand All @@ -96,13 +100,20 @@ def initialize(self) -> tuple[float, int]:

def agg(self, old: tuple[V, int], new: Any) -> tuple[V, int]:
old_sum, old_count = old
if self.column is ROOT:
new = new if self.column is ROOT else new.get(self.column)
try:
return old_sum + new, old_count + 1
return old_sum + new[self.column], old_count + 1
except TypeError:
if new is None:
return old
raise

def result(self, value: tuple[Union[int, float], int]) -> float:
sum_, count_ = value
return sum_ / count_
try:
return sum_ / count_
except ZeroDivisionError:
return 0.0


R = TypeVar("R", int, float)
Expand Down Expand Up @@ -135,10 +146,11 @@ def initialize(self) -> None:
return None

def agg(self, old: Optional[V], new: Any) -> V:
if self.column is not ROOT:
new = new[self.column]
new = new if self.column is ROOT else new.get(self.column)
if old is None:
return new
elif new is None:
return old
return max(old, new)

def result(self, value: V) -> V:
Expand All @@ -153,10 +165,11 @@ def initialize(self) -> None:
return None

def agg(self, old: Optional[V], new: Any) -> V:
if self.column is not ROOT:
new = new[self.column]
new = new if self.column is ROOT else new.get(self.column)
if old is None:
return new
elif new is None:
return old
return min(old, new)

def result(self, value: V) -> V:
Expand Down