Skip to content

Commit

Permalink
Move to C interface.
Browse files Browse the repository at this point in the history
Remove memory allocation needed for the scheduler.
We now have memory allocations only when we start scheduling.
  • Loading branch information
lucteo committed Jan 28, 2024
1 parent 0097cc1 commit 852157b
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 150 deletions.
119 changes: 61 additions & 58 deletions include/exec/__detail/__system_context_default_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,64 +13,72 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "__system_context_if.hpp"
#include "__system_context_if.h"
#include "stdexec/execution.hpp"
#include "exec/static_thread_pool.hpp"

namespace exec { namespace __system_context_default_impl {
namespace exec::__system_context_default_impl {

namespace __if = __system_context_interface;
// TODO: move default implementation to weak pointers

// Low-level APIs
// Phase 2 will move to pointers and ref counting ala COM
// Phase 3 will move these to weak symbols and allow replacement in tests
// Default implementation based on static_thread_pool
struct __exec_system_context_impl : public __if::__exec_system_context_interface {
exec::static_thread_pool __pool_;
struct __system_scheduler_impl : __exec_system_scheduler_interface {
__system_scheduler_impl(exec::static_thread_pool& __pool)
: __pool_scheduler_{__pool.get_scheduler()} {
__get_forward_progress_guarantee = __get_forward_progress_guarantee_impl;
__schedule = __schedule_impl;
__bulk_schedule = __bulk_schedule_impl;
}

__if::__exec_system_scheduler_interface* get_scheduler() noexcept override;
};
private:
/// Scheduler from the underlying thread pool.
decltype(std::declval<exec::static_thread_pool>().get_scheduler()) __pool_scheduler_;

struct __exec_system_scheduler_impl : public __if::__exec_system_scheduler_interface {
__exec_system_scheduler_impl(
__exec_system_context_impl* __ctx,
decltype(__ctx->__pool_.get_scheduler()) __pool_scheduler)
: __ctx_{__ctx}
, __pool_scheduler_{__pool_scheduler} {
static int __get_forward_progress_guarantee_impl(__exec_system_scheduler_interface*) {
return 1; // parallel
}

__exec_system_context_impl* __ctx_;
decltype(__ctx_->__pool_.get_scheduler()) __pool_scheduler_;
static void __schedule_impl(
__exec_system_scheduler_interface* __self,
__exec_system_context_schedule_callback_t __cb,
void* __data);

void schedule(__if::__exec_system_context_schedule_callback_t __cb, void* __data) override;

void bulk_schedule(
__if::__exec_system_context_schedule_callback_t __cb,
__if::__exec_system_context_bulk_item_callback_t __cb_item,
static void __bulk_schedule_impl(
__exec_system_scheduler_interface* __self,
__exec_system_context_schedule_callback_t __cb,
__exec_system_context_bulk_item_callback_t __cb_item,
void* __data,
long __size) override;
long __size);
};

stdexec::forward_progress_guarantee get_forward_progress_guarantee() const override {
return stdexec::forward_progress_guarantee::parallel;
/// Default implementation of a system context, based on `static_thread_pool`
struct __system_context_impl : __exec_system_context_interface {
__system_context_impl() {
__get_scheduler = __get_scheduler_impl;
}

bool equals(const __if::__exec_system_scheduler_interface* __rhs) const override {
auto __rhs_impl = dynamic_cast<const __exec_system_scheduler_impl*>(__rhs);
return __rhs_impl && __rhs_impl->__ctx_ == __ctx_;
private:
/// The underlying thread pool.
exec::static_thread_pool __pool_{};

/// The system scheduler implementation.
__system_scheduler_impl __scheduler_{__pool_};

static __exec_system_scheduler_interface*
__get_scheduler_impl(__exec_system_context_interface* __self) noexcept {
return &static_cast<__system_context_impl*>(__self)->__scheduler_;
}
};

inline __if::__exec_system_scheduler_interface*
__exec_system_context_impl::get_scheduler() noexcept {
// TODO: ref counting etc
return new __exec_system_scheduler_impl(this, __pool_.get_scheduler());
}

// TODO: rename
/// Receiver that calls the callback when the operation completes.
struct __recv {
using receiver_concept = stdexec::receiver_t;
__if::__exec_system_context_schedule_callback_t __cb_;

/// The callback to be called.
__exec_system_context_schedule_callback_t __cb_;

/// The data to be passed to the callback.
void* __data_;

friend void tag_invoke(stdexec::set_value_t, __recv&& __self) noexcept {
Expand All @@ -87,46 +95,41 @@ namespace exec { namespace __system_context_default_impl {
}
};

inline void __exec_system_scheduler_impl::schedule(
__if::__exec_system_context_schedule_callback_t __cb,
inline void __system_scheduler_impl::__schedule_impl(
__exec_system_scheduler_interface* __self,
__exec_system_context_schedule_callback_t __cb,
void* __data) {
auto __sender = stdexec::schedule(__pool_scheduler_);
auto __this = static_cast<__system_scheduler_impl*>(__self);
auto __sender = stdexec::schedule(__this->__pool_scheduler_);
using operation_state_t = stdexec::connect_result_t<decltype(__sender), __recv>;
auto __os = new operation_state_t(stdexec::connect(std::move(__sender), __recv{__cb, __data}));
// TODO: stop leaking
// TODO: we have: size=64, alignment=8
stdexec::start(*__os);
}

inline void __exec_system_scheduler_impl::bulk_schedule(
__if::__exec_system_context_schedule_callback_t __cb,
__if::__exec_system_context_bulk_item_callback_t __cb_item,
inline void __system_scheduler_impl::__bulk_schedule_impl(
__exec_system_scheduler_interface* __self,
__exec_system_context_schedule_callback_t __cb,
__exec_system_context_bulk_item_callback_t __cb_item,
void* __data,
long __size) {
// TODO
auto __this = static_cast<__system_scheduler_impl*>(__self);
auto __sender = stdexec::bulk(
stdexec::schedule(__pool_scheduler_), __size, [__cb_item, __data](long __idx) {
stdexec::schedule(__this->__pool_scheduler_), __size, [__cb_item, __data](long __idx) {
__cb_item(__data, __idx);
// TODO: error, stopped
});

using operation_state_t = stdexec::connect_result_t<decltype(__sender), __recv>;
auto __os = new operation_state_t(stdexec::connect(std::move(__sender), __recv{__cb, __data}));
// TODO: stop leaking
// TODO: we have: size=???, alignment=???
stdexec::start(*__os);
}

// Phase 1 implementation, single implementation
// TODO: Make a weak symbol and replace in a test
static __exec_system_context_impl* __get_exec_system_context_impl() {
static __exec_system_context_impl __impl_;

/// Gets the default system context implementation.
static __system_context_impl* __get_exec_system_context_impl() {
static __system_context_impl __impl_;
return &__impl_;
}

// TODO: Move everything above here to a detail header and wrap in a
// namespace to represent extern "C"


}}
}
72 changes: 72 additions & 0 deletions include/exec/__detail/__system_context_if.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2024 Lee Howes, Lucian Radu Teodorescu
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef __EXEC__SYSTEM_CONTEXT_IF_H__
#define __EXEC__SYSTEM_CONTEXT_IF_H__

#ifdef __cplusplus
extern "C" {
#endif

struct __exec_system_context_interface;
struct __exec_system_scheduler_interface;

/// Interface that allows interaction with the system context, allowing scheduling work on the system.
struct __exec_system_context_interface {
/// Returns an interface to the system scheduler.
struct __exec_system_scheduler_interface* (*__get_scheduler)(
struct __exec_system_context_interface* /*self*/);
};

/// Callback to be called by the scheduler when new work can start.
typedef void (*__exec_system_context_schedule_callback_t)(
void*, // data pointer passed to scheduler
int, // completion type: 0 for normal completion, 1 for cancellation, 2 for exception
void*); // If completion type is 2, this is the exception pointer.

/// Callback to be called by the scheduler for each bulk item.
typedef void (*__exec_system_context_bulk_item_callback_t)(
void*, // data pointer passed to scheduler
long); // the index of the work item that is starting

struct __exec_system_scheduler_interface {
/// Gets the forward progress guarantee of the scheduler.
///
/// 0 == concurrent, 1 == parallel, 2 == weakly_parallel
int (*__get_forward_progress_guarantee)(struct __exec_system_scheduler_interface* /*self*/);

/// Schedules new work on the system scheduler, calling `cb` with `data` when the work can start.
void (*__schedule)(
struct __exec_system_scheduler_interface* /*self*/,
__exec_system_context_schedule_callback_t /*cb*/,
void* /*data*/);

/// Schedules new bulk work of size `size` on the system scheduler, calling `cb_item` with `data`
/// for indices in [0, `size`), and calling `cb` on general completion.
void (*__bulk_schedule)(
struct __exec_system_scheduler_interface* /*self*/,
__exec_system_context_schedule_callback_t /*cb*/,
__exec_system_context_bulk_item_callback_t /*cb_item*/,
void* /*data*/,
long /*size*/);
};

#ifdef __cplusplus
}
#endif


#endif
63 changes: 0 additions & 63 deletions include/exec/__detail/__system_context_if.hpp

This file was deleted.

Loading

0 comments on commit 852157b

Please sign in to comment.