diff --git a/editoast/Cargo.lock b/editoast/Cargo.lock index ce32021b246..12a4abb4b02 100644 --- a/editoast/Cargo.lock +++ b/editoast/Cargo.lock @@ -349,6 +349,54 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "amq-protocol" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d40d8b2465c7959dd40cee32ba6ac334b5de57e9fca0cc756759894a4152a5d" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cb2100adae7da61953a2c3a01935d86caae13329fadce3333f524d6d6ce12e2" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing", +] + +[[package]] +name = "amq-protocol-types" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "156ff13c8a3ced600b4e54ed826a2ae6242b6069d00dd98466827cef07d3daff" +dependencies = [ + "cookie-factory", + "nom", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "751bbd7d440576066233e740576f1b31fdc6ab86cfabfbd48c548de77eca73e4" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -520,6 +568,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-global-executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33dd14c5a15affd2abcff50d84efd4009ada28a860f01c14f9d654f3e81b3f75" +dependencies = [ + "async-global-executor", + "async-trait", + "executor-trait", +] + [[package]] name = "async-io" version = "1.13.0" @@ -579,6 +638,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e" +dependencies = [ + "async-io 1.13.0", + "async-trait", + "futures-core", + "reactor-trait", +] + [[package]] name = "async-std" version = "1.12.0" @@ -811,6 +882,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "blocking" version = "1.5.1" @@ -893,6 +973,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdd7a427adc0135366d99db65b36dae9237130997e560ed61118041fb72be6e8" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.90" @@ -944,6 +1033,16 @@ dependencies = [ "windows-targets 0.52.4", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "4.5.4" @@ -1046,6 +1145,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "cookie-factory" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9885fa71e26b8ab7855e2ec7cae6e9b380edff76cd052e07c683a0319d51b3a2" +dependencies = [ + "futures 0.3.30", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -1279,6 +1387,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher", +] + [[package]] name = "diesel" version = "2.1.5" @@ -1359,6 +1476,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "editoast" version = "0.1.0" @@ -1393,6 +1516,7 @@ dependencies = [ "inventory", "itertools 0.12.1", "json-patch", + "lapin", "mvt", "openssl", "opentelemetry", @@ -1599,6 +1723,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a1052dd43212a7777ec6a69b117da52f5e52f07aec47d00c1a2b33b85d06b08" +dependencies = [ + "async-trait", +] + [[package]] name = "exr" version = "1.72.0" @@ -1606,7 +1739,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "887d93f60543e9a9362ef8a21beedd0a833c5d9610e18c67abe15a5963dcb1a4" dependencies = [ "bit_field", - "flume", + "flume 0.11.0", "half", "lebe", "miniz_oxide", @@ -1689,6 +1822,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "pin-project", + "spin", +] + [[package]] name = "flume" version = "0.11.0" @@ -2266,6 +2411,16 @@ dependencies = [ "serde", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -2402,6 +2557,28 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" +[[package]] +name = "lapin" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f3067a1fcfbc3fc46455809c023e69b8f6602463201010f4ae5a3b572adb9dc" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume 0.10.14", + "futures-core", + "futures-io", + "parking_lot 0.12.1", + "pinky-swear", + "reactor-trait", + "serde", + "tracing", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -2990,6 +3167,23 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "p12" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4873306de53fe82e7e484df31e1e947d61514b6ea2ed6cd7b45d63006fd9224" +dependencies = [ + "cbc", + "cipher", + "des", + "getrandom", + "hmac", + "lazy_static", + "rc2", + "sha1", + "yasna", +] + [[package]] name = "par-map" version = "0.1.4" @@ -3136,6 +3330,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinky-swear" +version = "6.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cfae3ead413ca051a681152bd266438d3bfa301c9bdf836939a14c721bb2a21" +dependencies = [ + "doc-comment", + "flume 0.11.0", + "parking_lot 0.12.1", + "tracing", +] + [[package]] name = "piper" version = "0.2.1" @@ -3574,6 +3780,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rc2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd" +dependencies = [ + "cipher", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -3583,6 +3798,17 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58" +dependencies = [ + "async-trait", + "futures-core", + "futures-io", +] + [[package]] name = "redis" version = "0.25.3" @@ -3726,6 +3952,21 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rmp" version = "0.8.12" @@ -3814,6 +4055,42 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-connector" +version = "0.18.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25da151615461c7347114b1ad1a7458b4cdebc69cb220cd140cd5cb324b1dd37" +dependencies = [ + "log", + "rustls", + "rustls-native-certs", + "rustls-webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -3823,6 +4100,16 @@ dependencies = [ "base64", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -3860,6 +4147,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.10.0" @@ -4389,6 +4686,18 @@ version = "0.12.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1fc403891a21bcfb7c37834ba66a547a8f402146eba7265b5a6d88059c9ff2f" +[[package]] +name = "tcp-stream" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4da30af7998f51ee1aa48ab24276fe303a697b004e31ff542b192c088d5630a5" +dependencies = [ + "cfg-if", + "p12", + "rustls-connector", + "rustls-pemfile", +] + [[package]] name = "tempfile" version = "3.10.1" @@ -4839,6 +5148,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "ureq" version = "2.9.6" @@ -5307,6 +5622,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/editoast/Cargo.toml b/editoast/Cargo.toml index a06eb7d23bf..f145a45a389 100644 --- a/editoast/Cargo.toml +++ b/editoast/Cargo.toml @@ -102,6 +102,7 @@ url = "2.5.0" utoipa.workspace = true uuid.workspace = true validator = { version = "0.17.0", features = ["derive"] } +lapin = "2.3.1" [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } diff --git a/editoast/src/core/mod.rs b/editoast/src/core/mod.rs index 50398aed7bc..525509abb2b 100644 --- a/editoast/src/core/mod.rs +++ b/editoast/src/core/mod.rs @@ -4,6 +4,7 @@ pub mod infra_loading; pub mod infra_state; #[cfg(test)] pub mod mocking; +pub mod mq_client; pub mod pathfinding; pub mod simulation; pub mod stdcm; diff --git a/editoast/src/core/mq_client.rs b/editoast/src/core/mq_client.rs new file mode 100644 index 00000000000..b7c05440c5f --- /dev/null +++ b/editoast/src/core/mq_client.rs @@ -0,0 +1,236 @@ +use futures_util::StreamExt; +use lapin::{ + options::{BasicPublishOptions, QueueDeclareOptions}, + types::FieldTable, + BasicProperties, Channel, Connection, ConnectionProperties, +}; +use serde::{Deserialize, Serialize}; +use serde_json::to_vec; +use std::fmt::Debug; +use tokio::time::{timeout, Duration}; +use uuid::Uuid; + +pub struct RabbitMQClient { + connection: Connection, + core_queue_prefix: String, + exchange: String, + timeout: u64, +} + +pub struct Options { + /// format `amqp://username:password@host:port/vhost` + /// for instance: `amqp://osrd:password@localhost:5672/%2f` for the default vhost + pub uri: String, + /// Exchange name + pub exchange: String, + /// Prefix for the core queues + pub core_queue_prefix: String, + /// Default timeout for the response + pub timeout: u64, +} + +#[derive(Serialize, Deserialize, Debug)] +struct MessageEnvelope +where + T: Serialize + Debug, +{ + payload: T, + message_type: String, + respond_to: Option, + infra_expected_version: i64, +} + +impl MessageEnvelope +where + T: Serialize + Debug, +{ + fn new( + payload: T, + message_type: String, + respond_to: Option, + infra_expected_version: i64, + ) -> Self { + MessageEnvelope { + payload, + message_type, + respond_to, + infra_expected_version, + } + } + + fn serialized_payload(&self) -> Result, Error> { + Ok(to_vec(&self).map_err(Error::SerializationError)?) + } +} + +#[derive(Debug)] +pub enum Error { + Lapin(lapin::Error), + SerializationError(serde_json::Error), + ResponseTimeout, +} + +impl RabbitMQClient { + pub async fn new(options: Options) -> Result { + let connection = Connection::connect(&options.uri, ConnectionProperties::default()).await?; + Ok(RabbitMQClient { + connection, + core_queue_prefix: options.core_queue_prefix, + exchange: options.exchange, + timeout: options.timeout, + }) + } + + /// Create the core queue. + /// If it does not exist, the queue will be created with the provided options. + /// If it exists, the queue_declare call will not have any effect. + pub async fn create_queue_for_core( + &self, + channel: &Channel, + infra_id: i64, + ) -> Result { + let core_queue_name = format!("{}-{}", self.core_queue_prefix, infra_id); + + channel + .queue_declare( + &core_queue_name, + QueueDeclareOptions { + exclusive: false, + auto_delete: false, + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await + .map_err(Error::Lapin)?; + + Ok(core_queue_name) + } + + pub async fn call( + &self, + message_type: &str, + infra_id: i64, + infra_expected_version: i64, + payload: T, + ) -> Result<(), Error> + where + T: Serialize + Debug, + { + // Create a channel + let channel = self + .connection + .create_channel() + .await + .map_err(Error::Lapin)?; + + // Prepare the message + let serialized_payload = MessageEnvelope::new( + payload, + message_type.to_string(), + None, + infra_expected_version, + ) + .serialized_payload()?; + + // Prepare the queue and publish the message to the core queue + let core_queue_name = self.create_queue_for_core(&channel, infra_id).await?; + channel + .basic_publish( + &self.exchange, + core_queue_name.as_str(), + BasicPublishOptions::default(), + &serialized_payload, + BasicProperties::default(), + ) + .await + .map_err(Error::Lapin)?; + + Ok(()) + } + + pub async fn call_with_response( + &self, + message_type: &str, + infra_id: i64, + infra_expected_version: i64, + payload: T, + override_timeout: Option, + ) -> Result + where + T: Serialize + Debug, + TR: for<'de> Deserialize<'de> + Debug, + { + // Create a channel + let channel = self + .connection + .create_channel() + .await + .map_err(Error::Lapin)?; + + // Declare a queue with a random name for the response. + // This queue will be deleted after the response is received. + let temp_queue_name = format!("resp-{}", Uuid::new_v4()); + let temp_queue = channel + .queue_declare( + &temp_queue_name, + QueueDeclareOptions { + exclusive: true, + auto_delete: true, + durable: false, + ..Default::default() + }, + FieldTable::default(), + ) + .await + .map_err(Error::Lapin)?; + + // Prepare the message + let serialized_payload = MessageEnvelope::new( + payload, + message_type.to_string(), + Some(temp_queue_name), + infra_expected_version, + ) + .serialized_payload()?; + + let core_queue_name = self.create_queue_for_core(&channel, infra_id).await?; + + // Publish the message to the core queue + channel + .basic_publish( + &self.exchange, + core_queue_name.as_str(), + BasicPublishOptions::default(), + &serialized_payload, + BasicProperties::default(), + ) + .await + .map_err(Error::Lapin)?; + + // Create a consumer for the temporary response queue + let consumer = channel + .basic_consume( + &temp_queue.name().as_str(), + "editoast", + lapin::options::BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .map_err(Error::Lapin)?; + + // Wait for the response, resolve it and return it + let timeout_duration = override_timeout.unwrap_or(self.timeout); + let message = timeout( + Duration::from_secs(timeout_duration), + consumer.into_future(), + ) + .await + .map_err(|_| Error::ResponseTimeout)? + .0 + .unwrap() + .map_err(Error::Lapin)?; + Ok(serde_json::from_slice(&message.data).map_err(Error::SerializationError)?) + } +}