Skip to content

Commit

Permalink
[BUG] Use models literal StructuredDataset to enable sd bypass task (#…
Browse files Browse the repository at this point in the history
…2954)

* fix: Use models literal sd to enable sd bypass task

Signed-off-by: JiaWei Jiang <[email protected]>

* Reuse sd _set_literal with a public wrapper method

Signed-off-by: JiaWei Jiang <[email protected]>

* Use github permanent link

Signed-off-by: JiaWei Jiang <[email protected]>

* Simplify sd_literal building logic

Signed-off-by: JiaWei Jiang <[email protected]>

* Drop unnecessary column handling

Signed-off-by: JiaWei Jiang <[email protected]>

---------

Signed-off-by: JiaWei Jiang <[email protected]>
Co-authored-by: Future-Outlier <[email protected]>
  • Loading branch information
JiangJiaWei1103 and Future-Outlier authored Dec 2, 2024
1 parent 92d87a8 commit 0b0e338
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions flytekit/types/structured/structured_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,25 @@ def return_sd() -> StructuredDataset:
return df
For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5954.
2. Need access to self._literal_sd when converting task output LiteralMap back to flyteidl, please see:
https://github.com/flyteorg/flytekit/blob/f938661ff8413219d1bea77f6914a58c302d5c6c/flytekit/bin/entrypoint.py#L326
For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5956.
"""
to_literal = loop_manager.synced(flyte_dataset_transformer.async_to_literal)
self._literal_sd = to_literal(ctx, self, StructuredDataset, expected).scalar.structured_dataset
if self.metadata is None:
self._metadata = self._literal_sd.metadata

def set_literal(self, ctx: FlyteContext, expected: LiteralType) -> None:
"""
A public wrapper method to set the StructuredDataset Literal.
This method provides external access to the internal _set_literal method.
"""
return self._set_literal(ctx, expected)

def iter(self) -> Generator[DF, None, None]:
if self._dataframe_type is None:
raise ValueError("No dataframe type set. Use open() to set the local dataframe type you want to use.")
Expand Down Expand Up @@ -795,18 +808,18 @@ def dict_to_structured_dataset(
if uri is None:
raise ValueError("StructuredDataset's uri and file format should not be None")

# Instead of using python native StructuredDataset, we need to build a literals.StructuredDataset
# The reason is that _literal_sd of python sd is accessed when task output LiteralMap is converted back to flyteidl
# Hence, _literal_sd must have to_flyte_idl method
# See https://github.com/flyteorg/flytekit/blob/f938661ff8413219d1bea77f6914a58c302d5c6c/flytekit/bin/entrypoint.py#L326
# For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5956.
sdt = StructuredDatasetType(format=file_format)
metad = literals.StructuredDatasetMetadata(structured_dataset_type=sdt)
sd_literal = literals.StructuredDataset(uri=uri, metadata=metad)

return StructuredDatasetTransformerEngine().to_python_value(
FlyteContextManager.current_context(),
Literal(
scalar=Scalar(
structured_dataset=StructuredDataset(
metadata=StructuredDatasetMetadata(
structured_dataset_type=StructuredDatasetType(format=file_format)
),
uri=uri,
)
)
),
Literal(scalar=Scalar(structured_dataset=sd_literal)),
expected_python_type,
)

Expand Down

0 comments on commit 0b0e338

Please sign in to comment.