Skip to content

Commit

Permalink
Don't store cluster dumps for benchmarks (#1561)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Oct 15, 2024
1 parent 1a8f583 commit d92edc4
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 81 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ab_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ jobs:
SNOWFLAKE_ROLE: ${{ secrets.SNOWFLAKE_ROLE }}
AB_VERSION: ${{ matrix.runtime-version }}
DB_NAME: ${{ matrix.runtime-version }}-${{ matrix.repeat }}.db
CLUSTER_DUMP: always
CLUSTER_KWARGS: AB_environments/${{ matrix.runtime-version }}.cluster.yaml
H2O_DATASETS: ${{ matrix.h2o_datasets }}
run: pytest --benchmark ${{ matrix.pytest_args }}
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ jobs:
SNOWFLAKE_ROLE: ${{ secrets.SNOWFLAKE_ROLE }}
COILED_RUNTIME_VERSION: ${{ matrix.runtime-version }}
DB_NAME: ${{ matrix.name_prefix }}-${{ matrix.os }}-py${{ matrix.python_version }}.db
CLUSTER_DUMP: always
DASK_DATAFRAME__QUERY_PLANNING: True
run: |
pytest --benchmark -n 4 --dist loadscope ${{ env.PYTEST_MARKERS }} ${{ matrix.pytest_args }}
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ jobs:
AWS_SECRET_ACCESS_KEY: ${{ secrets.RUNTIME_CI_BOT_AWS_SECRET_ACCESS_KEY }}
COILED_RUNTIME_VERSION: ${{ matrix.runtime-version }}
DB_NAME: tpch_${{ inputs.scale }}.db
CLUSTER_DUMP: always
DASK_DATAFRAME__QUERY_PLANNING: True
run: |
pytest --benchmark \
Expand Down
4 changes: 2 additions & 2 deletions tests/benchmarks/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ def parquet_cluster(dask_env_variables, cluster_kwargs, github_cluster_tags):


@pytest.fixture
def parquet_client(parquet_cluster, cluster_kwargs, upload_cluster_dump, benchmark_all):
def parquet_client(parquet_cluster, cluster_kwargs, benchmark_all):
n_workers = cluster_kwargs["parquet_cluster"]["n_workers"]
with distributed.Client(parquet_cluster) as client:
parquet_cluster.scale(n_workers)
client.wait_for_workers(n_workers, timeout=600)
client.restart()
with upload_cluster_dump(client), benchmark_all(client):
with benchmark_all(client):
yield client


Expand Down
4 changes: 2 additions & 2 deletions tests/benchmarks/test_spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ def spill_cluster(dask_env_variables, cluster_kwargs, github_cluster_tags):


@pytest.fixture
def spill_client(spill_cluster, cluster_kwargs, upload_cluster_dump, benchmark_all):
def spill_client(spill_cluster, cluster_kwargs, benchmark_all):
n_workers = cluster_kwargs["spill_cluster"]["n_workers"]
with Client(spill_cluster) as client:
spill_cluster.scale(n_workers)
client.wait_for_workers(n_workers, timeout=600)
client.restart()
with upload_cluster_dump(client), benchmark_all(client):
with benchmark_all(client):
yield client


