Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for downloading offloaded literal in copilot #6048

Merged
merged 2 commits into from
Nov 26, 2024

Conversation

pmahindrakar-oss
Copy link
Contributor

Why are the changes needed?

Introduces reading offloaded literals in container tasks without which large literal cant be supported for this type of task.

What changes were proposed in this pull request?

  • Fixes the issue with reading collection and maps in container task which is currently broken.
  • The reason its broken is because for collection and map there needs to exist directory with the literal name to dump the values. Added unit tests to cover this. They break before the changes
  • Adds reading of the offloaded literal

How was this patch tested?

Tested using the following workflow which writes a large output and is read by container task

from typing import List
from flytekit import ContainerTask, kwtypes, task, workflow, LaunchPlan, map_task
import logging

logging.basicConfig(level=logging.DEBUG)

# Generate individual 1MB-sized strings
@task(cache=True, cache_version="1.1")
def my_1mb_task(i: str) -> str:
    return f"Hello world {i}" * 100 * 1024

# Generate a list of strings
@task(cache=True, cache_version="1.1")
def generate_strs(count: int) -> List[str]:
    return ["a"] * count

# Workflow to generate large output
@workflow
def my_wf(mbs: int) -> List[str]:
    strs = generate_strs(count=mbs)
    return map_task(my_1mb_task)(i=strs)

# ContainerTask to process large output
process_large_output = ContainerTask(
    name="process_large_output",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(input=List[str]),
    outputs=kwtypes(result=str),
    #image="ghcr.io/flyteorg/rawcontainers-shell:v2",  # Replace with your preferred image
    image="localhost:30000/process-large-output:latest",  # Replace with your preferred image
    command=[
        "python",
        "process_large_output.py",
        "/var/inputs/input.json",
        #"{{.inputs.input}}",
        "/var/outputs/result.txt"
    ]
)

# Task to consume outputs
@task(cache=True, cache_version="1.1")
def noop(input: List[str]):
    pass

# New workflow for larger inputs
@workflow
def big_inputs_wf(input: List[str]):
    noop(input=input)

# New workflow with container task processing
@workflow
def big_inputs_with_container_wf(input: List[str]) -> str:
    return process_large_output(input=input)

# Reference workflows
@workflow
def ref_wf(mbs: int):
    big_inputs_wf(input=my_wf(mbs))

@workflow
def ref_wf_with_container(mbs: int) -> str:
    return big_inputs_with_container_wf(input=my_wf(mbs))

Generated image locally

 docker build -t  localhost:30000/process-large-output:latest .

And also used the copilot image built locally

docker build -f Dockerfile.flytecopilot -t localhost:30000/my-flytecopilot-app:latest . 

Following is the code for container

import sys
import os
import time


def process_large_output(input_list):
    """
    Processes a list of large strings and generates a summary.
    
    Args:
        input_list (list): List of large strings to process.

    Returns:
        str: Summary of the processed input.
    """
    # Example processing: Calculate total characters and number of strings
    total_strings = len(input_list)
    total_chars = sum(len(s) for s in input_list)
    return f"Processed {total_strings} strings with a total of {total_chars} characters."


def read_folder(folder_path):
    """
    Reads files in a folder, each representing a string, and returns a list of their contents.
    
    Args:
        folder_path (str): Path to the folder containing the files.

    Returns:
        list: List of strings read from the files.
    """
    if not os.path.exists(folder_path):
        print(f"Error: Folder '{folder_path}' does not exist.")
        sys.exit(1)

    files = sorted(os.listdir(folder_path), key=lambda x: int(x) if x.isdigit() else float('inf'))
    input_list = []

    for file_name in files:
        file_path = os.path.join(folder_path, file_name)
        if os.path.isfile(file_path):
            with open(file_path, "r") as f:
                input_list.append(f.read().strip())
        else:
            print(f"Skipping non-file entry: {file_path}")

    return input_list


def main():
    # Check if the correct arguments are passed
    if len(sys.argv) < 3:
        print("Usage: python process_large_output.py <input_folder> <output_file>")
        sys.exit(1)

    input_folder = sys.argv[1]
    output_file = sys.argv[2]

    # Read the input folder
    input_data = read_folder(input_folder)

    # Process the input list
    summary = process_large_output(input_data)

    # Write the summary to the output file
    with open(output_file, "w") as f:
        f.write(summary)

    print(f"Processing complete. Summary written to '{output_file}'.")


if __name__ == "__main__":
    main()
Screenshot 2024-11-25 at 6 10 28 PM

Local sandbox output

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Copy link

codecov bot commented Nov 26, 2024

Codecov Report

Attention: Patch coverage is 35.29412% with 11 lines in your changes missing coverage. Please review.

Project coverage is 37.08%. Comparing base (25c89ee) to head (06c4881).
Report is 4 commits behind head on master.

Files with missing lines Patch % Lines
flytecopilot/data/download.go 35.29% 8 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #6048      +/-   ##
==========================================
+ Coverage   37.04%   37.08%   +0.04%     
==========================================
  Files        1316     1318       +2     
  Lines      132262   132284      +22     
==========================================
+ Hits        48998    49061      +63     
+ Misses      79002    78952      -50     
- Partials     4262     4271       +9     
Flag Coverage Δ
unittests-datacatalog 51.58% <ø> (ø)
unittests-flyteadmin 54.10% <ø> (ø)
unittests-flytecopilot 30.99% <35.29%> (+8.76%) ⬆️
unittests-flytectl 62.29% <ø> (-0.18%) ⬇️
unittests-flyteidl 7.23% <ø> (ø)
unittests-flyteplugins 53.73% <ø> (ø)
unittests-flytepropeller 42.63% <ø> (ø)
unittests-flytestdlib 57.59% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, will approve after you fix the lint warning.

Copy link
Contributor

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@eapolinario eapolinario merged commit ab04192 into master Nov 26, 2024
51 of 52 checks passed
@eapolinario eapolinario deleted the container-task-offloading branch November 26, 2024 20:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants