From 131f89d8328686a2994ee7b0d4ef0dcdc6b852bc Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Tue, 19 Dec 2023 11:41:41 +0100 Subject: [PATCH 1/6] [Dask.order] Ignore data tasks when ordering (#10706) --- dask/order.py | 57 ++++++++++++++++++++++++++++------ dask/tests/test_order.py | 66 +++++++++++++++++++++++++++++++++++----- 2 files changed, 106 insertions(+), 17 deletions(-) diff --git a/dask/order.py b/dask/order.py index da7d6082f96..0592eeda2e7 100644 --- a/dask/order.py +++ b/dask/order.py @@ -46,6 +46,7 @@ arise, and the order we would like to be determined. """ +import copy from collections import defaultdict, deque, namedtuple from collections.abc import Iterable, Mapping, MutableMapping from typing import Any, Callable, Literal, NamedTuple, overload @@ -109,16 +110,16 @@ def order( expected_len = len(dsk) if dependencies is None: + dependencies_are_copy = True dependencies = {k: get_dependencies(dsk, k) for k in dsk} + else: + # Below we're removing items from the sets in this dict + # We need a deepcopy for that but we only want to do this if necessary + # since this only happens for special cases. + dependencies_are_copy = False + dependencies = dict(dependencies) dependents = reverse_dict(dependencies) - num_needed, total_dependencies = ndependencies(dependencies, dependents) - if len(total_dependencies) != len(dsk): - cycle = getcycle(dsk, None) - raise RuntimeError( - "Cycle detected between the following keys:\n -> %s" - % "\n -> ".join(str(x) for x in cycle) - ) leaf_nodes = {k for k, v in dependents.items() if not v} root_nodes = {k for k, v in dependencies.items() if not v} @@ -128,11 +129,20 @@ def order( # Normalize the graph by removing leaf nodes that are not actual tasks, see # for instance da.store where the task is merely an alias # to multiple keys, i.e. [key1, key2, ...,] + + # Similarly, we are removing root nodes that are pure data tasks. Those task + # are embedded in the run_spec of a task and are not runnable. We have to + # assign a priority but their priority has no impact on performance. + # The removal of those tasks typically transforms the graph topology in a + # way that is simpler to handle all_tasks = False n_removed_leaves = 0 + requires_data_task = defaultdict(set) while not all_tasks: all_tasks = True for leaf in list(leaf_nodes): + if leaf in root_nodes: + continue if ( not istask(dsk[leaf]) # Having a linear chain is fine @@ -155,6 +165,30 @@ def order( if not dependents[dep]: leaf_nodes.add(dep) + for root in list(root_nodes): + if root in leaf_nodes: + continue + if not istask(dsk[root]) and len(dependents[root]) > 1: + if not dependencies_are_copy: + dependencies_are_copy = True + dependencies = copy.deepcopy(dependencies) + root_nodes.remove(root) + for dep in dependents[root]: + requires_data_task[dep].add(root) + dependencies[dep].remove(root) + if not dependencies[dep]: + root_nodes.add(dep) + del dsk[root] + del dependencies[root] + del dependents[root] + + num_needed, total_dependencies = ndependencies(dependencies, dependents) + if len(total_dependencies) != len(dsk): + cycle = getcycle(dsk, None) + raise RuntimeError( + "Cycle detected between the following keys:\n -> %s" + % "\n -> ".join(str(x) for x in cycle) + ) assert dependencies is not None roots_connected, max_dependents = _connecting_to_roots(dependencies, dependents) leafs_connected, _ = _connecting_to_roots(dependents, dependencies) @@ -180,8 +214,8 @@ def sort_key(x: Key) -> tuple[int, int, int, int, str]: except KeyError: assert dependencies is not None _sort_keys_cache[x] = rv = ( - len(dependencies[x]), total_dependencies[x], + len(dependencies[x]), len(roots_connected[x]), -max_dependents[x], # Converting to str is actually faster than creating some @@ -199,12 +233,15 @@ def add_to_result(item: Key) -> None: nonlocal i while next_items: item = next_items.pop() - assert not num_needed[item] runnable_hull.discard(item) reachable_hull.discard(item) leaf_nodes.discard(item) if item in result: continue + + while requires_data_task[item]: + add_to_result(requires_data_task[item].pop()) + if return_stats: result[item] = Order(i, crit_path_counter - _crit_path_counter_offset) else: @@ -215,7 +252,7 @@ def add_to_result(item: Key) -> None: # the final result since the `process_runnable` should produce # equivalent results regardless of the order in which runnable is # populated (not identical but equivalent) - for dep in dependents[item]: + for dep in dependents.get(item, ()): num_needed[dep] -= 1 reachable_hull.add(dep) if not num_needed[dep]: diff --git a/dask/tests/test_order.py b/dask/tests/test_order.py index c25e9e7699d..9b528c5627a 100644 --- a/dask/tests/test_order.py +++ b/dask/tests/test_order.py @@ -84,6 +84,7 @@ def test_ordering_keeps_groups_together(abcde): d = {(a, i): (f,) for i in range(4)} d.update({(b, 0): (f, (a, 0), (a, 1)), (b, 1): (f, (a, 2), (a, 3))}) o = order(d) + assert_topological_sort(d, o) assert abs(o[(a, 0)] - o[(a, 1)]) == 1 assert abs(o[(a, 2)] - o[(a, 3)]) == 1 @@ -91,6 +92,7 @@ def test_ordering_keeps_groups_together(abcde): d = {(a, i): (f,) for i in range(4)} d.update({(b, 0): (f, (a, 0), (a, 2)), (b, 1): (f, (a, 1), (a, 3))}) o = order(d) + assert_topological_sort(d, o) assert abs(o[(a, 0)] - o[(a, 2)]) == 1 assert abs(o[(a, 1)] - o[(a, 3)]) == 1 @@ -116,6 +118,7 @@ def test_avoid_broker_nodes(abcde): (b, 2): (f, (a, 1)), } o = order(dsk) + assert_topological_sort(dsk, o) assert o[(a, 1)] < o[(b, 0)] or (o[(b, 1)] < o[(a, 0)] and o[(b, 2)] < o[(a, 0)]) @@ -142,6 +145,7 @@ def test_base_of_reduce_preferred(abcde): dsk[c] = 1 o = order(dsk) + assert_topological_sort(dsk, o) assert o[(b, 0)] <= 1 assert o[(b, 1)] <= 3 @@ -175,6 +179,7 @@ def test_avoid_upwards_branching(abcde): } o = order(dsk) + assert_topological_sort(dsk, o) assert o[(d, 1)] < o[(b, 1)] @@ -214,6 +219,7 @@ def test_avoid_upwards_branching_complex(abcde): } o = order(dsk) + assert_topological_sort(dsk, o) assert o[(c, 1)] < o[(b, 1)] assert abs(o[(d, 2)] - o[(d, 3)]) == 1 @@ -238,6 +244,7 @@ def test_deep_bases_win_over_dependents(abcde): dsk = {a: (f, b, c, d), b: (f, d, e), c: (f, d), d: 1, e: 2} o = order(dsk) + assert_topological_sort(dsk, o) assert o[b] < o[c] @@ -255,6 +262,7 @@ def test_prefer_deep(abcde): dsk = {a: 1, b: (f, a), c: (f, b), d: 1, e: (f, d)} o = order(dsk) + assert_topological_sort(dsk, o) assert o[a] < o[d] assert o[b] < o[d] @@ -272,6 +280,7 @@ def test_break_ties_by_str(abcde): dsk["y"] = (f, list(x_keys)) o = order(dsk) + assert_topological_sort(dsk, o) expected = {"y": 10} expected.update({k: i for i, k in enumerate(x_keys[::-1])}) @@ -293,6 +302,7 @@ def test_gh_3055(): dsk = dict(w.__dask_graph__()) o = order(dsk) + assert_topological_sort(dsk, o) assert max(diagnostics(dsk, o=o)[1]) <= 8 L = [o[k] for k in w.__dask_keys__()] assert sum(x < len(o) / 2 for x in L) > len(L) / 3 # some complete quickly @@ -323,6 +333,7 @@ def test_favor_longest_critical_path(abcde): dsk = {c: (f,), d: (f, c), e: (f, c), b: (f, c), a: (f, b)} o = order(dsk) + assert_topological_sort(dsk, o) assert o[d] > o[b] assert o[e] > o[b] @@ -351,6 +362,7 @@ def test_run_smaller_sections(abcde): dd: (f, cc), } o = order(dsk) + assert_topological_sort(dsk, o) assert max(diagnostics(dsk)[1]) <= 4 # optimum is 3 # This is a mildly ambiguous example # https://github.com/dask/dask/pull/10535/files#r1337528255 @@ -399,7 +411,8 @@ def _(*args): c1: (f(c1), c2), } - order(dsk) + o = order(dsk) + assert_topological_sort(dsk, o) dask.get(dsk, [a1, b1, c1]) # trigger computation assert log == expected @@ -436,6 +449,7 @@ def test_nearest_neighbor(abcde): } o = order(dsk) + assert_topological_sort(dsk, o) assert 3 < sum(o[a + i] < len(o) / 2 for i in "123456789") < 7 assert 1 < sum(o[b + i] < len(o) / 2 for i in "1234") < 4 @@ -445,6 +459,7 @@ def test_string_ordering(): """Prefer ordering tasks by name first""" dsk = {("a", 1): (f,), ("a", 2): (f,), ("a", 3): (f,)} o = order(dsk) + assert_topological_sort(dsk, o) assert o == {("a", 1): 0, ("a", 2): 1, ("a", 3): 2} or o == { ("a", 1): 2, ("a", 2): 1, @@ -462,6 +477,7 @@ def test_string_ordering_dependents(): # See comment in add_to_result dsk = {("a", 1): (f, "b"), ("a", 2): (f, "b"), ("a", 3): (f, "b"), "b": (f,)} o = order(dsk) + assert_topological_sort(dsk, o) assert o == {"b": 0, ("a", 1): 1, ("a", 2): 2, ("a", 3): 3} or o == { "b": 0, ("a", 1): 3, @@ -483,6 +499,7 @@ def test_prefer_short_narrow(abcde): (c, 2): (f, (c, 1), (a, 1), (b, 1)), } o = order(dsk) + assert_topological_sort(dsk, o) assert o[(b, 0)] < o[(b, 1)] assert o[(b, 0)] < o[(c, 2)] assert o[(c, 1)] < o[(c, 2)] @@ -540,6 +557,7 @@ def test_prefer_short_ancestor(abcde): (c, 2): (f, (c, 1), (a, 1), (b, 1)), } o = order(dsk) + assert_topological_sort(dsk, o) assert o[(a, 0)] < o[(a, 1)] assert o[(b, 0)] < o[(b, 1)] @@ -581,6 +599,7 @@ def test_map_overlap(abcde): } o = order(dsk) + assert_topological_sort(dsk, o) assert o[(b, 1)] < o[(e, 5)] or o[(b, 5)] < o[(e, 1)] @@ -614,6 +633,7 @@ def test_use_structure_not_keys(abcde): (b, 0): (f, (a, 3), (a, 8), (a, 1)), } o = order(dsk) + assert_topological_sort(dsk, o) assert max(diagnostics(dsk, o=o)[1]) == 3 As = sorted(val for (letter, _), val in o.items() if letter == a) @@ -642,6 +662,7 @@ def test_dont_run_all_dependents_too_early(abcde): dsk[(c, i)] = (f, (c, 0)) dsk[(d, i)] = (f, (d, i - 1), (b, i), (c, i)) o = order(dsk) + assert_topological_sort(dsk, o) expected = [3, 6, 9, 12, 15, 18, 21, 24, 27, 30] actual = sorted(v for (letter, num), v in o.items() if letter == d) @@ -680,6 +701,7 @@ def test_many_branches_use_ndependencies(abcde): (a, 3): (f, (a, 2), (b, 2), (c, 1), (dd, 3), (ee, 3)), } o = order(dsk) + assert_topological_sort(dsk, o) # run all d's and e's first expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] actual = sorted(v for (letter, _), v in o.items() if letter in {d, dd, e, ee}) @@ -742,6 +764,7 @@ def test_order_with_equal_dependents(abcde): } ) o = order(dsk) + assert_topological_sort(dsk, o) total = 0 for x in abc: for i in range(len(abc)): @@ -757,6 +780,7 @@ def test_order_with_equal_dependents(abcde): for i in range(len(abc)): dsk2[(x, 7, i, 0)] = (f, (x, 6, i, 0)) o = order(dsk2) + assert_topological_sort(dsk2, o) total = 0 for x in abc: for i in range(len(abc)): @@ -772,6 +796,7 @@ def test_order_with_equal_dependents(abcde): for i in range(len(abc)): del dsk3[(x, 6, i, 1)] o = order(dsk3) + assert_topological_sort(dsk3, o) total = 0 for x in abc: for i in range(len(abc)): @@ -787,6 +812,7 @@ def test_order_with_equal_dependents(abcde): for i in range(len(abc)): del dsk4[(x, 6, i, 0)] o = order(dsk4) + assert_topological_sort(dsk4, o) pressure = diagnostics(dsk4, o=o)[1] assert max(pressure) <= max_pressure for x in abc: @@ -848,6 +874,7 @@ def test_terminal_node_backtrack(): ), } o = order(dsk) + assert_topological_sort(dsk, o) assert o[("a", 2)] < o[("a", 3)] @@ -857,9 +884,8 @@ def test_array_store_final_order(tmpdir): # but with the graph actually generated by da.store. da = pytest.importorskip("dask.array") zarr = pytest.importorskip("zarr") - import numpy as np - arrays = [da.from_array(np.ones((110, 4)), chunks=(100, 2)) for i in range(4)] + arrays = [da.ones((110, 4), chunks=(100, 2)) for i in range(4)] x = da.concatenate(arrays, axis=0).rechunk((100, 2)) store = zarr.DirectoryStore(tmpdir) @@ -867,6 +893,7 @@ def test_array_store_final_order(tmpdir): dest = root.empty_like(name="dest", data=x, chunks=x.chunksize, overwrite=True) d = x.store(dest, lock=False, compute=False) o = order(d.dask) + assert_topological_sort(dict(d.dask), o) # Find the lowest store. Dask starts here. stores = [k for k in o if isinstance(k, tuple) and k[0].startswith("store-map-")] first_store = min(stores, key=lambda k: o[k]) @@ -968,8 +995,8 @@ def test_eager_to_compute_dependent_to_free_parent(): "a70": (f, f), "a71": (f, f), } - dependencies, dependents = get_deps(dsk) o = order(dsk) + assert_topological_sort(dsk, o) _, pressure = diagnostics(dsk, o=o) assert max(pressure) <= 8 @@ -1015,6 +1042,7 @@ def test_diagnostics(abcde): (e, 1): (f, (e, 0)), } o = order(dsk) + assert_topological_sort(dsk, o) info, memory_over_time = diagnostics(dsk) # this is ambiguous, depending on whether we start from left or right assert all(o[key] == val.order for key, val in info.items()) @@ -1150,6 +1178,7 @@ def test_xarray_like_reduction(): } ) o = order(dsk) + assert_topological_sort(dsk, o) _, pressure = diagnostics(dsk, o=o) assert max(pressure) <= 9 @@ -1298,6 +1327,7 @@ def test_anom_mean_raw(abcde): } o = order(dsk) + assert_topological_sort(dsk, o) # The left hand computation branch should complete before we start loading # more data nodes_to_finish_before_loading_more_data = [ @@ -1751,6 +1781,7 @@ def test_flox_reduction(abcde): (g, 2, 2): (f, (a, 0), (e, 1)), } o = order(dsk) + assert_topological_sort(dsk, o) of1 = list(o[(g, 1, ix)] for ix in range(3)) of2 = list(o[(g, 2, ix)] for ix in range(3)) assert max(of1) < min(of2) or max(of2) < min(of1) @@ -1788,7 +1819,12 @@ def random(**kwargs): dsk = collections_to_dsk([graph], optimize_graph=optimize) dependencies, dependents = get_deps(dsk) # Verify assumptions + before = len(dsk) + before_dsk = dsk.copy() o = order(dsk) + assert_topological_sort(dsk, o) + assert before == len(o) == len(dsk) + assert before_dsk == dsk # Verify assumptions (specifically that the reducers are sum-aggregate) assert ({"object", "sum", "sum-aggregate"}).issubset({key_split(k) for k in o}) reducers = {k for k in o if key_split(k) == "sum-aggregate"} @@ -1806,6 +1842,15 @@ def random(**kwargs): ) +def assert_topological_sort(dsk, order): + dependencies, dependents = get_deps(dsk) + num_needed = {k: len(dependencies[k]) for k in dsk} + for k in sorted(dsk, key=order.__getitem__): + assert num_needed[k] == 0 + for dep in dependents[k]: + num_needed[dep] -= 1 + + def test_doublediff(abcde): a, b, c, d, e = abcde dsk = { @@ -1925,6 +1970,7 @@ def test_gh_3055_explicit(abcde): assert len(con_r) == len(dsk) assert con_r[(e, 0)] == {("root", 0), (a, 1)} o = order(dsk) + assert_topological_sort(dsk, o) assert max(diagnostics(dsk, o=o)[1]) <= 5 assert o[(e, 0)] < o[(a, 3)] < o[(a, 4)] assert o[(a, 2)] < o[(a, 3)] < o[(a, 4)] @@ -1959,6 +2005,7 @@ def test_order_flox_reduction_2(abcde): (d, 1, 1): (c, 1, 1), } o = order(dsk) + assert_topological_sort(dsk, o) final_nodes = sorted( [(d, ix, jx) for ix in range(2) for jx in range(2)], key=o.__getitem__ ) @@ -2004,6 +2051,7 @@ def f(ds): assert len(shared_roots) == 1 shared_root = shared_roots.pop() o = order(dsk) + assert_topological_sort(dsk, o) previous = None step = 0 @@ -2137,8 +2185,12 @@ def test_do_not_mutate_input(): "b": (f, 1), "c": np.array([[1, 2], [3, 4]]), "d": ["a", "b", "c"], + "e": (f, "d"), } + dependencies, __build_class__ = get_deps(dsk) + dependencies_copy = dependencies.copy() dsk_copy = dsk.copy() - o = order(dsk) - assert o["d"] == len(dsk) - 1 - assert len(dsk) == len(dsk_copy) + o = order(dsk, dependencies=dependencies) + assert_topological_sort(dsk, o) + assert dsk == dsk_copy + assert dependencies == dependencies_copy From d6597e5908d01aa5d4fe3bf7f5a16532e330ab69 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Tue, 19 Dec 2023 15:55:17 +0100 Subject: [PATCH 2/6] Don't duplicate unobserved categories in GroupBy.nunqiue if ``split_out>1`` (#10716) --- dask/dataframe/groupby.py | 16 ++++++++++++---- dask/dataframe/tests/test_categorical.py | 12 ++++++++++-- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/dask/dataframe/groupby.py b/dask/dataframe/groupby.py index 955f19191a2..4d39d057494 100644 --- a/dask/dataframe/groupby.py +++ b/dask/dataframe/groupby.py @@ -750,8 +750,11 @@ def _drop_duplicates_reindex(df): def _nunique_df_chunk(df, *by, **kwargs): name = kwargs.pop("name") + group_keys = {} + if PANDAS_GE_150: + group_keys["group_keys"] = True - g = _groupby_raise_unaligned(df, by=by) + g = _groupby_raise_unaligned(df, by=by, **group_keys) if len(df) > 0: grouped = ( g[[name]].apply(_drop_duplicates_reindex).reset_index(level=-1, drop=True) @@ -767,16 +770,21 @@ def _nunique_df_chunk(df, *by, **kwargs): def _nunique_df_combine(df, levels, sort=False): result = ( - df.groupby(level=levels, sort=sort) + df.groupby(level=levels, sort=sort, observed=True) .apply(_drop_duplicates_reindex) .reset_index(level=-1, drop=True) ) + if result.index.dtype == "O": + print() + # print(result.index) + # print(df) return result def _nunique_df_aggregate(df, levels, name, sort=False): - with check_observed_deprecation(): - return df.groupby(level=levels, sort=sort)[name].nunique() + result = df.groupby(level=levels, sort=sort, observed=True)[name].nunique() + # print(result.index) + return result def _nunique_series_chunk(df, *by, **_ignored_): diff --git a/dask/dataframe/tests/test_categorical.py b/dask/dataframe/tests/test_categorical.py index ebd3e7e63ea..e9636ed312b 100644 --- a/dask/dataframe/tests/test_categorical.py +++ b/dask/dataframe/tests/test_categorical.py @@ -146,9 +146,13 @@ def test_concat_unions_categoricals(): ), ], ) +@pytest.mark.parametrize("npartitions", [None, 10]) +@pytest.mark.parametrize("split_out", [1, 4]) @pytest.mark.filterwarnings("ignore:The default value of numeric_only") @pytest.mark.filterwarnings("ignore:Dropping") -def test_unknown_categoricals(shuffle_method, numeric_only): +def test_unknown_categoricals( + shuffle_method, numeric_only, npartitions, split_out, request +): ddf = dd.DataFrame( {("unknown", i): df for (i, df) in enumerate(frames)}, "unknown", @@ -158,6 +162,10 @@ def test_unknown_categoricals(shuffle_method, numeric_only): ), [None] * 4, ) + if npartitions == 10 and not PANDAS_GE_150: + request.applymarker(pytest.mark.xfail(reason="group_keys not supported")) + if npartitions is not None: + ddf = ddf.repartition(npartitions=npartitions) # Compute df = ddf.compute() @@ -179,7 +187,7 @@ def test_unknown_categoricals(shuffle_method, numeric_only): with ctx: expected = df.groupby(df.w).y.nunique() with ctx: - result = ddf.groupby(ddf.w).y.nunique() + result = ddf.groupby(ddf.w, sort=False).y.nunique(split_out=split_out) assert_eq(result, expected) with ctx: From 2b5c65a783eea71073e50c5da9556f60a2529cef Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 19 Dec 2023 13:12:50 -0600 Subject: [PATCH 3/6] Update environment file upload step in CI (#10726) --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3d56e44f7dc..e859732409b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -102,9 +102,9 @@ jobs: # This environment file is created in continuous_integration/scripts/install.sh # and can be useful when debugging locally - name: Upload environment file - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: env-${{ matrix.os }}-${{ matrix.environment }} + name: env-${{ matrix.os }}-${{ matrix.environment }}-${{ matrix.extra }} path: env.yaml - name: Run tests From 55617708f4d819cf00d580e92b870c1f8196b4b4 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 19 Dec 2023 15:20:53 -0600 Subject: [PATCH 4/6] Update tab styling on "10 Minutes to Dask" page (#10728) --- docs/source/10-minutes-to-dask.rst | 72 ++++++++++++++++++------------ 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/docs/source/10-minutes-to-dask.rst b/docs/source/10-minutes-to-dask.rst index 4ce05ac9e4d..91f356736fb 100644 --- a/docs/source/10-minutes-to-dask.rst +++ b/docs/source/10-minutes-to-dask.rst @@ -32,9 +32,11 @@ Creating a Dask Object You can create a Dask object from scratch by supplying existing data and optionally including information about how the chunks should be structured. -.. tabs:: - .. group-tab:: DataFrame +.. tab-set:: + + .. tab-item:: DataFrame + :sync: dataframe See :doc:`dataframe`. @@ -84,7 +86,8 @@ including information about how the chunks should be structured. 2021-09-21 ... ... Dask Name: blocks, 11 tasks - .. group-tab:: Array + .. tab-item:: Array + :sync: array See :doc:`array`. @@ -112,7 +115,8 @@ including information about how the chunks should be structured. # access a particular block of data a.blocks[1, 3] - .. group-tab:: Bag + .. tab-item:: Bag + :sync: bag See :doc:`bag`. @@ -131,9 +135,10 @@ Indexing Indexing Dask collections feels just like slicing NumPy arrays or pandas DataFrame. -.. tabs:: +.. tab-set:: - .. group-tab:: DataFrame + .. tab-item:: DataFrame + :sync: dataframe .. code-block:: python @@ -156,13 +161,15 @@ Indexing Dask collections feels just like slicing NumPy arrays or pandas DataFra 2021-10-09 05:00:59.999999999 ... ... Dask Name: loc, 11 tasks - .. group-tab:: Array + .. tab-item:: Array + :sync: array - .. jupyter-execute:: + .. jupyter-execute:: - a[:50, 200] + a[:50, 200] - .. group-tab:: Bag + .. tab-item:: Bag + :sync: bag A Bag is an unordered collection allowing repeats. So it is like a list, but it doesnโ€™t guarantee an ordering among elements. There is no way to index Bags since they are @@ -177,9 +184,10 @@ you ask for it. Instead, a Dask task graph for the computation is produced. Anytime you have a Dask object and you want to get the result, call ``compute``: -.. tabs:: +.. tab-set:: - .. group-tab:: DataFrame + .. tab-item:: DataFrame + :sync: dataframe .. code-block:: python @@ -199,7 +207,8 @@ Anytime you have a Dask object and you want to get the result, call ``compute``: [198 rows x 2 columns] - .. group-tab:: Array + .. tab-item:: Array + :sync: array .. code-block:: python @@ -211,7 +220,8 @@ Anytime you have a Dask object and you want to get the result, call ``compute``: 18200, 18700, 19200, 19700, 20200, 20700, 21200, 21700, 22200, 22700, 23200, 23700, 24200, 24700]) - .. group-tab:: Bag + .. tab-item:: Bag + :sync: bag .. code-block:: python @@ -225,9 +235,10 @@ Methods Dask collections match existing numpy and pandas methods, so they should feel familiar. Call the method to set up the task graph, and then call ``compute`` to get the result. -.. tabs:: +.. tab-set:: - .. group-tab:: DataFrame + .. tab-item:: DataFrame + :sync: dataframe .. code-block:: python @@ -280,7 +291,8 @@ Call the method to set up the task graph, and then call ``compute`` to get the r 2021-10-09 05:00:00 161963 Freq: H, Name: a, Length: 198, dtype: int64 - .. group-tab:: Array + .. tab-item:: Array + :sync: array .. code-block:: python @@ -332,7 +344,8 @@ Call the method to set up the task graph, and then call ``compute`` to get the r array([100009, 99509, 99009, 98509, 98009, 97509, 97009, 96509, 96009, 95509]) - .. group-tab:: Bag + .. tab-item:: Bag + :sync: bag Dask Bag implements operations like ``map``, ``filter``, ``fold``, and ``groupby`` on collections of generic Python objects. @@ -369,9 +382,10 @@ Visualize the Task Graph So far we've been setting up computations and calling ``compute``. In addition to triggering computation, we can inspect the task graph to figure out what's going on. -.. tabs:: +.. tab-set:: - .. group-tab:: DataFrame + .. tab-item:: DataFrame + :sync: dataframe .. code-block:: python @@ -391,7 +405,8 @@ triggering computation, we can inspect the task graph to figure out what's going .. image:: images/10_minutes_dataframe_graph.png :alt: Dask task graph for the Dask dataframe computation. The task graph shows a "loc" and "getitem" operations selecting a small section of the dataframe values, before applying a cumulative sum "cumsum" operation, then finally subtracting a value from the result. - .. group-tab:: Array + .. tab-item:: Array + :sync: array .. code-block:: python @@ -410,7 +425,8 @@ triggering computation, we can inspect the task graph to figure out what's going .. image:: images/10_minutes_array_graph.png :alt: Dask task graph for the Dask array computation. The task graph shows many "amax" operations on each chunk of the Dask array, that are then aggregated to find "amax" along the first array axis, then reversing the order of the array values with a "getitem" slicing operation, before an "add" operation to get the final result. - .. group-tab:: Bag + .. tab-item:: Bag + :sync: bag .. code-block:: python @@ -431,9 +447,9 @@ Low-Level Interfaces Often when parallelizing existing code bases or building custom algorithms, you run into code that is parallelizable, but isn't just a big DataFrame or array. -.. tabs:: +.. tab-set:: - .. group-tab:: Delayed: Lazy + .. tab-item:: Delayed: Lazy :doc:`delayed` lets you to wrap individual function calls into a lazily constructed task graph: @@ -455,7 +471,7 @@ run into code that is parallelizable, but isn't just a big DataFrame or array. c = c.compute() # This triggers all of the above computations - .. group-tab:: Futures: Immediate + .. tab-item:: Futures: Immediate Unlike the interfaces described so far, Futures are eager. Computation starts as soon as the function is submitted (see :doc:`futures`). @@ -500,9 +516,9 @@ If you want more control, use the distributed scheduler instead. Despite having "distributed" in it's name, the distributed scheduler works well on both single and multiple machines. Think of it as the "advanced scheduler". -.. tabs:: +.. tab-set:: - .. group-tab:: Local + .. tab-item:: Local This is how you set up a cluster that uses only your own computer. @@ -514,7 +530,7 @@ on both single and multiple machines. Think of it as the "advanced scheduler". ... client - .. group-tab:: Remote + .. tab-item:: Remote This is how you connect to a cluster that is already running. From f6115511a09c309e02c2d40946b6ca74c6d148db Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 20 Dec 2023 03:59:37 -0600 Subject: [PATCH 5/6] Use new Xarray logo (#10729) --- docs/source/index.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index 32b2ab394c2..14a26b527a1 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -158,7 +158,7 @@ Dask provides several APIs. Choose one that works best for you: .. grid-item:: :columns: 12 12 5 5 - .. figure:: https://docs.xarray.dev/en/stable/_static/dataset-diagram-logo.png + .. figure:: https://docs.xarray.dev/en/stable/_static/logos/Xarray_Logo_RGB_Final.png :align: center .. tab-item:: Bags From 31b26dfa4bf15f23a7070155ac67e6ba285de76f Mon Sep 17 00:00:00 2001 From: Sarah Charlotte Johnson Date: Wed, 20 Dec 2023 14:02:14 -0800 Subject: [PATCH 6/6] Add task graph animation to docs homepage (#10730) --- docs/source/index.rst | 21 +++++++++++++++++---- pyproject.toml | 2 +- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index 14a26b527a1..b76162d2a57 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -2,11 +2,24 @@ Dask ==== -*Dask is a Python library for parallel and distributed computing.* Dask is ... +.. grid:: 1 1 2 2 + + .. grid-item:: + :columns: 12 12 6 6 + + *Dask is a Python library for parallel and distributed computing.* Dask is: + + - **Easy** to use and set up (it's just a Python library) + - **Powerful** at providing scale, and unlocking complex algorithms + - and **Fun** ๐ŸŽ‰ + + .. grid-item:: + :columns: 12 12 6 6 + + .. raw:: html + +
 
-- **Easy** to use and set up (it's just a Python library) -- **Powerful** at providing scale, and unlocking complex algorithms -- and **Fun** ๐ŸŽ‰ How to Use Dask --------------- diff --git a/pyproject.toml b/pyproject.toml index cd2cfe9b7f0..1b75522db6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -185,7 +185,7 @@ module = [ allow_untyped_defs = false [tool.codespell] -ignore-words-list = "coo,nd" +ignore-words-list = "coo,nd,medias" skip = "docs/source/changelog.rst" [tool.coverage.run]