Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polars Lazyframe Support #775

Merged
merged 30 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
51b875a
temp updates
Mar 21, 2024
ec37b32
initial prototype migration to shared polars for lazyframe support
Mar 22, 2024
ef90eb3
clean up log lines
Mar 22, 2024
604d136
fix function refactor
Mar 22, 2024
ae70b41
Implement materializer so that it returns a dataframe after processing
Mar 22, 2024
760fe21
fix linting
Mar 22, 2024
ec8a11d
fix linting
Mar 22, 2024
bf34df2
fix linting
Mar 22, 2024
1eb3929
update linting
Mar 22, 2024
3eaa10a
update linting
Mar 22, 2024
a32c04a
update linting
Mar 22, 2024
5cb77d4
fix linting
Mar 22, 2024
3ff84b0
fix linting
Mar 22, 2024
cfbc088
update for example
Mar 25, 2024
7a9eb83
update PR prototype code
Mar 26, 2024
7513060
update tests
Mar 27, 2024
25662b2
update tests
Mar 27, 2024
0abb830
update tests
Mar 27, 2024
bed412a
finish tests for other parsers
Mar 27, 2024
f302b14
Merge branch 'main' of github.com:buggtb/hamilton
Mar 27, 2024
de9cd80
Merge branch 'DAGWorks-Inc:main' into main
buggtb Mar 27, 2024
5d48c6e
Add lazyframe implementation
buggtb Mar 27, 2024
3ccdde2
Extended applicable types for Polars writers
buggtb Mar 27, 2024
d65bace
Updated PolarsLazyFrameResult and data writers
buggtb Mar 27, 2024
8aa9695
Extended support for LazyFrame in Polars extensions
buggtb Mar 27, 2024
fd1c011
fix test
buggtb Mar 27, 2024
2e7ff6c
Added Polars LazyFrame example
buggtb Mar 28, 2024
557ec54
Updated data loading method in tests
buggtb Mar 28, 2024
4a8c27b
Merge branch 'DAGWorks-Inc:main' into main
buggtb Mar 28, 2024
2c32470
update to force new build cause why not
buggtb Mar 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions examples/polars/lazyframe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Classic Hamilton Hello World

In this example we show you how to create a simple hello world dataflow that
creates a polars lazyframe as a result. It performs a series of transforms on the
input to create columns that appear in the output.

File organization:

* `my_functions.py` houses the logic that we want to compute.
Note (1) how the functions are named, and what input
parameters they require. That is how we create a DAG modeling the dataflow we want to happen.
* `my_script.py` houses how to get Hamilton to create the DAG, specifying that we want a polars dataframe and
exercise it with some inputs.

To run things:
```bash
> python my_script.py
```

# Visualizing Execution
Here is the graph of execution - which should look the same as the pandas example:

![polars](polars.png)

# Caveat with Polars
There is one major caveat with Polars to be aware of: THERE IS NO INDEX IN POLARS LIKE THERE IS WITH PANDAS.

What this means is that when you tell Hamilton to execute and return a polars dataframe if you are using the
[provided results builder](https://github.com/dagworks-inc/hamilton/blob/sf-hamilton-1.14.1/hamilton/plugins/h_polars.py#L8), i.e. `hamilton.plugins.h_polars.PolarsResultsBuilder`, then you will have to
ensure the row order matches the order you expect for all the outputs you request. E.g. if you do a filter, or a sort,
or a join, or a groupby, you will have to ensure that when you ask Hamilton to materialize an output that it's in the
order you expect.

If you have questions, or need help with this example,
join us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg), and we'll try to help!

Otherwise if you have ideas on how to better make Hamilton work with Polars, please open an issue or start a discussion!
15 changes: 15 additions & 0 deletions examples/polars/lazyframe/my_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import polars as pl

from hamilton.function_modifiers import load_from, value


@load_from.csv(file=value("./sample_data.csv"))
def raw_data(data: pl.LazyFrame) -> pl.LazyFrame:
return data


