Skip to content

Commit

Permalink
Shuffle metrics 3/4: Capture background metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Dec 18, 2023
1 parent 8fc2f5e commit 7cc4088
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 42 deletions.
13 changes: 7 additions & 6 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3614,21 +3614,22 @@ def _build_data_sources(self) -> None:
)
if spans_ext and self.span_tag_selector.value:
span = spans_ext.merge_by_tags(*self.span_tag_selector.value)
execute_metrics = span.cumulative_worker_metrics
metrics = span.cumulative_worker_metrics
elif spans_ext and spans_ext.spans:
# Calculate idle time
span = spans_ext.merge_all()
execute_metrics = span.cumulative_worker_metrics
metrics = span.cumulative_worker_metrics
else:
# Spans extension is not loaded
execute_metrics = {
metrics = {
k: v
for k, v in self.scheduler.cumulative_worker_metrics.items()
if isinstance(k, tuple) and k[0] == "execute"
if isinstance(k, tuple)
}

for (context, function, activity, unit), v in execute_metrics.items():
assert context == "execute"
for (context, function, activity, unit), v in metrics.items():
if context != "execute":
continue # TODO visualize 'shuffle' metrics
assert isinstance(function, str)
assert isinstance(unit, str)
assert self.unit_selector.value
Expand Down
21 changes: 21 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,27 @@ async def test_FinePerformanceMetrics_no_spans(c, s, a, b):
assert sorted(cl.visible_activities) == ["memory-read"]


@gen_cluster(client=True)
async def test_FinePerformanceMetrics_shuffle(c, s, a, b):
da = pytest.importorskip("dask.array")

x = da.random.random((4, 4), chunks=(1, -1))
x = x.rechunk((-1, 1), method="p2p")
x = x.sum()
await c.compute(x)
await a.heartbeat()
await b.heartbeat()

cl = FinePerformanceMetrics(s)
cl.update()
# execute metrics from shuffle
assert "shuffle-barrier" in cl.visible_functions
assert "shuffle-shard-partition-cpu" in cl.visible_activities
# shuffle metrics have been filtered out
assert "background-comms" not in cl.visible_functions
assert "shard-partition-cpu" not in cl.visible_activities


@gen_cluster(client=True)
async def test_ClusterMemory(c, s, a, b):
cl = ClusterMemory(s)
Expand Down
91 changes: 72 additions & 19 deletions distributed/shuffle/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pickle
import time
from collections import defaultdict
from collections.abc import Callable, Generator, Iterable, Iterator, Sequence
from collections.abc import Callable, Generator, Hashable, Iterable, Iterator, Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from enum import Enum
Expand Down Expand Up @@ -55,9 +55,11 @@
class ShuffleRun(Generic[_T_partition_id, _T_partition_type]):
id: ShuffleId
run_id: int
span_id: str | None
local_address: str
executor: ThreadPoolExecutor
rpc: Callable[[str], PooledRPCCall]
digest_metric: Callable[[Hashable, float], None]
scheduler: PooledRPCCall
closed: bool
_disk_buffer: DiskShardsBuffer | MemoryShardsBuffer
Expand All @@ -78,10 +80,12 @@ def __init__(
self,
id: ShuffleId,
run_id: int,
span_id: str | None,
local_address: str,
directory: str,
executor: ThreadPoolExecutor,
rpc: Callable[[str], PooledRPCCall],
digest_metric: Callable[[Hashable, float], None],
scheduler: PooledRPCCall,
memory_limiter_disk: ResourceLimiter,
memory_limiter_comms: ResourceLimiter,
Expand All @@ -90,23 +94,33 @@ def __init__(
):
self.id = id
self.run_id = run_id
self.span_id = span_id
self.local_address = local_address
self.executor = executor
self.rpc = rpc
self.digest_metric = digest_metric
self.scheduler = scheduler
self.closed = False
if disk:
self._disk_buffer = DiskShardsBuffer(
directory=directory,
read=self.read,
memory_limiter=memory_limiter_disk,
)
else:
self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize)

self._comm_buffer = CommShardsBuffer(
send=self.send, memory_limiter=memory_limiter_comms
)
# Initialize buffers and start background tasks
# Don't log metrics issued by the background tasks onto the dask task that
# spawned this object
with context_meter.clear_callbacks():
with self._capture_metrics("background-disk"):
if disk:
self._disk_buffer = DiskShardsBuffer(
directory=directory,
read=self.read,
memory_limiter=memory_limiter_disk,
)
else:
self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize)

