From d7d8558b4d0684817b3d4447c12b0f7074f4dfd7 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 27 Nov 2023 18:54:00 +0100 Subject: [PATCH] Do not pre-serialize numpy shards --- distributed/shuffle/_disk.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/distributed/shuffle/_disk.py b/distributed/shuffle/_disk.py index 8929c2f3651..38ed4ff8b02 100644 --- a/distributed/shuffle/_disk.py +++ b/distributed/shuffle/_disk.py @@ -14,7 +14,7 @@ from distributed.shuffle._buffer import ShardsBuffer from distributed.shuffle._limiter import ResourceLimiter from distributed.shuffle._pickle import pickle_bytelist -from distributed.utils import Deadline, log_errors, nbytes +from distributed.utils import Deadline, empty_context, log_errors, nbytes class ReadWriteLock: @@ -157,15 +157,19 @@ async def _process(self, id: str, shards: list[Any]) -> None: if isinstance(shards[0], bytes): # Manually serialized dataframes frames = shards + serialize_meter_ctx: Any = empty_context else: # Unserialized numpy arrays - with context_meter.meter("serialize", func=thread_time): - frames = list(concat(pickle_bytelist(shard) for shard in shards)) + # Note: no calls to pickle_bytelist will happen until we actually start + # writing to disk below. + frames = concat(pickle_bytelist(shard) for shard in shards) + serialize_meter_ctx = context_meter.meter("serialize", func=thread_time) with ( self._directory_lock.read(), self.time("write"), context_meter.meter("disk-write"), + serialize_meter_ctx, ): # Consider boosting total_size a bit here to account for duplication # We only need shared (i.e., read) access to the directory to write