diff --git a/source/adapters/native_cpu/CMakeLists.txt b/source/adapters/native_cpu/CMakeLists.txt index 17467bfdef..50559d57ba 100644 --- a/source/adapters/native_cpu/CMakeLists.txt +++ b/source/adapters/native_cpu/CMakeLists.txt @@ -35,6 +35,7 @@ add_ur_adapter(${TARGET_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/queue.cpp ${CMAKE_CURRENT_SOURCE_DIR}/queue.hpp ${CMAKE_CURRENT_SOURCE_DIR}/sampler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/threadpool.hpp ${CMAKE_CURRENT_SOURCE_DIR}/ur_interface_loader.cpp ${CMAKE_CURRENT_SOURCE_DIR}/usm_p2p.cpp ${CMAKE_CURRENT_SOURCE_DIR}/virtual_mem.cpp @@ -49,6 +50,34 @@ set_target_properties(${TARGET_NAME} PROPERTIES SOVERSION "${PROJECT_VERSION_MAJOR}" ) +# oneTBB is used as an optional NativeCPU backend and disabled by default. +option(NATIVECPU_WITH_ONETBB "Use oneTBB as backend for Native CPU" OFF) +if(NATIVECPU_WITH_ONETBB) + message(STATUS "Configuring Native CPU adapter with oneTBB backend.") + + include(FetchContent) + FetchContent_Declare( + tbb + GIT_REPOSITORY https://github.com/uxlfoundation/oneTBB.git +#commit 9d4578723827f31defd79389819a5fbf659577f7 (HEAD -> master, origin/master, origin/HEAD) +#Author: Konstantin Boyarinov <konstantin.boyarinov@intel.com> +#Date: Fri Jan 24 23:23:59 2025 +0200 +# Add explicit deduction guides for blocked_nd_range (#1525) + GIT_TAG 9d4578723827f31defd79389819a5fbf659577f7 + CMAKE_ARGS "-DTBB_TEST:BOOL=OFF -DTBB_EXAMPLES:BOOL=OFF -DTBB_BENCH:BOOL=OFF" + OVERRIDE_FIND_PACKAGE + ) + set(TBB_TEST OFF CACHE INTERNAL "" FORCE) + set(TBB_EXAMPLES OFF CACHE INTERNAL "" FORCE) + set(TBB_BENCH OFF CACHE INTERNAL "" FORCE) + set(TBB_BUILD ON CACHE INTERNAL "" FORCE) + set(TBB_FIND_PACKAGE OFF CACHE INTERNAL "" FORCE) + set(TBB_FUZZ_TESTING OFF CACHE INTERNAL "" FORCE) + set(TBB_INSTALL ON CACHE INTERNAL "" FORCE) + set (CMAKE_INCLUDE_CURRENT_DIR OFF) + FetchContent_MakeAvailable(tbb) +endif() + find_package(Threads REQUIRED) target_link_libraries(${TARGET_NAME} PRIVATE @@ -61,3 +90,23 @@ target_link_libraries(${TARGET_NAME} PRIVATE target_include_directories(${TARGET_NAME} PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../../" ) + +if(NATIVECPU_WITH_ONETBB) + target_link_libraries(${TARGET_NAME} PRIVATE + TBB::tbb + ) + if (NOT MSVC) + # oneTBB currently casts away some const qualifiers + # todo: check if compiler actually supports these options + target_compile_options(tbb PRIVATE -Wno-cast-qual) + target_compile_options(tbbmalloc PRIVATE -Wno-cast-qual) + endif() + + # Undefine _DEBUG option in release builds to find + # release tbbbind + if (NOT uppercase_CMAKE_BUILD_TYPE STREQUAL "DEBUG") + target_compile_options(tbb PRIVATE -U_DEBUG) + endif() + + target_compile_definitions(${TARGET_NAME} PRIVATE NATIVECPU_WITH_ONETBB) +endif() diff --git a/source/adapters/native_cpu/context.hpp b/source/adapters/native_cpu/context.hpp index b9d2d22dd1..8168e0d10e 100644 --- a/source/adapters/native_cpu/context.hpp +++ b/source/adapters/native_cpu/context.hpp @@ -116,7 +116,7 @@ struct ur_context_handle_t_ : RefCounted { // We need to ensure that we align to at least alignof(usm_alloc_info), // otherwise its start address may be unaligned. alignment = - std::max<size_t>(alignment, alignof(native_cpu::usm_alloc_info)); + std::max<uint32_t>(alignment, alignof(native_cpu::usm_alloc_info)); void *alloc = native_cpu::malloc_impl(alignment, size); if (!alloc) return nullptr; diff --git a/source/adapters/native_cpu/enqueue.cpp b/source/adapters/native_cpu/enqueue.cpp index ec5a6cf339..22cf26602f 100644 --- a/source/adapters/native_cpu/enqueue.cpp +++ b/source/adapters/native_cpu/enqueue.cpp @@ -53,8 +53,8 @@ struct NDRDescT { } // namespace native_cpu #ifdef NATIVECPU_USE_OCK -static native_cpu::state getResizedState(const native_cpu::NDRDescT &ndr, - size_t itemsPerThread) { +static inline native_cpu::state getResizedState(const native_cpu::NDRDescT &ndr, + size_t itemsPerThread) { native_cpu::state resized_state( ndr.GlobalSize[0], ndr.GlobalSize[1], ndr.GlobalSize[2], itemsPerThread, ndr.LocalSize[1], ndr.LocalSize[2], ndr.GlobalOffset[0], @@ -107,7 +107,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( auto &tp = hQueue->getDevice()->tp; const size_t numParallelThreads = tp.num_threads(); hKernel->updateMemPool(numParallelThreads); - std::vector<std::future<void>> futures; + auto Tasks = native_cpu::getScheduler(tp); std::vector<std::function<void(size_t, ur_kernel_handle_t_)>> groups; auto numWG0 = ndr.GlobalSize[0] / ndr.LocalSize[0]; auto numWG1 = ndr.GlobalSize[1] / ndr.LocalSize[1]; @@ -159,17 +159,17 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( for (unsigned g2 = 0; g2 < numWG2; g2++) { for (unsigned g1 = 0; g1 < numWG1; g1++) { for (unsigned g0 = 0; g0 < new_num_work_groups_0; g0 += 1) { - futures.emplace_back(tp.schedule_task( + Tasks.schedule( [ndr, itemsPerThread, kernel = *hKernel, g0, g1, g2](size_t) { native_cpu::state resized_state = getResizedState(ndr, itemsPerThread); resized_state.update(g0, g1, g2); kernel._subhandler(kernel.getArgs().data(), &resized_state); - })); + }); } // Peel the remaining work items. Since the local size is 1, we iterate // over the work groups. - for (unsigned g0 = new_num_work_groups_0 * itemsPerThread; g0 < numWG0; + for (size_t g0 = new_num_work_groups_0 * itemsPerThread; g0 < numWG0; g0++) { state.update(g0, g1, g2); hKernel->_subhandler(hKernel->getArgs().data(), &state); @@ -179,26 +179,25 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( } else { // We are running a parallel_for over an nd_range - if (numWG1 * numWG2 >= numParallelThreads) { // Dimensions 1 and 2 have enough work, split them across the threadpool for (unsigned g2 = 0; g2 < numWG2; g2++) { for (unsigned g1 = 0; g1 < numWG1; g1++) { - futures.emplace_back( - tp.schedule_task([state, kernel = *hKernel, numWG0, g1, g2, - numParallelThreads](size_t threadId) mutable { - for (unsigned g0 = 0; g0 < numWG0; g0++) { - kernel.handleLocalArgs(numParallelThreads, threadId); - state.update(g0, g1, g2); - kernel._subhandler(kernel.getArgs().data(), &state); - } - })); + Tasks.schedule([state, kernel = *hKernel, numWG0, g1, g2, + numParallelThreads](size_t threadId) mutable { + for (unsigned g0 = 0; g0 < numWG0; g0++) { + kernel.handleLocalArgs(numParallelThreads, threadId); + state.update(g0, g1, g2); + kernel._subhandler(kernel.getArgs().data(), &state); + } + }); } } } else { // Split dimension 0 across the threadpool // Here we try to create groups of workgroups in order to reduce // synchronization overhead + groups.reserve(numWG2 * numWG1 * numWG0); for (unsigned g2 = 0; g2 < numWG2; g2++) { for (unsigned g1 = 0; g1 < numWG1; g1++) { for (unsigned g0 = 0; g0 < numWG0; g0++) { @@ -214,35 +213,34 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueKernelLaunch( } auto numGroups = groups.size(); auto groupsPerThread = numGroups / numParallelThreads; - auto remainder = numGroups % numParallelThreads; - for (unsigned thread = 0; thread < numParallelThreads; thread++) { - futures.emplace_back( - tp.schedule_task([groups, thread, groupsPerThread, - kernel = *hKernel](size_t threadId) { - for (unsigned i = 0; i < groupsPerThread; i++) { - auto index = thread * groupsPerThread + i; - groups[index](threadId, kernel); - } - })); + if (groupsPerThread) { + for (unsigned thread = 0; thread < numParallelThreads; thread++) { + Tasks.schedule([groups, thread, groupsPerThread, + kernel = *hKernel](size_t threadId) { + for (unsigned i = 0; i < groupsPerThread; i++) { + auto index = thread * groupsPerThread + i; + groups[index](threadId, kernel); + } + }); + } } - // schedule the remaining tasks + auto remainder = numGroups % numParallelThreads; if (remainder) { - futures.emplace_back( - tp.schedule_task([groups, remainder, - scheduled = numParallelThreads * groupsPerThread, - kernel = *hKernel](size_t threadId) { - for (unsigned i = 0; i < remainder; i++) { - auto index = scheduled + i; - groups[index](threadId, kernel); - } - })); + Tasks.schedule([groups, remainder, + scheduled = numParallelThreads * groupsPerThread, + kernel = *hKernel](size_t threadId) { + for (unsigned i = 0; i < remainder; i++) { + auto index = scheduled + i; + groups[index](threadId, kernel); + } + }); } } } #endif // NATIVECPU_USE_OCK - event->set_futures(futures); + event->set_futures(Tasks.getTaskInfo()); *phEvent = event; event->set_callback([hKernel, event]() { @@ -456,7 +454,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueMemBufferFill( // TODO: error checking // TODO: handle async void *startingPtr = hBuffer->_mem + offset; - unsigned steps = size / patternSize; + size_t steps = size / patternSize; for (unsigned i = 0; i < steps; i++) { memcpy(static_cast<int8_t *>(startingPtr) + i * patternSize, pPattern, patternSize); @@ -597,7 +595,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueUSMFill( break; } default: { - for (unsigned int step{0}; step < size; step += patternSize) { + for (size_t step{0}; step < size; step += patternSize) { auto *dest = reinterpret_cast<void *>( reinterpret_cast<uint8_t *>(ptr) + step); memcpy(dest, pPattern, patternSize); diff --git a/source/adapters/native_cpu/event.cpp b/source/adapters/native_cpu/event.cpp index 37eaf1f6d1..b03591dc57 100644 --- a/source/adapters/native_cpu/event.cpp +++ b/source/adapters/native_cpu/event.cpp @@ -11,6 +11,7 @@ #include "ur_api.h" #include "common.hpp" +#include "device.hpp" #include "event.hpp" #include "queue.hpp" #include <cstdint> @@ -123,7 +124,7 @@ UR_APIEXPORT ur_result_t UR_APICALL urEnqueueTimestampRecordingExp( ur_event_handle_t_::ur_event_handle_t_(ur_queue_handle_t queue, ur_command_t command_type) : queue(queue), context(queue->getContext()), command_type(command_type), - done(false) { + done(false), futures(queue->getDevice()->tp) { this->queue->addEvent(this); } @@ -138,9 +139,7 @@ void ur_event_handle_t_::wait() { if (done) { return; } - for (auto &f : futures) { - f.wait(); - } + this->futures.wait_all(); queue->removeEvent(this); done = true; // The callback may need to acquire the lock, so we unlock it here diff --git a/source/adapters/native_cpu/event.hpp b/source/adapters/native_cpu/event.hpp index 60176a33a6..ac3a322e21 100644 --- a/source/adapters/native_cpu/event.hpp +++ b/source/adapters/native_cpu/event.hpp @@ -9,6 +9,7 @@ //===----------------------------------------------------------------------===// #pragma once #include "common.hpp" +#include "threadpool.hpp" #include "ur_api.h" #include <cstdint> #include <future> @@ -40,7 +41,8 @@ struct ur_event_handle_t_ : RefCounted { ur_command_t getCommandType() const { return command_type; } - void set_futures(std::vector<std::future<void>> &fs) { + // todo: get rid of this function + void set_futures(native_cpu::tasksinfo_t &&fs) { std::lock_guard<std::mutex> lock(mutex); futures = std::move(fs); } @@ -59,7 +61,7 @@ struct ur_event_handle_t_ : RefCounted { ur_command_t command_type; bool done; std::mutex mutex; - std::vector<std::future<void>> futures; + native_cpu::tasksinfo_t futures; std::function<void()> callback; uint64_t timestamp_start = 0; uint64_t timestamp_end = 0; diff --git a/source/adapters/native_cpu/kernel.hpp b/source/adapters/native_cpu/kernel.hpp index 4d2dec85cb..6ca3eae777 100644 --- a/source/adapters/native_cpu/kernel.hpp +++ b/source/adapters/native_cpu/kernel.hpp @@ -22,7 +22,7 @@ using nativecpu_task_t = std::function<nativecpu_kernel_t>; struct local_arg_info_t { uint32_t argIndex; size_t argSize; - local_arg_info_t(uint32_t argIndex, size_t argSize) + inline local_arg_info_t(uint32_t argIndex, size_t argSize) : argIndex(argIndex), argSize(argSize) {} }; @@ -41,7 +41,7 @@ struct ur_kernel_handle_t_ : RefCounted { incrementReferenceCount(); } - ~ur_kernel_handle_t_() { + inline ~ur_kernel_handle_t_() { if (decrementReferenceCount() == 0) { free(_localMemPool); Args.deallocate(); diff --git a/source/adapters/native_cpu/nativecpu_state.hpp b/source/adapters/native_cpu/nativecpu_state.hpp index 9d6b4f4f06..68743c33cf 100644 --- a/source/adapters/native_cpu/nativecpu_state.hpp +++ b/source/adapters/native_cpu/nativecpu_state.hpp @@ -20,9 +20,9 @@ struct state { size_t MNumGroups[3]; size_t MGlobalOffset[3]; uint32_t NumSubGroups, SubGroup_id, SubGroup_local_id, SubGroup_size; - state(size_t globalR0, size_t globalR1, size_t globalR2, size_t localR0, - size_t localR1, size_t localR2, size_t globalO0, size_t globalO1, - size_t globalO2) + inline state(size_t globalR0, size_t globalR1, size_t globalR2, + size_t localR0, size_t localR1, size_t localR2, size_t globalO0, + size_t globalO1, size_t globalO2) : MGlobal_range{globalR0, globalR1, globalR2}, MWorkGroup_size{localR0, localR1, localR2}, MNumGroups{globalR0 / localR0, globalR1 / localR1, globalR2 / localR2}, @@ -42,8 +42,8 @@ struct state { SubGroup_size = 1; } - void update(size_t group0, size_t group1, size_t group2, size_t local0, - size_t local1, size_t local2) { + inline void update(size_t group0, size_t group1, size_t group2, size_t local0, + size_t local1, size_t local2) { MWorkGroup_id[0] = group0; MWorkGroup_id[1] = group1; MWorkGroup_id[2] = group2; @@ -58,7 +58,7 @@ struct state { MWorkGroup_size[2] * MWorkGroup_id[2] + MLocal_id[2] + MGlobalOffset[2]; } - void update(size_t group0, size_t group1, size_t group2) { + inline void update(size_t group0, size_t group1, size_t group2) { MWorkGroup_id[0] = group0; MWorkGroup_id[1] = group1; MWorkGroup_id[2] = group2; diff --git a/source/adapters/native_cpu/threadpool.hpp b/source/adapters/native_cpu/threadpool.hpp index 2f2f79cd5a..a016131f67 100644 --- a/source/adapters/native_cpu/threadpool.hpp +++ b/source/adapters/native_cpu/threadpool.hpp @@ -208,7 +208,84 @@ template <typename ThreadPoolT> class threadpool_interface { return workerTask->get_future(); } }; +using simple_threadpool_t = threadpool_interface<detail::simple_thread_pool>; -using threadpool_t = threadpool_interface<detail::simple_thread_pool>; +class TasksInfo_TP { + using FType = std::future<void>; + std::vector<FType> futures; +public: + inline void schedule(FType &&f) { futures.emplace_back(std::move(f)); } + inline void wait_all() { + for (auto &f : futures) + f.wait(); + } + TasksInfo_TP(simple_threadpool_t &) {} +}; + +template <class TP, class TaskInfo> struct Scheduler_base { + TP &ref; + TaskInfo ti; + Scheduler_base(TP &ref_) : ref(ref_), ti(ref_) {} + TaskInfo getTaskInfo() { return std::move(ti); } +}; + +template <class TP> struct Scheduler : Scheduler_base<TP, TasksInfo_TP> { + using Scheduler_base<TP, TasksInfo_TP>::Scheduler_base; + + inline void schedule(worker_task_t &&task) { + this->ti.schedule(this->ref.schedule_task(std::move(task))); + } +}; + +template <class TPType> inline Scheduler<TPType> getScheduler(TPType &tp) { + return Scheduler<TPType>(tp); +} + +} // namespace native_cpu + +#ifdef NATIVECPU_WITH_ONETBB +// Simple TBB backend +#include "oneapi/tbb.h" +namespace native_cpu { + +struct TBB_threadpool { + oneapi::tbb::task_group tasks; + inline size_t num_threads() const noexcept { + return oneapi::tbb::info::default_concurrency(); + } +}; + +class TBB_TasksInfo { + TBB_threadpool *tp; + +public: + inline void wait_all() { tp->tasks.wait(); } + TBB_TasksInfo(TBB_threadpool &t) : tp(&t) {} +}; + +template <> +struct Scheduler<TBB_threadpool> + : Scheduler_base<TBB_threadpool, TBB_TasksInfo> { + using Scheduler_base<TBB_threadpool, TBB_TasksInfo>::Scheduler_base; + template <class T> inline void schedule(T &&task_) { + ref.tasks.run(std::function<void()>([task = std::move(task_)]() mutable { + auto thread_id = tbb::this_task_arena::current_thread_index(); + assert(thread_id >= 0 && + thread_id < oneapi::tbb::info::default_concurrency()); + task(thread_id); + })); + } +}; + +using tasksinfo_t = TBB_TasksInfo; +using threadpool_t = TBB_threadpool; +} // namespace native_cpu + +#else +// The default backend +namespace native_cpu { +using tasksinfo_t = TasksInfo_TP; +using threadpool_t = simple_threadpool_t; } // namespace native_cpu +#endif