Skip to content

Commit

Permalink
Benchmark geospatial workload submission (#1589)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Nov 12, 2024
1 parent e1ed22d commit 82ad744
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 49 deletions.
61 changes: 52 additions & 9 deletions tests/geospatial/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import time
import uuid

import coiled
Expand Down Expand Up @@ -26,19 +27,32 @@ def cluster_name(request, scale):
return f"geospatial-{module}-{scale}-{uuid.uuid4().hex[:8]}"


@pytest.fixture()
def client_factory(
@pytest.fixture(
params=[
pytest.param("execution", marks=pytest.mark.geo_execution),
pytest.param("submission", marks=pytest.mark.geo_submission),
]
)
def benchmark_type(request):
return request.param


@pytest.fixture
def setup_benchmark(
benchmark_type,
cluster_name,
github_cluster_tags,
benchmark_all,
py_spy_profile,
memray_profile,
py_spy_profile,
performance_report,
):
should_execute = benchmark_type == "execution"
import contextlib

@contextlib.contextmanager
def _(n_workers, env=None, **cluster_kwargs):
n_workers = n_workers if should_execute else 0
with coiled.Cluster(
name=cluster_name,
tags=github_cluster_tags,
Expand All @@ -48,11 +62,40 @@ def _(n_workers, env=None, **cluster_kwargs):
if env:
cluster.send_private_envs(env=env)
with cluster.get_client() as client:
# FIXME https://github.com/coiled/platform/issues/103
client.wait_for_workers(n_workers)
with performance_report(), py_spy_profile(client), memray_profile(
client
), benchmark_all(client):
yield client
if should_execute:
# FIXME https://github.com/coiled/platform/issues/103
client.wait_for_workers(n_workers)
with performance_report(), py_spy_profile(client), memray_profile(
client
), benchmark_all(client):

def benchmark_execution(func, *args, **kwargs):
func(*args, **kwargs).compute()

yield benchmark_execution
else:
submitted = False

def track_submission(event):
nonlocal submitted
ts, msg = event
if not isinstance(msg, dict):
return

if not msg.get("action", None) == "update-graph":
return

submitted = True

def benchmark_submission(func, *args, **kwargs):
_ = client.compute(func(*args, **kwargs), sync=False)
while submitted is False:
time.sleep(0.1)

client.subscribe_topic("scheduler", track_submission)
with performance_report(), py_spy_profile(client), memray_profile(
client
), benchmark_all(client):
yield benchmark_submission

return _
10 changes: 5 additions & 5 deletions tests/geospatial/test_atmospheric_circulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
def test_atmospheric_circulation(
gcs_url,
scale,
client_factory,
setup_benchmark,
cluster_kwargs={
"workspace": "dask-benchmarks-gcp",
"region": "us-central1",
Expand All @@ -17,12 +17,12 @@ def test_atmospheric_circulation(
"large": {"n_workers": 100},
},
):
with client_factory(
with setup_benchmark(
**scale_kwargs[scale], **cluster_kwargs
) as client: # noqa: F841
result = atmospheric_circulation(
) as benchmark: # noqa: F841
benchmark(
atmospheric_circulation,
scale=scale,
storage_url=gcs_url,
storage_options={"token": CoiledShippedCredentials()},
)
result.compute()
20 changes: 10 additions & 10 deletions tests/geospatial/test_climatology.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
def test_rechunk_map_blocks(
gcs_url,
scale,
client_factory,
setup_benchmark,
cluster_kwargs={
"workspace": "dask-benchmarks-gcp",
"region": "us-central1",
Expand All @@ -28,21 +28,21 @@ def test_rechunk_map_blocks(
"large": {"n_workers": 100},
},
):
with client_factory(
with setup_benchmark(
**scale_kwargs[scale], **cluster_kwargs
) as client: # noqa: F841
result = rechunk_map_blocks(
) as benchmark: # noqa: F841
benchmark(
rechunk_map_blocks,
scale=scale,
storage_url=gcs_url,
storage_options={"token": CoiledShippedCredentials()},
)
result.compute()


def test_highlevel_api(
gcs_url,
scale,
client_factory,
setup_benchmark,
cluster_kwargs={
"workspace": "dask-benchmarks-gcp",
"region": "us-central1",
Expand All @@ -54,12 +54,12 @@ def test_highlevel_api(
"large": {"n_workers": 100},
},
):
with client_factory(
with setup_benchmark(
**scale_kwargs[scale], **cluster_kwargs
) as client: # noqa: F841
result = highlevel_api(
) as benchmark: # noqa: F841
benchmark(
highlevel_api,
scale=scale,
storage_url=gcs_url,
storage_options={"token": CoiledShippedCredentials()},
)
result.compute()
9 changes: 4 additions & 5 deletions tests/geospatial/test_cloud_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def test_cloud_optimize(
scale,
s3,
s3_url,
client_factory,
setup_benchmark,
cluster_kwargs={
"workspace": "dask-benchmarks",
"region": "us-west-2",
Expand All @@ -16,8 +16,7 @@ def test_cloud_optimize(
"large": {"n_workers": 200},
},
):
with client_factory(
with setup_benchmark(
**scale_kwargs[scale], **cluster_kwargs
) as client: # noqa: F841
result = cloud_optimize(scale, s3fs=s3, storage_url=s3_url)
result.compute()
) as benchmark: # noqa: F841
benchmark(cloud_optimize, scale, s3fs=s3, storage_url=s3_url)
10 changes: 5 additions & 5 deletions tests/geospatial/test_rechunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
def test_era5_rechunking(
gcs_url,
scale,
client_factory,
setup_benchmark,
cluster_kwargs={
"workspace": "dask-benchmarks-gcp",
"region": "us-central1",
Expand All @@ -17,12 +17,12 @@ def test_era5_rechunking(
"large": {"n_workers": 100},
},
):
with client_factory(
with setup_benchmark(
**scale_kwargs[scale], **cluster_kwargs
) as client: # noqa: F841
result = era5_rechunking(
) as benchmark: # noqa: F841
benchmark(
era5_rechunking,
scale=scale,
storage_url=gcs_url,
storage_options={"token": CoiledShippedCredentials()},
)
result.compute()
10 changes: 5 additions & 5 deletions tests/geospatial/test_regridding.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
def test_xesmf(
gcs_url,
scale,
client_factory,
setup_benchmark,
cluster_kwargs={
"workspace": "dask-benchmarks-gcp",
"region": "us-central1",
Expand All @@ -18,12 +18,12 @@ def test_xesmf(
"large": {"n_workers": 10},
},
):
with client_factory(
with setup_benchmark(
**scale_kwargs[scale], **cluster_kwargs
) as client: # noqa: F841
result = xesmf(
) as benchmark: # noqa: F841
benchmark(
xesmf,
scale=scale,
storage_url=gcs_url,
storage_options={"token": CoiledShippedCredentials()},
)
result.compute()
9 changes: 4 additions & 5 deletions tests/geospatial/test_satellite_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
def test_satellite_filtering(
az_url,
scale,
client_factory,
setup_benchmark,
cluster_kwargs={
"workspace": "dask-benchmarks-azure",
"region": "westeurope",
Expand All @@ -16,13 +16,12 @@ def test_satellite_filtering(
"large": {"n_workers": 100},
},
):
with client_factory(
with setup_benchmark(
**scale_kwargs[scale],
env={
"AZURE_STORAGE_ACCOUNT_NAME": os.environ["AZURE_STORAGE_ACCOUNT_NAME"],
"AZURE_STORAGE_SAS_TOKEN": os.environ["AZURE_STORAGE_SAS_TOKEN"],
},
**cluster_kwargs,
) as client: # noqa: F841
result = satellite_filtering(scale=scale, storage_url=az_url)
result.compute()
) as benchmark: # noqa: F841
benchmark(satellite_filtering, scale=scale, storage_url=az_url)
Empty file.
16 changes: 11 additions & 5 deletions tests/geospatial/test_zonal_average.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
This example was adapted from https://github.com/dcherian/dask-demo/blob/main/nwm-aws.ipynb
"""

import pytest

from tests.geospatial.workloads.zonal_average import nwm


def test_nwm(
scale,
client_factory,
benchmark_type,
setup_benchmark,
cluster_kwargs={
"workspace": "dask-benchmarks",
"region": "us-east-1",
Expand All @@ -17,8 +20,11 @@ def test_nwm(
"large": {"n_workers": 200, "scheduler_memory": "32 GiB"},
},
):
with client_factory(
if benchmark_type == "submission":
pytest.skip(
reason="FIXME: Submission requires pre-computations, but no workers were requested."
)
with setup_benchmark(
**scale_kwargs[scale], **cluster_kwargs
) as client: # noqa: F841
result = nwm(scale=scale)
result.compute()
) as benchmark: # noqa: F841
benchmark(nwm, scale=scale)

0 comments on commit 82ad744

Please sign in to comment.