From 7cc4088f6738243c99e50f2464a37eb2d9d7322b Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 12 Dec 2023 18:13:48 +0000 Subject: [PATCH] Shuffle metrics 3/4: Capture background metrics --- distributed/dashboard/components/scheduler.py | 13 +-- .../dashboard/tests/test_scheduler_bokeh.py | 21 +++++ distributed/shuffle/_core.py | 91 +++++++++++++++---- distributed/shuffle/_rechunk.py | 16 +++- distributed/shuffle/_scheduler_plugin.py | 4 +- distributed/shuffle/_shuffle.py | 20 +++- distributed/shuffle/_worker_plugin.py | 5 +- distributed/shuffle/tests/test_metrics.py | 49 ++++++++-- distributed/shuffle/tests/test_rechunk.py | 2 + distributed/shuffle/tests/test_shuffle.py | 2 + distributed/spans.py | 11 ++- distributed/worker.py | 4 +- 12 files changed, 196 insertions(+), 42 deletions(-) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index aacd4b21ff1..d068a8d9461 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -3614,21 +3614,22 @@ def _build_data_sources(self) -> None: ) if spans_ext and self.span_tag_selector.value: span = spans_ext.merge_by_tags(*self.span_tag_selector.value) - execute_metrics = span.cumulative_worker_metrics + metrics = span.cumulative_worker_metrics elif spans_ext and spans_ext.spans: # Calculate idle time span = spans_ext.merge_all() - execute_metrics = span.cumulative_worker_metrics + metrics = span.cumulative_worker_metrics else: # Spans extension is not loaded - execute_metrics = { + metrics = { k: v for k, v in self.scheduler.cumulative_worker_metrics.items() - if isinstance(k, tuple) and k[0] == "execute" + if isinstance(k, tuple) } - for (context, function, activity, unit), v in execute_metrics.items(): - assert context == "execute" + for (context, function, activity, unit), v in metrics.items(): + if context != "execute": + continue # TODO visualize 'shuffle' metrics assert isinstance(function, str) assert isinstance(unit, str) assert self.unit_selector.value diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 3faf4657f65..7c597998ee8 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -450,6 +450,27 @@ async def test_FinePerformanceMetrics_no_spans(c, s, a, b): assert sorted(cl.visible_activities) == ["memory-read"] +@gen_cluster(client=True) +async def test_FinePerformanceMetrics_shuffle(c, s, a, b): + da = pytest.importorskip("dask.array") + + x = da.random.random((4, 4), chunks=(1, -1)) + x = x.rechunk((-1, 1), method="p2p") + x = x.sum() + await c.compute(x) + await a.heartbeat() + await b.heartbeat() + + cl = FinePerformanceMetrics(s) + cl.update() + # execute metrics from shuffle + assert "shuffle-barrier" in cl.visible_functions + assert "shuffle-shard-partition-cpu" in cl.visible_activities + # shuffle metrics have been filtered out + assert "background-comms" not in cl.visible_functions + assert "shard-partition-cpu" not in cl.visible_activities + + @gen_cluster(client=True) async def test_ClusterMemory(c, s, a, b): cl = ClusterMemory(s) diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index 0381a8ea3da..95c9327ec55 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -7,7 +7,7 @@ import pickle import time from collections import defaultdict -from collections.abc import Callable, Generator, Iterable, Iterator, Sequence +from collections.abc import Callable, Generator, Hashable, Iterable, Iterator, Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from enum import Enum @@ -55,9 +55,11 @@ class ShuffleRun(Generic[_T_partition_id, _T_partition_type]): id: ShuffleId run_id: int + span_id: str | None local_address: str executor: ThreadPoolExecutor rpc: Callable[[str], PooledRPCCall] + digest_metric: Callable[[Hashable, float], None] scheduler: PooledRPCCall closed: bool _disk_buffer: DiskShardsBuffer | MemoryShardsBuffer @@ -78,10 +80,12 @@ def __init__( self, id: ShuffleId, run_id: int, + span_id: str | None, local_address: str, directory: str, executor: ThreadPoolExecutor, rpc: Callable[[str], PooledRPCCall], + digest_metric: Callable[[Hashable, float], None], scheduler: PooledRPCCall, memory_limiter_disk: ResourceLimiter, memory_limiter_comms: ResourceLimiter, @@ -90,23 +94,33 @@ def __init__( ): self.id = id self.run_id = run_id + self.span_id = span_id self.local_address = local_address self.executor = executor self.rpc = rpc + self.digest_metric = digest_metric self.scheduler = scheduler self.closed = False - if disk: - self._disk_buffer = DiskShardsBuffer( - directory=directory, - read=self.read, - memory_limiter=memory_limiter_disk, - ) - else: - self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize) - self._comm_buffer = CommShardsBuffer( - send=self.send, memory_limiter=memory_limiter_comms - ) + # Initialize buffers and start background tasks + # Don't log metrics issued by the background tasks onto the dask task that + # spawned this object + with context_meter.clear_callbacks(): + with self._capture_metrics("background-disk"): + if disk: + self._disk_buffer = DiskShardsBuffer( + directory=directory, + read=self.read, + memory_limiter=memory_limiter_disk, + ) + else: + self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize) + + with self._capture_metrics("background-comms"): + self._comm_buffer = CommShardsBuffer( + send=self.send, memory_limiter=memory_limiter_comms + ) + # TODO: reduce number of connections to number of workers # MultiComm.max_connections = min(10, n_workers) @@ -136,6 +150,38 @@ def __str__(self) -> str: def __hash__(self) -> int: return self.run_id + @contextlib.contextmanager + def _capture_metrics(self, where: str) -> Iterator[None]: + """Capture context_meter metrics as + + {('shuffle', , 'foreground|background...', label, unit): value} + + **Note 1:** When the metric is not logged by a background task + (where='foreground'), this produces a duplicated metric under + + {('execute', , , label, unit): value} + + This is by design so that one can have a holistic view of the whole shuffle + process. + + **Note 2:** We're immediately writing to Worker.digests. + We don't temporarily store metrics under ShuffleRun as we would lose those + recorded between the heartbeat and when the ShuffleRun object is deleted at the + end of a run. + """ + + def callback(label: Hashable, value: float, unit: str) -> None: + if not isinstance(label, tuple): + label = (label,) + if isinstance(label[0], str) and label[0].startswith("shuffle-"): + label = (label[0][len("shuffle-") :], *label[1:]) + name = ("shuffle", self.span_id, where, *label, unit) + + self.digest_metric(name, value) + + with context_meter.add_callback(callback, allow_offload="background" in where): + yield + @contextlib.contextmanager def time(self, name: str) -> Iterator[None]: start = time.time() @@ -288,12 +334,14 @@ def add_partition( self.raise_if_closed() if self.transferred: raise RuntimeError(f"Cannot add more partitions to {self}") - with ( - context_meter.meter("shuffle-shard-partition-noncpu"), - context_meter.meter("shuffle-shard-partition-cpu", func=thread_time), - ): - shards = self._shard_partition(data, partition_id) - sync(self._loop, self._write_to_comm, shards) + # Log metrics both in the "execute" and in the "shuffle" contexts + with self._capture_metrics("foreground"): + with ( + context_meter.meter("shuffle-shard-partition-noncpu"), + context_meter.meter("shuffle-shard-partition-cpu", func=thread_time), + ): + shards = self._shard_partition(data, partition_id) + sync(self._loop, self._write_to_comm, shards) return self.run_id @abc.abstractmethod @@ -311,6 +359,8 @@ def get_output_partition( raise RuntimeError("`get_output_partition` called before barrier task") sync(self._loop, self.flush_receive) with ( + # Log metrics both in the "execute" and in the "shuffle" contexts + self._capture_metrics("foreground"), context_meter.meter("shuffle-get-output-noncpu"), context_meter.meter("shuffle-get-output-cpu", func=thread_time), ): @@ -372,6 +422,7 @@ class ShuffleRunSpec(Generic[_T_partition_id]): run_id: int = field(init=False, default_factory=partial(next, itertools.count(1))) spec: ShuffleSpec worker_for: dict[_T_partition_id, str] + span_id: str | None @property def id(self) -> ShuffleId: @@ -395,9 +446,10 @@ def pick_worker(self, partition: _T_partition_id, workers: Sequence[str]) -> str def create_new_run( self, worker_for: dict[_T_partition_id, str], + span_id: str | None, ) -> SchedulerShuffleState: return SchedulerShuffleState( - run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for), + run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for, span_id=span_id), participating_workers=set(worker_for.values()), ) @@ -405,6 +457,7 @@ def create_new_run( def create_run_on_worker( self, run_id: int, + span_id: str | None, worker_for: dict[_T_partition_id, str], plugin: ShuffleWorkerPlugin, ) -> ShuffleRun: diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 54c13e56790..674af988981 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -99,7 +99,7 @@ import mmap import os from collections import defaultdict -from collections.abc import Callable, Generator, Sequence +from collections.abc import Callable, Generator, Hashable, Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from itertools import product @@ -312,6 +312,8 @@ class ArrayRechunkRun(ShuffleRun[NDIndex, "np.ndarray"]): A unique `ShuffleID` this belongs to. run_id: A unique identifier of the specific execution of the shuffle this belongs to. + span_id: + Span identifier; see :doc:`spans` local_address: The local address this Shuffle can be contacted by using `rpc`. directory: @@ -321,8 +323,11 @@ class ArrayRechunkRun(ShuffleRun[NDIndex, "np.ndarray"]): rpc: A callable returning a PooledRPCCall to contact other Shuffle instances. Typically a ConnectionPool. + digest_metric: + A callable to ingest a performance metric. + Typically Server.digest_metric. scheduler: - A PooledRPCCall to to contact the scheduler. + A PooledRPCCall to contact the scheduler. memory_limiter_disk: memory_limiter_comm: A ``ResourceLimiter`` limiting the total amount of memory used in either @@ -336,10 +341,12 @@ def __init__( new: ChunkedAxes, id: ShuffleId, run_id: int, + span_id: str | None, local_address: str, directory: str, executor: ThreadPoolExecutor, rpc: Callable[[str], PooledRPCCall], + digest_metric: Callable[[Hashable, float], None], scheduler: PooledRPCCall, memory_limiter_disk: ResourceLimiter, memory_limiter_comms: ResourceLimiter, @@ -349,10 +356,12 @@ def __init__( super().__init__( id=id, run_id=run_id, + span_id=span_id, local_address=local_address, directory=directory, executor=executor, rpc=rpc, + digest_metric=digest_metric, scheduler=scheduler, memory_limiter_comms=memory_limiter_comms, memory_limiter_disk=memory_limiter_disk, @@ -485,6 +494,7 @@ def pick_worker(self, partition: NDIndex, workers: Sequence[str]) -> str: def create_run_on_worker( self, run_id: int, + span_id: str | None, worker_for: dict[NDIndex, str], plugin: ShuffleWorkerPlugin, ) -> ShuffleRun: @@ -494,6 +504,7 @@ def create_run_on_worker( new=self.new, id=self.id, run_id=run_id, + span_id=span_id, directory=os.path.join( plugin.worker.local_directory, f"shuffle-{self.id}-{run_id}", @@ -501,6 +512,7 @@ def create_run_on_worker( executor=plugin._executor, local_address=plugin.worker.address, rpc=plugin.worker.rpc, + digest_metric=plugin.worker.digest_metric, scheduler=plugin.worker.scheduler, memory_limiter_disk=plugin.memory_limiter_disk, memory_limiter_comms=plugin.memory_limiter_comms, diff --git a/distributed/shuffle/_scheduler_plugin.py b/distributed/shuffle/_scheduler_plugin.py index 3f2f4dc50b6..95746abb45b 100644 --- a/distributed/shuffle/_scheduler_plugin.py +++ b/distributed/shuffle/_scheduler_plugin.py @@ -146,7 +146,9 @@ def get_or_create( self._raise_if_task_not_processing(key) worker_for = self._calculate_worker_for(spec) self._ensure_output_tasks_are_non_rootish(spec) - state = spec.create_new_run(worker_for) + state = spec.create_new_run( + worker_for=worker_for, span_id=self.scheduler.tasks[key].group.span_id + ) self.active_shuffles[spec.id] = state self._shuffles[spec.id].add(state) state.participating_workers.add(worker) diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 22de59d838a..01bd0302922 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -7,6 +7,7 @@ Callable, Collection, Generator, + Hashable, Iterable, Iterator, Sequence, @@ -397,6 +398,8 @@ class DataFrameShuffleRun(ShuffleRun[int, "pd.DataFrame"]): A unique `ShuffleID` this belongs to. run_id: A unique identifier of the specific execution of the shuffle this belongs to. + span_id: + Span identifier; see :doc:`spans` local_address: The local address this Shuffle can be contacted by using `rpc`. directory: @@ -406,8 +409,11 @@ class DataFrameShuffleRun(ShuffleRun[int, "pd.DataFrame"]): rpc: A callable returning a PooledRPCCall to contact other Shuffle instances. Typically a ConnectionPool. + digest_metric: + A callable to ingest a performance metric. + Typically Server.digest_metric. scheduler: - A PooledRPCCall to to contact the scheduler. + A PooledRPCCall to contact the scheduler. memory_limiter_disk: memory_limiter_comm: A ``ResourceLimiter`` limiting the total amount of memory used in either @@ -426,10 +432,12 @@ def __init__( meta: pd.DataFrame, id: ShuffleId, run_id: int, + span_id: str | None, local_address: str, directory: str, executor: ThreadPoolExecutor, rpc: Callable[[str], PooledRPCCall], + digest_metric: Callable[[Hashable, float], None], scheduler: PooledRPCCall, memory_limiter_disk: ResourceLimiter, memory_limiter_comms: ResourceLimiter, @@ -441,10 +449,12 @@ def __init__( super().__init__( id=id, run_id=run_id, + span_id=span_id, local_address=local_address, directory=directory, executor=executor, rpc=rpc, + digest_metric=digest_metric, scheduler=scheduler, memory_limiter_comms=memory_limiter_comms, memory_limiter_disk=memory_limiter_disk, @@ -543,7 +553,11 @@ def pick_worker(self, partition: int, workers: Sequence[str]) -> str: return _get_worker_for_range_sharding(self.npartitions, partition, workers) def create_run_on_worker( - self, run_id: int, worker_for: dict[int, str], plugin: ShuffleWorkerPlugin + self, + run_id: int, + span_id: str | None, + worker_for: dict[int, str], + plugin: ShuffleWorkerPlugin, ) -> ShuffleRun: return DataFrameShuffleRun( column=self.column, @@ -551,6 +565,7 @@ def create_run_on_worker( worker_for=worker_for, id=self.id, run_id=run_id, + span_id=span_id, directory=os.path.join( plugin.worker.local_directory, f"shuffle-{self.id}-{run_id}", @@ -558,6 +573,7 @@ def create_run_on_worker( executor=plugin._executor, local_address=plugin.worker.address, rpc=plugin.worker.rpc, + digest_metric=plugin.worker.digest_metric, scheduler=plugin.worker.scheduler, memory_limiter_disk=plugin.memory_limiter_disk if self.disk diff --git a/distributed/shuffle/_worker_plugin.py b/distributed/shuffle/_worker_plugin.py index 02728e7e20b..0eebf2fdd3e 100644 --- a/distributed/shuffle/_worker_plugin.py +++ b/distributed/shuffle/_worker_plugin.py @@ -241,7 +241,10 @@ async def _refresh( f" expected run_id > {stale_run_id}" ) shuffle_run = result.spec.create_run_on_worker( - result.run_id, result.worker_for, self._plugin + run_id=result.run_id, + worker_for=result.worker_for, + plugin=self._plugin, + span_id=result.span_id, ) self._active_runs[shuffle_id] = shuffle_run self._runs.add(shuffle_run) diff --git a/distributed/shuffle/tests/test_metrics.py b/distributed/shuffle/tests/test_metrics.py index f4c1b7f6820..96634e929f1 100644 --- a/distributed/shuffle/tests/test_metrics.py +++ b/distributed/shuffle/tests/test_metrics.py @@ -11,13 +11,21 @@ def assert_metrics(s: Scheduler, *keys: tuple[str, ...]) -> None: + metrics = dict(s.cumulative_worker_metrics) + span = s.extensions["spans"].spans_search_by_name["default",][0] + span_metrics = dict(span.cumulative_worker_metrics) + for key in keys: - assert key in s.cumulative_worker_metrics - value = s.cumulative_worker_metrics[key] - if key[-1] == "seconds": - assert value >= 0 - else: # count or bytes - assert value > 0 + value = metrics.pop(key) + assert span_metrics.pop(key) == value + if key[0] == "execute": + key2 = ("shuffle", "foreground", key[2].replace("shuffle-", ""), key[3]) + assert metrics.pop(key2) == value + assert span_metrics.pop(key2) == value + + # Check that the test doesn't omit any metrics + assert [key for key in metrics if key[0] == "shuffle"] == [] + assert [key for key in span_metrics if key[0] == "shuffle"] == [] @gen_cluster(client=True, config={"optimization.fuse.active": False}) @@ -39,6 +47,21 @@ async def test_rechunk(c, s, a, b): ("execute", "rechunk", "shuffle-disk-read", "count"), ("execute", "rechunk", "shuffle-get-output-cpu", "seconds"), ("execute", "rechunk", "shuffle-get-output-noncpu", "seconds"), + ("shuffle", "background-comms", "compress", "seconds"), + ("shuffle", "background-comms", "idle", "seconds"), + ("shuffle", "background-comms", "process", "bytes"), + ("shuffle", "background-comms", "process", "count"), + ("shuffle", "background-comms", "process", "seconds"), + ("shuffle", "background-comms", "send", "seconds"), + ("shuffle", "background-comms", "serialize", "seconds"), + ("shuffle", "background-disk", "disk-write", "bytes"), + ("shuffle", "background-disk", "disk-write", "count"), + ("shuffle", "background-disk", "disk-write", "seconds"), + ("shuffle", "background-disk", "idle", "seconds"), + ("shuffle", "background-disk", "process", "bytes"), + ("shuffle", "background-disk", "process", "count"), + ("shuffle", "background-disk", "process", "seconds"), + ("shuffle", "background-disk", "serialize", "seconds"), ) @@ -71,4 +94,18 @@ async def test_dataframe(c, s, a, b): ("execute", "shuffle_p2p", "shuffle-disk-read", "count"), ("execute", "shuffle_p2p", "shuffle-get-output-cpu", "seconds"), ("execute", "shuffle_p2p", "shuffle-get-output-noncpu", "seconds"), + ("shuffle", "background-comms", "compress", "seconds"), + ("shuffle", "background-comms", "idle", "seconds"), + ("shuffle", "background-comms", "process", "bytes"), + ("shuffle", "background-comms", "process", "count"), + ("shuffle", "background-comms", "process", "seconds"), + ("shuffle", "background-comms", "send", "seconds"), + ("shuffle", "background-comms", "serialize", "seconds"), + ("shuffle", "background-disk", "disk-write", "bytes"), + ("shuffle", "background-disk", "disk-write", "count"), + ("shuffle", "background-disk", "disk-write", "seconds"), + ("shuffle", "background-disk", "idle", "seconds"), + ("shuffle", "background-disk", "process", "bytes"), + ("shuffle", "background-disk", "process", "count"), + ("shuffle", "background-disk", "process", "seconds"), ) diff --git a/distributed/shuffle/tests/test_rechunk.py b/distributed/shuffle/tests/test_rechunk.py index 6dfc6e326ef..b75526950c4 100644 --- a/distributed/shuffle/tests/test_rechunk.py +++ b/distributed/shuffle/tests/test_rechunk.py @@ -68,9 +68,11 @@ def new_shuffle( directory=directory / name, id=ShuffleId(name), run_id=next(AbstractShuffleTestPool._shuffle_run_id_iterator), + span_id=None, local_address=name, executor=self._executor, rpc=self, + digest_metric=lambda name, value: None, scheduler=self, memory_limiter_disk=ResourceLimiter(10000000), memory_limiter_comms=ResourceLimiter(10000000), diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index c34a85cd9f0..b9864d62dad 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -1578,9 +1578,11 @@ def new_shuffle( directory=directory / name, id=ShuffleId(name), run_id=next(AbstractShuffleTestPool._shuffle_run_id_iterator), + span_id=None, local_address=name, executor=self._executor, rpc=self, + digest_metric=lambda name, value: None, scheduler=self, memory_limiter_disk=ResourceLimiter(10000000), memory_limiter_comms=ResourceLimiter(10000000), diff --git a/distributed/spans.py b/distributed/spans.py index ef90b073fe8..9a71e1ce86f 100644 --- a/distributed/spans.py +++ b/distributed/spans.py @@ -25,6 +25,9 @@ from distributed.scheduler import TaskGroup, TaskStateState +CONTEXTS_WITH_SPAN_ID = ("execute", "shuffle") + + @contextmanager def span(*tags: str) -> Iterator[str]: """Tag group of tasks to be part of a certain group, called a span. @@ -316,8 +319,10 @@ def cumulative_worker_metrics(self) -> dict[tuple[Hashable, ...], float]: At the moment of writing, all keys are ``("execute", , , )`` - but more may be added in the future with a different format; please test for - ``k[0] == "execute"``. + or + ``("shuffle", , , )`` + but more may be added in the future with a different format; please test e.g. + for ``k[0] == "execute"``. """ out = sum_mappings( child._cumulative_worker_metrics for child in self.traverse_spans() @@ -634,7 +639,7 @@ def collect_digests(self) -> None: self.digests_total_since_heartbeat = { k: v for k, v in self.worker.digests_total_since_heartbeat.items() - if isinstance(k, tuple) and k[0] == "execute" + if isinstance(k, tuple) and k[0] in CONTEXTS_WITH_SPAN_ID } def heartbeat(self) -> dict[tuple[Hashable, ...], float]: diff --git a/distributed/worker.py b/distributed/worker.py index d87dfe57c87..55dd5a7724f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -92,7 +92,7 @@ from distributed.pubsub import PubSubWorkerExtension from distributed.security import Security from distributed.sizeof import safe_sizeof as sizeof -from distributed.spans import SpansWorkerExtension +from distributed.spans import CONTEXTS_WITH_SPAN_ID, SpansWorkerExtension from distributed.threadpoolexecutor import ThreadPoolExecutor from distributed.threadpoolexecutor import secede as tpe_secede from distributed.utils import ( @@ -1041,7 +1041,7 @@ async def get_metrics(self) -> dict: # Don't cast int metrics to float digests: defaultdict[Hashable, float] = defaultdict(int) for k, v in self.digests_total_since_heartbeat.items(): - if isinstance(k, tuple) and k[0] == "execute": + if isinstance(k, tuple) and k[0] in CONTEXTS_WITH_SPAN_ID: k = k[:1] + k[2:] digests[k] += v