Skip to content

Commit

Permalink
stop leaking in schedule and bulk schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
ericniebler committed Jan 29, 2024
1 parent 9be7c3e commit b3d3e18
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions include/exec/__detail/__system_context_default_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ namespace exec::__system_context_default_impl {

// TODO: move default implementation to weak pointers

using __pool_scheduler_t =
decltype(std::declval<exec::static_thread_pool>().get_scheduler());

struct __system_scheduler_impl : __exec_system_scheduler_interface {
__system_scheduler_impl(exec::static_thread_pool& __pool)
: __pool_scheduler_{__pool.get_scheduler()} {
Expand All @@ -33,7 +36,7 @@ namespace exec::__system_context_default_impl {

private:
/// Scheduler from the underlying thread pool.
decltype(std::declval<exec::static_thread_pool>().get_scheduler()) __pool_scheduler_;
__pool_scheduler_t __pool_scheduler_;

static int __get_forward_progress_guarantee_impl(__exec_system_scheduler_interface*) {
return 1; // parallel
Expand Down Expand Up @@ -72,6 +75,10 @@ namespace exec::__system_context_default_impl {
};

/// Receiver that calls the callback when the operation completes.
template <class __Sender>
struct __operation;

template <class __Sender>
struct __recv {
using receiver_concept = stdexec::receiver_t;

Expand All @@ -81,31 +88,42 @@ namespace exec::__system_context_default_impl {
/// The data to be passed to the callback.
void* __data_;

/// The owning operation state, to be deleted when the operation completes.
__operation<__Sender>* __op_;

friend void tag_invoke(stdexec::set_value_t, __recv&& __self) noexcept {
__self.__cb_(__self.__data_, 0, nullptr);
delete __self.__op_;
}

friend void tag_invoke(stdexec::set_stopped_t, __recv&& __self) noexcept {
__self.__cb_(__self.__data_, 1, nullptr);
delete __self.__op_;
}

friend void
tag_invoke(stdexec::set_error_t, __recv&& __self, std::exception_ptr __ptr) noexcept {
__self.__cb_(__self.__data_, 2, *reinterpret_cast<void**>(&__ptr));
delete __self.__op_;
}
};

template <typename __Sender>
struct __operation {
stdexec::connect_result_t<__Sender, __recv<__Sender>> __inner_op_;

__operation(__Sender __sndr, __exec_system_context_schedule_callback_t __cb, void* __data)
: __inner_op_(stdexec::connect(std::move(__sndr), __recv<__Sender>{__cb, __data, this})) {}
};

inline void __system_scheduler_impl::__schedule_impl(
__exec_system_scheduler_interface* __self,
__exec_system_context_schedule_callback_t __cb,
void* __data) {
auto __this = static_cast<__system_scheduler_impl*>(__self);
auto __sender = stdexec::schedule(__this->__pool_scheduler_);
using operation_state_t = stdexec::connect_result_t<decltype(__sender), __recv>;
auto __os = new operation_state_t(stdexec::connect(std::move(__sender), __recv{__cb, __data}));
// TODO: stop leaking
// TODO: we have: size=64, alignment=8
stdexec::start(*__os);
auto __sndr = stdexec::schedule(__this->__pool_scheduler_);
auto __os = new __operation{std::move(__sndr), __cb, __data};
stdexec::start(__os->__inner_op_);
}

inline void __system_scheduler_impl::__bulk_schedule_impl(
Expand All @@ -115,15 +133,12 @@ namespace exec::__system_context_default_impl {
void* __data,
long __size) {
auto __this = static_cast<__system_scheduler_impl*>(__self);
auto __sender = stdexec::bulk(
auto __sndr = stdexec::bulk(
stdexec::schedule(__this->__pool_scheduler_), __size, [__cb_item, __data](long __idx) {
__cb_item(__data, __idx);
});
using operation_state_t = stdexec::connect_result_t<decltype(__sender), __recv>;
auto __os = new operation_state_t(stdexec::connect(std::move(__sender), __recv{__cb, __data}));
// TODO: stop leaking
// TODO: we have: size=???, alignment=???
stdexec::start(*__os);
auto __os = new __operation{std::move(__sndr), __cb, __data};
stdexec::start(__os->__inner_op_);
}

/// Gets the default system context implementation.
Expand Down

0 comments on commit b3d3e18

Please sign in to comment.