def spend_per_signup(raw_data: pl.LazyFrame) -> pl.LazyFrame:
"""Computes cost per signup in relation to spend."""
return raw_data.select("spend", "signups").with_columns(
[(pl.col("spend") / pl.col("signups")).alias("spend_per_signup")]
)
33 changes: 33 additions & 0 deletions examples/polars/lazyframe/my_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging
import sys

from hamilton import base, driver
from hamilton.plugins import h_polars_lazyframe

logging.basicConfig(stream=sys.stdout)

# Create a driver instance. If you want to run the compute in the final node you can also use
# h_polars.PolarsDataFrameResult() and you don't need to run collect at the end. Which you use
# probably depends on whether you want to use the LazyFrame in more nodes in another DAG before
# computing the result.
adapter = base.SimplePythonGraphAdapter(result_builder=h_polars_lazyframe.PolarsLazyFrameResult())
import my_functions # where our functions are defined

dr = driver.Driver({}, my_functions, adapter=adapter)
output_columns = [
"spend_per_signup",
]
# let's create the lazyframe!
df = dr.execute(output_columns)
# Here we just print the Lazyframe plan
print(df)

# Now we run the query
df = df.collect()

# And print the table.
print(df)

# To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work
# dr.visualize_execution(output_columns, './polars', {"format": "png"})
# dr.display_all_functions('./my_full_dag.dot')
2 changes: 2 additions & 0 deletions examples/polars/lazyframe/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
polars
sf-hamilton
7 changes: 7 additions & 0 deletions examples/polars/lazyframe/sample_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
signups,spend
1,10
10,10
50,20
100,40
200,40
400,50
1 change: 1 addition & 0 deletions hamilton/function_modifiers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"pandas",
"plotly",
"polars",
"polars_lazyframe",
"pyspark_pandas",
"spark",
"dask",
Expand Down
29 changes: 21 additions & 8 deletions hamilton/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,27 @@ def get_dataframe_metadata(df: pd.DataFrame) -> Dict[str, Any]:
- the column names
- the data types
"""
return {
DATAFRAME_METADATA: {
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns),
"datatypes": [str(t) for t in list(df.dtypes)], # for serialization purposes
}
}
metadata = {}
try:
metadata["rows"] = len(df)
except TypeError:
metadata["rows"] = None

try:
metadata["columns"] = len(df.columns)
except (AttributeError, TypeError):
metadata["columns"] = None

try:
metadata["column_names"] = list(df.columns)
except (AttributeError, TypeError):
metadata["column_names"] = None

try:
metadata["datatypes"] = [str(t) for t in list(df.dtypes)]
except (AttributeError, TypeError):
metadata["datatypes"] = None
return {DATAFRAME_METADATA: metadata}


def get_file_and_dataframe_metadata(path: str, df: pd.DataFrame) -> Dict[str, Any]:
Expand Down
2 changes: 2 additions & 0 deletions hamilton/plugins/h_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def build_result(
(value,) = outputs.values() # this works because it's length 1.
if isinstance(value, pl.DataFrame): # it's a dataframe
return value
if isinstance(value, pl.LazyFrame): # it's a lazyframe
return value.collect()
elif not isinstance(value, pl.Series): # it's a single scalar/object
key, value = outputs.popitem()
return pl.DataFrame({key: [value]})
Expand Down
46 changes: 46 additions & 0 deletions hamilton/plugins/h_polars_lazyframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Dict, Type, Union

import polars as pl

from hamilton import base


class PolarsLazyFrameResult(base.ResultMixin):
"""A ResultBuilder that produces a polars dataframe.

Use this when you want to create a polars dataframe from the outputs. Caveat: you need to ensure that the length
of the outputs is the same, otherwise you will get an error; mixed outputs aren't that well handled.

To use:

.. code-block:: python

