Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Use models literal StructuredDataset to enable sd bypass task #2954

Merged
merged 7 commits into from
Dec 2, 2024
33 changes: 23 additions & 10 deletions flytekit/types/structured/structured_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,25 @@ def return_sd() -> StructuredDataset:
return df

For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5954.

2. Need access to self._literal_sd when converting task output LiteralMap back to flyteidl, please see:
https://github.com/flyteorg/flytekit/blob/f938661ff8413219d1bea77f6914a58c302d5c6c/flytekit/bin/entrypoint.py#L326

For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5956.
"""
to_literal = loop_manager.synced(flyte_dataset_transformer.async_to_literal)
self._literal_sd = to_literal(ctx, self, StructuredDataset, expected).scalar.structured_dataset
if self.metadata is None:
self._metadata = self._literal_sd.metadata

def set_literal(self, ctx: FlyteContext, expected: LiteralType) -> None:
"""
A public wrapper method to set the StructuredDataset Literal.

This method provides external access to the internal _set_literal method.
"""
return self._set_literal(ctx, expected)

def iter(self) -> Generator[DF, None, None]:
if self._dataframe_type is None:
raise ValueError("No dataframe type set. Use open() to set the local dataframe type you want to use.")
Expand Down Expand Up @@ -795,18 +808,18 @@ def dict_to_structured_dataset(
if uri is None:
raise ValueError("StructuredDataset's uri and file format should not be None")

# Instead of using python native StructuredDataset, we need to build a literals.StructuredDataset
# The reason is that _literal_sd of python sd is accessed when task output LiteralMap is converted back to flyteidl
# Hence, _literal_sd must have to_flyte_idl method
# See https://github.com/flyteorg/flytekit/blob/f938661ff8413219d1bea77f6914a58c302d5c6c/flytekit/bin/entrypoint.py#L326
# For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5956.
sdt = StructuredDatasetType(format=file_format)
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt)
sd_literal = literals.StructuredDataset(uri=uri, metadata=metad)

return StructuredDatasetTransformerEngine().to_python_value(
FlyteContextManager.current_context(),
Literal(
scalar=Scalar(
structured_dataset=StructuredDataset(
metadata=StructuredDatasetMetadata(
structured_dataset_type=StructuredDatasetType(format=file_format)
),
uri=uri,
)
)
),
Literal(scalar=Scalar(structured_dataset=sd_literal)),
expected_python_type,
)

Expand Down
Loading