diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py
index 7622b0fcd..1051f1117 100644
--- a/quixstreams/dataframe/dataframe.py
+++ b/quixstreams/dataframe/dataframe.py
@@ -1592,7 +1592,7 @@ def __getitem__(
             )
         elif isinstance(item, list):
             # Make a projection and filter keys from the dict
-            return self.apply(lambda value: {k: value[k] for k in item})
+            return self.apply(lambda value: {k: value.get(k) for k in item})
         elif isinstance(item, str):
             # Create a StreamingSeries based on a column name
             return StreamingSeries(name=item, sdf_id=id(self))
diff --git a/quixstreams/dataframe/series.py b/quixstreams/dataframe/series.py
index 68c7ea97d..201b5d0ec 100644
--- a/quixstreams/dataframe/series.py
+++ b/quixstreams/dataframe/series.py
@@ -235,6 +235,8 @@ def _operation(
             [Any, _O],
             Union[bool, Self],
         ],
+        missing_column_value: Any = None,
+        missing_data_value: Any = None,
     ) -> Self:
         self._validate_other_series(other)
 
@@ -242,23 +244,39 @@ def _operation(
         if isinstance(other, self.__class__):
             other_composed = other.compose_returning()
 
-            def f(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
-                return operator_(
-                    self_composed(value, key, timestamp, headers)[0],
-                    other_composed(value, key, timestamp, headers)[0],
-                )
-
-            return self._from_apply_callback(func=f)
+            def func(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
+                try:
+                    # These may raise ColumnDoesNotExist
+                    self_result = self_composed(value, key, timestamp, headers)[0]
+                    other_result = other_composed(value, key, timestamp, headers)[0]
+
+                    # This may raise TypeError
+                    return operator_(self_result, other_result)
+                except ColumnDoesNotExist:
+                    return missing_column_value
+                except TypeError:
+                    if self_result is None or other_result is None:
+                        return missing_data_value
+                    raise
         else:
 
-            def f(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
-                return operator_(
-                    self_composed(value, key, timestamp, headers)[0], other
-                )
+            def func(value: Any, key: Any, timestamp: int, headers: Any) -> Any:
+                try:
+                    # This may raise ColumnDoesNotExist
+                    self_result = self_composed(value, key, timestamp, headers)[0]
 
-            return self._from_apply_callback(func=f)
+                    # This may raise TypeError
+                    return operator_(self_result, other)
+                except ColumnDoesNotExist:
+                    return missing_column_value
+                except TypeError:
+                    if self_result is None or other is None:
+                        return missing_data_value
+                    raise
 
-    def isin(self, other: Container) -> Self:
+        return self._from_apply_callback(func=func)
+
+    def isin(self, other: Union[Container, Self]) -> Self:
         """
         Check if series value is in "other".
         Same as "StreamingSeries in other".
@@ -287,7 +305,12 @@ def isin(self, other: Container) -> Self:
         def f(a, b):
             return contains(b, a)
 
-        return self._operation(other, f)
+        return self._operation(
+            other,
+            f,
+            missing_column_value=False,
+            missing_data_value=False,
+        )
 
     def contains(self, other: Union[Self, object]) -> Self:
         """
@@ -312,7 +335,12 @@ def contains(self, other: Union[Self, object]) -> Self:
         :param other: object to check
         :return: new StreamingSeries
         """
-        return self._operation(other, operator.contains)
+        return self._operation(
+            other,
+            operator.contains,
+            missing_column_value=False,
+            missing_data_value=False,
+        )
 
     def is_(self, other: Union[Self, object]) -> Self:
         """
@@ -335,7 +363,7 @@ def is_(self, other: Union[Self, object]) -> Self:
         :param other: object to check for "is"
         :return: new StreamingSeries
         """
-        return self._operation(other, operator.is_)
+        return self._operation(other, operator.is_, missing_column_value=False)
 
     def isnot(self, other: Union[Self, object]) -> Self:
         """
@@ -359,7 +387,7 @@ def isnot(self, other: Union[Self, object]) -> Self:
         :param other: object to check for "is_not"
         :return: new StreamingSeries
         """
-        return self._operation(other, operator.is_not)
+        return self._operation(other, operator.is_not, missing_column_value=False)
 
     def isnull(self) -> Self:
         """
@@ -382,7 +410,7 @@ def isnull(self) -> Self:
 
         :return: new StreamingSeries
         """
-        return self._operation(None, operator.is_)
+        return self._operation(None, operator.is_, missing_column_value=True)
 
     def notnull(self) -> Self:
         """
@@ -405,7 +433,7 @@ def notnull(self) -> Self:
 
         :return: new StreamingSeries
         """
-        return self._operation(None, operator.is_not)
+        return self._operation(None, operator.is_not, missing_column_value=False)
 
     def abs(self) -> Self:
         """
diff --git a/quixstreams/dataframe/windows/aggregations.py b/quixstreams/dataframe/windows/aggregations.py
index e147724d4..50a66d65f 100644
--- a/quixstreams/dataframe/windows/aggregations.py
+++ b/quixstreams/dataframe/windows/aggregations.py
@@ -68,9 +68,13 @@ def initialize(self) -> int:
         return 0
 
     def agg(self, old: V, new: Any) -> V:
-        if self.column is ROOT:
+        new = new if self.column is ROOT else new.get(self.column)
+        try:
             return old + new
-        return old + new[self.column]
+        except TypeError:
+            if new is None:
+                return old
+            raise
 
     def result(self, value: V) -> V:
         return value
@@ -96,13 +100,20 @@ def initialize(self) -> tuple[float, int]:
 
     def agg(self, old: tuple[V, int], new: Any) -> tuple[V, int]:
         old_sum, old_count = old
-        if self.column is ROOT:
+        new = new if self.column is ROOT else new.get(self.column)
+        try:
             return old_sum + new, old_count + 1
-        return old_sum + new[self.column], old_count + 1
+        except TypeError:
+            if new is None:
+                return old
+            raise
 
     def result(self, value: tuple[Union[int, float], int]) -> float:
         sum_, count_ = value
-        return sum_ / count_
+        try:
+            return sum_ / count_
+        except ZeroDivisionError:
+            return 0.0
 
 
 R = TypeVar("R", int, float)
@@ -135,10 +146,11 @@ def initialize(self) -> None:
         return None
 
     def agg(self, old: Optional[V], new: Any) -> V:
-        if self.column is not ROOT:
-            new = new[self.column]
+        new = new if self.column is ROOT else new.get(self.column)
         if old is None:
             return new
+        elif new is None:
+            return old
         return max(old, new)
 
     def result(self, value: V) -> V:
@@ -153,10 +165,11 @@ def initialize(self) -> None:
         return None
 
     def agg(self, old: Optional[V], new: Any) -> V:
-        if self.column is not ROOT:
-            new = new[self.column]
+        new = new if self.column is ROOT else new.get(self.column)
         if old is None:
             return new
+        elif new is None:
+            return old
         return min(old, new)
 
     def result(self, value: V) -> V: