From 22a61d3fe7edeae1b799d81ffb17f007111ed633 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Mon, 31 Oct 2022 17:51:36 +0800 Subject: [PATCH 1/7] Draft RFC for topics Signed-off-by: Michael X. Grey --- rfcs/xx-topics.md | 295 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 295 insertions(+) create mode 100644 rfcs/xx-topics.md diff --git a/rfcs/xx-topics.md b/rfcs/xx-topics.md new file mode 100644 index 00000000..bb4e174d --- /dev/null +++ b/rfcs/xx-topics.md @@ -0,0 +1,295 @@ +# Feature Name: Topics + +:warning: This proposal depends on [one-shot systems](https://github.com/bevyengine/bevy/pull/4090), which is not yet a merged feature. + +## Summary + +This RFC introduces a feature to help with developing reactive applications. +Topics are data channels for moving asynchronous messages through many-to-many relationships between publishers and subscribers. +Each subscriber can listen to messages from many publishers, and many subscribers can listen to each publisher. + +## Motivation + +Topics are a common mechanism for middlewares to pass data between distributed systems. +Topics are also called pub-sub (publish-subscribe) and are supported by middlewares like ROS, MQTT, ZeroMQ, Kafka, RabbitMQ, and many others. +In these systems, anonymous publishers are able to push out messages to "topics" (data channels) which anonymous subscribers can listen to. +This allows the development of distributed reactive systems, organized by the abstraction of "topics" so that publishers can be easily discovered by relevant subscribers without needing to know details of how the overall distributed system is implemented. + +This RFC proposes implementing topics inside of Bevy as ergonomic Systems, Resources, and Components. + +Topics can be used inside of Bevy to help Bevy users implement their application using a "distributed" architecture to decouple and organize event flows from various agents/entities. +This is similar to a previous suggestion about [Entity Events](https://github.com/bevyengine/bevy/issues/2070) and has similar motivations. +Bevy's current resource-based Event mechanism is not convenient for representing more complex flows of information that may be entity/agent-driven or entity/agent-sensitive. + +At the same time, defining this generic ergonomic pipeline for topics within Bevy should help facilitate the integration of various middlewares into Bevy. +For example, a `bevy_mqtt` plugin could simply subscribe to bevy topics and then effortlessly publish the received messages to MQTT topics, and vice-versa for two-way communication. + +Bevy users who want their application to have networking or inter-process communication can implement their systems in terms of the bevy topics and services, then plug in whichever middleware suits their overall networked / multi-process system. +They could also change their choice of middleware later without changing how the systems of their application are implemented. +Or multiple middlewares can be used by one application without affecting how the systems are implemented. + +## User-facing explanation + +### Topics + +These are the key structs for topics: +* `Publisher`: a component struct that provides a `publish(data: T)` method +* `Subscriber`: a component struct that provides a `subscribe(publisher: Entity, callback: F)` method +* `Domain`: a component that maintains a dictionary of topic names along with the publishers and subscribers that are interested in each topic +* `GlobalDomain`: a Domain that can be used as a resource instead of as a component + +To send out messages, spawn a `Publisher` component for an entity, optionally giving some initial messages: + +```rust +struct MyMessage { + data: String, +} + +fn make_a_publisher( + commands: Commands, +) { + let mut publisher = Publisher::::new(); + publisher.publish(MyMessage{data: "Hello, world!".to_string()}); + publisher.publish(MyMessage{data: "I am being spawned!".to_string()}); + commands.spawn().insert(publisher); +} +``` + +To use a spawned publisher, query for a mutable reference and use its `publish(data: T)` method: + +```rust +fn use_the_publishers( + publishers: Query<&mut Publisher>, +) { + for publisher in &mut publishers { + publisher.publish(MyMessage{data: "ping".to_string()}); + } +} +``` + +To help subscribers discover the publisher, use the `advertise(publisher: Entity, topic: String)` domain method to make the publisher visible in that domain: + +```rust +fn make_a_publisher( + commands: Commands, + domain: ResMut>, +) { + let publisher_entity = commands.spawn().insert(Publisher::::new()).id(); + domain.advertise(publisher_entity, "my_topic".to_string()); +} +``` + +To receive and respond to messages from a specific publisher, spawn a `Subscriber` component for an entity and use its `subscribe(publisher: Entity, callback: F)` method: + +```rust +struct MySpecialPublisher(Entity); + +#[derive(SystemParam)] +struct MyCallbackParams { + message_counter: ResMut, + message_displays: Query<&mut MessageDisplay>, +} + +fn make_a_subscriber( + commands: Commands, + special_publisher: Res, +) { + let mut subscriber = Subscriber::::new(); + subscriber.subscribe( + special_publisher, + // Create a closure whose first argument is the incoming message + // and whose second argument is a system parameter. This closure will + // be triggered each time a new message comes in. + |msg: &MyMessage, params: MyCallbackParams| { + *params.message_counter += 1; + for display in &mut params.message_displays { + display.show(msg.data); + } + } + ); + + commands.spawn().insert(subscriber); +} +``` + +Alternatively, to automatically subscribe to all publishers (current and future) for a topic, use the `discover(for_subscriber: Entity, on_topic: String, callback: F)` domain method: + +```rust +fn make_a_subscriber( + commands: Commands, + domain: ResMut>, +) { + // The `discover` method will automatically insert a Subscriber component + // for the newly spawned entity. + domain.discover( + commands.spawn(), + "my_topic".to_string(), + |msg: &MyMessage, params: MyCallbackParams| { + *params.message_counter += 1; + for display in &mut params.message_displays { + display.show(msg.data); + } + } + ); +} +``` + +When a subscriber is removed from an entity or an entity that has a subscriber is despawned, the callback that was subscribed will be dropped and not get triggered anymore. + +## Implementation strategy + +Many details of this implementation proposal will likely need significant iteration with help from someone more familiar than I am with the internals of Bevy. +I am currently not clear on how exactly dynamic system construction can/does work, nor the right way to leverage one-shot systems to make this work. + +### Publishing +* Messages are be stored in the `Publisher` struct as a `Smallvec`. +* The `Publisher` will also store a `SmallVec` to keep track of its subscribers. +* Once per cycle, the message queue in the publisher will be processed against all current subscribers. + * For each subscriber of each publisher that has a non-empty queue, generate a one-shot system. The one-shot system would query for: + * immutable reference to the publisher, to have access to its message queue + * the system parameter needed by the subscriber + * The one-shot system would iterate through the message queue of the publisher, passing an immutable reference to each message into the subscriber's callback, along with the system parameter needed by the callback. + * The subscription callbacks will be able to run in parallel, as long as their additional system parameters don't have conflicts. + +### Subscribing +* Subscribers maintain a dictionary of which callback should be used for each `(publisher entity, message type)` pair +* When a new subscription is created using `Subscriber::subscribe`, the entity of the publisher is saved in a special queue for new subscriptions + * On each cycle, before published messages are processed, a system runs through all subscribers to process any changes in subscriptions + * (Consider ways to reduce overhead so this isn't an O(N) operation over all the subscribers on every cycle, maybe using a shared atomic bool to decide if changes even need to be checked) + * When a new subscription is found, the publisher is informed about the entity for that subscriptions + * When an old subscription is removed, the publisher is informed so it does not keep trying to send messages to the subscriber + * Once the publisher is informed of the new subscriber, the publisher's entity is removed from the special new subscription queue +* The one-shot system produced by the publisher will use a private function to retrieve the correct callback from the subscriber based on the publisher's entity + +### Domain management +The Domain needs to keep track of which topics the entities of publishers and subscribers are interested in. +Whenever a publisher is advertised or a subscriber requests discovery, the Domain must form a connection between the two, informing the publisher about the subscriber and vice-versa. + +* New publishers and new subscribers are each kept in a special queue +* On each cycle: + * The queue of new publishers is cross-examined against the queue of new subscribers as well as the dictionary of old subscribers to look for matches + * New subscribers are cross-examined against old publishers to look for matches + * Matching publishers + subscribers are informed of each other + * Dropped publishers/subscribers are identified and any lingering connections to them are erased + +The connections created by the `Domain` need to be semantically different from the connections formed by directly subscribing to a publisher. +For one thing, Domain-created connections need to be reference-counted: If a subscriber is subscribed to multiple topics and a publisher is publishing to more than one of those topics, then when the subscriber unsubscribes from one of the topics it should not unsubscribe from the publisher. +The subscriber's connection to the publisher should only be broken when both conditions are met: +1. Any topics subscriptions that connected the publisher and subscriber are dropped +2. Any direct connection that was made between the publisher and subscriber is dropped + +### Additional thoughts + +To facilitate cleanup of spontaneously dropped connections (e.g. a Publisher or Subscriber component is suddenly removed from an entity) it might be necessary to automatically insert a secondary component on each Publisher/Subscriber entity that keeps track of what its last set of connections were before the component was removed. +Otherwise each time a Publisher/Subscriber is removed from an entity an O(N) operation will be needed to cleanup its connections. +It may be possible for [relations](https://github.com/bevyengine/bevy/issues/3742) :rainbow: to help with this. + +We may want to consider having a `Subscription` object that acts like a reference counter to keep direct publisher<->subscriber connections alive. +Otherwise there could be conflicts where one system tells a subscriber to disconnect from a publisher while another system still wants the connection to exist. + +## Drawbacks + +## Rationale and alternatives + +### One message at a time +In the current design of Bevy's `Events`, systems are given access to the entire `Event` queue whereas this proposal only gives the callback system access to one `&Message` at a time. +This is done to reflect the following information: +1. The callback will clearly only be triggered when at least one new message has arrived, so there is no need to question whether the system will be triggered with an empty message queue. +2. Users do not have to be concerned about how the message queue is implemented or that a message queue even exists at all. + +### Callback driven + +Being driven by callbacks provides a few helpful benefits: +* The user does not need to think of where to place the responding system within their application stages +* The user can copy runtime data into their callback which wouldn't be available at compile time +* Callbacks offer a simple way to turn reactions on/off by toggling subscriptions + +However, if the user has no control over when the callbacks are processed it may be harder for them to synchronize the overall behavior of their application. +Ideally users should implement their applications to not be sensitive to callback ordering, but that might not be a realistic requirement to impose. +Maybe subscriptions should also be able to specify what stage their callback should be processed in. +This concern will also likely be affected by the upcoming stageless systems design. + +## Prior art + +As mentioned in the motivation section, [pub-sub patterns](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) are common throughout middlewares to support inter-process and network communication. + +The concept is also associated with a more general pattern of "reactive programming", which can be seen in frameworks like [ReactiveX](https://reactivex.io/). + +## Unresolved questions + +### Domain struct name +Should topics and services use the same `Domain` struct or should they namespaced, e.g. `bevy_topics::Domain` and `bevy_services::Domain`? Or should each of these structs be given different names entirely? + +### Topic names +Should topic names use `String` or should they be more like `StageLabel` where they can use any data structure with Hash and Eq traits? + +### Callback timing +In what stage should subscription callbacks be processed? Is that something that users should be able to set, and if so how? Should that be customizable per subscription? + +## Future possibilities + +### Publisher `Description` + +In addition to the message type field, allow publishers to have a `Description` generic parameter that allows them to describe themselves. +This may help subscribers filter publishers whose outputs are not relevant for them. + +Using the `Description` type, the `Domain` discovery feature should allow subscribers to provide a callback to filter out publishers for the topics that the subscriber is trying to subscribe to. + +### Connection events + +Allow a subscriber to define an `on_connect` and `on_disconnect` callback that will be triggered when successfully connecting or disconnecting with a publisher. If we implement the `Description` generic parameter mentioned above, then that `Description` value can be passed into these callbacks. + +### History + +Allow publishers to be configured to maintain a history that subscribers can choose to review. E.g.: + +```rust +impl Publisher { + /// Keep the last n messages that were published so that new subscribers can + /// look through them. By default this is 0. + pub fn keep_last(self, n: usize) -> Self; + + /// Keep the all messages that are ever published so that new subscribers + /// can look through them. + pub fn keep_all(self) -> Self; +} + +/// Returned by `Subscribe::subscribe` +impl Subscription { + /// Receives the last n messages that were published before this subscriber + /// subscribed, if and only if the publisher has been maintaining its history. + pub fn receive_last(self, n: usize) -> Self; + + /// Receives all available messages in the publisher's history. + pub fn receive_all(self) -> Self; +} +``` + +This may require adding a second message queue to the `Publisher` struct. + +### Parallel Publishing + +In the current implementation proposal, a system needs to obtain a mutable reference to a publisher. +This means at most one system can be publishing for a certain type of publisher at a time (without needing complicated system-deconflicting strategies). +It would be good to have ways for systems to publish with the same publisher in parallel. Some possibilities: + +* Custom command that flushes commands into publishers + * Advantage: commands are a familiar way of marshalling parallel requests in Bevy + * Disadvantage: it may feel clunky for users because they would need to explicitly communicate the publisher entity to the command +* Additional method `Publisher::parallel_publish(&self, msg: T)` that uses a mutex to protect access to the message queue + * Advantage: ergonomic and easy to understand + * Disadvantage: introducing mutexes could mean a noticeable performance hit + +### Custom message buffer / pre-allocated size + +It would be good if it were possible for users to specify how they want the publisher to store messages. +Even if we lock users into `SmallVec` (which is probably a good choice for +90% of use cases), users may still want to specify the default pre-allocation size based on their knowledge of what publishing patterns they can expect. + +Since the container or size information would need to be part of the Publisher's type information as a generic parameter, the publisher would become harder to query, and the queries would become sensitive to the choice of container. +To work nicely, this capability would require [trait queries](https://crates.io/crates/bevy-trait-query) where `Publisher` is a trait instead of a struct. + +### Stream operations + +It may be worth considering whether/how bevy can facilitate transformations and operations on data streams the way [ReactiveX](https://reactivex.io/documentation/operators.html) does. + +As a simple example, consider an operator that zips two topics/publishers `Publisher`, `Publisher` together to effectively provide `Publisher<(U, V)>` that can be subscribed to. The zipped publisher would publish a new message each time a new message has arrived for both `U` and `V`. From 739d98e42df48af84d20514cc8d25c1ad9213e85 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 1 Nov 2022 23:00:38 +0800 Subject: [PATCH 2/7] Iterate on topics proposal Signed-off-by: Michael X. Grey --- rfcs/xx-topics.md | 216 +++++++++++++++++++++++++++++++++------------- 1 file changed, 155 insertions(+), 61 deletions(-) diff --git a/rfcs/xx-topics.md b/rfcs/xx-topics.md index bb4e174d..03a8484b 100644 --- a/rfcs/xx-topics.md +++ b/rfcs/xx-topics.md @@ -1,6 +1,6 @@ # Feature Name: Topics -:warning: This proposal depends on [one-shot systems](https://github.com/bevyengine/bevy/pull/4090), which is not yet a merged feature. +:warning: This proposal depends on [one-shot systems](https://github.com/bevyengine/bevy/pull/4090), which is not yet a merged feature, and possibly [trait queries](https://github.com/bevyengine/rfcs/pull/39) which is not yet a merged RFC. ## Summary @@ -11,16 +11,34 @@ Each subscriber can listen to messages from many publishers, and many subscriber ## Motivation Topics are a common mechanism for middlewares to pass data between distributed systems. -Topics are also called pub-sub (publish-subscribe) and are supported by middlewares like ROS, MQTT, ZeroMQ, Kafka, RabbitMQ, and many others. +Often called pub-sub (publish-subscribe), this pattern is supported by middlewares like ROS, MQTT, ZeroMQ, Kafka, RabbitMQ, and many others. In these systems, anonymous publishers are able to push out messages to "topics" (data channels) which anonymous subscribers can listen to. This allows the development of distributed reactive systems, organized by the abstraction of "topics" so that publishers can be easily discovered by relevant subscribers without needing to know details of how the overall distributed system is implemented. -This RFC proposes implementing topics inside of Bevy as ergonomic Systems, Resources, and Components. +This RFC proposes implementing publishers and subscribers, with topic-based discovery inside of Bevy as ergonomic Systems, Resources, and Components. Topics can be used inside of Bevy to help Bevy users implement their application using a "distributed" architecture to decouple and organize event flows from various agents/entities. This is similar to a previous suggestion about [Entity Events](https://github.com/bevyengine/bevy/issues/2070) and has similar motivations. Bevy's current resource-based Event mechanism is not convenient for representing more complex flows of information that may be entity/agent-driven or entity/agent-sensitive. +### Example Use-Case +* A game has NPCs that can react to various events. +* The NPCs are split into two teams: `"Red"` and `"Blue"` team and there are team-wide events. +* There are team-agnostic event types, but not all NPCs should listen to all of these events. +* Not all NPCs should react to events in the same way. + +To program the logic for this game with the existing `Events` system, the programmer would need to define one single system that accounts for every possible combination of these factors. +The many conditional combinations will cause the single system to have many complex branches. + +With pub-sub and topics, the programmer could break down the problem as follows: +* NPC entities on the `"Red"` team subscribe to the `"Red"` topic. +* NPC entities on the `"Blue"` team subscribe to the `"Blue"` topic. +* Additional topics are only subscribed by each entity if they are relevant for the entity. +* Each subscription specifies the appropriate behavior for its own NPC. + +The different conditional combinations that need to be accounted for are expressed more easily through topic subscription management. + +### Networking At the same time, defining this generic ergonomic pipeline for topics within Bevy should help facilitate the integration of various middlewares into Bevy. For example, a `bevy_mqtt` plugin could simply subscribe to bevy topics and then effortlessly publish the received messages to MQTT topics, and vice-versa for two-way communication. @@ -41,28 +59,41 @@ These are the key structs for topics: To send out messages, spawn a `Publisher` component for an entity, optionally giving some initial messages: ```rust -struct MyMessage { - data: String, +enum Mission { + Attack, + Hold, + Retreat, } -fn make_a_publisher( +/// Colonels issue missions to their battalions, so they need to publish Mission messages +fn connect_colonels( commands: Commands, + new_colonels: Query<(Entity, &Battalion), Added>, + domain: ResMut>, ) { - let mut publisher = Publisher::::new(); - publisher.publish(MyMessage{data: "Hello, world!".to_string()}); - publisher.publish(MyMessage{data: "I am being spawned!".to_string()}); - commands.spawn().insert(publisher); + for (e, battalion) in &new_colonels { + let mut publisher = Publisher::::new(); + publisher.publish(Mission::Hold); + commands.entity(e).insert(publisher); + } } ``` To use a spawned publisher, query for a mutable reference and use its `publish(data: T)` method: ```rust -fn use_the_publishers( - publishers: Query<&mut Publisher>, +/// On each update cycle, the colonels decide what action to take +fn update_colonels( + armies: Query<(&BattleConditions, &mut Publisher), With>, ) { - for publisher in &mut publishers { - publisher.publish(MyMessage{data: "ping".to_string()}); + for (condition, mission) in &mut publishers { + if condition.is_critical() { + mission.publish(Mission::Retreat); + } else if condition.have_advantage() { + mission.publish(Mission::Attack); + } else { + mission.publish(Mission::Hold); + } } } ``` @@ -70,67 +101,112 @@ fn use_the_publishers( To help subscribers discover the publisher, use the `advertise(publisher: Entity, topic: String)` domain method to make the publisher visible in that domain: ```rust -fn make_a_publisher( +/// Colonels issue missions to their battalions, so they need to publish Mission messages +fn connect_colonels( commands: Commands, - domain: ResMut>, + new_colonels: Query<(Entity, &Battalion), Added>, + domain: ResMut>, ) { - let publisher_entity = commands.spawn().insert(Publisher::::new()).id(); - domain.advertise(publisher_entity, "my_topic".to_string()); + for (e, battalion) in &new_colonels { + let mut publisher = Publisher::::new(); + publisher.publish(Mission::Hold); + commands.entity(e).insert(publisher); + // Colonels advertise their mission publisher to the battalion-wide mission topic + domain.advertise(e, battalion.mission_topic()); + } } ``` To receive and respond to messages from a specific publisher, spawn a `Subscriber` component for an entity and use its `subscribe(publisher: Entity, callback: F)` method: ```rust -struct MySpecialPublisher(Entity); - -#[derive(SystemParam)] -struct MyCallbackParams { - message_counter: ResMut, - message_displays: Query<&mut MessageDisplay>, -} - -fn make_a_subscriber( +/// Soldiers follow the missions issued to them by their colonels +fn connect_soldiers( commands: Commands, - special_publisher: Res, + new_cavalries: Query<(Entity, &Battalion), Added>, + new_archers: Query<(Entity, &Battalion), Added>, ) { - let mut subscriber = Subscriber::::new(); - subscriber.subscribe( - special_publisher, - // Create a closure whose first argument is the incoming message - // and whose second argument is a system parameter. This closure will - // be triggered each time a new message comes in. - |msg: &MyMessage, params: MyCallbackParams| { - *params.message_counter += 1; - for display in &mut params.message_displays { - display.show(msg.data); + // Cavalry units listen for missions from their colonel and then charge, defend, or flee + for (e, battalion) in initial_soldiers.cavalry() { + let mut subscriber = Subscriber::::new(); + subscriber.subscribe( + battalion.colonel(), + move |mission: &Mission, orders: Query<&mut CavalryOrders>| { + let orders = orders.get_mut(e).unwrap(); + match mission { + Mission::Attack => { + orders.charge_forward(); + } + Mission::Hold => { + orders.defense_formation(); + } + Mission::Retreat => { + orders.flee(); + } + } } - } - ); + ); + + commands.entity(e).insert(subscriber); + } + + // Archer units listen for missions from the colonel and then fire, aim, or flee + for (e, battalion) in initial_soldiers.archers() { + let mut subscriber = Subscriber::::new(); + subscriber.subscribe( + battalion.colonel(), + move |mission: &Mission, orders: Query<&mut ArcherOrders>| { + let orders = orders.get_mut(unit).unwrap(); + match mission { + Mission::Attack => { + orders.fire_salvo(); + } + Mission::Hold => { + orders.take_aim(); + } + Mission::Retreat => { + orders.flee(); + } + } + } + ); - commands.spawn().insert(subscriber); + commands.entity(e).insert(subscriber); + } } ``` Alternatively, to automatically subscribe to all publishers (current and future) for a topic, use the `discover(for_subscriber: Entity, on_topic: String, callback: F)` domain method: ```rust -fn make_a_subscriber( +/// Spies intercept or interfere with the mission information of a battalion +fn connect_spies( commands: Commands, - domain: ResMut>, + new_informants: Query<(Entity, &Informant, &Battalion), Added>, + new_imposters: Query<(Entity, &Imposter), Added>, + domain: ResMut>, ) { - // The `discover` method will automatically insert a Subscriber component - // for the newly spawned entity. - domain.discover( - commands.spawn(), - "my_topic".to_string(), - |msg: &MyMessage, params: MyCallbackParams| { - *params.message_counter += 1; - for display in &mut params.message_displays { - display.show(msg.data); + for (e, informant, battalion) in &new_informants { + // Informants listen to the battalion missions and then re-publish them + // for an enemy army to hear. + domain.advertise(e, informant.report_topic()); + let battalion_id = battalion.id(); + commands.entity(e).insert(Publisher::<(Entity, Mission)>::new()); + domain.discover( + battalion.mission_topic(), + move |mission: &Mission, report: Query<&mut Publisher<(Entity, Mission)>| { + report.get_mut(e).publish((battalion_id, mission.clone())); } - } - ); + ) + } + + for (e, imposter) in &new_imposters { + // Imposters also publish battalion missions to confuse informants. + // Ordinary soldiers are subscribed directly to their colonels so they + // are not confused by imposters. + commands.entity(e).insert(Publisher::::new()); + domain.advertise(e, imposter.mission_topic()); + } } ``` @@ -142,7 +218,7 @@ Many details of this implementation proposal will likely need significant iterat I am currently not clear on how exactly dynamic system construction can/does work, nor the right way to leverage one-shot systems to make this work. ### Publishing -* Messages are be stored in the `Publisher` struct as a `Smallvec`. +* Published messages are stored in the `Publisher` struct as a `Smallvec`. * The `Publisher` will also store a `SmallVec` to keep track of its subscribers. * Once per cycle, the message queue in the publisher will be processed against all current subscribers. * For each subscriber of each publisher that has a non-empty queue, generate a one-shot system. The one-shot system would query for: @@ -217,8 +293,13 @@ The concept is also associated with a more general pattern of "reactive programm ## Unresolved questions -### Domain struct name -Should topics and services use the same `Domain` struct or should they namespaced, e.g. `bevy_topics::Domain` and `bevy_services::Domain`? Or should each of these structs be given different names entirely? +### How to automatically operate on publishers/subscribers with different message types +The `bevy_topics` plugin somehow needs to create systems that operate on `Publisher` and `Subscriber` components where `T` may vary. + +One option would be to require users to register each type they want to support messages for when adding the `bevy_topics` plugin. +I would prefer to avoid this because requirement because I think users would be confused when they create a connection between entities with `Publisher` and `Subscriber` components but the connection quietly fails because they forgot to register `Foo`. + +The only other way I can think of making this work would be to use something like [trait queries](https://github.com/bevyengine/rfcs/pull/39) where `Publisher` and `Subscriber` implement traits that allow them to be discovered by the `bevy_topics` system. ### Topic names Should topic names use `String` or should they be more like `StageLabel` where they can use any data structure with Hash and Eq traits? @@ -230,10 +311,23 @@ In what stage should subscription callbacks be processed? Is that something that ### Publisher `Description` -In addition to the message type field, allow publishers to have a `Description` generic parameter that allows them to describe themselves. +In addition to the message type field, allow publishers to provide a "description" value that allows them to describe themselves. This may help subscribers filter publishers whose outputs are not relevant for them. -Using the `Description` type, the `Domain` discovery feature should allow subscribers to provide a callback to filter out publishers for the topics that the subscriber is trying to subscribe to. +Using the description, the `Domain` discovery feature should allow subscribers to provide a callback to filter out publishers for the topics that the subscriber is trying to subscribe to. + +To allow descriptions to be generic without being coupled to the `Publisher` type, we could use the following pattern: + +```rust +impl Publisher { + /// Add a description of any kind to this publisher + pub fn add_description(&mut self, description: Description); + + /// View a description of any kind for this publisher, if the publisher + /// provides it. + pub fn get_description(&self) -> Option<&Description>; +} +``` ### Connection events @@ -273,7 +367,7 @@ In the current implementation proposal, a system needs to obtain a mutable refer This means at most one system can be publishing for a certain type of publisher at a time (without needing complicated system-deconflicting strategies). It would be good to have ways for systems to publish with the same publisher in parallel. Some possibilities: -* Custom command that flushes commands into publishers +* Custom command that flushes messages into publishers * Advantage: commands are a familiar way of marshalling parallel requests in Bevy * Disadvantage: it may feel clunky for users because they would need to explicitly communicate the publisher entity to the command * Additional method `Publisher::parallel_publish(&self, msg: T)` that uses a mutex to protect access to the message queue @@ -286,7 +380,7 @@ It would be good if it were possible for users to specify how they want the publ Even if we lock users into `SmallVec` (which is probably a good choice for +90% of use cases), users may still want to specify the default pre-allocation size based on their knowledge of what publishing patterns they can expect. Since the container or size information would need to be part of the Publisher's type information as a generic parameter, the publisher would become harder to query, and the queries would become sensitive to the choice of container. -To work nicely, this capability would require [trait queries](https://crates.io/crates/bevy-trait-query) where `Publisher` is a trait instead of a struct. +To work nicely, this capability would require [trait queries](https://github.com/bevyengine/rfcs/pull/39) where `Publisher` is a trait instead of a struct. ### Stream operations From 6c200b208badbe84ff66d0a4716990d633c069ca Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 1 Nov 2022 23:03:29 +0800 Subject: [PATCH 3/7] Remove mention of services Signed-off-by: Michael X. Grey --- rfcs/xx-topics.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rfcs/xx-topics.md b/rfcs/xx-topics.md index 03a8484b..2bd88578 100644 --- a/rfcs/xx-topics.md +++ b/rfcs/xx-topics.md @@ -42,7 +42,7 @@ The different conditional combinations that need to be accounted for are express At the same time, defining this generic ergonomic pipeline for topics within Bevy should help facilitate the integration of various middlewares into Bevy. For example, a `bevy_mqtt` plugin could simply subscribe to bevy topics and then effortlessly publish the received messages to MQTT topics, and vice-versa for two-way communication. -Bevy users who want their application to have networking or inter-process communication can implement their systems in terms of the bevy topics and services, then plug in whichever middleware suits their overall networked / multi-process system. +Bevy users who want their application to have networking or inter-process communication can implement their systems in terms of the bevy topics, then plug in whichever middleware suits their overall networked / multi-process system. They could also change their choice of middleware later without changing how the systems of their application are implemented. Or multiple middlewares can be used by one application without affecting how the systems are implemented. From c09f3a1012c5ceeec13df9b81b4a01ba172f3de9 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 1 Nov 2022 23:13:49 +0800 Subject: [PATCH 4/7] Tweak example code Signed-off-by: Michael X. Grey --- rfcs/xx-topics.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/rfcs/xx-topics.md b/rfcs/xx-topics.md index 2bd88578..b669f3b1 100644 --- a/rfcs/xx-topics.md +++ b/rfcs/xx-topics.md @@ -56,7 +56,7 @@ These are the key structs for topics: * `Domain`: a component that maintains a dictionary of topic names along with the publishers and subscribers that are interested in each topic * `GlobalDomain`: a Domain that can be used as a resource instead of as a component -To send out messages, spawn a `Publisher` component for an entity, optionally giving some initial messages: +To send out messages, insert a `Publisher` component for an entity, optionally giving some initial messages: ```rust enum Mission { @@ -68,10 +68,10 @@ enum Mission { /// Colonels issue missions to their battalions, so they need to publish Mission messages fn connect_colonels( commands: Commands, - new_colonels: Query<(Entity, &Battalion), Added>, + new_colonels: Query>, domain: ResMut>, ) { - for (e, battalion) in &new_colonels { + for e in &new_colonels { let mut publisher = Publisher::::new(); publisher.publish(Mission::Hold); commands.entity(e).insert(publisher); @@ -82,11 +82,11 @@ fn connect_colonels( To use a spawned publisher, query for a mutable reference and use its `publish(data: T)` method: ```rust -/// On each update cycle, the colonels decide what action to take +/// On each update cycle, the colonels decide what action to take based on their battle conditions fn update_colonels( - armies: Query<(&BattleConditions, &mut Publisher), With>, + colonels: Query<(&BattleConditions, &mut Publisher), With>, ) { - for (condition, mission) in &mut publishers { + for (condition, mission) in &mut colonels { if condition.is_critical() { mission.publish(Mission::Retreat); } else if condition.have_advantage() { @@ -117,7 +117,7 @@ fn connect_colonels( } ``` -To receive and respond to messages from a specific publisher, spawn a `Subscriber` component for an entity and use its `subscribe(publisher: Entity, callback: F)` method: +To receive and respond to messages from a specific publisher, insert a `Subscriber` component for an entity and use its `subscribe(publisher: Entity, callback: F)` method: ```rust /// Soldiers follow the missions issued to them by their colonels @@ -127,7 +127,7 @@ fn connect_soldiers( new_archers: Query<(Entity, &Battalion), Added>, ) { // Cavalry units listen for missions from their colonel and then charge, defend, or flee - for (e, battalion) in initial_soldiers.cavalry() { + for (e, battalion) in &new_cavalries { let mut subscriber = Subscriber::::new(); subscriber.subscribe( battalion.colonel(), @@ -151,7 +151,7 @@ fn connect_soldiers( } // Archer units listen for missions from the colonel and then fire, aim, or flee - for (e, battalion) in initial_soldiers.archers() { + for (e, battalion) in &new_archers { let mut subscriber = Subscriber::::new(); subscriber.subscribe( battalion.colonel(), @@ -201,7 +201,7 @@ fn connect_spies( } for (e, imposter) in &new_imposters { - // Imposters also publish battalion missions to confuse informants. + // Imposters publish battalion missions to confuse informants. // Ordinary soldiers are subscribed directly to their colonels so they // are not confused by imposters. commands.entity(e).insert(Publisher::::new()); From 3ad08da6d7b08549b5c008d589b3b5f7b9f2bfd0 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 1 Nov 2022 23:28:30 +0800 Subject: [PATCH 5/7] Iterating Signed-off-by: Michael X. Grey --- rfcs/xx-topics.md | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/rfcs/xx-topics.md b/rfcs/xx-topics.md index b669f3b1..243dcd55 100644 --- a/rfcs/xx-topics.md +++ b/rfcs/xx-topics.md @@ -223,7 +223,7 @@ I am currently not clear on how exactly dynamic system construction can/does wor * Once per cycle, the message queue in the publisher will be processed against all current subscribers. * For each subscriber of each publisher that has a non-empty queue, generate a one-shot system. The one-shot system would query for: * immutable reference to the publisher, to have access to its message queue - * the system parameter needed by the subscriber + * the system parameter needed by the subscriber's callback * The one-shot system would iterate through the message queue of the publisher, passing an immutable reference to each message into the subscriber's callback, along with the system parameter needed by the callback. * The subscription callbacks will be able to run in parallel, as long as their additional system parameters don't have conflicts. @@ -232,7 +232,7 @@ I am currently not clear on how exactly dynamic system construction can/does wor * When a new subscription is created using `Subscriber::subscribe`, the entity of the publisher is saved in a special queue for new subscriptions * On each cycle, before published messages are processed, a system runs through all subscribers to process any changes in subscriptions * (Consider ways to reduce overhead so this isn't an O(N) operation over all the subscribers on every cycle, maybe using a shared atomic bool to decide if changes even need to be checked) - * When a new subscription is found, the publisher is informed about the entity for that subscriptions + * When a new subscription is found, the publisher is informed about the subscriber entity for that subscription * When an old subscription is removed, the publisher is informed so it does not keep trying to send messages to the subscriber * Once the publisher is informed of the new subscriber, the publisher's entity is removed from the special new subscription queue * The one-shot system produced by the publisher will use a private function to retrieve the correct callback from the subscriber based on the publisher's entity @@ -249,7 +249,7 @@ Whenever a publisher is advertised or a subscriber requests discovery, the Domai * Dropped publishers/subscribers are identified and any lingering connections to them are erased The connections created by the `Domain` need to be semantically different from the connections formed by directly subscribing to a publisher. -For one thing, Domain-created connections need to be reference-counted: If a subscriber is subscribed to multiple topics and a publisher is publishing to more than one of those topics, then when the subscriber unsubscribes from one of the topics it should not unsubscribe from the publisher. +For one thing, Domain-created connections need to be reference-counted: If a subscriber is subscribed to multiple topics and a publisher is publishing to more than one of those topics, then when the subscriber unsubscribes from one of the topics it should not fully unsubscribe from the publisher. The subscriber's connection to the publisher should only be broken when both conditions are met: 1. Any topics subscriptions that connected the publisher and subscriber are dropped 2. Any direct connection that was made between the publisher and subscriber is dropped @@ -307,6 +307,15 @@ Should topic names use `String` or should they be more like `StageLabel` where t ### Callback timing In what stage should subscription callbacks be processed? Is that something that users should be able to set, and if so how? Should that be customizable per subscription? +### Repeat subscriptions +What should happen if a subscriber gets subscribed to the same publisher multiple times with different subscriptions? +What should happen if a publisher advertises on multiple topics and the same subscriber is discovering those multiple topics? + +Should all of the subscribed callbacks be triggered? +Should only the most recently subscribed callback be triggered? + +Maybe instead of `Subscriber` we should have `Subscription` that repesents only a single connection and a single callback, stored on its own entity. Then we would have `Subscriber` without a `T` parameter as a parent and manager of the `Subscription` entities. + ## Future possibilities ### Publisher `Description` From e89bb1b3d9259df64cc7341bca55bb548bf42fd7 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 1 Nov 2022 23:42:20 +0800 Subject: [PATCH 6/7] Iterating Signed-off-by: Michael X. Grey --- rfcs/xx-topics.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rfcs/xx-topics.md b/rfcs/xx-topics.md index 243dcd55..44c8775c 100644 --- a/rfcs/xx-topics.md +++ b/rfcs/xx-topics.md @@ -281,9 +281,9 @@ Being driven by callbacks provides a few helpful benefits: * Callbacks offer a simple way to turn reactions on/off by toggling subscriptions However, if the user has no control over when the callbacks are processed it may be harder for them to synchronize the overall behavior of their application. -Ideally users should implement their applications to not be sensitive to callback ordering, but that might not be a realistic requirement to impose. +Ideally users should implement their applications to not be sensitive to callback timing, but that might not be a realistic requirement to impose. Maybe subscriptions should also be able to specify what stage their callback should be processed in. -This concern will also likely be affected by the upcoming stageless systems design. +This concern will certainly be affected by the upcoming stageless systems design. ## Prior art @@ -297,7 +297,7 @@ The concept is also associated with a more general pattern of "reactive programm The `bevy_topics` plugin somehow needs to create systems that operate on `Publisher` and `Subscriber` components where `T` may vary. One option would be to require users to register each type they want to support messages for when adding the `bevy_topics` plugin. -I would prefer to avoid this because requirement because I think users would be confused when they create a connection between entities with `Publisher` and `Subscriber` components but the connection quietly fails because they forgot to register `Foo`. +I would prefer to avoid this requirement because I think users would be confused when they create a connection between entities with `Publisher` and `Subscriber` components but the connection quietly fails because they forgot to register `Foo`. The only other way I can think of making this work would be to use something like [trait queries](https://github.com/bevyengine/rfcs/pull/39) where `Publisher` and `Subscriber` implement traits that allow them to be discovered by the `bevy_topics` system. @@ -308,13 +308,13 @@ Should topic names use `String` or should they be more like `StageLabel` where t In what stage should subscription callbacks be processed? Is that something that users should be able to set, and if so how? Should that be customizable per subscription? ### Repeat subscriptions -What should happen if a subscriber gets subscribed to the same publisher multiple times with different subscriptions? +What should happen if a subscriber gets subscribed to the same publisher multiple times with different subscription callbacks? What should happen if a publisher advertises on multiple topics and the same subscriber is discovering those multiple topics? Should all of the subscribed callbacks be triggered? Should only the most recently subscribed callback be triggered? -Maybe instead of `Subscriber` we should have `Subscription` that repesents only a single connection and a single callback, stored on its own entity. Then we would have `Subscriber` without a `T` parameter as a parent and manager of the `Subscription` entities. +Maybe instead of `Subscriber` we should have `Subscription` that repesents only a single connection and a single callback, stored on its own entity. Then we would have `Subscriber` without a `T` parameter as a parent and manager of the `Subscription` entities that are created for it. ## Future possibilities @@ -368,7 +368,7 @@ impl Subscription { } ``` -This may require adding a second message queue to the `Publisher` struct. +This may require adding another message queue to the `Publisher` struct to save the history. ### Parallel Publishing @@ -379,7 +379,7 @@ It would be good to have ways for systems to publish with the same publisher in * Custom command that flushes messages into publishers * Advantage: commands are a familiar way of marshalling parallel requests in Bevy * Disadvantage: it may feel clunky for users because they would need to explicitly communicate the publisher entity to the command -* Additional method `Publisher::parallel_publish(&self, msg: T)` that uses a mutex to protect access to the message queue +* Alternative publish method `Publisher::parallel_publish(&self, msg: T)` that uses a mutex to protect access to the message queue * Advantage: ergonomic and easy to understand * Disadvantage: introducing mutexes could mean a noticeable performance hit @@ -395,4 +395,4 @@ To work nicely, this capability would require [trait queries](https://github.com It may be worth considering whether/how bevy can facilitate transformations and operations on data streams the way [ReactiveX](https://reactivex.io/documentation/operators.html) does. -As a simple example, consider an operator that zips two topics/publishers `Publisher`, `Publisher` together to effectively provide `Publisher<(U, V)>` that can be subscribed to. The zipped publisher would publish a new message each time a new message has arrived for both `U` and `V`. +As a simple example, consider an operator that zips two topics/publishers `Publisher`, `Publisher` together to effectively provide a `Publisher<(U, V)>` that can be subscribed to. The zipped publisher would publish a new message each time a new message has arrived for both `U` and `V`. From d6bc24636a98ef4e717060674c355b5641095fb9 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Wed, 2 Nov 2022 00:51:22 +0800 Subject: [PATCH 7/7] Incorporate the PR number Signed-off-by: Michael X. Grey --- rfcs/{xx-topics.md => 65-topics.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename rfcs/{xx-topics.md => 65-topics.md} (100%) diff --git a/rfcs/xx-topics.md b/rfcs/65-topics.md similarity index 100% rename from rfcs/xx-topics.md rename to rfcs/65-topics.md