Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Yet another P2P buffer rewrite #8359

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
1 change: 1 addition & 0 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"distributed.scheduler.events-log-length": "distributed.admin.low-level-log-length",
"recent-messages-log-length": "distributed.admin.low-level-log-length",
"distributed.comm.recent-messages-log-length": "distributed.admin.low-level-log-length",
"distributed.p2p.disk": "distributed.p2p.storage.disk",
}

# Affects yaml and env variables configs, as well as calls to dask.config.set()
Expand Down
35 changes: 31 additions & 4 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,14 @@ properties:
description: Configuration settings for Dask communications specific to P2P
properties:

buffer:
type:
- string
- integer
description: |
The maximum amount of data for P2P's comm buffers to buffer in-memory per worker.

This limit is not absolute but used to apply back pressure.
retry:
type: object
description: |
Expand All @@ -1066,10 +1074,29 @@ properties:
max:
type: string
description: The maximum delay between retries
disk:
type: boolean
description: |
Whether or not P2P stores intermediate data on disk instead of memory
storage:
type: object
description: Configuration settings for P2P storage
properties:

buffer:
type:
- string
- integer
description: |
The maximum amount of data for P2P's storage buffers to buffer in-memory per worker

This limit is not absolute but used to apply back pressure.
disk:
type: boolean
description: |
Whether or not P2P stores intermediate data on disk instead of memory
threads:
type: integer
description: Number of threads used for CPU-intensive operations per worker
io-threads:
type: integer
description: Number of threads used for I/O operations per worker

dashboard:
type: object
Expand Down
9 changes: 8 additions & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,19 @@ distributed:

p2p:
comm:
buffer: 500 MiB
message-bytes-limit: 50 MiB
retry:
count: 10
delay:
min: 1s # the first non-zero delay between re-tries
max: 30s # the maximum delay between re-tries
disk: True
concurrency: 4
storage:
buffer: 500 MiB
disk: True
concurrency: 1
threads: 1

###################
# Bokeh dashboard #
Expand Down
64 changes: 38 additions & 26 deletions distributed/shuffle/_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

from packaging.version import parse

from dask.utils import parse_bytes

if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -46,6 +44,27 @@ def check_minimal_arrow_version() -> None:
)


def combine_tables(tables: Iterable[pa.Table], deep_copy: bool = True) -> pa.Table:
import numpy as np

table = concat_tables(tables)
# if table.
# if deep_copy:
# return copy_table(table)
return table.take(np.arange(start=0, stop=table.num_rows))
# return table.combine_chunks()


def copy_table(table: pa.Table) -> pa.Table:
"""Creates a deep-copy of the table"""
import pyarrow as pa

# concat_arrays forced a deep-copy even if the input arrays only have a single chunk.
return pa.table(
[concat_arrays(column.chunks) for column in table.columns], schema=table.schema
)


def concat_tables(tables: Iterable[pa.Table]) -> pa.Table:
import pyarrow as pa

Expand Down Expand Up @@ -100,7 +119,7 @@ def serialize_table(table: pa.Table) -> bytes:
stream = pa.BufferOutputStream()
with pa.ipc.new_stream(stream, table.schema) as writer:
writer.write_table(table)
return stream.getvalue().to_pybytes()
return stream.getvalue()


def deserialize_table(buffer: bytes) -> pa.Table:
Expand All @@ -110,32 +129,32 @@ def deserialize_table(buffer: bytes) -> pa.Table:
return reader.read_all()


def write_to_disk(data: list[pa.Table], file: pa.OSFile) -> int:
import pyarrow as pa

table = concat_tables(data)
del data
start = file.tell()
with pa.ipc.new_stream(file, table.schema) as writer:
writer.write_table(table)
return file.tell() - start


def read_from_disk(path: Path) -> tuple[list[pa.Table], int]:
import pyarrow as pa

batch_size = parse_bytes("1 MiB")
batch = []
shards = []

with pa.OSFile(str(path), mode="rb") as f:
size = f.seek(0, whence=2)
f.seek(0)
prev = 0
offset = f.tell()
while offset < size:
while f.tell() < size:
sr = pa.RecordBatchStreamReader(f)
shard = sr.read_all()
offset = f.tell()
batch.append(shard)

if offset - prev >= batch_size:
table = concat_tables(batch)
shards.append(_copy_table(table))
batch = []
prev = offset
if batch:
table = concat_tables(batch)
shards.append(_copy_table(table))
shards.append(shard)

if shards:
shards = [concat_tables(shards)]
return shards, size


Expand All @@ -152,10 +171,3 @@ def concat_arrays(arrays: Iterable[pa.Array]) -> pa.Array:
"P2P shuffling requires pyarrow>=12.0.0 to support extension types."
) from e
raise


def _copy_table(table: pa.Table) -> pa.Table:
import pyarrow as pa

arrs = [concat_arrays(column.chunks) for column in table.columns]
return pa.table(data=arrs, schema=table.schema)
Loading