Expand Down
6 changes: 2 additions & 4 deletions tests/benchmarks/test_work_stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def test_trivial_workload_should_not_cause_work_stealing(small_client):
)
def test_work_stealing_on_scaling_up(
test_name_uuid,
upload_cluster_dump,
benchmark_all,
cluster_kwargs,
dask_env_variables,
Expand All @@ -43,7 +42,7 @@ def test_work_stealing_on_scaling_up(
with Client(cluster) as client:
# FIXME https://github.com/coiled/platform/issues/103
client.wait_for_workers(1, timeout=300)
with upload_cluster_dump(client), benchmark_all(client):
with benchmark_all(client):
# Slow task.
def func1(chunk):
if sum(chunk.shape) != 0: # Make initialization fast
Expand Down Expand Up @@ -89,7 +88,6 @@ def clog(n):
@run_up_to_nthreads("small_cluster", 100, reason="fixed dataset")
def test_work_stealing_on_straggling_worker(
test_name_uuid,
upload_cluster_dump,
benchmark_all,
cluster_kwargs,
dask_env_variables,
Expand All @@ -105,7 +103,7 @@ def test_work_stealing_on_straggling_worker(
with Client(cluster) as client:
# FIXME https://github.com/coiled/platform/issues/103
client.wait_for_workers(kwargs["n_workers"], timeout=600)
with upload_cluster_dump(client), benchmark_all(client):
with benchmark_all(client):

def clog():
time.sleep(1)
Expand Down
6 changes: 2 additions & 4 deletions tests/benchmarks/test_xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ def group_reduction_cluster(dask_env_variables, cluster_kwargs, github_cluster_t


@pytest.fixture
def group_reduction_client(
group_reduction_cluster, cluster_kwargs, upload_cluster_dump, benchmark_all
):
def group_reduction_client(group_reduction_cluster, cluster_kwargs, benchmark_all):
n_workers = cluster_kwargs["group_reduction_cluster"]["n_workers"]
with Client(group_reduction_cluster) as client:
group_reduction_cluster.scale(n_workers)
client.wait_for_workers(n_workers, timeout=600)
client.restart()
with upload_cluster_dump(client), benchmark_all(client):
with benchmark_all(client):
yield client


Expand Down
81 changes: 17 additions & 64 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,6 @@ def small_client(
testrun_uid,
small_cluster,
cluster_kwargs,
upload_cluster_dump,
benchmark_all,
):
n_workers = cluster_kwargs["small_cluster"]["n_workers"]
Expand All @@ -569,26 +568,25 @@ def small_client(
small_cluster.scale(n_workers)
client.wait_for_workers(n_workers, timeout=600)

with upload_cluster_dump(client):
log_on_scheduler(client, "Finished client setup of %s", test_label)
log_on_scheduler(client, "Finished client setup of %s", test_label)

with benchmark_all(client):
yield client
with benchmark_all(client):
yield client

# Note: normally, this RPC call is almost instantaneous. However, in the
# case where the scheduler is still very busy when the fixtured test returns
# (e.g. test_futures.py::test_large_map_first_work), it can be delayed into
# several seconds. We don't want to capture this extra delay with
# benchmark_time, as it's beyond the scope of the test.
log_on_scheduler(client, "Starting client teardown of %s", test_label)
# Note: normally, this RPC call is almost instantaneous. However, in the
# case where the scheduler is still very busy when the fixtured test returns
# (e.g. test_futures.py::test_large_map_first_work), it can be delayed into
# several seconds. We don't want to capture this extra delay with
# benchmark_time, as it's beyond the scope of the test.
log_on_scheduler(client, "Starting client teardown of %s", test_label)

client.restart()
# Run connects to all workers once and to ensure they're up before we do
# something else. With another call of restart when entering this
# fixture again, this can trigger a race condition that kills workers
# See https://github.com/dask/distributed/issues/7312 Can be removed
# after this issue is fixed.
client.run(lambda: None)
client.restart()
# Run connects to all workers once and to ensure they're up before we do
# something else. With another call of restart when entering this
# fixture again, this can trigger a race condition that kills workers
# See https://github.com/dask/distributed/issues/7312 Can be removed
# after this issue is fixed.
client.run(lambda: None)


@pytest.fixture
Expand All @@ -597,7 +595,6 @@ def client(
dask_env_variables,
cluster_kwargs,
github_cluster_tags,
upload_cluster_dump,
benchmark_all,
):
name = request.param["name"]
Expand All @@ -612,7 +609,7 @@ def client(
client.upload_file(request.param["upload_file"])
if request.param["worker_plugin"] is not None:
client.register_worker_plugin(request.param["worker_plugin"])
with upload_cluster_dump(client), benchmark_all(client):
with benchmark_all(client):
yield client


Expand Down Expand Up @@ -673,13 +670,6 @@ def s3_url(s3, s3_scratch, test_name_uuid):
s3.rm(url, recursive=True)


@pytest.fixture(scope="session")
def s3_cluster_dump_url(s3, s3_scratch):
dump_url = f"{s3_scratch}/cluster_dumps"
s3.mkdirs(dump_url, exist_ok=True)
return dump_url


GCS_REGION = "us-central1"
GCS_BUCKET = "gs://coiled-oss-scratch/benchmarks-bot"

Expand Down Expand Up @@ -723,43 +713,6 @@ def pytest_runtest_makereport(item, call):
setattr(item, "rep_" + rep.when, rep)


@pytest.fixture
def upload_cluster_dump(
request, s3_cluster_dump_url, s3_storage_options, test_run_benchmark
):
@contextlib.contextmanager
def _upload_cluster_dump(client):
failed = False
# the code below is a workaround to make cluster dumps work with clients in fixtures
# and outside fixtures.
try:
yield
except Exception:
failed = True
raise
else:
# we need this for tests that are not using the client fixture
# for those cases request.node.rep_call.failed can't be access.
try:
failed = request.node.rep_call.failed
except AttributeError:
failed = False
finally:
cluster_dump = os.environ.get("CLUSTER_DUMP", "false")
if cluster_dump == "always" or (cluster_dump == "fail" and failed):
dump_path = (
f"{s3_cluster_dump_url}/{client.cluster.name}/"
f"{test_run_benchmark.path.replace('/', '.')}.{request.node.name}"
)
test_run_benchmark.cluster_dump_url = dump_path + ".msgpack.gz"
logger.info(
f"Cluster state dump can be found at: {dump_path}.msgpack.gz"
)
client.dump_cluster_state(dump_path, **s3_storage_options)

yield _upload_cluster_dump


requires_p2p_shuffle = pytest.mark.skipif(
Version(distributed.__version__) < Version("2023.1.0"),
reason="p2p shuffle not available",
Expand Down
3 changes: 1 addition & 2 deletions tests/stability/test_deadlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
reason="https://github.com/dask/distributed/issues/6110",
)
def test_repeated_merge_spill(
upload_cluster_dump,
benchmark_all,
cluster_kwargs,
dask_env_variables,
Expand All @@ -28,7 +27,7 @@ def test_repeated_merge_spill(
**cluster_kwargs["test_repeated_merge_spill"],
) as cluster:
with Client(cluster) as client:
with upload_cluster_dump(client), benchmark_all(client):
with benchmark_all(client):
ddf = dask.datasets.timeseries(
"2020",
"2025",
Expand Down

0 comments on commit d92edc4

Please sign in to comment.