Skip to content

Commit

Permalink
merian-nodes: Graph: Event system
Browse files Browse the repository at this point in the history
  • Loading branch information
LDAP committed Nov 14, 2024
1 parent cade3e5 commit 2aad9a3
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 38 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Merian is split into multiple components:
- [merian-quake](https://github.com/LDAP/merian-quake-rt): A path-tracer for the original Quake game. (coming soon)
- [merian-shadertoy](https://github.com/LDAP/merian-shadertoy): A limited Vulkan implementation Shadertoys with hot reloading.
- [merian-hdr-viewer](https://github.com/LDAP/merian-hdr-viewer): A simple HDR viewer with various exposure and tone-mapping controls.
- [merian-example-sum](https://github.com/LDAP/merian-example-sum): Example on how to compute a sum on the GPU.

Merian aims for compatibility with Windows, Linux as well as all major GPU vendors.

Expand Down
206 changes: 186 additions & 20 deletions include/merian-nodes/graph/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cstdint>
#include <queue>
#include <regex>
#include <set>
#include <unordered_map>
#include <unordered_set>
Expand Down Expand Up @@ -160,7 +161,7 @@ struct NodeData {
}

uint32_t set_index(const uint64_t run_iteration) const {
assert(descriptor_sets.size());
assert(!descriptor_sets.empty());
return run_iteration % descriptor_sets.size();
}
};
Expand Down Expand Up @@ -371,6 +372,12 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
return;
}

{
// let current nodes know that the graph is about to be reconnected.
MERIAN_PROFILE_SCOPE(profiler, "notify nodes");
send_graph_event("connect");
}

// Make sure resources are not in use
{
MERIAN_PROFILE_SCOPE(profiler, "wait for in-flight iterations");
Expand Down Expand Up @@ -421,21 +428,27 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
registry.node_name(node)));
SPDLOG_DEBUG("on_connected node: {} ({})", data.identifier,
registry.node_name(node));
const NodeIOLayout io_layout([&](const InputConnectorHandle& input) {
const NodeIOLayout io_layout(
[&](const InputConnectorHandle& input) {
#ifndef NDEBUG
if (std::find(data.input_connectors.begin(), data.input_connectors.end(),
input) == data.input_connectors.end()) {
throw std::runtime_error{fmt::format(
"Node {} tried to get an output connector for an input {} "
"which was not returned in describe_inputs (which is not "
"how this works).",
registry.node_name(node), input->name)};
}
if (std::find(data.input_connectors.begin(),
data.input_connectors.end(),
input) == data.input_connectors.end()) {
throw std::runtime_error{fmt::format(
"Node {} tried to get an output connector for an input {} "
"which was not returned in describe_inputs (which is not "
"how this works).",
registry.node_name(node), input->name)};
}
#endif
// for optional inputs we inserted a input connection with nullptr in
// search_satisfied_nodes, no problem here.
return data.input_connections.at(input).output;
});
// for optional inputs we inserted a input connection with nullptr in
// search_satisfied_nodes, no problem here.
return data.input_connections.at(input).output;
},
[&](const std::string& event_pattern,
const GraphEvent::Listener& listener) {
register_event_listener_for_connect(event_pattern, listener);
});
const Node::NodeStatusFlags flags =
node->on_connected(io_layout, data.descriptor_set_layout);
needs_reconnect |= flags & Node::NodeStatusFlagBits::NEEDS_RECONNECT;
Expand All @@ -447,6 +460,14 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
}
}
}

{
MERIAN_PROFILE_SCOPE(profiler, "register user event listener");
for (const auto& [event_pattern, event_listener] : user_event_pattern_listener) {
register_event_listener_for_connect(event_pattern, event_listener);
}
}

run_iteration = 0;
last_build_report = profiler->get_report();
time_connect_reference = std::chrono::high_resolution_clock::now();
Expand Down Expand Up @@ -614,6 +635,7 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {

// waits until all in-flight iterations have finished
void wait() {
SPDLOG_DEBUG("wait until all in-flight iterations have finished");
ring_fences.wait_all();
}

Expand All @@ -627,6 +649,7 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
InFlightData& in_flight_data = ring_fences.get(i).user_data;
in_flight_data.in_flight_data.clear();
}

