Skip to content

Commit

Permalink
Refactor workqueue selection mechanism (#1025)
Browse files Browse the repository at this point in the history
  • Loading branch information
abdelrahim-hentabli authored Dec 7, 2024
1 parent 496ce05 commit 32d65c2
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 43 deletions.
42 changes: 15 additions & 27 deletions sources/middle-layer/dispatcher/hw_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "hw_descriptors_api.h"
#include "hw_device.hpp"
#include "util/topology.hpp"
#include "util/util.hpp"

#ifdef DYNAMIC_LOADING_LIBACCEL_CONFIG
#include "hw_configuration_driver.h"
Expand Down Expand Up @@ -63,30 +64,27 @@ void hw_device::fill_hw_context(hw_accelerator_context* const hw_context_ptr) co
}

auto hw_device::enqueue_descriptor(void* desc_ptr) const noexcept -> hw_accelerator_status {
static thread_local std::uint32_t wq_idx = 0;
bool is_op_supported_by_wq = false;
const uint32_t operation = hw_iaa_descriptor_get_operation((hw_descriptor*)desc_ptr);
static thread_local std::uint32_t wq_idx = 0;

const uint32_t operation = hw_iaa_descriptor_get_operation((hw_descriptor*)desc_ptr);
util::bitmask128 bit_index_is_valid_wq = util::bitmask128(queue_count_);

// Must select only workqueues w/ operation enabled
queue_selection_.reduce_by_operation(operation, bit_index_is_valid_wq);
if (bit_index_is_valid_wq == 0U) { return HW_ACCELERATOR_NOT_SUPPORTED_BY_WQ; }

// For small low-latency cases WQ with small transfer size may be preferable
// TODO: order WQs by priority and engines capacity, check transfer sizes and other possible features
for (uint64_t try_count = 0U; try_count < queue_count_; ++try_count) {
hw_iaa_descriptor_set_block_on_fault((hw_descriptor*)desc_ptr, working_queues_[wq_idx].get_block_on_fault());
// If OPCFG functionality exists, check OPCFG register before submitting, otherwise try submission
if (!op_cfg_enabled_ || get_operation_supported_on_wq(wq_idx, operation)) {
// For submitting when OPCFG is supported, logic is :
// If all WQs don't support operation, return HW_ACCELERATOR_NOT_SUPPORTED_BY_WQ
// If any WQ supports operation, but submission fails, then return HW_ACCELERATOR_WQ_IS_BUSY
if (bit_index_is_valid_wq[wq_idx]) {
hw_iaa_descriptor_set_block_on_fault((hw_descriptor*)desc_ptr,
working_queues_[wq_idx].get_block_on_fault());
const qpl_status enqueue_status = working_queues_[wq_idx].enqueue_descriptor(desc_ptr);
is_op_supported_by_wq = true;
if (QPL_STS_OK == enqueue_status) { return HW_ACCELERATOR_STATUS_OK; }
}
wq_idx = (wq_idx + 1) % queue_count_;
}
if (!is_op_supported_by_wq) {
return HW_ACCELERATOR_NOT_SUPPORTED_BY_WQ;
} else {
return HW_ACCELERATOR_WQ_IS_BUSY;
}
return HW_ACCELERATOR_WQ_IS_BUSY;
}

auto hw_device::get_indexing_support_enabled() const noexcept -> uint32_t {
Expand Down Expand Up @@ -133,10 +131,6 @@ auto hw_device::get_force_array_output_support() const noexcept -> bool {
return IC_FORCE_ARRAY(iaa_cap_register_);
}

auto hw_device::get_operation_supported_on_wq(const uint32_t wq_idx, const uint32_t operation) const noexcept -> bool {
return OC_GET_OP_SUPPORTED(op_configs_[wq_idx], operation);
}

auto hw_device::get_load_partial_aecs_support() const noexcept -> bool {
return IC_LOAD_PARTIAL_AECS(iaa_cap_register_);
}
Expand Down Expand Up @@ -245,14 +239,8 @@ auto hw_device::initialize_new_device(descriptor_t* device_descriptor_ptr) noexc

if (queue_count_ == 0) { return HW_ACCELERATOR_WORK_QUEUES_NOT_AVAILABLE; }

// Logic for op_cfg_enabled_ value
op_cfg_enabled_ = working_queues_[0].get_op_configuration_support();

for (uint32_t wq_idx = 0; wq_idx < queue_count_; wq_idx++) {
for (uint32_t register_index = 0; register_index < TOTAL_OP_CFG_BIT_GROUPS; register_index++) {
op_configs_[wq_idx] = working_queues_[wq_idx].get_op_config_register();
}
}
// Initialize queue_selection_ object
queue_selection_ = queue_selector(working_queues_, queue_count_);

return HW_ACCELERATOR_STATUS_OK;
}
Expand Down
27 changes: 11 additions & 16 deletions sources/middle-layer/dispatcher/hw_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "hw_devices.h"
#include "hw_queue.hpp"
#include "hw_status.h"
#include "queue_selector.hpp"

namespace qpl::ml::dispatcher {

Expand All @@ -25,9 +26,7 @@ class hw_device final {

static constexpr uint32_t max_working_queues = MAX_NUM_WQ;

using queues_container_t = std::array<hw_queue, max_working_queues>;
using op_config_register_t = std::array<uint32_t, TOTAL_OP_CFG_BIT_GROUPS>;
using opcfg_container_t = std::array<op_config_register_t, max_working_queues>;
using queues_container_t = std::array<hw_queue, max_working_queues>;

public:
using descriptor_t = void;
Expand Down Expand Up @@ -72,24 +71,20 @@ class hw_device final {

[[nodiscard]] auto get_force_array_output_support() const noexcept -> bool;

[[nodiscard]] auto get_operation_supported_on_wq(const uint32_t wq_idx, const uint32_t operation) const noexcept
-> bool;

[[nodiscard]] auto get_load_partial_aecs_support() const noexcept -> bool;

[[nodiscard]] auto is_matching_user_numa_policy(int32_t user_specified_numa_id) const noexcept -> bool;

private:
queues_container_t working_queues_ = {}; /**< Set of available HW working queues */
opcfg_container_t op_configs_ = {}; /**< Array of OPCFG register content for each available HW working queue */
uint32_t queue_count_ = 0U; /**< Number of working queues that are available */
uint64_t gen_cap_register_ = 0U; /**< GENCAP register content */
uint64_t iaa_cap_register_ = 0U; /**< IAACAP register content */
uint64_t numa_node_id_ = 0U; /**< NUMA node id of the device */
uint32_t version_major_ = 0U; /**< Major version of discovered device */
uint32_t version_minor_ = 0U; /**< Minor version of discovered device */
bool op_cfg_enabled_ = false; /**< Need to check workqueue's OPCFG register */
uint64_t socket_id_ = 0u; /**< Socket id of the device */
queues_container_t working_queues_ = {}; /**< Set of available HW working queues */
uint32_t queue_count_ = 0U; /**< Number of working queues that are available */
uint64_t gen_cap_register_ = 0U; /**< GENCAP register content */
uint64_t iaa_cap_register_ = 0U; /**< IAACAP register content */
uint64_t numa_node_id_ = 0U; /**< NUMA node id of the device */
uint32_t version_major_ = 0U; /**< Major version of discovered device */
uint32_t version_minor_ = 0U; /**< Minor version of discovered device */
uint64_t socket_id_ = 0u; /**< Socket id of the device */
queue_selector queue_selection_; /**< Queue Selection object */
};

#endif
Expand Down
87 changes: 87 additions & 0 deletions sources/middle-layer/dispatcher/queue_selector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*******************************************************************************
* Copyright (C) 2024 Intel Corporation
*
* SPDX-License-Identifier: MIT
******************************************************************************/

#ifndef QPL_SOURCES_MIDDLE_LAYER_DISPATCHER_QUEUE_SELECTOR_HPP_
#define QPL_SOURCES_MIDDLE_LAYER_DISPATCHER_QUEUE_SELECTOR_HPP_

#include <unordered_map>

#include "hw_devices.h"
#include "hw_iaa_flags.h"
#include "hw_queue.hpp"
#include "util/util.hpp"

namespace qpl::ml::dispatcher {

class queue_selector {
static constexpr uint32_t max_working_queues = MAX_NUM_WQ;

// Operation codes for Intel® In-Memory Analytics Accelerator (Intel® IAA)
// Used to check OPCFG if operation is enabled/disabled
static constexpr uint32_t opcodes_list[] = {QPL_OPCODE_DECOMPRESS, QPL_OPCODE_COMPRESS, QPL_OPCODE_CRC64,
QPL_OPCODE_SCAN, QPL_OPCODE_EXTRACT, QPL_OPCODE_SELECT,
QPL_OPCODE_EXPAND};

using queues_container_t = std::array<hw_queue, max_working_queues>;
using op_config_register_t = std::array<uint32_t, TOTAL_OP_CFG_BIT_GROUPS>;
using opcfg_container_t = std::array<op_config_register_t, max_working_queues>;

public:
queue_selector() = default;

/**
* @brief Initialize the queue_selector object.This constructor initializes the map of operation code to disabled wq.
*/
queue_selector(const queues_container_t& working_queues, const uint8_t total_wq_size) {
bool op_cfg_enabled = working_queues[0].get_op_configuration_support();

if (!op_cfg_enabled) {
for (uint32_t operation : opcodes_list) {
wq_map_operation_enabled_to_bitmask_[operation] = util::bitmask128(total_wq_size);
}
} else {
for (uint32_t operation : opcodes_list) {
util::bitmask128 bit_index_is_valid_wq;

for (uint32_t wq_idx = 0; wq_idx < total_wq_size; wq_idx++) {
if (OC_GET_OP_SUPPORTED(working_queues[wq_idx].get_op_config_register(), operation)) {
if (wq_idx < 64) {
bit_index_is_valid_wq.low |= static_cast<uint64_t>(1U) << wq_idx;
} else {
bit_index_is_valid_wq.high |= static_cast<uint64_t>(1U) << (wq_idx - 64);
}
}
}

wq_map_operation_enabled_to_bitmask_[operation] = bit_index_is_valid_wq;
}
}
}

/**
* @brief Reduce the number of valid WQs based on the operation code. Disabled workqueues are marked as invalid.
* @param [in] operation Operation code
* @param [in, out] bit_index_is_valid_wq Bitmask of size 128 bits (2 uint64_t) where each bit corresponds to a WQ.
* If bit is set, WQ is valid, otherwise it is disabled.
*/
void reduce_by_operation(const uint32_t operation, util::bitmask128& bit_index_is_valid_wq) const noexcept {
if (wq_map_operation_enabled_to_bitmask_.find(operation) != wq_map_operation_enabled_to_bitmask_.end()) {
bit_index_is_valid_wq.low &= wq_map_operation_enabled_to_bitmask_.at(operation).low;
bit_index_is_valid_wq.high &= wq_map_operation_enabled_to_bitmask_.at(operation).high;
}
}

private:
/* Map of operation to enabled WQ indexes
* Key: Operation code
* Value: LE-64 Bitmask of size 128 bits of WQ indexes where operation is enabled
*/
std::unordered_map<uint32_t, util::bitmask128> wq_map_operation_enabled_to_bitmask_;
};

} // namespace qpl::ml::dispatcher

#endif // QPL_SOURCES_MIDDLE_LAYER_DISPATCHER_QUEUE_SELECTOR_HPP_
31 changes: 31 additions & 0 deletions sources/middle-layer/util/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,37 @@ mask_type build_mask(uint32_t number_of_bits) {
}
}

struct bitmask128 {
uint64_t low;
uint64_t high;

bitmask128() noexcept : low(0U), high(0U) {}
bitmask128(const uint32_t size) noexcept {
if (size < 64) {
low = (1U << size) - 1U;
high = 0U;
} else if (size < 128) {
low = UINT64_MAX;
high = (1U << (size - 64)) - 1U;
} else {
low = UINT64_MAX;
high = UINT64_MAX;
}
}

bool operator[](const uint32_t idx) const noexcept {
if (idx < 64) {
return (low & (static_cast<uint64_t>(1U) << idx)) != 0U;
} else if (idx < 128) {
return (high & (static_cast<uint64_t>(1U) << (idx - 64))) != 0U;
} else {
return false;
}
}

bool operator==(const uint64_t& rhs) const noexcept { return low == rhs && high == 0U; }
};

} // namespace util
} // namespace qpl::ml

Expand Down

0 comments on commit 32d65c2

Please sign in to comment.