Skip to content

Commit

Permalink
Merge pull request #804 from kordejong/gh484
Browse files Browse the repository at this point in the history
Fix from_numpy crash
  • Loading branch information
kordejong authored Feb 6, 2025
2 parents f4a5d5e + 81f7f1a commit 78b92b2
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 232 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -750,27 +750,27 @@ namespace lue {

auto const& ondp = std::get<0>(policies.outputs_policies()).output_no_data_policy();

// A single instance of this class are used to instantiate all new partitions. This
// happens asynchronously. A partition client (a future to the server instance)
// is returned immediately, but the server instance itself is not created yet.
// To be able to create the server instance later on, when the
// InstantiateFromBuffer instance is already gone, we still need to be able to
// access to the buffer. Because of that, we pass a copy of the BufferHandle
// instance and the GrabBuffer function to the lambda. Copying the BufferHandle
// A single instance of this class is used to instantiate all new partitions. This happens
// asynchronously. A partition client (a future to the server instance) is returned
// immediately, but the server instance itself is not created yet. To be able to create the
// server instance later on, when the InstantiateFromBuffer instance is already gone, we still
// need to be able to access to the buffer. Because of that, we pass a copy of the
// BufferHandle instance and the GrabBuffer function to the lambda. Copying the BufferHandle
// instance increases the reference count and keeps the underlying buffer alive.

return hpx::async(

[locality_id,
ondp,
array_shape,
offset,
partition_shape,
buffer_handle = _buffer_handle,
buffer_handle = _buffer_handle, // Copy, increases reference count
grab_buffer = _grab_buffer,
no_data_value = _no_data_value]()
{
// Create a partition instance and copy the relevant cells from
// the input buffer to the partition instance
// Create a partition instance and copy the relevant cells from the input buffer to
// the partition instance

// A 1D array of elements to copy
Element const* buffer = grab_buffer(buffer_handle);
Expand Down Expand Up @@ -863,10 +863,7 @@ namespace lue {


template<typename Functor>
concept PartitionInstantiator = requires(Functor functor)
{
functor.instantiate_per_locality;
};
concept PartitionInstantiator = requires(Functor functor) { functor.instantiate_per_locality; };


/*!
Expand Down Expand Up @@ -1004,7 +1001,8 @@ namespace lue {
// create array, fill with fill value --------------------------------------------------------------------

template<typename Element, Rank rank>
requires std::is_arithmetic_v<Element> auto create_partitioned_array(
requires std::is_arithmetic_v<Element>
auto create_partitioned_array(
Shape<Count, rank> const& array_shape,
Shape<Count, rank> const& partition_shape,
Scalar<Element> const& fill_value) -> PartitionedArray<Element, rank>
Expand All @@ -1018,7 +1016,8 @@ namespace lue {


template<typename Element, Rank rank>
requires std::is_arithmetic_v<Element> auto create_partitioned_array(
requires std::is_arithmetic_v<Element>
auto create_partitioned_array(
Shape<Count, rank> const& array_shape,
Shape<Count, rank> const& partition_shape,
Element const fill_value) -> PartitionedArray<Element, rank>
Expand All @@ -1028,17 +1027,18 @@ namespace lue {


template<typename Element, Rank rank>
requires std::is_arithmetic_v<Element> auto create_partitioned_array(
Shape<Count, rank> const& array_shape,
Scalar<Element> const& fill_value) -> PartitionedArray<Element, rank>
requires std::is_arithmetic_v<Element>
auto create_partitioned_array(Shape<Count, rank> const& array_shape, Scalar<Element> const& fill_value)
-> PartitionedArray<Element, rank>
{
return create_partitioned_array(array_shape, default_partition_shape(array_shape), fill_value);
}


template<typename Element, Rank rank>
requires std::is_arithmetic_v<Element> auto create_partitioned_array(
Shape<Count, rank> const& array_shape, Element const fill_value) -> PartitionedArray<Element, rank>
requires std::is_arithmetic_v<Element>
auto create_partitioned_array(Shape<Count, rank> const& array_shape, Element const fill_value)
-> PartitionedArray<Element, rank>
{
return create_partitioned_array(
array_shape, default_partition_shape(array_shape), Scalar<Element>{fill_value});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ BOOST_AUTO_TEST_CASE(use_case_2)
using BufferHandle = std::shared_ptr<Buffer>;

BufferHandle buffer_handle = std::make_shared<Buffer>(nr_values);
BOOST_CHECK_EQUAL(buffer_handle.use_count(), 1);
std::iota(buffer_handle->begin(), buffer_handle->end(), 0);

// Create a partitioned array, passing in the buffer
Expand All @@ -352,9 +353,9 @@ BOOST_AUTO_TEST_CASE(use_case_2)
auto const& partitions = array.partitions();
lue::wait_all(partitions);

// buffer_handle and the functor containing a copy of buffer_handle
// buffer_handle and the functor contain a copy of buffer_handle
// At least once this test failed (use_count was 3). Could be this specific test is wrong.
// BOOST_CHECK_EQUAL(buffer_handle.use_count(), 2);
BOOST_CHECK_EQUAL(buffer_handle.use_count(), 2);

auto const [nr_partitions0, nr_partitions1] = array.partitions().shape();

Expand Down
68 changes: 23 additions & 45 deletions source/framework/python/source/numpy/from_numpy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@ namespace lue::framework {
namespace {

template<typename T>
using Array = pybind11::array_t<T>;

template<typename T>
using Object = std::tuple<Array<T>, pybind11::buffer_info>;

template<typename T>
using ReferenceCountedObject = std::shared_ptr<Object<T>>;
using ReferenceCountedObject = std::shared_ptr<T[]>;

} // Anonymous namespace

Expand All @@ -30,18 +24,6 @@ namespace lue::framework {

namespace lue::detail {

// template<
// typename T>
// class ArrayTraits<
// pybind11::array_t<T>>
// {

// public:

// using Element = T;

// };

template<typename T>
class ArrayTraits<lue::framework::ReferenceCountedObject<T>>
{
Expand All @@ -60,7 +42,7 @@ namespace lue::framework {

template<typename Element>
auto from_numpy_py(
pybind11::array_t<Element> const& array,
pybind11::array_t<Element, pybind11::array::c_style> const& array,
std::optional<pybind11::tuple> const& partition_shape,
std::optional<Element> const& no_data_value) -> pybind11::object
{
Expand Down Expand Up @@ -99,41 +81,37 @@ namespace lue::framework {
static_partition_shape = default_partition_shape(static_array_shape);
}

// The goal is to create a partitioned array, given a Numpy array. We want to do
// this asynchronously. So, even though the function returns an array immediately,
// the partitions might not be ready to use yet.
// This is nice, because other code can already attach continuations to the
// partitions. Or do something else entirely.
// This is a bit dangerous as well:
// - Modifying the Numpy array while partitions are still being created
// results in rubish in the resulting array. Don't do this.
// - The Numpy array must not be deleted while array partitions are still being
// created. This is prevented by copying array instances around. This increases
// the reference count of the underlying pybind11 object.

// The reference counted object is a shared pointer of:
// - The array passed in
// - The pybind11::buffer_info instance of the array
// This second thing provides us with a pointer to the buffer array elements. Obtaining
// it should only be done once, for all partitions. Therefore we do that here and glue
// it to the array. A shared pointer to this tuple is passed around in the create_array
// function. We don't use the reference counting of the array itself, apart from
// the one copy we make when we create the tuple.
// The goal is to create a partitioned array, given a Numpy array. We want to do this
// asynchronously. So, even though the function returns an array immediately, the partitions might
// not be ready to use yet.
//
// This is nice, because other code can already attach continuations to the partitions. Or do
// something else entirely.
//
// The problem is that we can't easily use the Python API in threads. This requires obtaining and
// releasing the GIL from/to the thread running the Python interpreter.
// A solution for this is to just copy the data here, and then asynchronously create the
// partitions

using Policies = lue::policy::create_partitioned_array::DefaultValuePolicies<Element>;
using Functor = lue::InstantiateFromBuffer<ReferenceCountedObject<Element>, 2>;

ReferenceCountedObject<Element> object{
std::make_shared<Object<Element>>(std::make_tuple(array, array.request()))};
// The source array buffer
pybind11::buffer_info source_array_buffer_info{array.request()};
Element const* source_buffer{static_cast<Element*>(source_array_buffer_info.ptr)};

// The target array buffer
auto const nr_elements(lue::nr_elements(static_array_shape));
ReferenceCountedObject<Element> copy_buffer(std::make_shared<Element[]>(nr_elements));
std::copy(source_buffer, source_buffer + nr_elements, copy_buffer.get());

return pybind11::cast(lue::create_partitioned_array(
Policies{},
static_array_shape,
static_partition_shape,
Functor{
object, // Copy: increments reference count
[](ReferenceCountedObject<Element> const& object) -> Element*
{ return static_cast<Element*>(std::get<1>(*object).ptr); },
std::move(copy_buffer),
[](ReferenceCountedObject<Element> const& buffer) -> Element* { return buffer.get(); },
no_data_value}));
}

Expand Down
Loading

0 comments on commit 78b92b2

Please sign in to comment.