Skip to content

Commit

Permalink
Add imageSpec to structured_dataset.py (#1708)
Browse files Browse the repository at this point in the history
* Add imageSpec to structured_dataset.py

Signed-off-by: Kevin Su <[email protected]>

* lint

Signed-off-by: Kevin Su <[email protected]>

* udpated other examples

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Jul 17, 2024
1 parent 9597270 commit 783a180
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
1 change: 1 addition & 0 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ jobs:
- name: Register specific tests
run: |
source .venv/bin/activate
export FLYTE_PUSH_IMAGE_SPEC=${{ github.event_name != 'pull_request' }}
while read -r line;
do
pyflyte --config ./boilerplate/flyte/end2end/functional-test-config.yaml \
Expand Down
2 changes: 1 addition & 1 deletion examples/data_types_and_io/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ ENV VENV /opt/venv
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"

RUN pip install flytekit==1.12.0 pandas
RUN pip install flytekit pandas
RUN pip install torch --index-url https://download.pytorch.org/whl/cpu

# Copy the actual code
Expand Down
15 changes: 10 additions & 5 deletions examples/data_types_and_io/data_types_and_io/dataclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass

import pandas as pd
from flytekit import task, workflow
from flytekit import ImageSpec, task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset
Expand All @@ -14,6 +14,11 @@
# If you're using Flytekit version >= v1.11.1, you don't need to decorate with `@dataclass_json` or
# inherit from Mashumaro's `DataClassJSONMixin`.

image_spec = ImageSpec(
registry="ghcr.io/flyteorg",
packages=["pandas", "pyarrow"],
)


# Python types
# Define a `dataclass` with `int`, `str` and `dict` as the data types
Expand All @@ -25,15 +30,15 @@ class Datum(DataClassJSONMixin):


# Once declared, a dataclass can be returned as an output or accepted as an input
@task
@task(container_image=image_spec)
def stringify(s: int) -> Datum:
"""
A dataclass return will be treated as a single complex JSON return.
"""
return Datum(x=s, y=str(s), z={s: str(s)})


@task
@task(container_image=image_spec)
def add(x: Datum, y: Datum) -> Datum:
"""
Flytekit automatically converts the provided JSON into a data class.
Expand All @@ -51,7 +56,7 @@ class FlyteTypes(DataClassJSONMixin):
directory: FlyteDirectory


@task
@task(container_image=image_spec)
def upload_data() -> FlyteTypes:
"""
Flytekit will upload FlyteFile, FlyteDirectory and StructuredDataset to the blob store,
Expand All @@ -76,7 +81,7 @@ def upload_data() -> FlyteTypes:
return fs


@task
@task(container_image=image_spec)
def download_data(res: FlyteTypes):
assert pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}).equals(res.dataframe.open(pd.DataFrame).all())
f = open(res.file, "r")
Expand Down
17 changes: 11 additions & 6 deletions examples/data_types_and_io/data_types_and_io/structured_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
)
from typing_extensions import Annotated

image_spec = ImageSpec(
registry="ghcr.io/flyteorg",
packages=["pandas", "pyarrow", "numpy"],
)


# Define a task that returns a Pandas DataFrame.
# Flytekit will detect the Pandas dataframe return signature and
# convert the interface for the task to the StructuredDatased type
@task
@task(container_image=image_spec)
def generate_pandas_df(a: int) -> pd.DataFrame:
return pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [a, 22], "Height": [160, 178]})

Expand All @@ -39,7 +44,7 @@ def generate_pandas_df(a: int) -> pd.DataFrame:
# that's supported or added to structured dataset.
# For instance, you can use ``pa.Table`` to convert
# the Pandas DataFrame to a PyArrow table.
@task
@task(container_image=image_spec)
def get_subset_pandas_df(df: Annotated[StructuredDataset, all_cols]) -> Annotated[StructuredDataset, col]:
df = df.open(pd.DataFrame).all()
df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
Expand All @@ -61,7 +66,7 @@ def simple_sd_wf(a: int = 19) -> Annotated[StructuredDataset, col]:
register_csv_handlers()


@task
@task(container_image=image_spec)
def pandas_to_csv(df: pd.DataFrame) -> Annotated[StructuredDataset, CSV]:
return StructuredDataset(dataframe=df)

Expand Down Expand Up @@ -132,12 +137,12 @@ def to_html(self, df: np.ndarray) -> str:

# You can now use `numpy.ndarray` to deserialize the parquet file to NumPy
# and serialize a task's output (NumPy array) to a parquet file.
@task
@task(container_image=image_spec)
def generate_pd_df_with_str() -> pd.DataFrame:
return pd.DataFrame({"Name": ["Tom", "Joseph"]})


@task
@task(container_image=image_spec)
def to_numpy(sd: StructuredDataset) -> Annotated[StructuredDataset, None, PARQUET]:
numpy_array = sd.open(np.ndarray).all()
return StructuredDataset(dataframe=numpy_array)
Expand Down Expand Up @@ -197,7 +202,7 @@ class CompanyField:
MySecondDataClassDataset = Annotated[StructuredDataset, kwtypes(info=InfoField)]
MyNestedDataClassDataset = Annotated[StructuredDataset, kwtypes(info=kwtypes(contacts=ContactsField))]

image = ImageSpec(packages=["pandas", "tabulate"], registry="ghcr.io/flyteorg")
image = ImageSpec(packages=["pandas", "pyarrow", "pandas", "tabulate"], registry="ghcr.io/flyteorg")


@task(container_image=image)
Expand Down

0 comments on commit 783a180

Please sign in to comment.