needs_reconnect = true;
}

Expand All @@ -643,10 +666,30 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
return std::as_const(node_for_identifier) | std::ranges::views::keys;
}

// --- Events ---

void send_event(const std::string& event_name,
const GraphEvent::Data& data = {},
const bool notify_all = true) {
send_event(GraphEvent::Info{nullptr, "", "user", event_name}, data, notify_all);
}

void register_event_listener(const std::string& event_pattern,
const GraphEvent::Listener& event_listener) {
user_event_pattern_listener.push_back(std::make_pair(event_pattern, event_listener));
}

// --- Properties / Graph (de)serialize ---

void properties(Properties& props) {
needs_reconnect |= props.config_bool("Rebuild");
props.st_no_space();
props.output_text("Run iteration: {}", run_iteration);
if (props.is_ui() && props.config_text("send event", props_send_event, true) &&
!props_send_event.empty()) {
send_event(props_send_event);
props_send_event.clear();
}
if (props.st_begin_child("graph_properties", "Graph Properties",
Properties::ChildFlagBits::FRAMED)) {
props.output_text("Run iteration: {}", run_iteration);
Expand Down Expand Up @@ -956,6 +999,12 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
throw std::invalid_argument{fmt::format(
"graph already contains a node with identifier '{}'", identifier.value())};
}
if (*identifier == "user") {
throw std::invalid_argument{"the identifier 'user' is reserved"};
}
if (*identifier == "graph") {
throw std::invalid_argument{"the identifier 'graph' is reserved"};
}
node_identifier = identifier.value();
} else {
uint32_t i = 0;
Expand Down Expand Up @@ -1315,11 +1364,12 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
void reset_connections() {
SPDLOG_DEBUG("reset connections");

this->flat_topology.clear();
this->maybe_connected_inputs.clear();
flat_topology.clear();
maybe_connected_inputs.clear();
for (auto& [node, data] : node_data) {
data.reset();
}
event_listeners.clear();
}

// Calls the describe_inputs() methods of the nodes and caches the result in
Expand Down Expand Up @@ -1393,8 +1443,8 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
// not be connected.
void cache_node_output_connectors(const NodeHandle& node, NodeData& data) {
try {
data.output_connectors =
node->describe_outputs(NodeIOLayout([&](const InputConnectorHandle& input) {
data.output_connectors = node->describe_outputs(NodeIOLayout(
[&](const InputConnectorHandle& input) {
#ifndef NDEBUG
if (input->delay > 0) {
throw std::runtime_error{fmt::format(
Expand All @@ -1415,6 +1465,9 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
// for optional inputs we inserted a input connection with nullptr in
// search_satisfied_nodes, no problem here.
return data.input_connections.at(input).output;
},
[&](const std::string& event_pattern, const GraphEvent::Listener& listener) {
register_event_listener_for_connect(event_pattern, listener);
}));
} catch (const graph_errors::node_error& e) {
data.errors.emplace_back(e.what());
Expand Down Expand Up @@ -1908,6 +1961,12 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
},
[&, dst_node]() -> std::any& {
return ring_fences.get().user_data.in_flight_data[dst_node];
},
[&, dst_node](const std::string& event_name, const GraphEvent::Data& data,
const bool notify_all) {
send_event(GraphEvent::Info{dst_node, registry.node_name(dst_node),
dst_data.identifier, event_name},
data, notify_all);
});
}
}
Expand All @@ -1921,6 +1980,103 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
input->name, data.identifier, registry.node_name(node));
}

void register_event_listener_for_connect(const std::string& event_pattern,
const GraphEvent::Listener& event_listener) {
split(event_pattern, ",", [&](const std::string& split_pattern) {
std::smatch match;
if (!std::regex_match(split_pattern, match, EVENT_REGEX)) {
SPDLOG_WARN("invalid event pattern '{}'", split_pattern);
return;
}
const std::string& node_name = match[1];
const std::string& node_identifier = match[2];
const std::string& event_name = match[3];

bool registered = false;
if (node_name.empty()) {
registered = true;
if (node_identifier.empty()) {
event_listeners["user"][event_name].emplace_back(event_listener);
event_listeners["graph"][event_name].emplace_back(event_listener);
} else if (node_identifier == "user" || node_identifier == "graph") {
event_listeners[node_identifier][event_name].emplace_back(event_listener);
} else {
registered = false;
}
}
for (const auto& [identifier, node] : node_for_identifier) {
if ((node_name.empty() || registry.node_name(node) == node_name) &&
(node_identifier.empty() || identifier == node_identifier)) {
event_listeners[identifier][event_name].emplace_back(event_listener);
registered = true;
}
}

if (registered) {
SPDLOG_DEBUG("registered listener for event pattern '{}'", split_pattern);
} else {
SPDLOG_WARN("no listener registered for event pattern '{}'. (no node type and node "
"identifier matched)",
split_pattern);
}
});
}

void send_graph_event(const std::string& event_name,
const GraphEvent::Data& data = {},
const bool notify_all = true) {
send_event(GraphEvent::Info{nullptr, "", "graph", event_name}, data, notify_all);
}

void send_event(const GraphEvent::Info& event_info,
const GraphEvent::Data& data,
const bool notify_all) const {
assert(!event_info.event_name.empty() && "event name cannot be empty.");
assert(!event_info.identifier.empty() && "identifier cannot be empty.");
assert((event_info.event_name.find('/') == event_info.event_name.npos) &&
"event name cannot contain '/'.");

SPDLOG_TRACE("sending event: {}/{}/{}, notify all={}", event_info.node_name,
event_info.identifier, event_info.event_name, notify_all);

const auto identifier_it = event_listeners.find(event_info.identifier);
if (identifier_it == event_listeners.end()) {
return;
}

// exact match
const auto event_it = identifier_it->second.find(event_info.event_name);
if (event_it != identifier_it->second.end()) {
if (notify_all) {
for (const auto& listener : event_it->second) {
listener(event_info, data);
}
} else {
for (const auto& listener : event_it->second) {
if (listener(event_info, data)) {
break;
}
}
}
}

// any
const auto event_any_it = identifier_it->second.find("");
if (event_any_it != identifier_it->second.end()) {
if (notify_all) {
for (const auto& listener : event_any_it->second) {
listener(event_info, data);
}
} else {
for (const auto& listener : event_any_it->second) {
if (listener(event_info, data)) {
break;
}
}
}
}
}

public:
// --- Callback setter ---

Expand Down Expand Up @@ -1978,8 +2134,8 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
std::chrono::nanoseconds duration_elapsed_since_connect;
std::chrono::nanoseconds duration_elapsed;
int time_overwrite = 0; // NONE, TIME, DIFFERENCE
// this is also used for overwrite time. In this case this should only be applied once and then
// reset.
// this is also used for overwrite time. In this case this should only be applied once and
// then reset.
float time_delta_overwrite_ms = 0.;
// across builds. Might be not 0 at begin of run.
std::chrono::nanoseconds time_delta = 0ns;
Expand Down Expand Up @@ -2010,7 +2166,17 @@ class Graph : public std::enable_shared_from_this<Graph<ITERATIONS_IN_FLIGHT>> {
// There may still be an invalid connection or an outputing node might be actually disabled.
std::unordered_map<InputConnectorHandle, NodeHandle> maybe_connected_inputs;

// Events
// (NodeHandle == nullptr means user events, event with name "" means "any")
std::map<std::string, std::map<std::string, std::vector<GraphEvent::Listener>>> event_listeners;
inline static const std::regex EVENT_REGEX{"([^/]*)/([^/]*)/([^/]*)"};

// cached here when the user calls register_event_listener and added to the data structure
// above when the graph is built.
std::vector<std::pair<std::string, GraphEvent::Listener>> user_event_pattern_listener;

// Properties helper
std::string props_send_event;
int new_node_selected = 0;
std::string new_node_identifier;
int remove_node_selected = 0;
Expand Down
Loading

0 comments on commit 2aad9a3

Please sign in to comment.