with self._capture_metrics("background-comms"):
self._comm_buffer = CommShardsBuffer(
send=self.send, memory_limiter=memory_limiter_comms
)

# TODO: reduce number of connections to number of workers
# MultiComm.max_connections = min(10, n_workers)

Expand Down Expand Up @@ -136,6 +150,38 @@ def __str__(self) -> str:
def __hash__(self) -> int:
return self.run_id

@contextlib.contextmanager
def _capture_metrics(self, where: str) -> Iterator[None]:
"""Capture context_meter metrics as
{('shuffle', <span id>, 'foreground|background...', label, unit): value}
**Note 1:** When the metric is not logged by a background task
(where='foreground'), this produces a duplicated metric under
{('execute', <span id>, <task prefix>, label, unit): value}
This is by design so that one can have a holistic view of the whole shuffle
process.
**Note 2:** We're immediately writing to Worker.digests.
We don't temporarily store metrics under ShuffleRun as we would lose those
recorded between the heartbeat and when the ShuffleRun object is deleted at the
end of a run.
"""

def callback(label: Hashable, value: float, unit: str) -> None:
if not isinstance(label, tuple):
label = (label,)
if isinstance(label[0], str) and label[0].startswith("shuffle-"):
label = (label[0][len("shuffle-") :], *label[1:])
name = ("shuffle", self.span_id, where, *label, unit)

self.digest_metric(name, value)

with context_meter.add_callback(callback, allow_offload="background" in where):
yield

@contextlib.contextmanager
def time(self, name: str) -> Iterator[None]:
start = time.time()
Expand Down Expand Up @@ -288,12 +334,14 @@ def add_partition(
self.raise_if_closed()
if self.transferred:
raise RuntimeError(f"Cannot add more partitions to {self}")
with (
context_meter.meter("shuffle-shard-partition-noncpu"),
context_meter.meter("shuffle-shard-partition-cpu", func=thread_time),
):
shards = self._shard_partition(data, partition_id)
sync(self._loop, self._write_to_comm, shards)
# Log metrics both in the "execute" and in the "shuffle" contexts
with self._capture_metrics("foreground"):
with (
context_meter.meter("shuffle-shard-partition-noncpu"),
context_meter.meter("shuffle-shard-partition-cpu", func=thread_time),
):
shards = self._shard_partition(data, partition_id)
sync(self._loop, self._write_to_comm, shards)
return self.run_id

@abc.abstractmethod
Expand All @@ -311,6 +359,8 @@ def get_output_partition(
raise RuntimeError("`get_output_partition` called before barrier task")
sync(self._loop, self.flush_receive)
with (
# Log metrics both in the "execute" and in the "shuffle" contexts
self._capture_metrics("foreground"),
context_meter.meter("shuffle-get-output-noncpu"),
context_meter.meter("shuffle-get-output-cpu", func=thread_time),
):
Expand Down Expand Up @@ -372,6 +422,7 @@ class ShuffleRunSpec(Generic[_T_partition_id]):
run_id: int = field(init=False, default_factory=partial(next, itertools.count(1)))
spec: ShuffleSpec
worker_for: dict[_T_partition_id, str]
span_id: str | None

@property
def id(self) -> ShuffleId:
Expand All @@ -395,16 +446,18 @@ def pick_worker(self, partition: _T_partition_id, workers: Sequence[str]) -> str
def create_new_run(
self,
worker_for: dict[_T_partition_id, str],
span_id: str | None,
) -> SchedulerShuffleState:
return SchedulerShuffleState(
run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for),
run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for, span_id=span_id),
participating_workers=set(worker_for.values()),
)

