Skip to content

Commit

Permalink
Do not pre-serialize numpy shards
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Nov 30, 2023
1 parent eb9c499 commit d7d8558
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions distributed/shuffle/_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from distributed.shuffle._buffer import ShardsBuffer
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._pickle import pickle_bytelist
from distributed.utils import Deadline, log_errors, nbytes
from distributed.utils import Deadline, empty_context, log_errors, nbytes


class ReadWriteLock:
Expand Down Expand Up @@ -157,15 +157,19 @@ async def _process(self, id: str, shards: list[Any]) -> None:
if isinstance(shards[0], bytes):
# Manually serialized dataframes
frames = shards
serialize_meter_ctx: Any = empty_context
else:
# Unserialized numpy arrays
with context_meter.meter("serialize", func=thread_time):
frames = list(concat(pickle_bytelist(shard) for shard in shards))
# Note: no calls to pickle_bytelist will happen until we actually start
# writing to disk below.
frames = concat(pickle_bytelist(shard) for shard in shards)
serialize_meter_ctx = context_meter.meter("serialize", func=thread_time)

with (
self._directory_lock.read(),
self.time("write"),
context_meter.meter("disk-write"),
serialize_meter_ctx,
):
# Consider boosting total_size a bit here to account for duplication
# We only need shared (i.e., read) access to the directory to write
Expand Down

0 comments on commit d7d8558

Please sign in to comment.