Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The container image of spark task should be immutable #2956

Merged
merged 8 commits into from
Nov 27, 2024

Conversation

pingsutw
Copy link
Member

Tracking issue

NA

Why are the changes needed?

flytekit overwrites the default_executor_path when the base_image of ImageSpec is None.

When running two Spark tasks in a workflow, flytekit overwrites the default_executor_path only for the first task. This happens because flytekit modifies the base_image of the imageSpec during the compilation of the first task. As a result, when compiling the second task, the base_image is no longer None, preventing Flytekit from overwriting the default_executor_path for the second task.

What changes were proposed in this pull request?

Deep copy the image spec and modify it.

How was this patch tested?

import datetime
import random
import time
from operator import add
from flytekit import ImageSpec, Resources, task, workflow
import flytekit

from flytekitplugins.spark import Spark
custom_image = ImageSpec(registry="ghcr.io/flyteorg", packages=["flytekitplugins-spark"])


@task(
    task_config=Spark(
        # This configuration is applied to the Spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
            "spark.ui.proxyRedirectUri": "https://dogfood.cloud-staging.union.ai",
            "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
        }
    ),
    limits=Resources(mem="2000M"),
    container_image=custom_image,
)
def hello_spark1(partitions: int) -> float:
    print("Starting Spark with Partitions: {}".format(partitions))

    n = 1 * partitions
    sess = flytekit.current_context().spark_session
    count = sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)

    pi_val = 4.0 * count / n
    time.sleep(360)
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(
    task_config=Spark(
        # This configuration is applied to the Spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
            "spark.ui.proxyRedirectUri": "https://dogfood.cloud-staging.union.ai",
            "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
        }
    ),
    limits=Resources(mem="2000M"),
    container_image=custom_image,
)
def hello_spark2(partitions: int) -> float:
    print("Starting Spark with Partitions: {}".format(partitions))

    n = 1 * partitions
    sess = flytekit.current_context().spark_session
    count = sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)

    pi_val = 4.0 * count / n
    time.sleep(360)
    return pi_val


@workflow
def my_spark2(triggered_date: datetime.datetime = datetime.datetime.now()) -> float:
    """
    Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi1 = hello_spark1(partitions=1)
    pi2 = hello_spark2(partitions=1)
    return pi1

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

NA

Docs link

NA

Signed-off-by: Kevin Su <[email protected]>
@eapolinario
Copy link
Collaborator

the failures in the different plugins tests are real. Can you take a look?

Signed-off-by: Kevin Su <[email protected]>
@pingsutw
Copy link
Member Author

the failures in the different plugins tests are real. Can you take a look?

fixed it

Signed-off-by: Kevin Su <[email protected]>
eapolinario
eapolinario previously approved these changes Nov 26, 2024
Copy link

codecov bot commented Nov 26, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 46.62%. Comparing base (fdb7676) to head (b03fa42).
Report is 2 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2956      +/-   ##
==========================================
- Coverage   51.25%   46.62%   -4.63%     
==========================================
  Files         200      200              
  Lines       20835    20851      +16     
  Branches     2688     2691       +3     
==========================================
- Hits        10678     9722     -956     
- Misses       9559    10652    +1093     
+ Partials      598      477     -121     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@pingsutw pingsutw merged commit d5ea440 into master Nov 27, 2024
104 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants