diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index 7622b0fcd..1051f1117 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -1592,7 +1592,7 @@ def __getitem__( ) elif isinstance(item, list): # Make a projection and filter keys from the dict - return self.apply(lambda value: {k: value[k] for k in item}) + return self.apply(lambda value: {k: value.get(k) for k in item}) elif isinstance(item, str): # Create a StreamingSeries based on a column name return StreamingSeries(name=item, sdf_id=id(self)) diff --git a/quixstreams/dataframe/series.py b/quixstreams/dataframe/series.py index 68c7ea97d..201b5d0ec 100644 --- a/quixstreams/dataframe/series.py +++ b/quixstreams/dataframe/series.py @@ -235,6 +235,8 @@ def _operation( [Any, _O], Union[bool, Self], ], + missing_column_value: Any = None, + missing_data_value: Any = None, ) -> Self: self._validate_other_series(other) @@ -242,23 +244,39 @@ def _operation( if isinstance(other, self.__class__): other_composed = other.compose_returning() - def f(value: Any, key: Any, timestamp: int, headers: Any) -> Any: - return operator_( - self_composed(value, key, timestamp, headers)[0], - other_composed(value, key, timestamp, headers)[0], - ) - - return self._from_apply_callback(func=f) + def func(value: Any, key: Any, timestamp: int, headers: Any) -> Any: + try: + # These may raise ColumnDoesNotExist + self_result = self_composed(value, key, timestamp, headers)[0] + other_result = other_composed(value, key, timestamp, headers)[0] + + # This may raise TypeError + return operator_(self_result, other_result) + except ColumnDoesNotExist: + return missing_column_value + except TypeError: + if self_result is None or other_result is None: + return missing_data_value + raise else: - def f(value: Any, key: Any, timestamp: int, headers: Any) -> Any: - return operator_( - self_composed(value, key, timestamp, headers)[0], other - ) + def func(value: Any, key: Any, timestamp: int, headers: Any) -> Any: + try: + # This may raise ColumnDoesNotExist + self_result = self_composed(value, key, timestamp, headers)[0] - return self._from_apply_callback(func=f) + # This may raise TypeError + return operator_(self_result, other) + except ColumnDoesNotExist: + return missing_column_value + except TypeError: + if self_result is None or other is None: + return missing_data_value + raise - def isin(self, other: Container) -> Self: + return self._from_apply_callback(func=func) + + def isin(self, other: Union[Container, Self]) -> Self: """ Check if series value is in "other". Same as "StreamingSeries in other". @@ -287,7 +305,12 @@ def isin(self, other: Container) -> Self: def f(a, b): return contains(b, a) - return self._operation(other, f) + return self._operation( + other, + f, + missing_column_value=False, + missing_data_value=False, + ) def contains(self, other: Union[Self, object]) -> Self: """ @@ -312,7 +335,12 @@ def contains(self, other: Union[Self, object]) -> Self: :param other: object to check :return: new StreamingSeries """ - return self._operation(other, operator.contains) + return self._operation( + other, + operator.contains, + missing_column_value=False, + missing_data_value=False, + ) def is_(self, other: Union[Self, object]) -> Self: """ @@ -335,7 +363,7 @@ def is_(self, other: Union[Self, object]) -> Self: :param other: object to check for "is" :return: new StreamingSeries """ - return self._operation(other, operator.is_) + return self._operation(other, operator.is_, missing_column_value=False) def isnot(self, other: Union[Self, object]) -> Self: """ @@ -359,7 +387,7 @@ def isnot(self, other: Union[Self, object]) -> Self: :param other: object to check for "is_not" :return: new StreamingSeries """ - return self._operation(other, operator.is_not) + return self._operation(other, operator.is_not, missing_column_value=False) def isnull(self) -> Self: """ @@ -382,7 +410,7 @@ def isnull(self) -> Self: :return: new StreamingSeries """ - return self._operation(None, operator.is_) + return self._operation(None, operator.is_, missing_column_value=True) def notnull(self) -> Self: """ @@ -405,7 +433,7 @@ def notnull(self) -> Self: :return: new StreamingSeries """ - return self._operation(None, operator.is_not) + return self._operation(None, operator.is_not, missing_column_value=False) def abs(self) -> Self: """ diff --git a/quixstreams/dataframe/windows/aggregations.py b/quixstreams/dataframe/windows/aggregations.py index e147724d4..50a66d65f 100644 --- a/quixstreams/dataframe/windows/aggregations.py +++ b/quixstreams/dataframe/windows/aggregations.py @@ -68,9 +68,13 @@ def initialize(self) -> int: return 0 def agg(self, old: V, new: Any) -> V: - if self.column is ROOT: + new = new if self.column is ROOT else new.get(self.column) + try: return old + new - return old + new[self.column] + except TypeError: + if new is None: + return old + raise def result(self, value: V) -> V: return value @@ -96,13 +100,20 @@ def initialize(self) -> tuple[float, int]: def agg(self, old: tuple[V, int], new: Any) -> tuple[V, int]: old_sum, old_count = old - if self.column is ROOT: + new = new if self.column is ROOT else new.get(self.column) + try: return old_sum + new, old_count + 1 - return old_sum + new[self.column], old_count + 1 + except TypeError: + if new is None: + return old + raise def result(self, value: tuple[Union[int, float], int]) -> float: sum_, count_ = value - return sum_ / count_ + try: + return sum_ / count_ + except ZeroDivisionError: + return 0.0 R = TypeVar("R", int, float) @@ -135,10 +146,11 @@ def initialize(self) -> None: return None def agg(self, old: Optional[V], new: Any) -> V: - if self.column is not ROOT: - new = new[self.column] + new = new if self.column is ROOT else new.get(self.column) if old is None: return new + elif new is None: + return old return max(old, new) def result(self, value: V) -> V: @@ -153,10 +165,11 @@ def initialize(self) -> None: return None def agg(self, old: Optional[V], new: Any) -> V: - if self.column is not ROOT: - new = new[self.column] + new = new if self.column is ROOT else new.get(self.column) if old is None: return new + elif new is None: + return old return min(old, new) def result(self, value: V) -> V: