Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API Server] Fix admin policy enforcement on validate and optimize #4820

Merged
merged 22 commits into from
Mar 3, 2025

Conversation

romilbhardwaj
Copy link
Collaborator

@romilbhardwaj romilbhardwaj commented Feb 26, 2025

Closes #4818.

Our admin policies were not being enforced during the validate and optimize calls, which would break policies which dynamically fetch Kubernetes credentials.

This PR fixes that but with a caveat: admin policies are not applied in the optimizer during failover. This is consistent with our previous admin policy enforcement, but we should revisit that design.

  • Code formatting: bash format.sh
  • Manually tested with dynamic k8s fetching admin policy
  • All smoke tests: pytest tests/test_smoke.py

@romilbhardwaj
Copy link
Collaborator Author

/smoke-test

@romilbhardwaj
Copy link
Collaborator Author

/smoke-test --kubernetes

Copy link
Collaborator

@Michaelvll Michaelvll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for identifying and fixing this @romilbhardwaj! Left a comment : )

Comment on lines 217 to 220
cluster_name: Optional[str] = None,
idle_minutes_to_autostop: Optional[int] = None,
down: bool = False, # pylint: disable=redefined-outer-name
dryrun: bool = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we emphasize that these are only required when admin_policy is specified, or more aggressively we just make this:

Suggested change
cluster_name: Optional[str] = None,
idle_minutes_to_autostop: Optional[int] = None,
down: bool = False, # pylint: disable=redefined-outer-name
dryrun: bool = False
admin_policy_request_options: Optional[admin_policy.RequestOptions] = None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep good idea, I was also thinking about it but wasn't sure if we want to expose admin_policy.RequestOptions in the SDK to end users. Should be ok.

@@ -116,12 +117,14 @@ class CheckBody(RequestBody):
class ValidateBody(RequestBody):
"""The request body for the validate endpoint."""
dag: str
request_options: admin_policy.RequestOptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be made optional and we can error out on the server side if admin_policy is set but RequestOptions are not supplied?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, made optional now. When None, we will now follow the default behavior in admin_policy_utils.apply().

(Current behavior is it will silently accept request_options as None, but we should update that behavior in a future PR. That's a deeper change since serve and jobs launch also invoke admin_policy_utils.apply() without request_options)

Comment on lines 259 to 273

# We make the validation a separate request as it may require expensive
# network calls if an admin policy is applied.
# TODO: Our current launch process is broken down into three calls:
# validate, optimize, and launch. This requires us to apply the admin policy
# in each step, which may be an expensive operation. We should consolidate
# these into a single call or have a TTL cache for (task, admin_policy)
# pairs.
logger.debug(f'Validating tasks: {validate_body.dag}')
try:
dag = dag_utils.load_chain_dag_from_yaml_str(validate_body.dag)
for task in dag.tasks:
# Will validate workdir and file_mounts in the backend, as those
# need to be validated after the files are uploaded to the SkyPilot
# API server with `upload_mounts_to_api_server`.
task.validate_name()
task.validate_run()
for r in task.resources:
r.validate()
except Exception as e: # pylint: disable=broad-except
raise fastapi.HTTPException(
status_code=400, detail=exceptions.serialize_exception(e)) from e
executor.schedule_request(request_id=request.state.request_id,
request_name='validate',
request_body=validate_body,
ignore_return_value=True,
func=core.validate_dag,
schedule_type=requests_lib.ScheduleType.SHORT)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to double check how much additional overhead we add to a normal launch with this modification. It may not worth it to slow down all the launch requests just for the admin policy, which is a less common codepath : )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging, it does add a significant penalty:

[Master] validate without executor: 0.007498 seconds
[This branch] validate with executor: 0.150482 seconds

I'm going to revert this change in favor of keeping the codepath fast for the common case when there's no admin policy.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark script:

from sky.client import sdk
from sky import Task, Dag, Resources
from sky.utils import dag_utils
import timeit

task = Task(name='test', run="echo 'Hello, World!'")
# resources = Resources(labels={'invalid'*50: 'bar'})
# task.set_resources(resources)
dag = dag_utils.convert_entrypoint_to_dag(task)

# First run once to get the result and warm up any caches
print("Running initial validation...")
sdk.validate(dag)
print("Initial validation complete.")

# Now benchmark with timeit
print("\nStarting benchmark...")
number_of_runs = 10
setup = "from __main__ import sdk, dag"

# Run the benchmark
print(f"Running sdk.validate() {number_of_runs} times...")
execution_time = timeit.timeit("sdk.validate(dag)", setup=setup, number=number_of_runs)
average_time = execution_time / number_of_runs

# Print results
print("\nBenchmark Results:")
print(f"Total time for {number_of_runs} runs: {execution_time:.6f} seconds")
print(f"Average time per run: {average_time:.6f} seconds")

@romilbhardwaj
Copy link
Collaborator Author

/quicktest-core

@romilbhardwaj
Copy link
Collaborator Author

/quicktest-core

@romilbhardwaj
Copy link
Collaborator Author

/smoke-test

Copy link
Collaborator

@Michaelvll Michaelvll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @romilbhardwaj! Looks mostly good to me. Left some minor comments.

"""The request body for the optimize endpoint."""
dag: str
minimize: common_lib.OptimizeTarget = common_lib.OptimizeTarget.COST
request_options: Optional[admin_policy.RequestOptions]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try if a API server exists, and a user upgrade to the current PR, will the CLIs show clear warning/error that the api server needs to be restarted? We may want to avoid pydantic errors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case you mentioned works (Old API server, new client), but the reverse (New API server, old client) did not. I've bumped up the API version to 3 to throw a clean error.

@romilbhardwaj romilbhardwaj requested a review from Michaelvll March 2, 2025 22:18
Copy link
Collaborator

@Michaelvll Michaelvll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @romilbhardwaj for fixing this! It is a great catch.

@@ -212,13 +213,17 @@ def list_accelerator_counts(
@annotations.client_api
def optimize(
dag: 'sky.Dag',
minimize: common.OptimizeTarget = common.OptimizeTarget.COST
minimize: common.OptimizeTarget = common.OptimizeTarget.COST,
admin_policy_request_options: Optional[admin_policy.RequestOptions] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably fix the error message where Kubernetes raises a None context. We can mention the active/allowed context there?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, will do in another PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4822 should also help alleviate this.

@romilbhardwaj
Copy link
Collaborator Author

/smoke-test

@romilbhardwaj romilbhardwaj merged commit 87a6b59 into master Mar 3, 2025
19 checks passed
@romilbhardwaj romilbhardwaj deleted the api_admin_policy_fix branch March 3, 2025 21:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[API Server] Admin policies don't work in the local API server during validate()
2 participants