From b206f826b17f905fa20fc4301a41c62e6f0d222b Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 30 Nov 2023 16:24:14 +0000 Subject: [PATCH] Shuffle metrics 3/4: Capture background metrics --- distributed/shuffle/_core.py | 91 ++++++++++++++++++----- distributed/shuffle/_rechunk.py | 16 +++- distributed/shuffle/_scheduler_plugin.py | 4 +- distributed/shuffle/_shuffle.py | 21 +++++- distributed/shuffle/_worker_plugin.py | 5 +- distributed/shuffle/tests/test_metrics.py | 49 ++++++++++-- distributed/shuffle/tests/test_rechunk.py | 2 + distributed/shuffle/tests/test_shuffle.py | 2 + distributed/spans.py | 11 ++- distributed/worker.py | 4 +- 10 files changed, 168 insertions(+), 37 deletions(-) diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index f722f94a53e..48b22473d69 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -7,7 +7,7 @@ import pickle import time from collections import defaultdict -from collections.abc import Callable, Iterable, Iterator, Sequence +from collections.abc import Callable, Hashable, Iterable, Iterator, Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from enum import Enum @@ -56,9 +56,11 @@ class ShuffleRun(Generic[_T_partition_id, _T_partition_type]): id: ShuffleId run_id: int + span_id: str | None local_address: str executor: ThreadPoolExecutor rpc: Callable[[str], PooledRPCCall] + digest_metric: Callable[[Hashable, float], None] scheduler: PooledRPCCall closed: bool _disk_buffer: DiskShardsBuffer | MemoryShardsBuffer @@ -79,10 +81,12 @@ def __init__( self, id: ShuffleId, run_id: int, + span_id: str | None, local_address: str, directory: str, executor: ThreadPoolExecutor, rpc: Callable[[str], PooledRPCCall], + digest_metric: Callable[[Hashable, float], None], scheduler: PooledRPCCall, memory_limiter_disk: ResourceLimiter, memory_limiter_comms: ResourceLimiter, @@ -91,23 +95,33 @@ def __init__( ): self.id = id self.run_id = run_id + self.span_id = span_id self.local_address = local_address self.executor = executor self.rpc = rpc + self.digest_metric = digest_metric self.scheduler = scheduler self.closed = False - if disk: - self._disk_buffer = DiskShardsBuffer( - directory=directory, - read=self.read, - memory_limiter=memory_limiter_disk, - ) - else: - self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize) - self._comm_buffer = CommShardsBuffer( - send=self.send, memory_limiter=memory_limiter_comms - ) + # Initialize buffers and start background tasks + # Don't log metrics issued by the background tasks onto the dask task that + # spawned this object + with context_meter.clear_callbacks(): + with self._capture_metrics("background-disk"): + if disk: + self._disk_buffer = DiskShardsBuffer( + directory=directory, + read=self.read, + memory_limiter=memory_limiter_disk, + ) + else: + self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize) + + with self._capture_metrics("background-comms"): + self._comm_buffer = CommShardsBuffer( + send=self.send, memory_limiter=memory_limiter_comms + ) + # TODO: reduce number of connections to number of workers # MultiComm.max_connections = min(10, n_workers) @@ -137,6 +151,38 @@ def __str__(self) -> str: def __hash__(self) -> int: return self.run_id + @contextlib.contextmanager + def _capture_metrics(self, where: str) -> Iterator[None]: + """Capture context_meter metrics as + + {('shuffle', , 'foreground|background...', label, unit): value} + + **Note 1:** When the metric is not logged by a background task + (where='foreground'), this produces a duplicated metric under + + {('execute', , , label, unit): value} + + This is by design so that one can have a holistic view of the whole shuffle + process. + + **Note 2:** We're immediately writing to Worker.digests. + We don't temporarily store metrics under ShuffleRun as we would lose those + recorded between the heartbeat and when the ShuffleRun object is deleted at the + end of a run. + """ + + def callback(label: Hashable, value: float, unit: str) -> None: + if not isinstance(label, tuple): + label = (label,) + if isinstance(label[0], str) and label[0].startswith("shuffle-"): + label = (label[0][len("shuffle-") :], *label[1:]) + name = ("shuffle", self.span_id, where, *label, unit) + + self.digest_metric(name, value) + + with context_meter.add_callback(callback, allow_offload="background" in where): + yield + @contextlib.contextmanager def time(self, name: str) -> Iterator[None]: start = time.time() @@ -289,12 +335,14 @@ def add_partition( self.raise_if_closed() if self.transferred: raise RuntimeError(f"Cannot add more partitions to {self}") - with ( - context_meter.meter("shuffle-shard-partition-noncpu"), - context_meter.meter("shuffle-shard-partition-cpu", func=thread_time), - ): - shards = self._shard_partition(data, partition_id) - sync(self._loop, self._write_to_comm, shards) + # Log metrics both in the "execute" and in the "shuffle" contexts + with self._capture_metrics("foreground"): + with ( + context_meter.meter("shuffle-shard-partition-noncpu"), + context_meter.meter("shuffle-shard-partition-cpu", func=thread_time), + ): + shards = self._shard_partition(data, partition_id) + sync(self._loop, self._write_to_comm, shards) return self.run_id @abc.abstractmethod @@ -312,6 +360,8 @@ def get_output_partition( raise RuntimeError("`get_output_partition` called before barrier task") sync(self._loop, self.flush_receive) with ( + # Log metrics both in the "execute" and in the "shuffle" contexts + self._capture_metrics("foreground"), context_meter.meter("shuffle-get-output-noncpu"), context_meter.meter("shuffle-get-output-cpu", func=thread_time), ): @@ -373,6 +423,7 @@ class ShuffleRunSpec(Generic[_T_partition_id]): run_id: int = field(init=False, default_factory=partial(next, itertools.count(1))) spec: ShuffleSpec worker_for: dict[_T_partition_id, str] + span_id: str | None @property def id(self) -> ShuffleId: @@ -387,10 +438,11 @@ class ShuffleSpec(abc.ABC, Generic[_T_partition_id]): def create_new_run( self, plugin: ShuffleSchedulerPlugin, + span_id: str | None, ) -> SchedulerShuffleState: worker_for = self._pin_output_workers(plugin) return SchedulerShuffleState( - run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for), + run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for, span_id=span_id), participating_workers=set(worker_for.values()), ) @@ -404,6 +456,7 @@ def _pin_output_workers( def create_run_on_worker( self, run_id: int, + span_id: str | None, worker_for: dict[_T_partition_id, str], plugin: ShuffleWorkerPlugin, ) -> ShuffleRun: diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 0e9eb2a287d..e329a34940a 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -99,7 +99,7 @@ import mmap import os from collections import defaultdict -from collections.abc import Callable, Sequence +from collections.abc import Callable, Hashable, Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from itertools import product @@ -313,6 +313,8 @@ class ArrayRechunkRun(ShuffleRun[NDIndex, "np.ndarray"]): A unique `ShuffleID` this belongs to. run_id: A unique identifier of the specific execution of the shuffle this belongs to. + span_id: + Span identifier; see :doc:`spans` local_address: The local address this Shuffle can be contacted by using `rpc`. directory: @@ -322,8 +324,11 @@ class ArrayRechunkRun(ShuffleRun[NDIndex, "np.ndarray"]): rpc: A callable returning a PooledRPCCall to contact other Shuffle instances. Typically a ConnectionPool. + digest_metric: + A callable to ingest a performance metric. + Typically Server.digest_metric. scheduler: - A PooledRPCCall to to contact the scheduler. + A PooledRPCCall to contact the scheduler. memory_limiter_disk: memory_limiter_comm: A ``ResourceLimiter`` limiting the total amount of memory used in either @@ -337,10 +342,12 @@ def __init__( new: ChunkedAxes, id: ShuffleId, run_id: int, + span_id: str | None, local_address: str, directory: str, executor: ThreadPoolExecutor, rpc: Callable[[str], PooledRPCCall], + digest_metric: Callable[[Hashable, float], None], scheduler: PooledRPCCall, memory_limiter_disk: ResourceLimiter, memory_limiter_comms: ResourceLimiter, @@ -350,10 +357,12 @@ def __init__( super().__init__( id=id, run_id=run_id, + span_id=span_id, local_address=local_address, directory=directory, executor=executor, rpc=rpc, + digest_metric=digest_metric, scheduler=scheduler, memory_limiter_comms=memory_limiter_comms, memory_limiter_disk=memory_limiter_disk, @@ -475,6 +484,7 @@ def _pin_output_workers(self, plugin: ShuffleSchedulerPlugin) -> dict[NDIndex, s def create_run_on_worker( self, run_id: int, + span_id: str | None, worker_for: dict[NDIndex, str], plugin: ShuffleWorkerPlugin, ) -> ShuffleRun: @@ -484,6 +494,7 @@ def create_run_on_worker( new=self.new, id=self.id, run_id=run_id, + span_id=span_id, directory=os.path.join( plugin.worker.local_directory, f"shuffle-{self.id}-{run_id}", @@ -491,6 +502,7 @@ def create_run_on_worker( executor=plugin._executor, local_address=plugin.worker.address, rpc=plugin.worker.rpc, + digest_metric=plugin.worker.digest_metric, scheduler=plugin.worker.scheduler, memory_limiter_disk=plugin.memory_limiter_disk, memory_limiter_comms=plugin.memory_limiter_comms, diff --git a/distributed/shuffle/_scheduler_plugin.py b/distributed/shuffle/_scheduler_plugin.py index cc9ff491acf..0db336a5d2f 100644 --- a/distributed/shuffle/_scheduler_plugin.py +++ b/distributed/shuffle/_scheduler_plugin.py @@ -145,7 +145,9 @@ def get_or_create( # that the shuffle works as intended and should fail instead. self._raise_if_barrier_unknown(spec.id) self._raise_if_task_not_processing(key) - state = spec.create_new_run(self) + state = spec.create_new_run( + self, span_id=self.scheduler.tasks[key].group.span_id + ) self.active_shuffles[spec.id] = state self._shuffles[spec.id].add(state) state.participating_workers.add(worker) diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 0ea2cb5524a..16c30c34e98 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -3,7 +3,7 @@ import logging import os from collections import defaultdict -from collections.abc import Callable, Collection, Iterable, Iterator, Sequence +from collections.abc import Callable, Collection, Hashable, Iterable, Iterator, Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from functools import partial @@ -392,6 +392,8 @@ class DataFrameShuffleRun(ShuffleRun[int, "pd.DataFrame"]): A unique `ShuffleID` this belongs to. run_id: A unique identifier of the specific execution of the shuffle this belongs to. + span_id: + Span identifier; see :doc:`spans` local_address: The local address this Shuffle can be contacted by using `rpc`. directory: @@ -401,8 +403,11 @@ class DataFrameShuffleRun(ShuffleRun[int, "pd.DataFrame"]): rpc: A callable returning a PooledRPCCall to contact other Shuffle instances. Typically a ConnectionPool. + digest_metric: + A callable to ingest a performance metric. + Typically Server.digest_metric. scheduler: - A PooledRPCCall to to contact the scheduler. + A PooledRPCCall to contact the scheduler. memory_limiter_disk: memory_limiter_comm: A ``ResourceLimiter`` limiting the total amount of memory used in either @@ -421,10 +426,12 @@ def __init__( meta: pd.DataFrame, id: ShuffleId, run_id: int, + span_id: str | None, local_address: str, directory: str, executor: ThreadPoolExecutor, rpc: Callable[[str], PooledRPCCall], + digest_metric: Callable[[Hashable, float], None], scheduler: PooledRPCCall, memory_limiter_disk: ResourceLimiter, memory_limiter_comms: ResourceLimiter, @@ -436,10 +443,12 @@ def __init__( super().__init__( id=id, run_id=run_id, + span_id=span_id, local_address=local_address, directory=directory, executor=executor, rpc=rpc, + digest_metric=digest_metric, scheduler=scheduler, memory_limiter_comms=memory_limiter_comms, memory_limiter_disk=memory_limiter_disk, @@ -535,7 +544,11 @@ def _pin_output_workers(self, plugin: ShuffleSchedulerPlugin) -> dict[int, str]: return plugin._pin_output_workers(self.id, self.parts_out, pick_worker) def create_run_on_worker( - self, run_id: int, worker_for: dict[int, str], plugin: ShuffleWorkerPlugin + self, + run_id: int, + span_id: str | None, + worker_for: dict[int, str], + plugin: ShuffleWorkerPlugin, ) -> ShuffleRun: return DataFrameShuffleRun( column=self.column, @@ -543,6 +556,7 @@ def create_run_on_worker( worker_for=worker_for, id=self.id, run_id=run_id, + span_id=span_id, directory=os.path.join( plugin.worker.local_directory, f"shuffle-{self.id}-{run_id}", @@ -550,6 +564,7 @@ def create_run_on_worker( executor=plugin._executor, local_address=plugin.worker.address, rpc=plugin.worker.rpc, + digest_metric=plugin.worker.digest_metric, scheduler=plugin.worker.scheduler, memory_limiter_disk=plugin.memory_limiter_disk if self.disk diff --git a/distributed/shuffle/_worker_plugin.py b/distributed/shuffle/_worker_plugin.py index 02728e7e20b..0eebf2fdd3e 100644 --- a/distributed/shuffle/_worker_plugin.py +++ b/distributed/shuffle/_worker_plugin.py @@ -241,7 +241,10 @@ async def _refresh( f" expected run_id > {stale_run_id}" ) shuffle_run = result.spec.create_run_on_worker( - result.run_id, result.worker_for, self._plugin + run_id=result.run_id, + worker_for=result.worker_for, + plugin=self._plugin, + span_id=result.span_id, ) self._active_runs[shuffle_id] = shuffle_run self._runs.add(shuffle_run) diff --git a/distributed/shuffle/tests/test_metrics.py b/distributed/shuffle/tests/test_metrics.py index f4c1b7f6820..96634e929f1 100644 --- a/distributed/shuffle/tests/test_metrics.py +++ b/distributed/shuffle/tests/test_metrics.py @@ -11,13 +11,21 @@ def assert_metrics(s: Scheduler, *keys: tuple[str, ...]) -> None: + metrics = dict(s.cumulative_worker_metrics) + span = s.extensions["spans"].spans_search_by_name["default",][0] + span_metrics = dict(span.cumulative_worker_metrics) + for key in keys: - assert key in s.cumulative_worker_metrics - value = s.cumulative_worker_metrics[key] - if key[-1] == "seconds": - assert value >= 0 - else: # count or bytes - assert value > 0 + value = metrics.pop(key) + assert span_metrics.pop(key) == value + if key[0] == "execute": + key2 = ("shuffle", "foreground", key[2].replace("shuffle-", ""), key[3]) + assert metrics.pop(key2) == value + assert span_metrics.pop(key2) == value + + # Check that the test doesn't omit any metrics + assert [key for key in metrics if key[0] == "shuffle"] == [] + assert [key for key in span_metrics if key[0] == "shuffle"] == [] @gen_cluster(client=True, config={"optimization.fuse.active": False}) @@ -39,6 +47,21 @@ async def test_rechunk(c, s, a, b): ("execute", "rechunk", "shuffle-disk-read", "count"), ("execute", "rechunk", "shuffle-get-output-cpu", "seconds"), ("execute", "rechunk", "shuffle-get-output-noncpu", "seconds"), + ("shuffle", "background-comms", "compress", "seconds"), + ("shuffle", "background-comms", "idle", "seconds"), + ("shuffle", "background-comms", "process", "bytes"), + ("shuffle", "background-comms", "process", "count"), + ("shuffle", "background-comms", "process", "seconds"), + ("shuffle", "background-comms", "send", "seconds"), + ("shuffle", "background-comms", "serialize", "seconds"), + ("shuffle", "background-disk", "disk-write", "bytes"), + ("shuffle", "background-disk", "disk-write", "count"), + ("shuffle", "background-disk", "disk-write", "seconds"), + ("shuffle", "background-disk", "idle", "seconds"), + ("shuffle", "background-disk", "process", "bytes"), + ("shuffle", "background-disk", "process", "count"), + ("shuffle", "background-disk", "process", "seconds"), + ("shuffle", "background-disk", "serialize", "seconds"), ) @@ -71,4 +94,18 @@ async def test_dataframe(c, s, a, b): ("execute", "shuffle_p2p", "shuffle-disk-read", "count"), ("execute", "shuffle_p2p", "shuffle-get-output-cpu", "seconds"), ("execute", "shuffle_p2p", "shuffle-get-output-noncpu", "seconds"), + ("shuffle", "background-comms", "compress", "seconds"), + ("shuffle", "background-comms", "idle", "seconds"), + ("shuffle", "background-comms", "process", "bytes"), + ("shuffle", "background-comms", "process", "count"), + ("shuffle", "background-comms", "process", "seconds"), + ("shuffle", "background-comms", "send", "seconds"), + ("shuffle", "background-comms", "serialize", "seconds"), + ("shuffle", "background-disk", "disk-write", "bytes"), + ("shuffle", "background-disk", "disk-write", "count"), + ("shuffle", "background-disk", "disk-write", "seconds"), + ("shuffle", "background-disk", "idle", "seconds"), + ("shuffle", "background-disk", "process", "bytes"), + ("shuffle", "background-disk", "process", "count"), + ("shuffle", "background-disk", "process", "seconds"), ) diff --git a/distributed/shuffle/tests/test_rechunk.py b/distributed/shuffle/tests/test_rechunk.py index d12a5474727..f138aed9fe8 100644 --- a/distributed/shuffle/tests/test_rechunk.py +++ b/distributed/shuffle/tests/test_rechunk.py @@ -68,9 +68,11 @@ def new_shuffle( directory=directory / name, id=ShuffleId(name), run_id=next(AbstractShuffleTestPool._shuffle_run_id_iterator), + span_id=None, local_address=name, executor=self._executor, rpc=self, + digest_metric=lambda name, value: None, scheduler=self, memory_limiter_disk=ResourceLimiter(10000000), memory_limiter_comms=ResourceLimiter(10000000), diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index c04ab49f821..0807bffaeaa 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -1578,9 +1578,11 @@ def new_shuffle( directory=directory / name, id=ShuffleId(name), run_id=next(AbstractShuffleTestPool._shuffle_run_id_iterator), + span_id=None, local_address=name, executor=self._executor, rpc=self, + digest_metric=lambda name, value: None, scheduler=self, memory_limiter_disk=ResourceLimiter(10000000), memory_limiter_comms=ResourceLimiter(10000000), diff --git a/distributed/spans.py b/distributed/spans.py index ef90b073fe8..9a71e1ce86f 100644 --- a/distributed/spans.py +++ b/distributed/spans.py @@ -25,6 +25,9 @@ from distributed.scheduler import TaskGroup, TaskStateState +CONTEXTS_WITH_SPAN_ID = ("execute", "shuffle") + + @contextmanager def span(*tags: str) -> Iterator[str]: """Tag group of tasks to be part of a certain group, called a span. @@ -316,8 +319,10 @@ def cumulative_worker_metrics(self) -> dict[tuple[Hashable, ...], float]: At the moment of writing, all keys are ``("execute", , , )`` - but more may be added in the future with a different format; please test for - ``k[0] == "execute"``. + or + ``("shuffle", , , )`` + but more may be added in the future with a different format; please test e.g. + for ``k[0] == "execute"``. """ out = sum_mappings( child._cumulative_worker_metrics for child in self.traverse_spans() @@ -634,7 +639,7 @@ def collect_digests(self) -> None: self.digests_total_since_heartbeat = { k: v for k, v in self.worker.digests_total_since_heartbeat.items() - if isinstance(k, tuple) and k[0] == "execute" + if isinstance(k, tuple) and k[0] in CONTEXTS_WITH_SPAN_ID } def heartbeat(self) -> dict[tuple[Hashable, ...], float]: diff --git a/distributed/worker.py b/distributed/worker.py index 5e77db87d7c..6b74044c673 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -92,7 +92,7 @@ from distributed.pubsub import PubSubWorkerExtension from distributed.security import Security from distributed.sizeof import safe_sizeof as sizeof -from distributed.spans import SpansWorkerExtension +from distributed.spans import CONTEXTS_WITH_SPAN_ID, SpansWorkerExtension from distributed.threadpoolexecutor import ThreadPoolExecutor from distributed.threadpoolexecutor import secede as tpe_secede from distributed.utils import ( @@ -1045,7 +1045,7 @@ async def get_metrics(self) -> dict: # Don't cast int metrics to float digests: defaultdict[Hashable, float] = defaultdict(int) for k, v in self.digests_total_since_heartbeat.items(): - if isinstance(k, tuple) and k[0] == "execute": + if isinstance(k, tuple) and k[0] in CONTEXTS_WITH_SPAN_ID: k = k[:1] + k[2:] digests[k] += v