diff --git a/.github/workflows/ci-test.yaml b/.github/workflows/ci-test.yaml index 2a744ed5..190b0380 100644 --- a/.github/workflows/ci-test.yaml +++ b/.github/workflows/ci-test.yaml @@ -14,15 +14,23 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, ubuntu-22.04] - gcc-version: [11, 12] + os: [ubuntu-24.04, ubuntu-22.04] + gcc-version: [11, 12, 13, 14] mpi-type: [mpich, openmpi] exclude: - - os: ubuntu-latest - gcc-version: 8 + - os: ubuntu-22.04 + gcc-version: 13 + - os: ubuntu-22.04 + gcc-version: 14 + - os: ubuntu-24.04 + mpi-type: mpich runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 + - name: Update apt + run: | + sudo add-apt-repository -y universe + sudo apt-get update - name: Cache boost uses: actions/cache@v4 id: cache-boost @@ -49,8 +57,8 @@ jobs: if: matrix.mpi-type == 'openmpi' run: sudo apt-get install openmpi-bin libopenmpi-dev - name: Install GCC-${{ matrix.gcc-version }} - if: matrix.gcc-version == '8' - run: sudo apt-get install gcc-8 g++-8 + if: (matrix.gcc-version == '11' && matrix.os == 'ubuntu-24.04') + run: sudo apt-get install gcc-11 g++-11 - name: Make run: | echo Run 'make' diff --git a/.gitignore b/.gitignore index 7d9106a8..cb84f20f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ *#* build* .vscode* -.idea* \ No newline at end of file +.idea* +.cache/ diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 2ea25151..dab9ff42 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -2,7 +2,7 @@ version: 2 # Set OS and Python versions build: - os: ubuntu-22.04 + os: ubuntu-24.04 tools: python: "3.12" @@ -13,4 +13,4 @@ python: # Change the location of the configuration file sphinx: - configuration: docs/rtd/conf.py \ No newline at end of file + configuration: docs/rtd/conf.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 027214a1..31a372bb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -144,7 +144,7 @@ endif () # include(FindArrowParquet) option(YGM_REQUIRE_ARROW_PARQUET "YGM requires Apache Arrow Parquet." OFF) -find_arrow_parquet() +find_or_install_arrow_parquet() # # Create the YGM target library diff --git a/Readme.md b/Readme.md index f7c12e9f..8ac66c22 100644 --- a/Readme.md +++ b/Readme.md @@ -1,33 +1,70 @@ -# What is YGM? - -YGM is an asynchronous communication library designed for irregular communication patterns. It is built on a -communicator abstraction, much like MPI, but communication is handled asynchronously and is initiated by senders without -any interaction with receivers. YGM features -* **Message buffering** - Increases application throughput. -* **Fire-and-Forget RPC Semantics** - A sender provides the function and function arguments for execution on a specified - destination rank through an `async` call. This function will complete on the destination rank at an unspecified time - in the future, but YGM does not explicitly make the sender aware of this completion. -* **Storage Containers** - YGM provides a collection of distributed storage containers with asynchronous - interfaces, used for many common distributed memory operations. Containers are designed to partition data, allowing -insertions to occur from any rank. Data is accessed through collective `for_all` operations that execute a user-provided -function on every stored object, or, when a particular piece of data's location is known, `visit`-type operations that -perform a user-provided function only on the desired data. These containers are found -[here](/include/ygm/container/). - -# Getting Started +## What is YGM? + +YGM is an asynchronous communication library written in C++ and designed for high-performance computing (HPC) use cases featuring +irregular communication patterns. YGM includes a collection of +distributed-memory storage containers designed to express common algorithmic and data-munging tasks. These containers +automatically partition data, allowing insertions and, with most containers, processing of individual elements to be +initiated from any runninng YGM process. + +Underlying YGM's containers is a communicator abstraction. This communicator asynchronously sends messages spawned by +senders with receivers needing no knowledge of incoming messages prior to their arrival. YGM communications take the +form of *active messages*; each message contains a function object to execute (often in the form of C++ lambdas), data +and/or pointers to data for this function to execute on, and a destination process for the message to be executed at. + +YGM also includes a set of I/O primitives for parsing collections of input documents in parallel as independent lines of +text and streaming output lines to +large numbers of destination files. Current parsing functionality supports reading input as CSV, ndjson, and +unstructured lines of data. + +## General YGM Operations + +YGM is built on its ability to communicate active messages asynchronously between running processes. This does not +capture every operation that can be useful, for instance collective operations are still widely needed. YGM uses +prefixes on function names to distinguish their behaviors in terms of the processes involved. These prefixes are: + * `async_`: Asynchronous operation initiated on a single process. The execution of the underlying function may + occur on a remote process. + * `local_`: Function performs only local operations on data of the current process. In uses within YGM containers + with partitioning schemes that determine item ownership, care must be taken to ensure the process a `local_` + operation is called from aligns with the item's owner. For instance, calling `ygm::container::map::local_insert` + will store an item on the process where the call is made, but the `ygm::container::map` may not be able to look + up this location if it is on the wrong process. + * No Prefix: Collective operation that must be called from all processes. + +The primary workhorse functions in YGM fall into the two categories of `async_` and `for_all` operations. In an +`async_` operation, a lambda is asynchronously sent to a (potentially) remote process for execution. In many cases +with YGM containers, the lambda being executed is not provided by the user and is instead part of the function itself, +e.g. `async_insert` calls on most containers. A `for_all` operation is a collective operation in which a lambda is +executed locally on every process while iterating over all locally held items of some YGM object. The items iterated +over can be items in a YGM container, items coming from a map, filter, or flatten applied to a container, or all lines +in a collection of files in a YGM I/O parser. + +### Lambda Capture Rules +Certain `async_` and `for_all` operations require users to provide lambdas as part of their executions. The lambdas +that can be accepted by these two classes of functions follow different rules pertaining to the capturing of variables: + * `async_` calls cannot capture (most) variables in lambdas. Variables necessary for lambda execution must be + provided as arguments to the `async_` call. In the event that the data for the lambda resides on the remote + process the lambda will execute on, a `ygm::ygm_ptr` should be passed as an argument to the `async_`. + * `for_all` calls assume lambdas take only the arguments inherently provided by the YGM object being iterated over. + All other necessary variables *must* be captured. The types of arguments provided to the lambda can be identified + by the `for_all_args` type within the YGM object. + +These differences in behavior arise from the distinction that `async_` lambdas may execute on a remote process, while +`for_all` lambdas are guaranteed to execute locally to a process. In the case of `async_` operations, the lambda and +all arguments must be serialized for communication, but C++ does not provide a method for inspection of variables +captured in the closure of a lambda. In the case of `for_all` operations, the execution is equivalent to calling +[`std::for_each`](https://en.cppreference.com/w/cpp/algorithm/for_each) on entire collection of items held locally. ## Requirements -* C++17 - GCC versions 8, 9 and 10 are tested. Your mileage may vary with other compilers. +* C++20 - GCC versions 11 and 12 are tested. Your mileage may vary with other compilers. * [Cereal](https://github.com/USCiLab/cereal) - C++ serialization library * MPI * Optionally, Boost 1.77 to enable Boost.JSON support. - ## Using YGM with CMake YGM is a header-only library that is easy to incorporate into a project through CMake. Adding the following to CMakeLists.txt will install YGM and its dependencies as part of your project: ``` -set(DESIRED_YGM_VERSION 0.4) +set(DESIRED_YGM_VERSION 0.6) find_package(ygm ${DESIRED_YGM_VERSION} CONFIG) if (NOT ygm_FOUND) FetchContent_Declare( @@ -52,62 +89,6 @@ else () endif () ``` -# Anatomy of a YGM Program -Here we will walk through a basic "hello world" YGM program. The [examples directory](/examples/) contains several other -examples, including many using YGM's storage containers. - -To begin, headers for a YGM communicator are needed -``` C++ -#include -``` - -At the beginning of the program, a YGM communicator must be constructed. It will be given `argc` and `argv` like -`MPI_Init`, and it has an optional third argument that specifies the aggregate size (in bytes) allowed for all send -buffers before YGM begins flushing sends. Here, we will make a buffer with 32MB of aggregate send buffer space. -``` C++ -ygm::comm world(&argc, &argv, 32*1024*1024); -``` - -Next, we need a lambda to send through YGM. We'll do a simple hello\_world type of lambda. -``` C++ -auto hello_world_lambda = [](const std::string &name) { - std::cout << "Hello " << name << std::endl; -}; -``` - -Finally, we use this lambda inside of our `async` calls. In this case, we will have rank 0 send a message to rank 1, -telling it to greet the world -``` C++ -if (world.rank0()) { - world.async(1, hello_world_lambda, std::string("world")); -} -``` - -The full, compilable version of this example is found [here](/examples/hello_world.cpp). Running it prints a single -"Hello world". - -# Potential Pitfalls - -## Allowed Lambdas -There are two distinct classes of lambdas that can be given to YGM: *remote lambdas* and *local lambdas*, each of which -has different requirements. - -### Remote Lambdas -A *remote lambda* is any lambda that may potentially be executed on a different rank. These lambdas are identified as -being those given to a `ygm::comm` or any of the storage containers through a function prefixed by `async_`. - -The defining feature of remote lambdas is they **must not** capture any variables; all variables must be provided as -arguments. This limitation is due to the lack of -ability for YGM to inspect and extract these arguments when serializing messages to be sent to other ranks. - -### Local Lambdas -A *local lambda* is any lambda that is guaranteed not to be sent to a remote rank. These lambdas are identified as being -those given to a `for_all` operation on a storage container. - -The defining feature of local lambdas is that all arguments besides what is stored in the container must be captured. -Internally, these lambdas may be given to a [`std::for_each`](https://en.cppreference.com/w/cpp/algorithm/for_each) that -iterates over the container's elements stored locally on each rank. - # License YGM is distributed under the MIT license. diff --git a/cmake/FindArrowParquet.cmake b/cmake/FindArrowParquet.cmake index a40e6cab..b922d07f 100644 --- a/cmake/FindArrowParquet.cmake +++ b/cmake/FindArrowParquet.cmake @@ -1,42 +1,4 @@ -# Find Arrow and Parquet using find_package -function(find_arrow_parquet_config) - # Find Arrow >- 8.0 - foreach (VERSION 16.0 15.0 14.0 13.0 12.0 11.0 10.0 9.0 8.0) - find_package(Arrow ${VERSION} QUIET) - if (Arrow_FOUND) - break() - endif () - endforeach () - set(Arrow_FOUND ${Arrow_FOUND} PARENT_SCOPE) - - # Find Parquet - if (Arrow_FOUND) - find_package(Parquet QUIET PATHS ${Arrow_DIR}) - endif () - set(Parquet_FOUND ${Parquet_FOUND} PARENT_SCOPE) - - # Show Arrow and Parquet info - if (Arrow_FOUND AND Parquet_FOUND) - if (Arrow_FOUND) - message(STATUS ${PROJECT_NAME} " found Arrow") - message(STATUS "Arrow version: ${ARROW_VERSION}") - message(STATUS "Arrow SO version: ${ARROW_FULL_SO_VERSION}") - endif () - - if (Parquet_FOUND) - message(STATUS ${PROJECT_NAME} " found Parquet") - message(STATUS "Parquet version: ${PARQUET_VERSION}") - message(STATUS "Parquet SO version: ${PARQUET_FULL_SO_VERSION}") - endif () - else () - if (YGM_REQUIRE_ARROW_PARQUET) - message(FATAL_ERROR "${PROJECT_NAME} requires Arrow Parquet >= 8.0 but Arrow Parquet was not found.") - else () - message(WARNING "${PROJECT_NAME} did not find Arrow Parquet >= 8.0. Building without Arrow Parquet.") - endif () - endif () -endfunction() - +include(PythonUtilities) # Find Arrow and Parquet installed along with pyarrow by pip. # @@ -58,7 +20,7 @@ endfunction() # If Arrow and Parquet are found, set Arrow_FOUND and Parquet_FOUND to TRUE. # Also, Arrow::arrow_shared and Parquet::parquet_shared are created as imported targets. # Those targets can be used to link Arrow and Parquet as find_package() is used. -function(find_pyarrow) +function(find_pip_installed_pyarrow) if (PIP_PYARROW_ROOT) # Find libarrow file(GLOB Arrow_LIBRARIES LIST_DIRECTORIES false "${PIP_PYARROW_ROOT}/libarrow.so.*") @@ -114,12 +76,6 @@ function(find_pyarrow) endif () message(STATUS "Arrow include dir: ${Arrow_INCLUDE_DIRS}") - else () # Arrow or Parquet not found - if (YGM_REQUIRE_ARROW_PARQUET) - message(FATAL_ERROR "${PROJECT_NAME} requires Arrow Parquet but Arrow Parquet was not found.") - else () - message(WARNING "${PROJECT_NAME} did not find Arrow Parquet. Building without Arrow Parquet.") - endif () endif () else () message(FATAL_ERROR "PIP_PYARROW_ROOT is not set. PIP_PYARROW_ROOT must be set to the root of the pyarrow installation.") @@ -128,24 +84,149 @@ function(find_pyarrow) endfunction() -# Find Arrow and Parquet using find_arrow or find_pyarrow -# If PIP_PYARROW_ROOT is set, find_pyarrow is used. +# Find the directory where pyarrow is installed. +# This function executes a Python script to find the pyarrow module and +# **does not assume that pyarrow is installed by pip**. # # Output: -# Arrow_FOUND and Parquet_FOUND are set to TRUE if Arrow and Parquet are found. -function(find_arrow_parquet) - if (PIP_PYARROW_ROOT) - find_pyarrow() - else () - find_arrow_parquet_config() +# PYARROW_ROOT is set to the root of the pyarrow installation. +function(find_pyarrow_package) + find_python3_module(pyarrow) + if (PYTHON3_MODULE_PATH) + get_filename_component(PYARROW_ROOT ${PYTHON3_MODULE_PATH} DIRECTORY) + set(PYARROW_ROOT ${PYARROW_ROOT} PARENT_SCOPE) endif () +endfunction() + +# Install pyarrow using pip +# Output: +# PIP_PYARROW_ROOT is set to the root of the pyarrow installation. +function(install_pyarrow_in_venv) + setup_python_venv() + if (NOT PYTHON_VENV_ROOT) + return() + endif () + + activate_python_venv(${PYTHON_VENV_ROOT}) + if (NOT PYTHON_VENV_ACTIVATED) + return() + endif () + + # Use only the Python 3 interpreter in the virtual environment + set(Python3_FIND_VIRTUALENV ONLY) + + # Upgrade pip + # Ignore the error status as failing to upgrade is not the end of the world + upgrade_pip() + + # Install pyarrow + pip_install_python_package("pyarrow==16.1.*") + if (PIP_INSTALL_SUCCEEDED) + find_pyarrow_package() + if (PYARROW_ROOT) + set(PIP_PYARROW_ROOT ${PYARROW_ROOT} PARENT_SCOPE) + endif () + endif () + + deactivate_python_venv() +endfunction() + + +# Find Arrow and Parquet using find_package +# Output: +# Arrow_FOUND is set to TRUE if Arrow is found. +# Parquet_FOUND is set to TRUE if Parquet is found. +function(find_arrow_parquet_config) + # Find Arrow >= 8.0. + # Start major version from 100 so that we do not have to update + # this code every time Arrow releases a major version. + foreach (MAJOR_VERSION RANGE 100 8 -1) + find_package(Arrow "${MAJOR_VERSION}.0" QUIET) + if (Arrow_FOUND) + break() + endif () + endforeach () set(Arrow_FOUND ${Arrow_FOUND} PARENT_SCOPE) + + # Find Parquet + if (Arrow_FOUND) + find_package(Parquet QUIET PATHS ${Arrow_DIR}) + endif () set(Parquet_FOUND ${Parquet_FOUND} PARENT_SCOPE) + + # Show Arrow and Parquet info + if (Arrow_FOUND AND Parquet_FOUND) + if (Arrow_FOUND) + message(STATUS ${PROJECT_NAME} " found Arrow") + message(STATUS "Arrow version: ${ARROW_VERSION}") + message(STATUS "Arrow SO version: ${ARROW_FULL_SO_VERSION}") + endif () + + if (Parquet_FOUND) + message(STATUS ${PROJECT_NAME} " found Parquet") + message(STATUS "Parquet version: ${PARQUET_VERSION}") + message(STATUS "Parquet SO version: ${PARQUET_FULL_SO_VERSION}") + endif () + endif () +endfunction() + +# Find Arrow and Parquet. If not found, install pyarrow using pip in a Python virtual environmental space. +# Input: +# PIP_PYARROW_ROOT (option) The root directory of a pyarrow installed by pip. +# YGM_REQUIRE_ARROW_PARQUET (option) If TRUE, an fatal error is thrown when Arrow Parquet is not found. +# Output: +# Arrow_FOUND and Parquet_FOUND are defined and set to TRUE if Arrow and Parquet are found. +function(find_or_install_arrow_parquet) + if (PIP_PYARROW_ROOT) + find_pip_installed_pyarrow() + if (NOT Arrow_FOUND OR NOT Parquet_FOUND) + if (YGM_REQUIRE_ARROW_PARQUET) + message(FATAL_ERROR "${PROJECT_NAME} requires Arrow Parquet but Arrow Parquet was not found in ${PIP_PYARROW_ROOT}.") + else () + message(WARNING "${PROJECT_NAME} did not find Arrow Parquet in ${PIP_PYARROW_ROOT}. Building without Arrow Parquet.") + endif () + return() + endif () + endif () + + if (NOT Arrow_FOUND OR NOT Parquet_FOUND) + find_arrow_parquet_config() + endif () + + if (NOT Arrow_FOUND OR NOT Parquet_FOUND) + find_pyarrow_package() + if (PYARROW_ROOT) + # Assume that the found pip was installed by pip. + set(PIP_PYARROW_ROOT ${PYARROW_ROOT}) + find_pip_installed_pyarrow() + endif () + endif () + + if (NOT Arrow_FOUND OR NOT Parquet_FOUND) + install_pyarrow_in_venv() + if (PIP_PYARROW_ROOT) + find_pip_installed_pyarrow() + endif () + endif () + + if (NOT Arrow_FOUND OR NOT Parquet_FOUND) + message(STATUS "${PROJECT_NAME} could not find Arrow Parquet.") + message(STATUS "If this is an unexpected result, try the following command to install pyarrow: export Python3_ROOT_DIR=/path/to/python3; /path/to/python3 -m pip pyarrow") + if (YGM_REQUIRE_ARROW_PARQUET) + message(FATAL_ERROR "${PROJECT_NAME} requires Arrow Parquet.") + else () + message(WARNING "${PROJECT_NAME} keep the build process without Arrow Parquet.") + endif () + return() + endif () + + set(Arrow_FOUND TRUE PARENT_SCOPE) + set(Parquet_FOUND TRUE PARENT_SCOPE) endfunction() # Link Arrow and Parquet to the target -# This function must be called after find_arrow_parquet(). +# This function must be called after find_or_install_arrow_parquet(). function(link_arrow_parquet target) if (Arrow_FOUND AND Parquet_FOUND) target_link_libraries(${target} PUBLIC @@ -153,4 +234,4 @@ function(link_arrow_parquet target) else () message(WARNING "Arrow or Parquet not found. Not linking Arrow or Parquet.") endif () -endfunction() \ No newline at end of file +endfunction() diff --git a/cmake/FindPython3Module.cmake b/cmake/FindPython3Module.cmake new file mode 100644 index 00000000..9d3b4c8e --- /dev/null +++ b/cmake/FindPython3Module.cmake @@ -0,0 +1,18 @@ +# Find a Python3 module using CMake's FindPython3 module. +# Input: module name to find +# Python3_ROOT_DIR can be used as a hint to find Python3 +# +# Output: PYTHON3_MODULE_PATH is set to the path of the module if found +function(find_python3_module module_name) + find_package(Python3 COMPONENTS Interpreter REQUIRED) + + execute_process( + COMMAND ${Python3_EXECUTABLE} -c "import importlib; import sys; module_name = '${module_name}'; spec = importlib.util.find_spec(module_name); print(spec.origin if spec else ''); sys.exit(0 if spec else 1)" + OUTPUT_VARIABLE MODULE_PATH + OUTPUT_STRIP_TRAILING_WHITESPACE + ) + + if (Python3_FOUND AND MODULE_PATH) + set(PYTHON3_MODULE_PATH ${MODULE_PATH} PARENT_SCOPE) + endif () +endfunction() \ No newline at end of file diff --git a/cmake/PythonUtilities.cmake b/cmake/PythonUtilities.cmake new file mode 100644 index 00000000..709f1bf6 --- /dev/null +++ b/cmake/PythonUtilities.cmake @@ -0,0 +1,103 @@ +# Create and activate a Python3 virtual environment +# +# Output: PYTHON_VENV_ROOT is set to the path of the virtual environment +# if created successfully +function(setup_python_venv) + find_package(Python3 COMPONENTS Interpreter QUIET) + if (NOT Python3_Interpreter_FOUND) + message(WARNING "Python3 interpreter not found") + return() + endif() + + set(PYTHON_VENV_ROOT "${CMAKE_BINARY_DIR}/${PROJECT_NAME}-venv") + execute_process( + COMMAND ${Python3_EXECUTABLE} -m venv ${PYTHON_VENV_ROOT} + RESULT_VARIABLE result + OUTPUT_QUIET + ) + if (result EQUAL "0") + message(STATUS "Created Python virtual environment in ${PYTHON_VENV_ROOT}") + set(PYTHON_VENV_ROOT ${PYTHON_VENV_ROOT} PARENT_SCOPE) + endif() +endfunction() + +# Activate a Python3 virtual environment +# Input: A path to the virtual environment +# Output: PYTHON_VENV_ACTIVATED is set to TRUE if activated successfully +function(activate_python_venv venv_path) + set (ENV{VIRTUAL_ENV} ${venv_path}) + set(PYTHON_VENV_ACTIVATED TRUE PARENT_SCOPE) +endfunction() + +# Deactivate a Python3 virtual environment +function(deactivate_python_venv) + unset(ENV{VIRTUAL_ENV}) + set(PYTHON_VENV_ACTIVATED FALSE PARENT_SCOPE) +endfunction() + +# Upgrade pip in the Python3 interpreter +# Output: PIP_UPGRADE_SUCCEEDED is set to TRUE if pip was upgraded successfully +function(upgrade_pip) + find_package(Python3 COMPONENTS Interpreter QUIET) + if (NOT Python3_Interpreter_FOUND) + message(WARNING "Python3 interpreter not found") + return() + endif() + + execute_process( + COMMAND ${Python3_EXECUTABLE} -m pip install --upgrade pip + RESULT_VARIABLE result + OUTPUT_QUIET + ) + if(result EQUAL "0") + set(PIP_UPGRADE_SUCCEEDED TRUE PARENT_SCOPE) + endif() +endfunction() + +# Install a Python3 package using pip +# +# Input: A path to pip_executable and a package name +# Output: PIP_INSTALL_SUCCEEDED is set to TRUE +# if the package was installed successfully +function(pip_install_python_package package_name) + find_package(Python3 COMPONENTS Interpreter QUIET) + if (NOT Python3_Interpreter_FOUND) + message(WARNING "Python3 interpreter not found") + return() + endif() + + execute_process( + COMMAND ${Python3_EXECUTABLE} -m pip install ${package_name} + RESULT_VARIABLE result + OUTPUT_QUIET + ) + if(result EQUAL "0") + message(STATUS "Installed ${package_name}") + set(PIP_INSTALL_SUCCEEDED TRUE PARENT_SCOPE) + endif() +endfunction() + +# Find a Python3 module using CMake's FindPython3 module. +# Input: module name to find +# Python3_ROOT_DIR can be used as a hint to find Python3 +# +# Output: PYTHON3_MODULE_PATH is set to the path of the module if found +function(find_python3_module module_name) + find_package(Python3 COMPONENTS Interpreter QUIET) + if (NOT Python3_Interpreter_FOUND) + message(WARNING "Python3 interpreter not found") + return() + endif() + + execute_process( + COMMAND ${Python3_EXECUTABLE} -c "import importlib.util; import sys; module_name = '${module_name}'; spec = importlib.util.find_spec(module_name); print(spec.origin if spec else ''); sys.exit(0 if spec else 1)" + OUTPUT_VARIABLE MODULE_PATH + OUTPUT_STRIP_TRAILING_WHITESPACE + RESULT_VARIABLE result + ) + + if (result EQUAL "0") + set(PYTHON3_MODULE_PATH ${MODULE_PATH} PARENT_SCOPE) + message(STATUS "Found Python module ${module_name} at ${MODULE_PATH}") + endif() +endfunction() \ No newline at end of file diff --git a/docs/CMakeLists.txt b/docs/CMakeLists.txt index 55a1d082..8265c559 100644 --- a/docs/CMakeLists.txt +++ b/docs/CMakeLists.txt @@ -29,4 +29,4 @@ doxygen_add_docs(doxygen COMMENT "Generate documentation by Doxygen" ) -add_subdirectory(rtd) \ No newline at end of file +add_subdirectory(rtd) diff --git a/docs/rtd/getting_started.rst b/docs/rtd/getting_started.rst new file mode 100644 index 00000000..2b88bf84 --- /dev/null +++ b/docs/rtd/getting_started.rst @@ -0,0 +1,117 @@ +Getting Started +*************** + +What is YGM? +============ + +YGM is an asynchronous communication library written in C++ and designed for high-performance computing (HPC) use cases featuring +irregular communication patterns. YGM includes a collection of +distributed-memory storage containers designed to express common algorithmic and data-munging tasks. These containers +automatically partition data, allowing insertions and, with most containers, processing of individual elements to be +initiated from any runninng YGM process. + +Underlying YGM's containers is a communicator abstraction. This communicator asynchronously sends messages spawned by +senders with receivers needing no knowledge of incoming messages prior to their arrival. YGM communications take the +form of *active messages*; each message contains a function object to execute (often in the form of C++ lambdas), data +and/or pointers to data for this function to execute on, and a destination process for the message to be executed at. + +YGM also includes a set of I/O primitives for parsing collections of input documents in parallel as independent lines of +text and streaming output lines to +large numbers of destination files. Current parsing functionality supports reading input as CSV, ndjson, and +unstructured lines of data. + +General YGM Operations +====================== + +YGM is built on its ability to communicate active messages asynchronously between running processes. This does not +capture every operation that can be useful, for instance collective operations are still widely needed. YGM uses +prefixes on function names to distinguish their behaviors in terms of the processes involved. These prefixes are: + * ``async_``: Asynchronous operation initiated on a single process. The execution of the underlying function may + occur on a remote process. + * ``local_``: Function performs only local operations on data of the current process. In uses within YGM containers + with partitioning schemes that determine item ownership, care must be taken to ensure the process a ``local_`` + operation is called from aligns with the item's owner. For instance, calling ``ygm::container::map::local_insert`` + will store an item on the process where the call is made, but the ``ygm::container::map`` may not be able to look + up this location if it is on the wrong process. + * No Prefix: Collective operation that must be called from all processes. + +The primary workhorse functions in YGM fall into the two categories of ``async_`` and ``for_all`` operations. In an +``async_`` operation, a lambda is asynchronously sent to a (potentially) remote process for execution. In many cases +with YGM containers, the lambda being executed is not provided by the user and is instead part of the function itself, +e.g. ``async_insert`` calls on most containers. A ``for_all`` operation is a collective operation in which a lambda is +executed locally on every process while iterating over all locally held items of some YGM object. The items iterated +over can be items in a YGM container, items coming from a map, filter, or flatten applied to a container, or all lines +in a collection of files in a YGM I/O parser. + +Lambda Capture Rules +-------------------- +Certain ``async_`` and ``for_all`` operations require users to provide lambdas as part of their executions. The lambdas +that can be accepted by these two classes of functions follow different rules pertaining to the capturing of variables: + * ``async_`` calls cannot capture (most) variables in lambdas. Variables necessary for lambda execution must be + provided as arguments to the ``async_`` call. In the event that the data for the lambda resides on the remote + process the lambda will execute on, a ``ygm::ygm_ptr`` should be passed as an argument to the ``async_``. + * ``for_all`` calls assume lambdas take only the arguments inherently provided by the YGM object being iterated over. + All other necessary variables *must* be captured. The types of arguments provided to the lambda can be identified + by the ``for_all_args`` type within the YGM object. + +These differences in behavior arise from the distinction that ``async_`` lambdas may execute on a remote process, while +``for_all`` lambdas are guaranteed to execute locally to a process. In the case of ``async_`` operations, the lambda and +all arguments must be serialized for communication, but C++ does not provide a method for inspection of variables +captured in the closure of a lambda. In the case of ``for_all`` operations, the execution is equivalent to calling +`std::for_each `_ on entire collection of items held locally. + +Requirements +============ + +* C++20 - GCC versions 11 and 12 are tested. Your mileage may vary with other compilers. +* `Cereal `_ - C++ serialization library +* MPI +* Optionally, Boost 1.77 to enable Boost.JSON support. + + +Using YGM with CMake +==================== +YGM is a header-only library that is easy to incorporate into a project through CMake. Adding the following to +CMakeLists.txt will install YGM and its dependencies as part of your project: + +.. code-block:: CMake + + set(DESIRED_YGM_VERSION 0.6) + find_package(ygm ${DESIRED_YGM_VERSION} CONFIG) + if (NOT ygm_FOUND) + FetchContent_Declare( + ygm + GIT_REPOSITORY https://github.com/LLNL/ygm + GIT_TAG v${DESIRED_YGM_VERSION} + ) + FetchContent_GetProperties(ygm) + if (ygm_POPULATED) + message(STATUS "Found already populated ygm dependency: " + ${ygm_SOURCE_DIR} + ) + else () + set(JUST_INSTALL_YGM ON) + set(YGM_INSTALL ON) + FetchContent_Populate(ygm) + add_subdirectory(${ygm_SOURCE_DIR} ${ygm_BINARY_DIR}) + message(STATUS "Cloned ygm dependency " ${ygm_SOURCE_DIR}) + endif () + else () + message(STATUS "Found installed ygm dependency " ${ygm_DIR}) + endif () + +License +======= +YGM is distributed under the MIT license. + +All new contributions must be made under the MIT license. + +See `LICENSE-MIT `_, `NOTICE +`_, and `COPYRIGHT `_ for +details. + +SPDX-License-Identifier: MIT + +Release +======= +LLNL-CODE-789122 diff --git a/docs/rtd/index.rst b/docs/rtd/index.rst index a32c2d2b..0f429a5a 100644 --- a/docs/rtd/index.rst +++ b/docs/rtd/index.rst @@ -10,6 +10,7 @@ YGM library documentation :maxdepth: 2 :caption: Contents: + getting_started ygm/comm ygm/container @@ -26,4 +27,4 @@ Indices and tables * :ref:`genindex` * :ref:`modindex` -* :ref:`search` \ No newline at end of file +* :ref:`search` diff --git a/docs/rtd/ygm/comm.rst b/docs/rtd/ygm/comm.rst index 7c4340a7..440073eb 100644 --- a/docs/rtd/ygm/comm.rst +++ b/docs/rtd/ygm/comm.rst @@ -3,10 +3,59 @@ :code:`ygm::comm` class reference. ================================== +Communicator Overview +===================== + The communicator :code:`ygm::comm` is the central object in YGM. The communicator controls an interface to an MPI communicator, and its functionality can be modified by additional optional parameters. +Communicator Features: + * **Message Buffering** - Increases application throughput at the expense of increased message latency. + * **Message Routing** - Extends benefits of message buffering to extremely large HPC allocations. + * **Fire-and-Forget RPC Semantics** - A sender provides the function and function arguments for execution on a specified + destination rank through an `async` call. This function will complete on the destination rank at an unspecified time + in the future, but YGM does not explicitly make the sender aware of this completion. + +Communicator Hello World +======================== + +Here we will walk through a basic "hello world" YGM program. The [examples directory](/examples/) contains several other +examples, including many using YGM's storage containers. + +To begin, headers for a YGM communicator are needed: + +.. code-block:: C++ + + #include + +At the beginning of the program, a YGM communicator must be constructed. It will be given ``argc`` and ``argv`` like +``MPI_Init``. + +.. code-block:: C++ + + ygm::comm world(&argc, &argv); + +Next, we need a lambda to send through YGM. We'll do a simple hello\_world type of lambda. + +.. code-block:: C++ + + auto hello_world_lambda = [](const std::string &name) { + std::cout << "Hello " << name << std::endl; + }; + +Finally, we use this lambda inside of our `async` calls. In this case, we will have rank 0 send a message to rank 1, +telling it to greet the world + +.. code-block:: C++ + + if (world.rank0()) { + world.async(1, hello_world_lambda, std::string("world")); + } + +The full, compilable version of this example is found `here `_. Running it prints a single +"Hello world". + .. doxygenclass:: ygm::comm :members: - :undoc-members: \ No newline at end of file + :undoc-members: diff --git a/docs/rtd/ygm/container.rst b/docs/rtd/ygm/container.rst index 05b3ba85..758c9b74 100644 --- a/docs/rtd/ygm/container.rst +++ b/docs/rtd/ygm/container.rst @@ -11,34 +11,68 @@ operations that need to be performed on the data stored in a container while abstracting the locality and access details of said data. While insiration is taken from STL, the top priority is to provide expressive and performant tools within the YGM framework. -Interaction with containers occurs in one of two classes of operations: -:code:`for_all` and `async_visit`. - -Both classes expect a function as a primary argument, similar to -:code:`ygm::comm::async`. -However, the passed function signature must match the contents of the container. -Value store containers holding :code:`value_type` objects expect the first -argument of passed functions to address objects with the syntax -:code:`[](value_type &data_item){}`. -Key-value store containers expect these functions instead to support separate -:code:`key_type` (which must be immutable) and :code:`value_type` arguments with -the syntax :code:`[](key_type key, value_type &value){}`. -Although all of these operations agree as to how contained objects are addressed -by functions, the interfaces are subtly different and support additional -optional features. + +Implemented Storage Containers +====================== + +The currently implemented containers include a mix of distributed versions of familiar containers and +distributed-specific containers: + + * ``ygm::container::bag`` - An unordered collection of objects partitioned across processes. Ideally suited for + iteration over all items with no capability for identifying or searching for an individual item within the bag. + * ``ygm::container::set`` - Analogous to ``std::set``. An unordered collection of unique objects with the ability to iterate + and search for individual items. Insertion and iteration are slower than a ``ygm::container::bag``. + * ``ygm::container::multiset`` - Analogous to ``std::multiset``. A set where multiple instances of the same object + may appear. + * ``ygm::container::map`` - Analogous to ``std::map``. A collection of keys with assigned values. Keys and values can + be inserted and looked up individually or iterated over collectively. + * ``ygm::container::multimap`` - Analogous to ``std::multimap``. A map where keys may appear with multiple values. + * ``ygm::container::array`` - A collection of items indexed by an integer type. Items can be inserted and looked up + by their index values independently or iterated over collectively. Differs from a ``std::array`` in that sizes do + not need to known at compile-time, and a ``ygm::container::array`` can be dynamically resized through a + (potentially expensive) function at runtime. + * ``ygm::container::counting_set`` - A container for counting occurrences of items. Can be thought of as a + ``ygm::container::map`` that maps items to integer counts but optimized for the case of frequent duplication of + keys. + * ``ygm::container::disjoint_set`` - A distributed disjoint set data structure. Implements asynchronous union + operation for maintaining membership of items within mathematical disjoint sets. Eschews the find operation of most + disjoint set data structures and instead allows for execution of user-provided lambdas upon successful completion + of set merges. + +Typical Container Operations +============================ + +Most interaction with containers occurs in one of two classes of operations: +:code:`for_all` and `async_`. + +:code:`for_all` Operations +-------------------------- :code:`for_all`-class operations are barrier-inducing collectives that direct -ranks to iteratively apply the passed function to all locally-held data. +ranks to iteratively apply a user-provided function to all locally-held data. Functions passed to the :code:`for_all` interface do not support additional variadic parameters. However, these functions are stored and executed locally on each rank, and so can capture objects in rank-local scope. -:code:`async_visit`-class operations provide a mechanism for executing a -function at a particular piece of data stored within a container. -YGM handles the creation and invocation of a YGM communicator :code:`async` -call, freeing the user to consider algorithmic details. -Not all containers support :code:`async_visit`-class operations. +:code:`async_` Operations +------------------------- + +Operations prefixed with ``async_`` perform operations on containers that can be spawned from any process and +execute on the correct process using YGM's asynchronous runtime. The most common `async` operations are: + + * ``async_insert`` - Inserts an item or a key and value, depending on the container being used. The process responsible + for storing the inserted object is determined using the container's partitioner. Depending on the container, this + partitioner may determine this location using a hash of the item or by heuristics that attempt to evenly spread + data across processes (in the case of ``ygm::container::bag``). + * ``async_visit`` - Items within YGM containers will be distributed across the universe of running processes. Instead + of providing operations to look up this data directly, which would involve a round-trip communication with the + process storing the item of interest, most YGM containers provide ``async_visit``. A call to ``async_visit`` takes + a function to execute and arguments to pass to the function and asynchronously executes the provided function with + arguments that are the item stored in the container and the additional arguments passed to ``async_visit``. + +Specific containers may have additional ``async_`` operations (or may be missing some of the above) based on the +capabilities of the container. Consult the documentation of individual containers for more details. .. toctree:: :maxdepth: 2 @@ -53,7 +87,20 @@ Not all containers support :code:`async_visit`-class operations. container/multiset container/set -YGM also supports adaptor classes and functions that wrap an existing class to -either add or modify operation functionality. +YGM Container Example +===================== + +.. literalinclude:: ../../../examples/container/map_visit.cpp + :language: C++ + +Container Transformation Objects +================================ + +``ygm::container`` provides a number of transformation objects that can be applied to containers to alter the appearance +of items passed to ``for_all`` operations without modifying the items within the container itself. The currently +supported transformation objects are: -.. doxygenfunction:: ygm::container::reduce_by_key_map \ No newline at end of file + * ``filter`` - Filters items in a container to only execute on the portion of the container satisfying a provided + boolean function. + * ``flatten`` - Extract the elements from tuple-like objects before passing to the user's ``for_all`` function. + * ``map`` - Apply a generic function to the container's items before passing to the user's ``for_all`` function. diff --git a/examples/container/bag_filter.cpp b/examples/container/bag_filter.cpp index 8dae39c3..928bd1cd 100644 --- a/examples/container/bag_filter.cpp +++ b/examples/container/bag_filter.cpp @@ -80,7 +80,7 @@ int main(int argc, char **argv) { ygm::container::map word_count(world); bag3.filter([](std::string s) { return s.size() == 3; }) - .map([](std::string s) { return std::make_pair(s, size_t(1)); }) + .transform([](std::string s) { return std::make_pair(s, size_t(1)); }) .reduce_by_key(word_count, std::plus()); word_count.for_all( @@ -92,7 +92,7 @@ int main(int argc, char **argv) { ygm::io::line_parser lp(world, {"dummy"}); lp.filter([](std::string s) { return s.size() == 3; }) - .map([](std::string s) { return std::make_pair(s, size_t(1)); }) + .transform([](std::string s) { return std::make_pair(s, size_t(1)); }) .reduce_by_key(word_count, std::plus()); } diff --git a/examples/container/word_counter.cpp b/examples/container/word_counter.cpp index 2da77508..9fecdc50 100644 --- a/examples/container/word_counter.cpp +++ b/examples/container/word_counter.cpp @@ -60,7 +60,7 @@ int main(int argc, char **argv) { to_gather = {"freedom"}; } - auto counts = word_counter.key_gather(to_gather); + auto counts = word_counter.gather_keys(to_gather); for (auto &word_count : counts) { std::cout << word_count.first << " -> " << word_count.second << std::endl; diff --git a/include/ygm/collective.hpp b/include/ygm/collective.hpp index 056f2db1..6c4dd9dd 100644 --- a/include/ygm/collective.hpp +++ b/include/ygm/collective.hpp @@ -23,7 +23,7 @@ T prefix_sum(const T &value, const comm &c) { T to_return{0}; c.barrier(); MPI_Comm mpi_comm = c.get_mpi_comm(); - ASSERT_MPI(MPI_Exscan(&value, &to_return, 1, detail::mpi_typeof(value), + YGM_ASSERT_MPI(MPI_Exscan(&value, &to_return, 1, detail::mpi_typeof(value), MPI_SUM, mpi_comm)); return to_return; } @@ -42,7 +42,7 @@ T sum(const T &value, const comm &c) { T to_return; c.barrier(); MPI_Comm mpi_comm = c.get_mpi_comm(); - ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(T()), + YGM_ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(T()), MPI_SUM, mpi_comm)); return to_return; } @@ -61,7 +61,7 @@ T min(const T &value, const comm &c) { T to_return; c.barrier(); MPI_Comm mpi_comm = c.get_mpi_comm(); - ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(T()), + YGM_ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(T()), MPI_MIN, mpi_comm)); return to_return; } @@ -80,7 +80,7 @@ T max(const T &value, const comm &c) { T to_return; c.barrier(); MPI_Comm mpi_comm = c.get_mpi_comm(); - ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(T()), + YGM_ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(T()), MPI_MAX, mpi_comm)); return to_return; } @@ -98,7 +98,7 @@ inline bool logical_and(bool value, const comm &c) { bool to_return; c.barrier(); MPI_Comm mpi_comm = c.get_mpi_comm(); - ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(bool()), + YGM_ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(bool()), MPI_LAND, mpi_comm)); return to_return; } @@ -116,7 +116,7 @@ inline bool logical_or(bool value, const comm &c) { bool to_return; c.barrier(); MPI_Comm mpi_comm = c.get_mpi_comm(); - ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(bool()), + YGM_ASSERT_MPI(MPI_Allreduce(&value, &to_return, 1, detail::mpi_typeof(bool()), MPI_LOR, mpi_comm)); return to_return; } @@ -133,7 +133,7 @@ template void bcast(T &to_bcast, int root, const comm &cm) { if constexpr (std::is_trivially_copyable::value && std::is_standard_layout::value) { - ASSERT_MPI( + YGM_ASSERT_MPI( MPI_Bcast(&to_bcast, sizeof(T), MPI_BYTE, root, cm.get_mpi_comm())); } else { ygm::detail::byte_vector packed; @@ -142,13 +142,13 @@ void bcast(T &to_bcast, int root, const comm &cm) { oarchive(to_bcast); } size_t packed_size = packed.size(); - ASSERT_RELEASE(packed_size < 1024 * 1024 * 1024); - ASSERT_MPI(MPI_Bcast(&packed_size, 1, ygm::detail::mpi_typeof(packed_size), + YGM_ASSERT_RELEASE(packed_size < 1024 * 1024 * 1024); + YGM_ASSERT_MPI(MPI_Bcast(&packed_size, 1, ygm::detail::mpi_typeof(packed_size), root, cm.get_mpi_comm())); if (cm.rank() != root) { packed.resize(packed_size); } - ASSERT_MPI(MPI_Bcast(packed.data(), packed_size, MPI_BYTE, root, + YGM_ASSERT_MPI(MPI_Bcast(packed.data(), packed_size, MPI_BYTE, root, cm.get_mpi_comm())); if (cm.rank() != root) { diff --git a/include/ygm/container/array.hpp b/include/ygm/container/array.hpp index 14e11bae..ab0cf519 100644 --- a/include/ygm/container/array.hpp +++ b/include/ygm/container/array.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -27,8 +28,10 @@ class array public detail::base_misc, std::tuple>, public detail::base_async_visit, std::tuple>, - public detail::base_iteration, - std::tuple> { + public detail::base_iteration_key_value, + std::tuple>, + public detail::base_async_reduce, + std::tuple> { friend class detail::base_misc, std::tuple>; public: @@ -98,7 +101,7 @@ class array key_type max_index{0}; for (const auto& [index, value] : l) { - ASSERT_RELEASE(index >= 0); + YGM_ASSERT_RELEASE(index >= 0); max_index = std::max(max_index, index); } @@ -126,9 +129,10 @@ class array } template - array(ygm::comm& comm, const T& t) requires detail::HasForAll && - detail::SingleItemTuple && - std::same_as> + array(ygm::comm& comm, const T& t) + requires detail::HasForAll && + detail::SingleItemTuple && + std::same_as> : m_comm(comm), pthis(this), m_default_value{}, partitioner(comm, 0) { pthis.check(m_comm); @@ -144,17 +148,19 @@ class array } template - array(ygm::comm& comm, const T& t) requires detail::HasForAll && - detail::SingleItemTuple && detail:: - DoubleItemTuple> && - std::convertible_to< - std::tuple_element_t< - 0, std::tuple_element_t<0, typename T::for_all_args>>, - key_type> && - std::convertible_to< - std::tuple_element_t< - 1, std::tuple_element_t<0, typename T::for_all_args>>, - mapped_type> + array(ygm::comm& comm, const T& t) + requires detail::HasForAll && + detail::SingleItemTuple && + detail::DoubleItemTuple< + std::tuple_element_t<0, typename T::for_all_args>> && + std::convertible_to< + std::tuple_element_t< + 0, std::tuple_element_t<0, typename T::for_all_args>>, + key_type> && + std::convertible_to< + std::tuple_element_t< + 1, std::tuple_element_t<0, typename T::for_all_args>>, + mapped_type> : m_comm(comm), pthis(this), m_default_value{}, partitioner(comm, 0) { pthis.check(m_comm); @@ -175,12 +181,15 @@ class array } template - array(ygm::comm& comm, const T& t) requires detail::HasForAll && - detail::DoubleItemTuple && std::convertible_to< - - std::tuple_element_t<0, typename T::for_all_args>, key_type> && - std::convertible_to, - mapped_type> + array(ygm::comm& comm, const T& t) + requires detail::HasForAll && + detail::DoubleItemTuple && + std::convertible_to< + std::tuple_element_t<0, typename T::for_all_args>, + key_type> && + std::convertible_to< + std::tuple_element_t<0, typename T::for_all_args>, + mapped_type> : m_comm(comm), pthis(this), m_default_value{}, partitioner(comm, 0) { pthis.check(m_comm); @@ -201,9 +210,10 @@ class array } template - array(ygm::comm& comm, const T& t) requires detail::STLContainer && - (not detail::SingleItemTuple)&&std:: - convertible_to + array(ygm::comm& comm, const T& t) + requires detail::STLContainer && + (not detail::SingleItemTuple) && + std::convertible_to : m_comm(comm), pthis(this), m_default_value{}, partitioner(comm, 0) { pthis.check(m_comm); @@ -221,11 +231,15 @@ class array } template - array(ygm::comm& comm, const T& t) requires detail::STLContainer && - detail::DoubleItemTuple && std::convertible_to< - std::tuple_element_t<0, typename T::value_type>, key_type> && - std::convertible_to, - mapped_type> + array(ygm::comm& comm, const T& t) + requires detail::STLContainer && + detail::DoubleItemTuple && + std::convertible_to< + std::tuple_element_t<0, typename T::value_type>, + key_type> && + std::convertible_to< + std::tuple_element_t<1, typename T::value_type>, + mapped_type> : m_comm(comm), pthis(this), m_default_value{}, partitioner(comm, 0) { pthis.check(m_comm); @@ -278,7 +292,7 @@ class array void async_binary_op_update_value(const key_type index, const mapped_type& value, const BinaryOp& b) { - ASSERT_RELEASE(index < m_global_size); + YGM_ASSERT_RELEASE(index < m_global_size); auto updater = [](const key_type i, mapped_type& v, const mapped_type& new_value) { BinaryOp* binary_op; @@ -326,7 +340,7 @@ class array template void async_unary_op_update_value(const key_type index, const UnaryOp& u) { - ASSERT_RELEASE(index < m_global_size); + YGM_ASSERT_RELEASE(index < m_global_size); auto updater = [](const key_type i, mapped_type& v) { UnaryOp* u; v = (*u)(v); @@ -355,8 +369,10 @@ class array std::vector> tmp_values; tmp_values.reserve(local_size()); local_for_all( - [&tmp_values](const key_type& index, const mapped_type& value) { - tmp_values.push_back(std::make_pair(index, value)); + [&tmp_values, size](const key_type& index, const mapped_type& value) { + if (index < size) { + tmp_values.push_back(std::make_pair(index, value)); + } }); m_global_size = size; @@ -368,14 +384,14 @@ class array // Repopulate array values for (const auto& [index, value] : tmp_values) { - if (index < size) { - async_set(index, value); - } + async_set(index, value); } m_comm.barrier(); } + void resize(const size_type size) { resize(size, m_default_value); } + size_t local_size() { return partitioner.local_size(); } size_t size() const { @@ -383,8 +399,6 @@ class array return m_global_size; } - void resize(const size_type size) { resize(size, m_default_value); } - void local_clear() { resize(0); } void local_swap(self_type& other) { @@ -413,6 +427,13 @@ class array } } + template + void local_reduce(const key_type index, const mapped_type& value, + ReductionOp reducer) { + m_local_vec[partitioner.local_index(index)] = + reducer(value, m_local_vec[partitioner.local_index(index)]); + } + void sort() { const key_type samples_per_pivot = std::max( std::min(20, m_global_size / m_comm.size()), 1); @@ -446,7 +467,8 @@ class array } m_comm.barrier(); - ASSERT_RELEASE(samples.size() == samples_per_pivot * (m_comm.size() - 1)); + YGM_ASSERT_RELEASE(samples.size() == + samples_per_pivot * (m_comm.size() - 1)); std::sort(samples.begin(), samples.end()); for (size_t i = samples_per_pivot - 1; i < samples.size(); i += samples_per_pivot) { @@ -455,7 +477,7 @@ class array samples.clear(); samples.shrink_to_fit(); - ASSERT_RELEASE(pivots.size() == m_comm.size() - 1); + YGM_ASSERT_RELEASE(pivots.size() == m_comm.size() - 1); // // Partition using pivots diff --git a/include/ygm/container/bag.hpp b/include/ygm/container/bag.hpp index b5c1ab62..1ed3806c 100644 --- a/include/ygm/container/bag.hpp +++ b/include/ygm/container/bag.hpp @@ -21,7 +21,7 @@ template class bag : public detail::base_async_insert_value, std::tuple>, public detail::base_count, std::tuple>, public detail::base_misc, std::tuple>, - public detail::base_iteration, std::tuple> { + public detail::base_iteration_value, std::tuple> { friend class detail::base_misc, std::tuple>; public: @@ -37,24 +37,38 @@ class bag : public detail::base_async_insert_value, std::tuple>, bag(ygm::comm &comm, std::initializer_list l) : m_comm(comm), pthis(this), partitioner(comm) { - m_comm.cout0("initializer_list assumes all ranks are equal"); pthis.check(m_comm); if (m_comm.rank0()) { for (const Item &i : l) { async_insert(i); } } + m_comm.barrier(); } template - bag(ygm::comm &comm, const STLContainer &cont) + bag(ygm::comm &comm, + const STLContainer &cont) requires detail::STLContainer && + std::convertible_to : m_comm(comm), pthis(this), partitioner(comm) { - m_comm.cout0("STLContainer assumes all ranks are different"); pthis.check(m_comm); for (const Item &i : cont) { - async_insert(i); + this->async_insert(i); } + m_comm.barrier(); + } + + template + bag(ygm::comm &comm, + const YGMContainer &yc) requires detail::HasForAll && + detail::SingleItemTuple + : m_comm(comm), pthis(this), partitioner(comm) { + pthis.check(m_comm); + + yc.for_all([this](const Item &value) { this->async_insert(value); }); + + m_comm.barrier(); } ~bag() { m_comm.barrier(); } @@ -228,7 +242,7 @@ class bag : public detail::base_async_insert_value, std::tuple>, private: std::vector local_pop(int n) { - ASSERT_RELEASE(n <= local_size()); + YGM_ASSERT_RELEASE(n <= local_size()); size_t new_size = local_size() - n; auto pop_start = m_local_bag.begin() + new_size; @@ -245,4 +259,4 @@ class bag : public detail::base_async_insert_value, std::tuple>, typename ygm::ygm_ptr pthis; }; -} // namespace ygm::container \ No newline at end of file +} // namespace ygm::container diff --git a/include/ygm/container/bag_orig.hpp b/include/ygm/container/bag_orig.hpp deleted file mode 100644 index 2ecd6795..00000000 --- a/include/ygm/container/bag_orig.hpp +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2019-2021 Lawrence Livermore National Security, LLC and other YGM -// Project Developers. See the top-level COPYRIGHT file for details. -// -// SPDX-License-Identifier: MIT - -#pragma once - -#include -#include - -namespace ygm::container { -template > -class bag { - public: - using self_type = bag; - using value_type = Item; - using size_type = size_t; - using ygm_for_all_types = std::tuple; - using container_type = ygm::container::bag_tag; - - bag(ygm::comm &comm); - ~bag(); - - void async_insert(const value_type &item); - void async_insert(const value_type &item, int dest); - void async_insert(const std::vector &items, int dest); - - template - void for_all(Function fn); - - void clear(); - - size_type size(); - size_type local_size(); - - void rebalance(); - - void swap(self_type &s); - - template - void local_shuffle(RandomFunc &r); - void local_shuffle(); - - template - void global_shuffle(RandomFunc &r); - void global_shuffle(); - - template - void local_for_all(Function fn); - - ygm::comm &comm(); - - void serialize(const std::string &fname); - void deserialize(const std::string &fname); - std::vector gather_to_vector(int dest); - std::vector gather_to_vector(); - - private: - std::vector local_pop(int n); - - template - void local_for_all_pair_types(Function fn); - - private: - size_t m_round_robin = 0; - ygm::comm &m_comm; - std::vector m_local_bag; - typename ygm::ygm_ptr pthis; -}; -} // namespace ygm::container - -#include diff --git a/include/ygm/container/counting_set.hpp b/include/ygm/container/counting_set.hpp index cb1e364b..4bab7ec5 100644 --- a/include/ygm/container/counting_set.hpp +++ b/include/ygm/container/counting_set.hpp @@ -19,8 +19,8 @@ template class counting_set : public detail::base_count, std::tuple>, public detail::base_misc, std::tuple>, - public detail::base_iteration, - std::tuple> { + public detail::base_iteration_key_value, + std::tuple> { friend class detail::base_misc, std::tuple>; public: @@ -34,24 +34,62 @@ class counting_set const size_type count_cache_size = 1024 * 1024; counting_set(ygm::comm &comm) - : m_map(comm /*, mapped_type(0)*/), - m_comm(comm), - partitioner(m_map.partitioner), - pthis(this) { + : m_map(comm), m_comm(comm), partitioner(m_map.partitioner), pthis(this) { + pthis.check(m_comm); m_count_cache.resize(count_cache_size, {key_type(), -1}); } counting_set() = delete; - void async_insert(const key_type &key) { cache_insert(key); } + counting_set(ygm::comm &comm, std::initializer_list l) + : m_map(comm), m_comm(comm), partitioner(m_map.partitioner), pthis(this) { + pthis.check(m_comm); + m_count_cache.resize(count_cache_size, {key_type(), -1}); + if (m_comm.rank0()) { + for (const Key &i : l) { + async_insert(i); + } + } + m_comm.barrier(); + } - // void async_erase(const key_type& key) { cache_erase(key); } + template + counting_set(ygm::comm &comm, const STLContainer &cont) requires + detail::STLContainer && + std::convertible_to + : m_map(comm), m_comm(comm), pthis(this), partitioner(comm) { + pthis.check(m_comm); + m_count_cache.resize(count_cache_size, {key_type(), -1}); + for (const Key &i : cont) { + this->async_insert(i); + } + m_comm.barrier(); + } + + template + counting_set(ygm::comm &comm, const YGMContainer &yc) requires + detail::HasForAll && + detail::SingleItemTuple + : m_map(comm), m_comm(comm), pthis(this), partitioner(comm) { + pthis.check(m_comm); + m_count_cache.resize(count_cache_size, {key_type(), -1}); + yc.for_all([this](const Key &value) { this->async_insert(value); }); + + m_comm.barrier(); + } + + void async_insert(const key_type &key) { cache_insert(key); } template void local_for_all(Function fn) { m_map.local_for_all(fn); } + template + void local_for_all(Function fn) const { + m_map.local_for_all(fn); + } + void local_clear() { // What to do here m_map.local_clear(); clear_cache(); @@ -95,9 +133,9 @@ class counting_set // return m_map.all_gather(keys); // } - std::map key_gather( + std::map gather_keys( const std::vector &keys) { - return m_map.key_gather(keys); + return m_map.gather_keys(keys); } typename ygm::ygm_ptr get_ygm_ptr() const { return pthis; } @@ -130,12 +168,12 @@ class counting_set m_count_cache[slot].second = 1; } else { // flush slot, fill with key - ASSERT_DEBUG(m_count_cache[slot].second > 0); + YGM_ASSERT_DEBUG(m_count_cache[slot].second > 0); if (m_count_cache[slot].first == key) { m_count_cache[slot].second++; } else { count_cache_flush(slot); - ASSERT_DEBUG(m_count_cache[slot].second == -1); + YGM_ASSERT_DEBUG(m_count_cache[slot].second == -1); m_count_cache[slot].first = key; m_count_cache[slot].second = 1; } @@ -148,7 +186,7 @@ class counting_set void count_cache_flush(size_t slot) { auto key = m_count_cache[slot].first; auto cached_count = m_count_cache[slot].second; - ASSERT_DEBUG(cached_count > 0); + YGM_ASSERT_DEBUG(cached_count > 0); m_map.async_visit( key, [](const key_type &key, size_t &count, int32_t to_add) { diff --git a/include/ygm/container/detail/bag_orig.ipp b/include/ygm/container/detail/bag_orig.ipp deleted file mode 100644 index f64f1e9c..00000000 --- a/include/ygm/container/detail/bag_orig.ipp +++ /dev/null @@ -1,271 +0,0 @@ -// Copyright 2019-2021 Lawrence Livermore National Security, LLC and other YGM -// Project Developers. See the top-level COPYRIGHT file for details. -// -// SPDX-License-Identifier: MIT - -#pragma once - -#include -#include -#include - -namespace ygm::container { - -template -bag::bag(ygm::comm &comm) : m_comm(comm), pthis(this) { - pthis.check(m_comm); -} - -template -bag::~bag() { - m_comm.barrier(); -} - -template -void bag::async_insert(const value_type &item) { - auto inserter = [](auto mailbox, auto map, const value_type &item) { - map->m_local_bag.push_back(item); - }; - int dest = (m_round_robin++ + m_comm.rank()) % m_comm.size(); - m_comm.async(dest, inserter, pthis, item); -} - -template -void bag::async_insert(const value_type &item, int dest) { - auto inserter = [](auto mailbox, auto map, const value_type &item) { - map->m_local_bag.push_back(item); - }; - m_comm.async(dest, inserter, pthis, item); -} - -template -void bag::async_insert(const std::vector &items, - int dest) { - auto inserter = [](auto mailbox, auto map, - const std::vector &item) { - map->m_local_bag.insert(map->m_local_bag.end(), item.begin(), item.end()); - }; - m_comm.async(dest, inserter, pthis, items); -} - -template -template -void bag::for_all(Function fn) { - m_comm.barrier(); - local_for_all(fn); -} - -template -void bag::clear() { - m_comm.barrier(); - m_local_bag.clear(); -} - -template -typename bag::size_type bag::size() { - m_comm.barrier(); - return m_comm.all_reduce_sum(m_local_bag.size()); -} - -template -typename bag::size_type bag::local_size() { - return m_local_bag.size(); -} - -template -void bag::rebalance() { - m_comm.barrier(); - - // Find current rank's prefix val and desired target size - size_t prefix_val = ygm::prefix_sum(local_size(), m_comm); - size_t target_size = std::ceil((size() * 1.0) / m_comm.size()); - - // Init to_send array where index is dest and value is the num to send - // int to_send[m_comm.size()] = {0}; - std::unordered_map to_send; - - auto global_size = size(); - size_t small_block_size = global_size / m_comm.size(); - size_t large_block_size = - global_size / m_comm.size() + ((global_size / m_comm.size()) > 0); - - for (size_t i = 0; i < local_size(); i++) { - size_t idx = prefix_val + i; - size_t target_rank; - - // Determine target rank to match partitioning in ygm::container::array - if (idx < (global_size % m_comm.size()) * large_block_size) { - target_rank = idx / large_block_size; - } else { - target_rank = (global_size % m_comm.size()) + - (idx - (global_size % m_comm.size()) * large_block_size) / - small_block_size; - } - - if (target_rank != m_comm.rank()) { - to_send[target_rank]++; - } - } - m_comm.barrier(); - - // Build and send bag indexes as calculated by to_send - for (auto &kv_pair : to_send) { - async_insert(local_pop(kv_pair.second), kv_pair.first); - } - - m_comm.barrier(); -} - -template -void bag::swap(self_type &s) { - m_comm.barrier(); - m_local_bag.swap(s.m_local_bag); -} - -template -template -void bag::local_shuffle(RandomFunc &r) { - m_comm.barrier(); - std::shuffle(m_local_bag.begin(), m_local_bag.end(), r); -} - -template -void bag::local_shuffle() { - ygm::default_random_engine<> r(m_comm, std::random_device()()); - local_shuffle(r); -} - -template -template -void bag::global_shuffle(RandomFunc &r) { - m_comm.barrier(); - std::vector old_local_bag; - std::swap(old_local_bag, m_local_bag); - - auto send_item = [](auto bag, const value_type &item) { - bag->m_local_bag.push_back(item); - }; - - std::uniform_int_distribution<> distrib(0, m_comm.size() - 1); - for (value_type i : old_local_bag) { - m_comm.async(distrib(r), send_item, pthis, i); - } -} - -template -void bag::global_shuffle() { - ygm::default_random_engine<> r(m_comm, std::random_device()()); - global_shuffle(r); -} - -template -ygm::comm &bag::comm() { - return m_comm; -} - -template -void bag::serialize(const std::string &fname) { - m_comm.barrier(); - std::string rank_fname = fname + std::to_string(m_comm.rank()); - std::ofstream os(rank_fname, std::ios::binary); - cereal::JSONOutputArchive oarchive(os); - oarchive(m_local_bag, m_round_robin, m_comm.size()); -} - -template -void bag::deserialize(const std::string &fname) { - m_comm.barrier(); - - std::string rank_fname = fname + std::to_string(m_comm.rank()); - std::ifstream is(rank_fname, std::ios::binary); - - cereal::JSONInputArchive iarchive(is); - int comm_size; - iarchive(m_local_bag, m_round_robin, comm_size); - - if (comm_size != m_comm.size()) { - m_comm.cerr0( - "Attempting to deserialize bag_impl using communicator of " - "different size than serialized with"); - } -} - -template -template -void bag::local_for_all(Function fn) { - if constexpr (ygm::detail::is_std_pair) { - local_for_all_pair_types(fn); // pairs get special handling - } else { - if constexpr (std::is_invocable()) { - std::for_each(m_local_bag.begin(), m_local_bag.end(), fn); - } else { - static_assert(ygm::detail::always_false<>, - "local bag lambdas must be invocable with (value_type &) " - "signatures"); - } - } -} - -template -std::vector::value_type> -bag::gather_to_vector(int dest) { - std::vector result; - auto p_res = m_comm.make_ygm_ptr(result); - m_comm.barrier(); - auto gatherer = [](auto res, const std::vector &outer_data) { - res->insert(res->end(), outer_data.begin(), outer_data.end()); - }; - m_comm.async(dest, gatherer, p_res, m_local_bag); - m_comm.barrier(); - return result; -} - -template -std::vector::value_type> -bag::gather_to_vector() { - std::vector result; - auto p_res = m_comm.make_ygm_ptr(result); - m_comm.barrier(); - auto result0 = gather_to_vector(0); - if (m_comm.rank0()) { - auto distribute = [](auto res, const std::vector &data) { - res->insert(res->end(), data.begin(), data.end()); - }; - m_comm.async_bcast(distribute, p_res, result0); - } - m_comm.barrier(); - return result; -} - -template -std::vector::value_type> bag::local_pop( - int n) { - ASSERT_RELEASE(n <= local_size()); - - size_t new_size = local_size() - n; - auto pop_start = m_local_bag.begin() + new_size; - std::vector ret; - ret.assign(pop_start, m_local_bag.end()); - m_local_bag.resize(new_size); - return ret; -} - -template -template -void bag::local_for_all_pair_types(Function fn) { - if constexpr (std::is_invocable()) { - std::for_each(m_local_bag.begin(), m_local_bag.end(), fn); - } else if constexpr (std::is_invocable()) { - for (auto &kv : m_local_bag) { - fn(kv.first, kv.second); - } - } else { - static_assert(ygm::detail::always_false<>, - "local bag lambdas must be invocable with (pair &) " - "or (pair::first_type &, pair::second_type &) signatures"); - } -} - -} // namespace ygm::container diff --git a/include/ygm/container/detail/base_async_contains.hpp b/include/ygm/container/detail/base_async_contains.hpp index 6e8fd3e6..2565052c 100644 --- a/include/ygm/container/detail/base_async_contains.hpp +++ b/include/ygm/container/detail/base_async_contains.hpp @@ -7,33 +7,35 @@ #include #include +#include +#include namespace ygm::container::detail { template struct base_async_contains { - - template + template void async_contains(const std::tuple_element<0, for_all_args>::type& value, - Function fn, const FuncArgs&... args) { + Function fn, const FuncArgs&... args) { + YGM_CHECK_ASYNC_LAMBDA_COMPLIANCE(Function, + "ygm::container::async_contains()"); derived_type* derived_this = static_cast(this); int dest = derived_this->partitioner.owner(value); - auto lambda = [](auto pcont, - const std::tuple_element<0, for_all_args>::type& value, - const FuncArgs&... args) { - - Function* fn = nullptr; + auto lambda = [fn](auto pcont, + const std::tuple_element<0, for_all_args>::type& value, + const FuncArgs&... args) mutable { bool contains = static_cast(pcont->local_count(value)); - ygm::meta::apply_optional(*fn, std::make_tuple(pcont), - std::forward_as_tuple(contains, value, args...)); + ygm::meta::apply_optional( + fn, std::make_tuple(pcont), + std::forward_as_tuple(contains, value, args...)); }; - derived_this->comm().async(dest, lambda, derived_this->get_ygm_ptr(), - value, args...); + derived_this->comm().async(dest, lambda, derived_this->get_ygm_ptr(), value, + args...); } }; -} // namespace ygm::container::detail \ No newline at end of file +} // namespace ygm::container::detail diff --git a/include/ygm/container/detail/base_async_insert_contains.hpp b/include/ygm/container/detail/base_async_insert_contains.hpp index de17cc14..6c26a641 100644 --- a/include/ygm/container/detail/base_async_insert_contains.hpp +++ b/include/ygm/container/detail/base_async_insert_contains.hpp @@ -7,37 +7,40 @@ #include #include +#include +#include namespace ygm::container::detail { template struct base_async_insert_contains { - - template - void async_insert_contains(const std::tuple_element<0, for_all_args>::type& value, - Function fn, const FuncArgs&... args) { + template + void async_insert_contains( + const std::tuple_element<0, for_all_args>::type& value, Function fn, + const FuncArgs&... args) { + YGM_CHECK_ASYNC_LAMBDA_COMPLIANCE( + Function, "ygm::container::async_insert_contains()"); derived_type* derived_this = static_cast(this); int dest = derived_this->partitioner.owner(value); - auto lambda = [](auto pcont, - const std::tuple_element<0, for_all_args>::type& value, - const FuncArgs&... args) { - - Function* fn = nullptr; + auto lambda = [fn](auto pcont, + const std::tuple_element<0, for_all_args>::type& value, + const FuncArgs&... args) mutable { bool contains = static_cast(pcont->local_count(value)); if (!contains) { pcont->local_insert(value); - } + } - ygm::meta::apply_optional(*fn, std::make_tuple(pcont), - std::forward_as_tuple(contains, value, args...)); + ygm::meta::apply_optional( + fn, std::make_tuple(pcont), + std::forward_as_tuple(contains, value, args...)); }; - derived_this->comm().async(dest, lambda, derived_this->get_ygm_ptr(), - value, args...); + derived_this->comm().async(dest, lambda, derived_this->get_ygm_ptr(), value, + args...); } }; -} // namespace ygm::container::detail \ No newline at end of file +} // namespace ygm::container::detail diff --git a/include/ygm/container/detail/base_async_reduce.hpp b/include/ygm/container/detail/base_async_reduce.hpp index e2d3d2bf..6ae652fd 100644 --- a/include/ygm/container/detail/base_async_reduce.hpp +++ b/include/ygm/container/detail/base_async_reduce.hpp @@ -8,6 +8,7 @@ #include #include #include +#include namespace ygm::container::detail { @@ -18,20 +19,23 @@ struct base_async_reduce { const typename std::tuple_element<0, for_all_args>::type& key, const typename std::tuple_element<1, for_all_args>::type& value, ReductionOp reducer) { + YGM_CHECK_ASYNC_LAMBDA_COMPLIANCE(ReductionOp, + "ygm::container::async_reduce()"); + derived_type* derived_this = static_cast(this); int dest = derived_this->partitioner.owner(key); - auto rlambda = [reducer]( - auto pcont, - const std::tuple_element<0, for_all_args>::type& key, - const std::tuple_element<1, for_all_args>::type& value) { - pcont->local_reduce(key, value, reducer); - }; + auto rlambda = + [reducer]( + auto pcont, const std::tuple_element<0, for_all_args>::type& key, + const std::tuple_element<1, for_all_args>::type& value) mutable { + pcont->local_reduce(key, value, reducer); + }; derived_this->comm().async(dest, rlambda, derived_this->get_ygm_ptr(), key, value); } }; -} // namespace ygm::container::detail \ No newline at end of file +} // namespace ygm::container::detail diff --git a/include/ygm/container/detail/base_async_visit.hpp b/include/ygm/container/detail/base_async_visit.hpp index 9a611818..6f9c22c8 100644 --- a/include/ygm/container/detail/base_async_visit.hpp +++ b/include/ygm/container/detail/base_async_visit.hpp @@ -9,6 +9,7 @@ #include #include #include +#include namespace ygm::container::detail { @@ -19,15 +20,17 @@ struct base_async_visit { Visitor visitor, const VisitorArgs&... args) requires DoubleItemTuple { + YGM_CHECK_ASYNC_LAMBDA_COMPLIANCE(Visitor, "ygm::container::async_visit()"); + derived_type* derived_this = static_cast(this); int dest = derived_this->partitioner.owner(key); - auto vlambda = [](auto pcont, - const std::tuple_element<0, for_all_args>::type& key, - const VisitorArgs&... args) { - Visitor* vis = nullptr; - pcont->local_visit(key, *vis, args...); + auto vlambda = [visitor]( + auto pcont, + const std::tuple_element<0, for_all_args>::type& key, + const VisitorArgs&... args) mutable { + pcont->local_visit(key, visitor, args...); }; derived_this->comm().async(dest, vlambda, derived_this->get_ygm_ptr(), key, @@ -40,15 +43,18 @@ struct base_async_visit { const VisitorArgs&... args) requires DoubleItemTuple { + YGM_CHECK_ASYNC_LAMBDA_COMPLIANCE( + Visitor, "ygm::container::async_visit_if_contains()"); + derived_type* derived_this = static_cast(this); int dest = derived_this->partitioner.owner(key); - auto vlambda = [](auto pcont, - const std::tuple_element<0, for_all_args>::type& key, - const VisitorArgs&... args) { - Visitor* vis = nullptr; - pcont->local_visit_if_contains(key, *vis, args...); + auto vlambda = [visitor]( + auto pcont, + const std::tuple_element<0, for_all_args>::type& key, + const VisitorArgs&... args) mutable { + pcont->local_visit_if_contains(key, visitor, args...); }; derived_this->comm().async(dest, vlambda, derived_this->get_ygm_ptr(), key, @@ -61,15 +67,18 @@ struct base_async_visit { const VisitorArgs&... args) const requires DoubleItemTuple { + YGM_CHECK_ASYNC_LAMBDA_COMPLIANCE( + Visitor, "ygm::container::async_visit_if_contains()"); + const derived_type* derived_this = static_cast(this); int dest = derived_this->partitioner.owner(key); - auto vlambda = [](const auto pcont, - const std::tuple_element<0, for_all_args>::type& key, - const VisitorArgs&... args) { - Visitor* vis = nullptr; - pcont->local_visit_if_contains(key, *vis, args...); + auto vlambda = [visitor]( + const auto pcont, + const std::tuple_element<0, for_all_args>::type& key, + const VisitorArgs&... args) mutable { + pcont->local_visit_if_contains(key, visitor, args...); }; derived_this->comm().async(dest, vlambda, derived_this->get_ygm_ptr(), key, @@ -79,4 +88,4 @@ struct base_async_visit { // todo: async_insert_visit() }; -} // namespace ygm::container::detail \ No newline at end of file +} // namespace ygm::container::detail diff --git a/include/ygm/container/detail/base_batch_erase.hpp b/include/ygm/container/detail/base_batch_erase.hpp new file mode 100644 index 00000000..2ae842cd --- /dev/null +++ b/include/ygm/container/detail/base_batch_erase.hpp @@ -0,0 +1,135 @@ +// Copyright 2019-2021 Lawrence Livermore National Security, LLC and other YGM +// Project Developers. See the top-level COPYRIGHT file for details. +// +// SPDX-License-Identifier: MIT + +#pragma once + +#include +#include +#include + +namespace ygm::container::detail { + +template +struct base_batch_erase_key { + using key_type = std::tuple_element_t<0, for_all_args>; + + template + void erase(const Container &cont) requires detail::HasForAll && + SingleItemTuple && std::convertible_to< + std::tuple_element_t<0, typename Container::for_all_args>, key_type> { + derived_type *derived_this = static_cast(this); + + cont.for_all( + [derived_this](const auto &key) { derived_this->async_erase(key); }); + + derived_this->comm().barrier(); + } + + template + void erase(const Container &cont) requires STLContainer && + AtLeastOneItemTuple && + std::convertible_to { + derived_type *derived_this = static_cast(this); + + for (const auto &key : cont) { + derived_this->async_erase(key); + } + + derived_this->comm().barrier(); + } +}; + +template +struct base_batch_erase_key_value { + using key_type = std::tuple_element_t<0, for_all_args>; + using mapped_type = std::tuple_element_t<1, for_all_args>; + + template + void erase(const Container &cont) requires HasForAll && + DoubleItemTuple && std::convertible_to< + std::tuple_element_t<0, typename Container::for_all_args>, + key_type> && + std::convertible_to< + std::tuple_element_t<1, typename Container::for_all_args>, + mapped_type> { + derived_type *derived_this = static_cast(this); + + cont.for_all([derived_this](const auto &key, const auto &value) { + derived_this->async_erase(key, value); + }); + + derived_this->comm().barrier(); + } + + template + void erase(const Container &cont) requires HasForAll && + SingleItemTuple && DoubleItemTuple< + std::tuple_element_t<0, typename Container::for_all_args>> && + std::convertible_to< + std::tuple_element_t< + 0, std::tuple_element_t<0, typename Container::for_all_args>>, + key_type> && + std::convertible_to< + std::tuple_element_t< + 1, std::tuple_element_t<0, typename Container::for_all_args>>, + mapped_type> { + derived_type *derived_this = static_cast(this); + + cont.for_all([derived_this](const auto &key_value) { + const auto &[key, value] = key_value; + + derived_this->async_erase(key, value); + }); + + derived_this->comm().barrier(); + } + + template + void erase(const Container &cont) requires STLContainer && + DoubleItemTuple && std::convertible_to< + std::tuple_element_t<0, typename Container::value_type>, key_type> && + std::convertible_to< + std::tuple_element_t<1, typename Container::value_type>, + mapped_type> { + derived_type *derived_this = static_cast(this); + + derived_this->comm().barrier(); + + for (const auto &key_value : cont) { + const auto &[key, value] = key_value; + derived_this->async_erase(key, value); + } + + derived_this->comm().barrier(); + } + + // Copies of base_batch_erase_key functions to allow deletions from keys alone + template + void erase(const Container &cont) requires detail::HasForAll && + SingleItemTuple && std::convertible_to< + std::tuple_element_t<0, typename Container::for_all_args>, key_type> { + derived_type *derived_this = static_cast(this); + + cont.for_all( + [derived_this](const auto &key) { derived_this->async_erase(key); }); + + derived_this->comm().barrier(); + } + + template + void erase(const Container &cont) requires STLContainer && + AtLeastOneItemTuple && + std::convertible_to { + derived_type *derived_this = static_cast(this); + + for (const auto &key : cont) { + derived_this->async_erase(key); + } + + derived_this->comm().barrier(); + } +}; + +} // namespace ygm::container::detail diff --git a/include/ygm/container/detail/base_concepts.hpp b/include/ygm/container/detail/base_concepts.hpp index 3881a832..9f82abc2 100644 --- a/include/ygm/container/detail/base_concepts.hpp +++ b/include/ygm/container/detail/base_concepts.hpp @@ -11,23 +11,44 @@ namespace ygm::container::detail { template -concept SingleItemTuple = requires(T v) { - requires std::tuple_size::value == 1; -}; +concept SingleItemTuple = + requires(T v) { requires std::tuple_size::value == 1; }; + +template +concept DoubleItemTuple = + requires(T v) { requires std::tuple_size::value == 2; }; + +template +concept AtLeastOneItemTuple = + requires(T v) { requires std::tuple_size::value >= 1; }; + +template +concept HasForAll = requires(T v) { typename T::for_all_args; }; template -concept DoubleItemTuple = requires(T v) { - requires std::tuple_size::value == 2; +concept HasAsyncReduceWithReductionOp = requires(T v) { + { + std::declval().async_reduce( + std::declval(), + std::declval(), + [](const typename T::mapped_type a, const typename T::mapped_type b) { + return a; + }) + } -> std::same_as; }; template -concept AtLeastOneItemTuple = requires(T v) { - requires std::tuple_size::value >= 1; +concept HasAsyncReduceWithoutReductionOp = requires(T v) { + { + std::declval().async_reduce(std::declval(), + std::declval()) + } -> std::same_as; }; template -concept HasForAll = requires(T v) { - typename T::for_all_args; +concept HasAsyncReduce = requires(T v) { + requires HasAsyncReduceWithReductionOp or + HasAsyncReduceWithoutReductionOp; }; // Copied solution for an STL container concept from @@ -61,5 +82,4 @@ concept STLContainer = requires(ContainerType a, const ContainerType b) { { a.max_size() } -> std::same_as; { a.empty() } -> std::same_as; }; - } // namespace ygm::container::detail diff --git a/include/ygm/container/detail/base_iteration.hpp b/include/ygm/container/detail/base_iteration.hpp index 7f0e7cd7..51434b09 100644 --- a/include/ygm/container/detail/base_iteration.hpp +++ b/include/ygm/container/detail/base_iteration.hpp @@ -6,43 +6,54 @@ #pragma once #include +#include #include #include namespace ygm::container::detail { template -class filter_proxy; +class filter_proxy_value; +template +class filter_proxy_key_value; -template -class map_proxy; +template +class transform_proxy_value; +template +class transform_proxy_key_value; template -class flatten_proxy; +class flatten_proxy_value; +template +class flatten_proxy_key_value; + +template +struct base_iteration_value { + using value_type = typename std::tuple_element<0, for_all_args>::type; -template -struct base_iteration { template void for_all(Function fn) { - derived_type* derived_this = static_cast(this); + auto* derived_this = static_cast(this); derived_this->comm().barrier(); derived_this->local_for_all(fn); } template void for_all(Function fn) const { - const derived_type* derived_this = static_cast(this); + const auto* derived_this = static_cast(this); derived_this->comm().barrier(); derived_this->local_for_all(fn); } template void gather(STLContainer& gto, int rank) const { + static_assert( + std::is_same_v); // TODO, make an all gather version that defaults to rank = -1 & uses a temp // container. bool all_gather = (rank == -1); static STLContainer* spgto = >o; - const derived_type* derived_this = static_cast(this); + const auto* derived_this = static_cast(this); const ygm::comm& mycomm = derived_this->comm(); auto glambda = [&mycomm, rank](const auto& value) { @@ -51,78 +62,284 @@ struct base_iteration { value); }; - for_all(glambda); + derived_this->for_all(glambda); derived_this->comm().barrier(); } - template - std::tuple_element<0, for_all_args>::type reduce(MergeFunction merge) const + template > + std::vector gather_topk( + size_t k, Compare comp = std::greater()) const requires SingleItemTuple { - const derived_type* derived_this = static_cast(this); + const auto* derived_this = static_cast(this); + const ygm::comm& mycomm = derived_this->comm(); + std::vector local_topk; + + // + // Find local top_k + for_all([&local_topk, comp, k](const value_type& value) { + local_topk.push_back(value); + std::sort(local_topk.begin(), local_topk.end(), comp); + if (local_topk.size() > k) { + local_topk.pop_back(); + } + }); + + // + // All reduce global top_k + auto to_return = mycomm.all_reduce( + local_topk, [comp, k](const std::vector& va, + const std::vector& vb) { + std::vector out(va.begin(), va.end()); + out.insert(out.end(), vb.begin(), vb.end()); + std::sort(out.begin(), out.end(), comp); + while (out.size() > k) { + out.pop_back(); + } + return out; + }); + return to_return; + } + + template + value_type reduce(MergeFunction merge) const { + const auto* derived_this = static_cast(this); derived_this->comm().barrier(); - ASSERT_RELEASE(derived_this->local_size() > - 0); // empty partition not handled yet using value_type = typename std::tuple_element<0, for_all_args>::type; bool first = true; - value_type to_return; + value_type local_reduce; - auto rlambda = [&to_return, &first, &merge](const value_type& value) { + auto rlambda = [&local_reduce, &first, &merge](const value_type& value) { if (first) { - to_return = value; - first = false; + local_reduce = value; + first = false; } else { - to_return = merge(to_return, value); + local_reduce = merge(local_reduce, value); } }; derived_this->for_all(rlambda); - derived_this->comm().barrier(); + std::optional to_reduce; + if (!first) { + to_reduce = local_reduce; + } - return ::ygm::all_reduce(to_return, merge, derived_this->comm()); + std::optional to_return = + ::ygm::all_reduce(to_reduce, merge, derived_this->comm()); + YGM_ASSERT_RELEASE(to_return.has_value()); + return to_return.value(); } template void collect(YGMContainer& c) const { + const auto* derived_this = static_cast(this); + auto clambda = [&c](const value_type& item) { c.async_insert(item); }; + derived_this->for_all(clambda); + } + + template + void reduce_by_key(MapType& map, ReductionOp reducer) const { + // TODO: static_assert MapType is ygm::container::map + const auto* derived_this = static_cast(this); + using reduce_key_type = typename MapType::key_type; + using reduce_value_type = typename MapType::mapped_type; + static_assert(std::is_same_v>, + "value_type must be a std::pair"); + + auto rbklambda = + [&map, reducer](std::pair kvp) { + map.async_reduce(kvp.first, kvp.second, reducer); + }; + derived_this->for_all(rbklambda); + } + + template + transform_proxy_value transform( + TransformFunction ffn); + + flatten_proxy_value flatten(); + + template + filter_proxy_value filter(FilterFunction ffn); + + private: + template + requires requires(STLContainer stc, Value v) { stc.push_back(v); } + static void generic_insert(STLContainer& stc, const Value& value) { + stc.push_back(value); + } + + template + requires requires(STLContainer stc, Value v) { stc.insert(v); } + static void generic_insert(STLContainer& stc, const Value& value) { + stc.insert(value); + } +}; + +// For Associative Containers +template +struct base_iteration_key_value { + using key_type = typename std::tuple_element<0, for_all_args>::type; + using mapped_type = typename std::tuple_element<1, for_all_args>::type; + + template + void for_all(Function fn) { + auto* derived_this = static_cast(this); + derived_this->comm().barrier(); + derived_this->local_for_all(fn); + } + + template + void for_all(Function fn) const { + const auto* derived_this = static_cast(this); + derived_this->comm().barrier(); + derived_this->local_for_all(fn); + } + + template + void gather(STLContainer& gto, int rank) const { + static_assert(std::is_same_v>); + // TODO, make an all gather version that defaults to rank = -1 & uses a temp + // container. + bool all_gather = (rank == -1); + static STLContainer* spgto = >o; + const derived_type* derived_this = static_cast(this); + const ygm::comm& mycomm = derived_this->comm(); + + auto glambda = [&mycomm, rank](const key_type& key, + const mapped_type& value) { + mycomm.async( + rank, + [](const key_type& key, const mapped_type& value) { + generic_insert(*spgto, std::make_pair(key, value)); + }, + key, value); + }; + + derived_this->for_all(glambda); + + derived_this->comm().barrier(); + } + + template >> + std::vector> gather_topk( + size_t k, Compare comp = Compare()) const { + const auto* derived_this = static_cast(this); + const ygm::comm& mycomm = derived_this->comm(); + using vec_type = std::vector>; + vec_type local_topk; + + // + // Find local top_k + for_all( + [&local_topk, comp, k](const key_type& key, const mapped_type& mapped) { + local_topk.push_back(std::make_pair(key, mapped)); + std::sort(local_topk.begin(), local_topk.end(), comp); + if (local_topk.size() > k) { + local_topk.pop_back(); + } + }); + + // + // All reduce global top_k + auto to_return = mycomm.all_reduce( + local_topk, [comp, k](const vec_type& va, const vec_type& vb) { + vec_type out(va.begin(), va.end()); + out.insert(out.end(), vb.begin(), vb.end()); + std::sort(out.begin(), out.end(), comp); + while (out.size() > k) { + out.pop_back(); + } + return out; + }); + return to_return; + } + + /* Its unclear this makes sense for an associative container. + template + std::pair reduce(MergeFunction merge) const { const derived_type* derived_this = static_cast(this); - auto clambda = [&c](const std::tuple_element<0, for_all_args>::type& item) { - c.async_insert(item); + derived_this->comm().barrier(); + + bool first = true; + + std::pair local_reduce; + + auto rlambda = [&local_reduce, &first, + &merge](const std::pair& value) { + if (first) { + local_reduce = value; + first = false; + } else { + local_reduce = merge(local_reduce, value); + } + }; + + derived_this->for_all(rlambda); + + std::optional> to_reduce; + if (!first) { // local partition was empty! + to_reduce = std::move(local_reduce); + } + + std::optional> to_return = + ::ygm::all_reduce(to_reduce, merge, derived_this->comm()); + YGM_ASSERT_RELEASE(to_return.has_value()); + return to_return.value(); + } + */ + + template + void collect(YGMContainer& c) const { + const auto* derived_this = static_cast(this); + auto clambda = [&c](const key_type& key, const mapped_type& value) { + c.async_insert(std::make_pair(key, value)); }; derived_this->for_all(clambda); } template void reduce_by_key(MapType& map, ReductionOp reducer) const { - const derived_type* derived_this = static_cast(this); + const auto* derived_this = static_cast(this); // static_assert ygm::map using reduce_key_type = typename MapType::key_type; using reduce_value_type = typename MapType::mapped_type; - if constexpr (std::tuple_size::value == 1) { - // must be a std::pair - auto rbklambda = [&map, reducer](std::pair kvp) { - map.async_reduce(kvp.first, kvp.second, reducer); - }; - derived_this->for_all(rbklambda); - } else { - static_assert(std::tuple_size::value == 2); - auto rbklambda = [&map, reducer](const reduce_key_type& key, const reduce_value_type& value) { - map.async_reduce(key, value, reducer); - }; - derived_this->for_all(rbklambda); - } + + static_assert(std::tuple_size::value == 2); + auto rbklambda = [&map, reducer](const reduce_key_type& key, + const reduce_value_type& value) { + map.async_reduce(key, value, reducer); + }; + derived_this->for_all(rbklambda); + } + + template + transform_proxy_key_value transform( + TransformFunction ffn); + + auto keys() { + return transform([](const key_type& key, + const mapped_type& value) -> key_type { return key; }); } - template - map_proxy map(MapFunction ffn); + auto values() { + return transform( + [](const key_type& key, const mapped_type& value) -> mapped_type { + return value; + }); + } - flatten_proxy flatten(); + flatten_proxy_key_value flatten(); template - filter_proxy filter(FilterFunction ffn); + filter_proxy_key_value filter( + FilterFunction ffn); private: template @@ -142,33 +359,64 @@ struct base_iteration { #include #include -#include +#include namespace ygm::container::detail { -template -template -map_proxy -base_iteration::map(MapFunction ffn) { - derived_type* derived_this = static_cast(this); - return map_proxy(*derived_this, ffn); +template +template +transform_proxy_value +base_iteration_value::transform( + TransformFunction ffn) { + auto* derived_this = static_cast(this); + return transform_proxy_value(*derived_this, + ffn); } -template -inline flatten_proxy -base_iteration::flatten() { +template +inline flatten_proxy_value +base_iteration_value::flatten() { // static_assert( // type_traits::is_vector>::value); - derived_type* derived_this = static_cast(this); - return flatten_proxy(*derived_this); + auto* derived_this = static_cast(this); + return flatten_proxy_value(*derived_this); } -template +template template -filter_proxy -base_iteration::filter(FilterFunction ffn) { - derived_type* derived_this = static_cast(this); - return filter_proxy(*derived_this, ffn); +filter_proxy_value +base_iteration_value::filter(FilterFunction ffn) { + auto* derived_this = static_cast(this); + return filter_proxy_value(*derived_this, ffn); } -} // namespace ygm::container::detail \ No newline at end of file +template +template +transform_proxy_key_value +base_iteration_key_value::transform( + TransformFunction ffn) { + auto* derived_this = static_cast(this); + return transform_proxy_key_value( + *derived_this, ffn); +} + +template +inline flatten_proxy_key_value +base_iteration_key_value::flatten() { + // static_assert( + // type_traits::is_vector>::value); + auto* derived_this = static_cast(this); + return flatten_proxy_key_value(*derived_this); +} + +template +template +filter_proxy_key_value +base_iteration_key_value::filter( + FilterFunction ffn) { + auto* derived_this = static_cast(this); + return filter_proxy_key_value(*derived_this, + ffn); +} + +} // namespace ygm::container::detail diff --git a/include/ygm/container/detail/block_partitioner.hpp b/include/ygm/container/detail/block_partitioner.hpp index 83e88a7b..3ff26914 100644 --- a/include/ygm/container/detail/block_partitioner.hpp +++ b/include/ygm/container/detail/block_partitioner.hpp @@ -48,7 +48,7 @@ struct block_partitioner { int to_return; // Owner depends on whether index is before switching to small blocks if (index < (m_partitioned_size % m_comm_size) * m_large_block_size) { - ASSERT_RELEASE(m_large_block_size > 0); + YGM_ASSERT_RELEASE(m_large_block_size > 0); to_return = index / m_large_block_size; } else { if (m_small_block_size == 0) { @@ -56,26 +56,26 @@ struct block_partitioner { << m_partitioned_size << "\t" << m_comm_size << "\t" << index << std::endl; } - ASSERT_RELEASE(m_small_block_size > 0); + YGM_ASSERT_RELEASE(m_small_block_size > 0); to_return = (m_partitioned_size % m_comm_size) + (index - (m_partitioned_size % m_comm_size) * m_large_block_size) / m_small_block_size; } - ASSERT_RELEASE((to_return >= 0) && (to_return < m_comm_size)); + YGM_ASSERT_RELEASE((to_return >= 0) && (to_return < m_comm_size)); return to_return; } index_type local_index(const index_type &global_index) { index_type to_return = global_index - m_local_start_index; - ASSERT_RELEASE((to_return >= 0) && (to_return <= m_small_block_size)); + YGM_ASSERT_RELEASE((to_return >= 0) && (to_return < m_local_size)); return to_return; } index_type global_index(const index_type &local_index) { index_type to_return = m_local_start_index + local_index; - ASSERT_RELEASE(to_return < m_partitioned_size); + YGM_ASSERT_RELEASE(to_return < m_partitioned_size); return to_return; } diff --git a/include/ygm/container/detail/disjoint_set_impl.hpp b/include/ygm/container/detail/disjoint_set_impl.hpp index 0748ced8..f4a79798 100644 --- a/include/ygm/container/detail/disjoint_set_impl.hpp +++ b/include/ygm/container/detail/disjoint_set_impl.hpp @@ -50,7 +50,7 @@ class disjoint_set_impl { private: void increase_rank(const rank_type new_rank) { - ASSERT_RELEASE(m_rank < new_rank); + YGM_ASSERT_RELEASE(m_rank < new_rank); m_rank = new_rank; // Only called on roots @@ -184,12 +184,12 @@ class disjoint_set_impl { const auto &my_parent = item_data.second.get_parent(); const auto my_parent_rank_est = item_data.second.get_parent_rank_estimate(); - ASSERT_RELEASE(my_rank >= merging_rank); + YGM_ASSERT_RELEASE(my_rank >= merging_rank); if (my_rank > merging_rank) { return; } else { - ASSERT_RELEASE(my_rank == merging_rank); + YGM_ASSERT_RELEASE(my_rank == merging_rank); if (my_parent == my_item) { // Merging new item onto root. Need to increase rank. item_data.second.increase_rank(merging_rank + 1); @@ -331,12 +331,12 @@ class disjoint_set_impl { const auto &my_parent = item_data.second.get_parent(); const auto my_parent_rank_est = item_data.second.get_parent_rank_estimate(); - ASSERT_RELEASE(my_rank >= merging_rank); + YGM_ASSERT_RELEASE(my_rank >= merging_rank); if (my_rank > merging_rank) { return; } else { - ASSERT_RELEASE(my_rank == merging_rank); + YGM_ASSERT_RELEASE(my_rank == merging_rank); if (my_parent == my_item) { // Has not found new parent item_data.second.increase_rank(merging_rank + 1); } else { // Tell merging item about new parent diff --git a/include/ygm/container/detail/filter_proxy.hpp b/include/ygm/container/detail/filter_proxy.hpp index 44e3725c..80714a4f 100644 --- a/include/ygm/container/detail/filter_proxy.hpp +++ b/include/ygm/container/detail/filter_proxy.hpp @@ -5,17 +5,60 @@ #pragma once - namespace ygm::container::detail { template -class filter_proxy - : public base_iteration, - typename Container::for_all_args> { +class filter_proxy_value + : public base_iteration_value, + typename Container::for_all_args> { + public: + using for_all_args = typename Container::for_all_args; + + filter_proxy_value(Container& rc, FilterFunction filter) + : m_rcontainer(rc), m_filter_fn(filter) {} + + template + void for_all(Function fn) { + auto flambda = [fn, this](auto&... xs) { + bool b = m_filter_fn(std::forward(xs)...); + if (b) { + fn(std::forward(xs)...); + } + }; + + m_rcontainer.for_all(flambda); + } + + template + void for_all(Function fn) const { + auto flambda = [fn, this](const auto&... xs) { + bool b = m_filter_fn(std::forward(xs)...); + if (b) { + fn(std::forward(xs)...); + } + }; + + m_rcontainer.for_all(flambda); + } + + ygm::comm& comm() { return m_rcontainer.comm(); } + + const ygm::comm& comm() const { return m_rcontainer.comm(); } + + private: + Container& m_rcontainer; + FilterFunction m_filter_fn; +}; + +template +class filter_proxy_key_value + : public base_iteration_key_value< + filter_proxy_key_value, + typename Container::for_all_args> { public: using for_all_args = typename Container::for_all_args; - filter_proxy(Container& rc, FilterFunction filter) + filter_proxy_key_value(Container& rc, FilterFunction filter) : m_rcontainer(rc), m_filter_fn(filter) {} template @@ -51,4 +94,4 @@ class filter_proxy FilterFunction m_filter_fn; }; -} \ No newline at end of file +} // namespace ygm::container::detail diff --git a/include/ygm/container/detail/flatten_proxy.hpp b/include/ygm/container/detail/flatten_proxy.hpp index d8ee4000..8dde155e 100644 --- a/include/ygm/container/detail/flatten_proxy.hpp +++ b/include/ygm/container/detail/flatten_proxy.hpp @@ -8,15 +8,15 @@ namespace ygm::container::detail { template -class flatten_proxy - : public base_iteration, - std::tuple>> { +class flatten_proxy_value + : public base_iteration_value, + std::tuple>> { public: using for_all_args = std::tuple>; - flatten_proxy(Container& rc) : m_rcontainer(rc) {} + flatten_proxy_value(Container& rc) : m_rcontainer(rc) {} template void for_all(Function fn) { @@ -52,4 +52,49 @@ class flatten_proxy Container& m_rcontainer; }; +template +class flatten_proxy_key_value + : public base_iteration_key_value< + flatten_proxy_value, + std::tuple< + std::tuple_element_t<0, typename Container::for_all_args>>> { + public: + using for_all_args = + std::tuple>; + + flatten_proxy_key_value(Container& rc) : m_rcontainer(rc) {} + + template + void for_all(Function fn) { + auto flambda = + [fn](std::tuple_element_t<0, typename Container::for_all_args>& + stlcont) { + for (auto& v : stlcont) { + fn(v); + } + }; + + m_rcontainer.for_all(flambda); + } + + template + void for_all(Function fn) const { + auto flambda = + [fn](std::tuple_element_t<0, typename Container::for_all_args>& + stlcont) { + for (const auto& v : stlcont) { + fn(v); + } + }; + + m_rcontainer.for_all(flambda); + } + + ygm::comm& comm() { return m_rcontainer.comm(); } + + const ygm::comm& comm() const { return m_rcontainer.comm(); } + + private: + Container& m_rcontainer; +}; } // namespace ygm::container::detail diff --git a/include/ygm/container/detail/map_impl.hpp b/include/ygm/container/detail/map_impl.hpp deleted file mode 100644 index 40d617a3..00000000 --- a/include/ygm/container/detail/map_impl.hpp +++ /dev/null @@ -1,379 +0,0 @@ -// Copyright 2019-2021 Lawrence Livermore National Security, LLC and other YGM -// Project Developers. See the top-level COPYRIGHT file for details. -// -// SPDX-License-Identifier: MIT - -#pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace ygm::container::detail { - -template , - typename Compare = std::less, - class Alloc = std::allocator>> -class map_impl { - public: - using self_type = map_impl; - using ptr_type = typename ygm::ygm_ptr; - using mapped_type = Value; - using key_type = Key; - using size_type = size_t; - using ygm_for_all_types = std::tuple; - using container_type = ygm::container::map_tag; - - Partitioner partitioner; - - map_impl(ygm::comm &comm) : m_default_value{}, m_comm(comm), pthis(this) { - pthis.check(m_comm); - } - - map_impl(ygm::comm &comm, const mapped_type &dv) - : m_default_value(dv), m_comm(comm), pthis(this) { - pthis.check(m_comm); - } - - map_impl(const self_type &rhs) - : m_default_value(rhs.m_default_value), m_comm(rhs.m_comm), pthis(this) { - m_local_map.insert(std::begin(rhs.m_local_map), std::end(rhs.m_local_map)); - pthis.check(m_comm); - } - - ~map_impl() { m_comm.barrier(); } - - void async_insert_unique(const key_type &key, const mapped_type &value) { - auto inserter = [](auto mailbox, auto map, const key_type &key, - const mapped_type &value) { - auto itr = map->m_local_map.find(key); - if (itr != map->m_local_map.end()) { - itr->second = value; - } else { - map->m_local_map.insert(std::make_pair(key, value)); - } - }; - int dest = owner(key); - m_comm.async(dest, inserter, pthis, key, value); - } - - void async_insert_if_missing(const key_type &key, const mapped_type &value) { - async_insert_if_missing_else_visit( - key, value, - [](const key_type &k, const mapped_type &v, - const mapped_type &new_value) {}); - } - - void async_insert_multi(const key_type &key, const mapped_type &value) { - auto inserter = [](auto mailbox, auto map, const key_type &key, - const mapped_type &value) { - map->m_local_map.insert(std::make_pair(key, value)); - }; - int dest = owner(key); - m_comm.async(dest, inserter, pthis, key, value); - } - - template - void async_visit(const key_type &key, Visitor visitor, - const VisitorArgs &...args) { - int dest = owner(key); - auto visit_wrapper = [](auto pcomm, auto pmap, const key_type &key, - const VisitorArgs &...args) { - auto range = pmap->m_local_map.equal_range(key); - if (range.first == range.second) { // check if not in range - pmap->m_local_map.insert(std::make_pair(key, pmap->m_default_value)); - range = pmap->m_local_map.equal_range(key); - ASSERT_DEBUG(range.first != range.second); - } - Visitor *vis = nullptr; - pmap->local_visit(key, *vis, args...); - }; - - m_comm.async(dest, visit_wrapper, pthis, key, - std::forward(args)...); - } - - template - void async_visit_group(const key_type &key, Visitor visitor, - const VisitorArgs &...args) { - int dest = owner(key); - auto visit_wrapper = [](auto pcomm, auto pmap, const key_type &key, - const VisitorArgs &...args) { - auto range = pmap->m_local_map.equal_range(key); - if (range.first == range.second) { // check if not in range - pmap->m_local_map.insert(std::make_pair(key, pmap->m_default_value)); - range = pmap->m_local_map.equal_range(key); - ASSERT_DEBUG(range.first != range.second); - } - - ygm::detail::interrupt_mask mask(pmap->m_comm); - - Visitor *vis = nullptr; - ygm::meta::apply_optional( - *vis, std::make_tuple(pmap), - std::forward_as_tuple(range.first, range.second, args...)); - }; - - m_comm.async(dest, visit_wrapper, pthis, key, - std::forward(args)...); - } - - template - void async_visit_if_exists(const key_type &key, Visitor visitor, - const VisitorArgs &...args) { - int dest = owner(key); - auto visit_wrapper = [](auto pcomm, auto pmap, const key_type &key, - const VisitorArgs &...args) { - Visitor *vis = nullptr; - pmap->local_visit(key, *vis, args...); - }; - - m_comm.async(dest, visit_wrapper, pthis, key, - std::forward(args)...); - } - - template - void async_insert_if_missing_else_visit(const key_type &key, - const mapped_type &value, - Visitor visitor, - const VisitorArgs &...args) { - int dest = owner(key); - auto insert_else_visit_wrapper = [](auto pmap, const key_type &key, - const mapped_type &value, - const VisitorArgs &...args) { - auto itr = pmap->m_local_map.find(key); - if (itr == pmap->m_local_map.end()) { - pmap->m_local_map.insert(std::make_pair(key, value)); - } else { - Visitor *vis = nullptr; - pmap->local_visit(key, *vis, value, args...); - } - }; - - m_comm.async(dest, insert_else_visit_wrapper, pthis, key, value, - std::forward(args)...); - } - - template - void async_reduce(const key_type &key, const mapped_type &value, - ReductionOp reducer) { - int dest = owner(key); - auto reduce_wrapper = [](auto pmap, const key_type &key, - const mapped_type &value) { - auto itr = pmap->m_local_map.find(key); - if (itr == pmap->m_local_map.end()) { - pmap->m_local_map.insert(std::make_pair(key, value)); - } else { - ReductionOp *reducer = nullptr; - itr->second = (*reducer)(itr->second, value); - } - }; - - m_comm.async(dest, reduce_wrapper, pthis, key, value); - } - - void async_erase(const key_type &key) { - int dest = owner(key); - auto erase_wrapper = [](auto pcomm, auto pmap, const key_type &key) { - pmap->local_erase(key); - }; - - m_comm.async(dest, erase_wrapper, pthis, key); - } - - size_t local_count(const key_type &key) { return m_local_map.count(key); } - - template - void for_all(Function fn) { - m_comm.barrier(); - local_for_all(fn); - } - - void clear() { - m_comm.barrier(); - m_local_map.clear(); - } - - size_type size() { - m_comm.barrier(); - return m_comm.all_reduce_sum(m_local_map.size()); - } - - size_t count(const key_type &key) { - m_comm.barrier(); - return m_comm.all_reduce_sum(m_local_map.count(key)); - } - - // Doesn't swap pthis. - // should we check comm is equal? -- probably - void swap(self_type &s) { - m_comm.barrier(); - std::swap(m_default_value, s.m_default_value); - m_local_map.swap(s.m_local_map); - } - - template - void all_gather(const STLKeyContainer &keys, MapKeyValue &output) { - ygm::ygm_ptr preturn(&output); - - auto fetcher = [](auto pcomm, int from, const key_type &key, auto pmap, - auto pcont) { - auto returner = [](auto pcomm, const key_type &key, - const std::vector &values, auto pcont) { - for (const auto &v : values) { - pcont->insert(std::make_pair(key, v)); - } - }; - auto values = pmap->local_get(key); - pcomm->async(from, returner, key, values, pcont); - }; - - m_comm.barrier(); - for (const auto &key : keys) { - int o = owner(key); - m_comm.async(o, fetcher, m_comm.rank(), key, pthis, preturn); - } - m_comm.barrier(); - } - - typename ygm::ygm_ptr get_ygm_ptr() const { return pthis; } - - void serialize(const std::string &fname) { - m_comm.barrier(); - std::string rank_fname = fname + std::to_string(m_comm.rank()); - std::ofstream os(rank_fname, std::ios::binary); - cereal::JSONOutputArchive oarchive(os); - oarchive(m_local_map, m_default_value, m_comm.size()); - } - - void deserialize(const std::string &fname) { - m_comm.barrier(); - - std::string rank_fname = fname + std::to_string(m_comm.rank()); - std::ifstream is(rank_fname, std::ios::binary); - - cereal::JSONInputArchive iarchive(is); - int comm_size; - iarchive(m_local_map, m_default_value, comm_size); - - if (comm_size != m_comm.size()) { - m_comm.cerr0( - "Attempting to deserialize map_impl using communicator of " - "different size than serialized with"); - } - } - - int owner(const key_type &key) const { - auto [owner, rank] = partitioner(key, m_comm.size(), 1024); - return owner; - } - - bool is_mine(const key_type &key) const { - return owner(key) == m_comm.rank(); - } - - std::vector local_get(const key_type &key) { - std::vector to_return; - - auto range = m_local_map.equal_range(key); - for (auto itr = range.first; itr != range.second; ++itr) { - to_return.push_back(itr->second); - } - - return to_return; - } - - template - void local_visit(const key_type &key, Function &fn, - const VisitorArgs &...args) { - ygm::detail::interrupt_mask mask(m_comm); - - auto range = m_local_map.equal_range(key); - if constexpr (std::is_invocable() || - std::is_invocable()) { - for (auto itr = range.first; itr != range.second; ++itr) { - ygm::meta::apply_optional( - fn, std::make_tuple(pthis), - std::forward_as_tuple(itr->first, itr->second, args...)); - } - } else { - static_assert(ygm::detail::always_false<>, - "remote map lambda signature must be invocable with (const " - "&key_type, mapped_type&, ...) or (ptr_type, const " - "&key_type, mapped_type&, ...) signatures"); - } - } - - void local_erase(const key_type &key) { m_local_map.erase(key); } - - void local_clear() { m_local_map.clear(); } - - size_type local_size() const { return m_local_map.size(); } - - size_t local_const(const key_type &k) const { return m_local_map.count(k); } - - ygm::comm &comm() { return m_comm; } - - template - void local_for_all(Function fn) { - if constexpr (std::is_invocable()) { - for (std::pair &kv : m_local_map) { - fn(kv.first, kv.second); - } - } else { - static_assert(ygm::detail::always_false<>, - "local map lambda signature must be invocable with (const " - "&key_type, mapped_type&) signature"); - } - } - - template - std::vector> topk(size_t k, - CompareFunction cfn) { - using vec_type = std::vector>; - - m_comm.barrier(); - - vec_type local_topk; - for (const auto &kv : m_local_map) { - local_topk.push_back(kv); - std::sort(local_topk.begin(), local_topk.end(), cfn); - if (local_topk.size() > k) { - local_topk.pop_back(); - } - } - - auto to_return = m_comm.all_reduce( - local_topk, [cfn, k](const vec_type &va, const vec_type &vb) { - vec_type out(va.begin(), va.end()); - out.insert(out.end(), vb.begin(), vb.end()); - std::sort(out.begin(), out.end(), cfn); - while (out.size() > k) { - out.pop_back(); - } - return out; - }); - return to_return; - } - - const mapped_type &default_value() const { return m_default_value; } - - protected: - map_impl() = delete; - - mapped_type m_default_value; - std::multimap m_local_map; - ygm::comm &m_comm; - ptr_type pthis; -}; -} // namespace ygm::container::detail diff --git a/include/ygm/container/detail/map_proxy.hpp b/include/ygm/container/detail/map_proxy.hpp deleted file mode 100644 index 2dcab046..00000000 --- a/include/ygm/container/detail/map_proxy.hpp +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2019-2021 Lawrence Livermore National Security, LLC and other YGM -// Project Developers. See the top-level COPYRIGHT file for details. -// -// SPDX-License-Identifier: MIT - -#pragma once - -#include -#include -#include - -namespace ygm::container::detail { - -namespace type_traits { -template