from hamilton import base, driver
from hamilton.plugins import polars_extensions
polars_builder = polars_extensions.PolarsLazyFrameResult()
adapter = base.SimplePythonGraphAdapter(polars_builder)
dr = driver.Driver(config, *modules, adapter=adapter)
df = dr.execute([...], inputs=...) # returns polars dataframe

Note: this is just a first attempt at something for Polars. Think it should handle more? Come chat/open a PR!
"""

def build_result(
self, **outputs: Dict[str, Union[pl.Series, pl.LazyFrame, Any]]
) -> pl.LazyFrame:
"""This is the method that Hamilton will call to build the final result. It will pass in the results
of the requested outputs that you passed in to the execute() method.

Note: this function could do smarter things; looking for contributions here!

:param outputs: The results of the requested outputs.
:return: a polars DataFrame.
"""
if len(outputs) == 1:
(value,) = outputs.values() # this works because it's length 1.
if isinstance(value, pl.LazyFrame): # it's a lazyframe
return value
return pl.LazyFrame(outputs)

def output_type(self) -> Type:
return pl.LazyFrame
56 changes: 35 additions & 21 deletions hamilton/plugins/polars_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ def _get_loading_kwargs(self):
kwargs["row_count_name"] = self.row_count_name
if self.row_count_offset is not None:
kwargs["row_count_offset"] = self.row_count_offset
if self.sample_size is not None:
kwargs["sample_size"] = self.sample_size
if self.eol_char is not None:
kwargs["eol_char"] = self.eol_char
if self.raise_if_empty is not None:
Expand All @@ -176,6 +174,7 @@ def _get_loading_kwargs(self):

def load_data(self, type_: Type) -> Tuple[DATAFRAME_TYPE, Dict[str, Any]]:
df = pl.read_csv(self.file, **self._get_loading_kwargs())

metadata = utils.get_file_and_dataframe_metadata(self.file, df)
return df, metadata

Expand Down Expand Up @@ -206,7 +205,7 @@ class PolarsCSVWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand Down Expand Up @@ -236,15 +235,12 @@ def _get_saving_kwargs(self):
kwargs["quote_style"] = self.quote_style
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()
data.write_csv(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

def load_data(self, type_: Type) -> Tuple[DATAFRAME_TYPE, Dict[str, Any]]:
df = pl.read_csv(self.file, **self._get_loading_kwargs())
metadata = utils.get_file_and_dataframe_metadata(self.file, df)
return df, metadata

@classmethod
def name(cls) -> str:
return "csv"
Expand Down Expand Up @@ -330,7 +326,7 @@ class PolarsParquetWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand All @@ -348,8 +344,12 @@ def _get_saving_kwargs(self):
kwargs["pyarrow_options"] = self.pyarrow_options
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_parquet(self.file, **self._get_saving_kwargs())

return utils.get_file_and_dataframe_metadata(self.file, data)

@classmethod
Expand Down Expand Up @@ -422,15 +422,17 @@ class PolarsFeatherWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
if self.compression is not None:
kwargs["compression"] = self.compression
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()
data.write_ipc(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

Expand Down Expand Up @@ -484,15 +486,18 @@ class PolarsAvroWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
if self.compression is not None:
kwargs["compression"] = self.compression
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_avro(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

Expand Down Expand Up @@ -547,7 +552,7 @@ class PolarsJSONWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand All @@ -557,7 +562,10 @@ def _get_saving_kwargs(self):
kwargs["row_oriented"] = self.row_oriented
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_json(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

Expand Down Expand Up @@ -665,7 +673,7 @@ class PolarsSpreadsheetWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand Down Expand Up @@ -713,7 +721,10 @@ def _get_saving_kwargs(self):
kwargs["freeze_panes"] = self.freeze_panes
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_excel(self.workbook, self.worksheet, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.workbook, data)

Expand Down Expand Up @@ -782,7 +793,7 @@ class PolarsDatabaseWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand All @@ -792,7 +803,10 @@ def _get_saving_kwargs(self):
kwargs["engine"] = self.engine
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_database(
table_name=self.table_name,
connection=self.connection,
Expand Down
Loading