@abc.abstractmethod
def create_run_on_worker(
self,
run_id: int,
span_id: str | None,
worker_for: dict[_T_partition_id, str],
plugin: ShuffleWorkerPlugin,
) -> ShuffleRun:
Expand Down
16 changes: 14 additions & 2 deletions distributed/shuffle/_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
import mmap
import os
from collections import defaultdict
from collections.abc import Callable, Generator, Sequence
from collections.abc import Callable, Generator, Hashable, Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from itertools import product
Expand Down Expand Up @@ -312,6 +312,8 @@ class ArrayRechunkRun(ShuffleRun[NDIndex, "np.ndarray"]):
A unique `ShuffleID` this belongs to.
run_id:
A unique identifier of the specific execution of the shuffle this belongs to.
span_id:
Span identifier; see :doc:`spans`
local_address:
The local address this Shuffle can be contacted by using `rpc`.
directory:
Expand All @@ -321,8 +323,11 @@ class ArrayRechunkRun(ShuffleRun[NDIndex, "np.ndarray"]):
rpc:
A callable returning a PooledRPCCall to contact other Shuffle instances.
Typically a ConnectionPool.
digest_metric:
A callable to ingest a performance metric.
Typically Server.digest_metric.
scheduler:
A PooledRPCCall to to contact the scheduler.
A PooledRPCCall to contact the scheduler.
memory_limiter_disk:
memory_limiter_comm:
A ``ResourceLimiter`` limiting the total amount of memory used in either
Expand All @@ -336,10 +341,12 @@ def __init__(
new: ChunkedAxes,
id: ShuffleId,
run_id: int,
span_id: str | None,
local_address: str,
directory: str,
executor: ThreadPoolExecutor,
rpc: Callable[[str], PooledRPCCall],
digest_metric: Callable[[Hashable, float], None],
scheduler: PooledRPCCall,
memory_limiter_disk: ResourceLimiter,
memory_limiter_comms: ResourceLimiter,
Expand All @@ -349,10 +356,12 @@ def __init__(
super().__init__(
id=id,
run_id=run_id,
span_id=span_id,
local_address=local_address,
directory=directory,
executor=executor,
rpc=rpc,
digest_metric=digest_metric,
scheduler=scheduler,
memory_limiter_comms=memory_limiter_comms,
memory_limiter_disk=memory_limiter_disk,
Expand Down Expand Up @@ -485,6 +494,7 @@ def pick_worker(self, partition: NDIndex, workers: Sequence[str]) -> str:
def create_run_on_worker(
self,
run_id: int,
span_id: str | None,
worker_for: dict[NDIndex, str],
plugin: ShuffleWorkerPlugin,
) -> ShuffleRun:
Expand All @@ -494,13 +504,15 @@ def create_run_on_worker(
new=self.new,
id=self.id,
run_id=run_id,
span_id=span_id,
directory=os.path.join(
plugin.worker.local_directory,
f"shuffle-{self.id}-{run_id}",
),
executor=plugin._executor,
local_address=plugin.worker.address,
rpc=plugin.worker.rpc,
digest_metric=plugin.worker.digest_metric,
scheduler=plugin.worker.scheduler,
memory_limiter_disk=plugin.memory_limiter_disk,
memory_limiter_comms=plugin.memory_limiter_comms,
Expand Down
4 changes: 3 additions & 1 deletion distributed/shuffle/_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def get_or_create(
self._raise_if_task_not_processing(key)
worker_for = self._calculate_worker_for(spec)
self._ensure_output_tasks_are_non_rootish(spec)
state = spec.create_new_run(worker_for)
state = spec.create_new_run(
worker_for=worker_for, span_id=self.scheduler.tasks[key].group.span_id
)
self.active_shuffles[spec.id] = state
self._shuffles[spec.id].add(state)
state.participating_workers.add(worker)
Expand Down
Loading

0 comments on commit 7cc4088

Please sign in to comment.