diff --git a/.github/workflows/ab_tests.yml b/.github/workflows/ab_tests.yml index 68d88c4545..9cb17abe04 100644 --- a/.github/workflows/ab_tests.yml +++ b/.github/workflows/ab_tests.yml @@ -106,7 +106,8 @@ jobs: DB_NAME: ${{ matrix.runtime-version }}-${{ matrix.repeat }}.db CLUSTER_KWARGS: AB_environments/${{ matrix.runtime-version }}.cluster.yaml H2O_DATASETS: ${{ matrix.h2o_datasets }} - run: pytest --benchmark ${{ matrix.pytest_args }} + MEMRAY_PROFILE: "scheduler" + run: pytest --benchmark --memray $MEMRAY_PROFILE ${{ matrix.pytest_args }} - name: Dump coiled.Cluster kwargs run: cat cluster_kwargs.merged.yaml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ae869f04ed..88ddf541b1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -141,8 +141,9 @@ jobs: COILED_RUNTIME_VERSION: ${{ matrix.runtime-version }} DB_NAME: ${{ matrix.name_prefix }}-${{ matrix.os }}-py${{ matrix.python_version }}.db DASK_DATAFRAME__QUERY_PLANNING: True + MEMRAY_PROFILE: "scheduler" run: | - pytest --benchmark -n 4 --dist loadscope ${{ env.PYTEST_MARKERS }} ${{ matrix.pytest_args }} + pytest --benchmark -n 4 --dist loadscope --memray $MEMRAY_PROFILE ${{ env.PYTEST_MARKERS }} ${{ matrix.pytest_args }} - name: Dump coiled.Cluster kwargs run: cat cluster_kwargs.merged.yaml || true diff --git a/.github/workflows/tpch.yml b/.github/workflows/tpch.yml index eb11a2ef65..1166b0bc88 100644 --- a/.github/workflows/tpch.yml +++ b/.github/workflows/tpch.yml @@ -105,7 +105,7 @@ jobs: pytest --benchmark \ ${{ env.PYTEST_BENCHMARKS }} \ -n 4 --dist loadscope \ - --scale ${{ inputs.scale }} + --scale ${{ inputs.scale }} \ - name: Upload benchmark results uses: actions/upload-artifact@v4 diff --git a/AB_environments/AB_baseline.conda.yaml b/AB_environments/AB_baseline.conda.yaml index 106174cef8..8ec826ab1c 100644 --- a/AB_environments/AB_baseline.conda.yaml +++ b/AB_environments/AB_baseline.conda.yaml @@ -54,6 +54,7 @@ dependencies: - pystac-client ==0.8.3 - odc-stac ==0.3.10 - adlfs ==2024.7.0 + - memray ==1.13.4 # End copy-paste - pip: diff --git a/AB_environments/AB_sample.conda.yaml b/AB_environments/AB_sample.conda.yaml index 982950bb0d..986f66709e 100644 --- a/AB_environments/AB_sample.conda.yaml +++ b/AB_environments/AB_sample.conda.yaml @@ -60,6 +60,7 @@ dependencies: - pystac-client ==0.8.3 - odc-stac ==0.3.10 - adlfs ==2024.7.0 + - memray ==1.13.4 # End copy-paste - pip: diff --git a/alembic/versions/1095dfdfc4ae_add_column_for_memray_profiles_url.py b/alembic/versions/1095dfdfc4ae_add_column_for_memray_profiles_url.py new file mode 100644 index 0000000000..fee83289ef --- /dev/null +++ b/alembic/versions/1095dfdfc4ae_add_column_for_memray_profiles_url.py @@ -0,0 +1,24 @@ +"""Add column for Memray profiles url + +Revision ID: 1095dfdfc4ae +Revises: 2d2405ad763b +Create Date: 2024-10-23 11:11:15.238042 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '1095dfdfc4ae' +down_revision = '2d2405ad763b' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column('test_run', sa.Column('memray_profiles_url', sa.String(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("test_run", "memray_profiles_url") diff --git a/benchmark_schema.py b/benchmark_schema.py index 3cf3272c9d..20b85697c5 100644 --- a/benchmark_schema.py +++ b/benchmark_schema.py @@ -62,6 +62,7 @@ class TestRun(Base): # Artifacts performance_report_url = Column(String, nullable=True) # Not yet collected cluster_dump_url = Column(String, nullable=True) + memray_profiles_url = Column(String, nullable=True) class TPCHRun(Base): diff --git a/ci/environment.yml b/ci/environment.yml index c32bc01992..979b5117ff 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -56,6 +56,7 @@ dependencies: - pystac-client ==0.8.3 - odc-stac ==0.3.10 - adlfs ==2024.7.0 + - memray ==1.13.4 ######################################################## # PLEASE READ: diff --git a/tests/conftest.py b/tests/conftest.py index 97958561d0..5e97102bdf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,7 @@ import pickle import subprocess import sys +import tarfile import threading import time import uuid @@ -28,6 +29,7 @@ import yaml from coiled import Cluster from dask.distributed import Client, WorkerPlugin +from dask.distributed.diagnostics import memray from dask.distributed.diagnostics.memory_sampler import MemorySampler from dask.distributed.scheduler import logger as scheduler_logger from dotenv import load_dotenv @@ -68,6 +70,14 @@ def pytest_addoption(parser): "--benchmark", action="store_true", help="Collect benchmarking data for tests" ) + parser.addoption( + "--memray", + action="store", + default="scheduler", + help="Memray profiles to collect: scheduler or none", + choices=("scheduler", "none"), + ) + def pytest_sessionfinish(session, exitstatus): # https://github.com/pytest-dev/pytest/issues/2393 @@ -652,6 +662,16 @@ def _(**exta_options): return _ +@pytest.fixture(scope="session") +def s3_performance(s3): + profiles_url = f"{S3_BUCKET}/performance" + # Ensure that the performance directory exists, + # but do NOT remove it as multiple test runs could be + # accessing it at the same time + s3.mkdirs(profiles_url, exist_ok=True) + return profiles_url + + @pytest.fixture(scope="session") def s3_scratch(s3): # Ensure that the test-scratch directory exists, @@ -675,6 +695,13 @@ def s3_url(s3, s3_scratch, test_name_uuid): pass +@pytest.fixture(scope="function") +def s3_performance_url(s3, s3_performance, test_name_uuid): + url = f"{s3_performance}/{test_name_uuid}" + s3.mkdirs(url, exist_ok=False) + return url + + GCS_REGION = "us-central1" GCS_BUCKET = "gs://coiled-oss-scratch/benchmarks-bot" @@ -843,3 +870,43 @@ def _(*args, **kwargs): @pytest.fixture(params=[0.1, 1]) def memory_multiplier(request): return request.param + + +@pytest.fixture +def memray_profile( + pytestconfig, + s3, + s3_performance_url, + s3_storage_options, + test_run_benchmark, + tmp_path, +): + if not test_run_benchmark: + yield + else: + memray_option = pytestconfig.getoption("--memray") + + if memray_option == "none": + yield contextlib.nullcontext + elif memray_option != "scheduler": + raise ValueError(f"Unhandled value for --memray: {memray_option}") + else: + + @contextlib.contextmanager + def _memray_profile(client): + profiles_path = tmp_path / "profiles" + profiles_path.mkdir() + try: + with memray.memray_scheduler(directory=profiles_path): + yield + finally: + archive = tmp_path / "memray.tar.gz" + with tarfile.open(archive, mode="w:gz") as tar: + for item in profiles_path.iterdir(): + tar.add(item, arcname=item.name) + test_run_benchmark.memray_profiles_url = ( + f"{s3_performance_url}/{archive.name}" + ) + s3.put(archive, s3_performance_url) + + yield _memray_profile diff --git a/tests/geospatial/conftest.py b/tests/geospatial/conftest.py index 38f6401d0e..6c2ad73a2f 100644 --- a/tests/geospatial/conftest.py +++ b/tests/geospatial/conftest.py @@ -27,7 +27,7 @@ def cluster_name(request, scale): @pytest.fixture() -def client_factory(cluster_name, github_cluster_tags, benchmark_all): +def client_factory(cluster_name, github_cluster_tags, benchmark_all, memray_profile): import contextlib @contextlib.contextmanager @@ -43,7 +43,7 @@ def _(n_workers, env=None, **cluster_kwargs): with cluster.get_client() as client: # FIXME https://github.com/coiled/platform/issues/103 client.wait_for_workers(n_workers) - with benchmark_all(client): + with memray_profile(client), benchmark_all(client): yield client return _