From 20d9ad48d6d0805e684755f3494556622b854e4c Mon Sep 17 00:00:00 2001 From: meryacine Date: Tue, 2 Aug 2022 09:22:24 +0200 Subject: [PATCH 01/14] feat(lightning): add tower messages Add message (de)serilaization to teos-common This commit adds lightning messages and message (de)serialization using helper local modules (ser_utils, ser_macros) and LDK v0.0.108 instead of the attempted fork used before (https://github.com/meryacine/rust-lightning/tree/expose-tlv-macros) Most of the serialization macros and utils are copied from LDK's own serialization framework though. Also we might remove some of the locally implemented TLV stuff if they ever get exposed by LDK. But some of the macros have been edited and diverged from LDK's first implementation: for example, the `vec` arms for the macros were adjusted so that it works with `impl_writeable_msg` macro to send arrays as TLV (which is not the case for LDK ATM). Note that we could have had vectors(arrays) be inside `Option`s and put in a TLV, but this will waste extra two bytes for vector length annotation (we don't need the vector length annotated in the V(value) of the TLV since L(length) already embeds this information) We also now have another macro match arm `str_opt`, which indicates an optional string inside a TLV, this was created to avoid adding extra length field for strings just like `vec`. --- teos-common/src/lib.rs | 1 + teos-common/src/lightning/messages.rs | 243 ++++++++++++++++++++++++ teos-common/src/lightning/mod.rs | 3 + teos-common/src/lightning/ser_macros.rs | 200 +++++++++++++++++++ teos-common/src/lightning/ser_utils.rs | 172 +++++++++++++++++ 5 files changed, 619 insertions(+) create mode 100644 teos-common/src/lightning/messages.rs create mode 100644 teos-common/src/lightning/mod.rs create mode 100644 teos-common/src/lightning/ser_macros.rs create mode 100644 teos-common/src/lightning/ser_utils.rs diff --git a/teos-common/src/lib.rs b/teos-common/src/lib.rs index 7c8b621c..ac628258 100644 --- a/teos-common/src/lib.rs +++ b/teos-common/src/lib.rs @@ -13,6 +13,7 @@ pub mod constants; pub mod cryptography; pub mod dbm; pub mod errors; +pub mod lightning; pub mod net; pub mod receipts; pub mod ser; diff --git a/teos-common/src/lightning/messages.rs b/teos-common/src/lightning/messages.rs new file mode 100644 index 00000000..c7a113aa --- /dev/null +++ b/teos-common/src/lightning/messages.rs @@ -0,0 +1,243 @@ +//! Watchtower custom lightning messages that implement LDK's [`Readable`] & [`Writeable`] traits. +//! +//! [`Readable`]: lightning::util::ser::Readable + +use crate::appointment::Locator; +use crate::lightning::ser_macros::{impl_writeable_msg, set_msg_type}; +use bitcoin::secp256k1::PublicKey; +use bitcoin::Txid; +use lightning::io::Error; +use lightning::ln::wire; +use lightning::util::ser::{Writeable, Writer}; + +// Re-exporting this for other crates to use. +pub use crate::lightning::ser_utils::Type; + +/// The register message sent by the user to subscribe for the watching service. +#[derive(Debug)] +pub struct Register { + pub pubkey: PublicKey, + pub appointment_slots: u32, + pub subscription_period: u32, +} + +/// The subscription details message that is sent to the user after registering or toping up. +/// This message is the response to the register message. +#[derive(Debug)] +pub struct SubscriptionDetails { + pub appointment_max_size: u16, + pub amount_msat: u32, + // Optional TLV. + pub invoice: Option, + pub signature: Option, +} + +/// The add/update appointment message sent by the user. +#[derive(Debug)] +pub struct AddUpdateAppointment { + pub locator: Locator, + // NOTE: LDK will prefix varying size fields (e.g. vectors and strings) with their length. + pub encrypted_blob: Vec, + pub signature: String, + // Optional TLV. + pub to_self_delay: Option, +} + +/// The appointment accepted message that is sent after an accepted add/update appointment message. +#[derive(Debug)] +pub struct AppointmentAccepted { + pub locator: Locator, + pub start_block: u32, + // Optional TLV. + pub receipt_signature: Option, +} + +/// The appointment rejected message that is sent if an add/update appointment message was rejected. +#[derive(Debug)] +pub struct AppointmentRejected { + pub locator: Locator, + pub rcode: u16, + pub reason: String, +} + +/// The get appointment message sent by the user to retrieve a previously sent appointment from the tower. +#[derive(Debug)] +pub struct GetAppointment { + pub locator: Locator, + pub signature: String, +} + +/// The appointment data message sent by the tower after a get appointment message. +#[derive(Debug)] +pub struct AppointmentData { + pub locator: Locator, + pub encrypted_blob: Vec, +} + +/// The tracker data message sent by the tower when the requested appointment has been acted upon. +#[derive(Debug)] +pub struct TrackerData { + pub dispute_txid: Txid, + pub penalty_txid: Txid, + pub penalty_rawtx: Vec, +} + +/// The appointment not found message sent by the tower in response to a get appointment message +/// whose locator didn't match any known appointment. +#[derive(Debug)] +pub struct AppointmentNotFound { + pub locator: Locator, +} + +/// The get subscription info message (a TEOS custom message, not a bolt13 one). +#[derive(Debug)] +pub struct GetSubscriptionInfo { + pub signature: String, +} + +/// The subscription info message sent by the tower in response to get subscription info message. +#[derive(Debug)] +pub struct SubscriptionInfo { + pub available_slots: u32, + pub subscription_expiry: u32, + // Sent as a TLV. Defaults to an empty vector. + pub locators: Vec, +} + +impl_writeable_msg!(Register, { + pubkey, + appointment_slots, + subscription_period +}, {}); + +impl_writeable_msg!(SubscriptionDetails, { + appointment_max_size, + amount_msat, +}, { + // Use `opt_str` and not `opt` to avoid writing a length prefix for strings + // since it's already written in the length part of the TLV. + (1, invoice, opt_str), + (3, signature, opt_str), +}); + +impl_writeable_msg!(AddUpdateAppointment, { + locator, + encrypted_blob, + signature, +}, { + (1, to_self_delay, opt), +}); + +impl_writeable_msg!(AppointmentAccepted, { + locator, + start_block, +}, { + // Use `opt_str` and not `opt` to avoid writing a length prefix for strings + // since it's already written in the length part of the TLV. + (1, receipt_signature, opt_str), +}); + +impl_writeable_msg!(AppointmentRejected, { + locator, + rcode, + reason, +}, {}); + +impl_writeable_msg!(GetAppointment, { + locator, + signature, +}, {}); + +impl_writeable_msg!(AppointmentData, { + locator, + encrypted_blob, +}, {}); + +impl_writeable_msg!(TrackerData, { + dispute_txid, + penalty_txid, + penalty_rawtx, +}, {}); + +impl_writeable_msg!(AppointmentNotFound, { + locator, +}, {}); + +impl_writeable_msg!(GetSubscriptionInfo, { + signature, +}, {}); + +impl_writeable_msg!(SubscriptionInfo, { + available_slots, + subscription_expiry, +}, { + (1, locators, vec) +}); + +set_msg_type!(Register, 48848); +set_msg_type!(SubscriptionDetails, 48850); +set_msg_type!(AddUpdateAppointment, 48852); +set_msg_type!(AppointmentAccepted, 48854); +set_msg_type!(AppointmentRejected, 48856); +set_msg_type!(GetAppointment, 48858); +set_msg_type!(AppointmentData, 48860); +set_msg_type!(TrackerData, 48862); +set_msg_type!(AppointmentNotFound, 48864); +// Let these messages get odd types since they are auxiliary messages. +set_msg_type!(GetSubscriptionInfo, 48865); +set_msg_type!(SubscriptionInfo, 48867); + +#[derive(Debug)] +pub enum TowerMessage { + // Register messages + Register(Register), + SubscriptionDetails(SubscriptionDetails), + // Appointment submission messages + AddUpdateAppointment(AddUpdateAppointment), + AppointmentAccepted(AppointmentAccepted), + AppointmentRejected(AppointmentRejected), + // Appointment fetching messages + GetAppointment(GetAppointment), + AppointmentData(AppointmentData), + TrackerData(TrackerData), + AppointmentNotFound(AppointmentNotFound), + // User subscription messages + GetSubscriptionInfo(GetSubscriptionInfo), + SubscriptionInfo(SubscriptionInfo), +} + +impl wire::Type for TowerMessage { + fn type_id(&self) -> u16 { + match self { + TowerMessage::Register(..) => Register::TYPE, + TowerMessage::SubscriptionDetails(..) => SubscriptionDetails::TYPE, + TowerMessage::AddUpdateAppointment(..) => AddUpdateAppointment::TYPE, + TowerMessage::AppointmentAccepted(..) => AppointmentAccepted::TYPE, + TowerMessage::AppointmentRejected(..) => AppointmentRejected::TYPE, + TowerMessage::GetAppointment(..) => GetAppointment::TYPE, + TowerMessage::AppointmentData(..) => AppointmentData::TYPE, + TowerMessage::TrackerData(..) => TrackerData::TYPE, + TowerMessage::AppointmentNotFound(..) => AppointmentNotFound::TYPE, + TowerMessage::GetSubscriptionInfo(..) => GetSubscriptionInfo::TYPE, + TowerMessage::SubscriptionInfo(..) => SubscriptionInfo::TYPE, + } + } +} + +impl Writeable for TowerMessage { + fn write(&self, writer: &mut W) -> Result<(), Error> { + match self { + TowerMessage::Register(msg) => msg.write(writer), + TowerMessage::SubscriptionDetails(msg) => msg.write(writer), + TowerMessage::AddUpdateAppointment(msg) => msg.write(writer), + TowerMessage::AppointmentAccepted(msg) => msg.write(writer), + TowerMessage::AppointmentRejected(msg) => msg.write(writer), + TowerMessage::GetAppointment(msg) => msg.write(writer), + TowerMessage::AppointmentData(msg) => msg.write(writer), + TowerMessage::TrackerData(msg) => msg.write(writer), + TowerMessage::AppointmentNotFound(msg) => msg.write(writer), + TowerMessage::GetSubscriptionInfo(msg) => msg.write(writer), + TowerMessage::SubscriptionInfo(msg) => msg.write(writer), + } + } +} diff --git a/teos-common/src/lightning/mod.rs b/teos-common/src/lightning/mod.rs new file mode 100644 index 00000000..5a788a7a --- /dev/null +++ b/teos-common/src/lightning/mod.rs @@ -0,0 +1,3 @@ +pub mod messages; +mod ser_macros; +mod ser_utils; diff --git a/teos-common/src/lightning/ser_macros.rs b/teos-common/src/lightning/ser_macros.rs new file mode 100644 index 00000000..675b09db --- /dev/null +++ b/teos-common/src/lightning/ser_macros.rs @@ -0,0 +1,200 @@ +//! A module containing some trait implementation macros to avoid repetition. +//! Most of this file is taken/inspired from [here](https://github.com/lightningdevkit/rust-lightning/blob/3676a056c85f54347e7e079e913317a79e26a2ae/lightning/src/util/ser_macros.rs). + +/* This file is licensed under either of + * Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or + * MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT) + * at your option. +*/ + +macro_rules! encode_tlv { + ($stream: expr, $type: expr, $field: expr, opt) => { + if let Some(ref field) = $field { + ser_macros::encode_tlv!($stream, $type, field); + } + }; + ($stream: expr, $type: expr, $field: expr, vec) => { + // Don't write the vector if it's empty. + if !$field.is_empty() { + // We can't just pass `$field` since this will move it out of the struct we are implementing + // this serialization for (but we could have cloned). That's why we pass a reference to it. + let lightning_vec = ser_utils::LightningVecWriter(&$field); + ser_macros::encode_tlv!($stream, $type, lightning_vec); + } + }; + ($stream: expr, $type: expr, $field: expr, opt_str) => { + if let Some(ref field) = $field { + let lightning_str = ser_utils::LightningVecWriter(field.as_bytes()); + ser_macros::encode_tlv!($stream, $type, lightning_str); + } + }; + ($stream: expr, $type: expr, $field: expr) => { + BigSize($type).write($stream)?; + BigSize($field.serialized_length() as u64).write($stream)?; + $field.write($stream)?; + }; +} + +macro_rules! encode_tlv_stream { + ($stream: expr, {$(($type: expr, $field: expr, $fieldty: tt)),* $(,)*}) => { { + #[allow(unused_imports)] + use { + lightning::util::ser::BigSize, + $crate::lightning::{ser_macros, ser_utils}, + }; + + $( + ser_macros::encode_tlv!($stream, $type, $field, $fieldty); + )* + + #[allow(unused_mut, unused_variables, unused_assignments, unused_comparisons)] + #[cfg(debug_assertions)] + { + let mut last_seen: Option = None; + $( + if let Some(t) = last_seen { + debug_assert!(t < $type, "{} <= {}; TLV types must be strictly increasing", $type, t); + } + last_seen = Some($type); + )* + } + } } +} + +macro_rules! decode_tlv { + ($reader: expr, $field: ident, opt) => { + $field = Some(Readable::read(&mut $reader)?); + }; + ($reader: expr, $field: ident, vec) => { + let lightning_vec = ser_utils::LightningVecReader::read(&mut $reader)?; + $field = lightning_vec.0; + }; + ($reader: expr, $field: ident, opt_str) => { + let lightning_str = ser_utils::LightningVecReader::read(&mut $reader)?; + let inner_str = + String::from_utf8(lightning_str.0).map_err(|_| DecodeError::InvalidValue)?; + $field = Some(inner_str); + }; +} + +macro_rules! decode_tlv_stream { + ($stream: expr, {$(($type: expr, $field: ident, $fieldty: tt)),* $(,)*}) => { { + #[allow(unused_imports)] + use { + lightning::ln::msgs::DecodeError, + lightning::util::ser::{BigSize, Readable}, + $crate::lightning::{ser_macros, ser_utils}, + }; + + let mut last_seen_type: Option = None; + let mut stream_ref = $stream; + + loop { + // First decode the type of this TLV: + let typ: BigSize = { + let mut tracking_reader = ser_utils::ReadTrackingReader::new(&mut stream_ref); + match Readable::read(&mut tracking_reader) { + Err(DecodeError::ShortRead) => { + if !tracking_reader.have_read { + break; + } else { + return Err(DecodeError::ShortRead); + } + }, + Err(e) => return Err(e), + Ok(t) => t, + } + }; + + // Types must be unique and monotonically increasing: + match last_seen_type { + Some(t) if typ.0 <= t => { + return Err(DecodeError::InvalidValue); + }, + _ => {}, + } + last_seen_type = Some(typ.0); + + // Finally, read the length and value itself: + let length: BigSize = Readable::read(&mut stream_ref)?; + let mut s = ser_utils::FixedLengthReader::new(&mut stream_ref, length.0); + match typ.0 { + $($type => { + ser_macros::decode_tlv!(s, $field, $fieldty); + if s.bytes_remain() { + s.eat_remaining()?; // Return ShortRead if there's actually not enough bytes + return Err(DecodeError::InvalidValue); + } + },)* + x if x % 2 == 0 => { + return Err(DecodeError::UnknownRequiredFeature); + }, + _ => {}, + } + s.eat_remaining()?; + } + } } +} + +macro_rules! init_tlv_field_var { + ($field: ident, opt) => { + let mut $field = None; + }; + ($field: ident, vec) => { + let mut $field = Vec::new(); + }; + ($field: ident, opt_str) => { + let mut $field = None; + }; +} + +macro_rules! impl_writeable_msg { + ($st: ty, {$($field: ident),* $(,)*}, {$(($type: expr, $tlvfield: ident, $fieldty: tt)),* $(,)*}) => { + impl lightning::util::ser::Writeable for $st { + fn write(&self, w: &mut W) -> Result<(), lightning::io::Error> { + $(self.$field.write(w)?;)* + $crate::lightning::ser_macros::encode_tlv_stream!(w, {$(($type, self.$tlvfield, $fieldty)),*}); + Ok(()) + } + } + + impl lightning::util::ser::Readable for $st { + fn read(r: &mut R) -> Result { + $(let $field = lightning::util::ser::Readable::read(r)?;)* + $($crate::lightning::ser_macros::init_tlv_field_var!($tlvfield, $fieldty);)* + $crate::lightning::ser_macros::decode_tlv_stream!(r, {$(($type, $tlvfield, $fieldty)),*}); + Ok(Self { + $($field),*, + $($tlvfield),* + }) + } + } + + #[cfg(test)] + impl std::cmp::PartialEq for $st { + fn eq(&self, other: &Self) -> bool { + true + $(&& self.$field == other.$field)* + $(&& self.$tlvfield == other.$tlvfield)* + } + } + } +} + +macro_rules! set_msg_type { + ($st: ty, $type: expr) => { + impl $crate::lightning::ser_utils::Type for $st { + const TYPE: u16 = $type; + } + }; +} + +// Macros used by `impl_writeable_msg`. +pub(super) use decode_tlv; +pub(super) use decode_tlv_stream; +pub(super) use encode_tlv; +pub(super) use encode_tlv_stream; +pub(super) use init_tlv_field_var; + +pub(super) use impl_writeable_msg; +pub(super) use set_msg_type; diff --git a/teos-common/src/lightning/ser_utils.rs b/teos-common/src/lightning/ser_utils.rs new file mode 100644 index 00000000..36bfad30 --- /dev/null +++ b/teos-common/src/lightning/ser_utils.rs @@ -0,0 +1,172 @@ +//! A helper module containing some lightning messages serialization stuff. +//! Most of this file is taken/inspired from [here](https://github.com/lightningdevkit/rust-lightning/blob/3676a056c85f54347e7e079e913317a79e26a2ae/lightning/src/util/ser.rs). + +/* This file is licensed under either of + * Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or + * MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT) + * at your option. +*/ + +use crate::appointment::{Locator, LOCATOR_LEN}; + +use lightning::io::{copy, sink, Error, Read}; +use lightning::ln::msgs::DecodeError; +use lightning::util::ser::{MaybeReadable, Readable, Writeable, Writer}; + +/// A trait that associates a u16 [`Type::TYPE`] constant with a lightning message. +pub trait Type { + /// The type identifying the message payload. + const TYPE: u16; +} + +// Deserialization for a Locator inside a lightning message. +impl Readable for Locator { + fn read(reader: &mut R) -> Result { + let mut buf = [0; LOCATOR_LEN]; + reader.read_exact(&mut buf)?; + let locator = Self::from_slice(&buf).map_err(|_| DecodeError::InvalidValue)?; + Ok(locator) + } +} + +// Serialization for a Locator inside a lighting message. +impl Writeable for Locator { + fn write(&self, writer: &mut W) -> Result<(), Error> { + writer.write_all(&self.to_vec())?; + Ok(()) + } +} + +/// A read wrapper around a vector inside a lightning message. +/// This wrapper mainly exists because we cannot implement LDK's (de)serialization traits +/// for Vec (since neither the traits nor Vec are defined in our crate (the orphan rule)). +/// +/// [`Readable`] implementation for this struct assumes that there is no length prefix. +/// It will read the vector until there are no more items in the stream (Don't use with non-TLV field). +pub(super) struct LightningVecReader(pub Vec); + +// Deserialization for a vector of items inside a lightning message. +impl Readable for LightningVecReader { + #[inline] + fn read(mut reader: &mut R) -> Result { + let mut values = Vec::new(); + loop { + let mut track_read = ReadTrackingReader::new(&mut reader); + match MaybeReadable::read(&mut track_read) { + Ok(Some(v)) => { + values.push(v); + } + Ok(None) => {} + // If we failed to read any bytes at all, we reached the end of our TLV + // stream and have simply exhausted all entries. + Err(e) if e == DecodeError::ShortRead && !track_read.have_read => break, + Err(e) => return Err(e), + } + } + Ok(Self(values)) + } +} + +/// A write wrapper around a vector/slice inside a lightning message. +/// Similar to [`LightningVecReader`] but the inner vector is a slice reference to avoid cloning. +/// +/// Note that we don't prefix the vector/slice with its length when serializing it, that's because this struct +/// is used in TLV streams which already has a BigSize length prefix (Don't use with non-TLV field). +pub(super) struct LightningVecWriter<'a, T>(pub &'a [T]); + +// Serialization for a vector of items inside a lighting message. +impl<'a, T: Writeable> Writeable for LightningVecWriter<'a, T> { + #[inline] + fn write(&self, writer: &mut W) -> Result<(), Error> { + for item in self.0.iter() { + item.write(writer)?; + } + Ok(()) + } +} + +/// Essentially std::io::Take but a bit simpler and with a method to walk the underlying stream +/// forward to ensure we always consume exactly the fixed length specified. +pub(super) struct FixedLengthReader { + read: R, + bytes_read: u64, + total_bytes: u64, +} + +impl FixedLengthReader { + /// Returns a new FixedLengthReader. + pub fn new(read: R, total_bytes: u64) -> Self { + Self { + read, + bytes_read: 0, + total_bytes, + } + } + + /// Returns whether there are remaining bytes or not. + #[inline] + pub fn bytes_remain(&mut self) -> bool { + self.bytes_read != self.total_bytes + } + + /// Consume the remaining bytes. + #[inline] + pub fn eat_remaining(&mut self) -> Result<(), DecodeError> { + copy(self, &mut sink()).unwrap(); + if self.bytes_read != self.total_bytes { + Err(DecodeError::ShortRead) + } else { + Ok(()) + } + } +} + +impl Read for FixedLengthReader { + #[inline] + fn read(&mut self, dest: &mut [u8]) -> Result { + if self.total_bytes == self.bytes_read { + Ok(0) + } else { + let read_len = core::cmp::min(dest.len() as u64, self.total_bytes - self.bytes_read); + match self.read.read(&mut dest[0..(read_len as usize)]) { + Ok(v) => { + self.bytes_read += v as u64; + Ok(v) + } + Err(e) => Err(e), + } + } + } +} + +/// A Read which tracks whether any bytes have been read at all. This allows us to distinguish +/// between "EOF reached before we started" and "EOF reached mid-read". +pub(super) struct ReadTrackingReader { + read: R, + /// Tells whether we have read from this reader or not yet. + pub have_read: bool, +} + +impl ReadTrackingReader { + /// Returns a new ReadTrackingReader. + pub fn new(read: R) -> Self { + Self { + read, + have_read: false, + } + } +} + +impl Read for ReadTrackingReader { + #[inline] + fn read(&mut self, dest: &mut [u8]) -> Result { + match self.read.read(dest) { + Ok(0) => Ok(0), + Ok(len) => { + self.have_read = true; + Ok(len) + } + Err(e) => Err(e), + } + } +} From ceefb85f2a6ee24bb769c4fc0d6a70e7bdb03835 Mon Sep 17 00:00:00 2001 From: meryacine Date: Tue, 2 Aug 2022 12:19:17 +0200 Subject: [PATCH 02/14] tests: Add tests for serialization macros and tower messages --- Cargo.lock | 13 ++ teos-common/Cargo.toml | 5 +- teos-common/src/lightning/messages.rs | 108 ++++++++++++++ teos-common/src/lightning/ser_macros.rs | 190 ++++++++++++++++++++++++ teos-common/src/lightning/ser_utils.rs | 28 ++++ 5 files changed, 343 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 0a9edf9c..703f3868 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -324,6 +324,7 @@ dependencies = [ "base64-compat", "bech32", "bitcoin_hashes", + "bitcoinconsensus", "secp256k1", "serde", ] @@ -337,6 +338,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bitcoinconsensus" +version = "0.19.0-3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a8aa43b5cd02f856cb126a9af819e77b8910fdd74dd1407be649f2f5fe3a1b5" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "bitcoincore-rpc" version = "0.15.0" @@ -1713,6 +1724,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d885bf509066af86ae85354c8959028ad6192c22a2657ef8271e94029d30f9d0" dependencies = [ "bitcoin", + "hex", + "regex", ] [[package]] diff --git a/teos-common/Cargo.toml b/teos-common/Cargo.toml index be0cb489..04fcc2e7 100644 --- a/teos-common/Cargo.toml +++ b/teos-common/Cargo.toml @@ -24,4 +24,7 @@ bitcoin = { version = "0.28.0", features = [ "use-serde" ] } lightning = "0.0.108" [build-dependencies] -tonic-build = "0.6" \ No newline at end of file +tonic-build = "0.6" + +[dev-dependencies] +lightning = { version = "0.0.108", features = ["_test_utils"] } \ No newline at end of file diff --git a/teos-common/src/lightning/messages.rs b/teos-common/src/lightning/messages.rs index c7a113aa..dcd41312 100644 --- a/teos-common/src/lightning/messages.rs +++ b/teos-common/src/lightning/messages.rs @@ -241,3 +241,111 @@ impl Writeable for TowerMessage { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::cryptography::{get_random_bytes, get_random_keypair}; + use crate::lightning::ser_utils::{get_random_locator, get_random_txid, TestVecWriter}; + use lightning::io::Cursor; + use lightning::util::ser::{Readable, Writeable}; + use std::cmp::PartialEq; + use std::fmt::Debug; + use std::iter::FromIterator; + + fn test_msg(msg: T) { + // Get a writer and write the message to it. + let mut stream = TestVecWriter(Vec::new()); + msg.write(&mut stream).ok().unwrap(); + // Create a reader out of the written buffer. + let mut stream = Cursor::new(stream.0); + let read_msg: T = Readable::read(&mut stream).ok().unwrap(); + // Assert the serialized then deserialized message is the same as the original one. + assert_eq!(msg, read_msg); + } + + #[test] + fn test_tower_messages_empty_tlvs() { + test_msg(Register { + pubkey: get_random_keypair().1, + appointment_slots: 4300, + subscription_period: 4032, + }); + test_msg(SubscriptionDetails { + appointment_max_size: 3032, + amount_msat: 41893, + invoice: None, + signature: None, + }); + test_msg(AddUpdateAppointment { + locator: get_random_locator(), + encrypted_blob: get_random_bytes(542), + signature: String::from("sign: locator || encrypted_blob || to_self_delay?"), + to_self_delay: None, + }); + test_msg(AppointmentAccepted { + locator: get_random_locator(), + start_block: 500310, + receipt_signature: None, + }); + test_msg(AppointmentRejected { + locator: get_random_locator(), + rcode: 539, + reason: String::from("You have no more slots. 😒πŸ₯ΊπŸ’”"), + }); + test_msg(GetAppointment { + locator: get_random_locator(), + signature: String::from("this is my signature. and is real."), + }); + test_msg(AppointmentData { + locator: get_random_locator(), + encrypted_blob: get_random_bytes(678), + }); + test_msg(TrackerData { + dispute_txid: get_random_txid(), + penalty_txid: get_random_txid(), + penalty_rawtx: get_random_bytes(432), + }); + test_msg(AppointmentNotFound { + locator: get_random_locator(), + }); + test_msg(GetSubscriptionInfo { + signature: String::from("sign: get subscription info"), + }); + test_msg(SubscriptionInfo { + available_slots: 429, + subscription_expiry: 1093, + locators: Vec::new(), + }); + } + + #[test] + fn test_tower_message_with_tlvs() { + test_msg(SubscriptionDetails { + appointment_max_size: 4498, + amount_msat: 891431, + invoice: Some(String::from( + "lnbc100p1psj9jhxdqud3jxktt5w46x7unfv9kz6mn0v3jsnp4q0d3p2sfluzdx45...", + )), + signature: Some(String::from( + "sign: user_pubkey || appointment_max_size || amount_msat || invoice_id?", + )), + }); + test_msg(AddUpdateAppointment { + locator: get_random_locator(), + encrypted_blob: get_random_bytes(542), + signature: String::from("sign: locator || encrypted_blob || to_self_delay?"), + to_self_delay: Some(56), + }); + test_msg(AppointmentAccepted { + locator: get_random_locator(), + start_block: 500310, + receipt_signature: Some(String::from("sign: user_signature || start_block")), + }); + test_msg(SubscriptionInfo { + available_slots: 429, + subscription_expiry: 1093, + locators: Vec::from_iter((0..10).map(|_| get_random_locator())), + }); + } +} diff --git a/teos-common/src/lightning/ser_macros.rs b/teos-common/src/lightning/ser_macros.rs index 675b09db..76e30ae9 100644 --- a/teos-common/src/lightning/ser_macros.rs +++ b/teos-common/src/lightning/ser_macros.rs @@ -198,3 +198,193 @@ pub(super) use init_tlv_field_var; pub(super) use impl_writeable_msg; pub(super) use set_msg_type; + +#[cfg(test)] +mod tests { + use crate::appointment::{Locator, LOCATOR_LEN}; + use crate::lightning::ser_utils::Type; + use crate::lightning::{ser_macros, ser_utils}; + use lightning::io; + use lightning::ln::msgs::DecodeError; + use lightning::util::ser::{BigSize, Readable, Writeable}; + + macro_rules! encode_decode_tlv { + ($typ: expr, $field: expr, $fieldty: tt) => {{ + // Encode the TLV to a stream. + let mut stream = ser_utils::TestVecWriter(Vec::new()); + encode_tlv!(&mut stream, $typ, $field, $fieldty); + // Get a reader with the writer's buffer. + let mut cursor = io::Cursor::new(stream.0); + let mut stream = ser_utils::ReadTrackingReader::new(&mut cursor); + init_tlv_field_var!(read_field, $fieldty); + #[allow(unreachable_code)] + if false { + unreachable!(); + // This assignment will let the compiler infer the type of `read_field`. + read_field = $field; + } + // Try to read from the stream. Note that the stream might be empty if `$field` + // carried no info to be written in the first place. + let read_typ_result = BigSize::read(&mut stream); + if let Ok(read_typ) = read_typ_result { + let read_len = BigSize::read(&mut stream)?; + let mut stream = ser_utils::FixedLengthReader::new(&mut cursor, read_len.0); + decode_tlv!(stream, read_field, $fieldty); + Ok((read_typ.0, read_len.0, read_field)) + } else if !stream.have_read { + Ok(($typ, 0, read_field)) + } else { + Err(read_typ_result.err().unwrap()) + } + }}; + } + + macro_rules! test_encode_decode_tlv { + ($typ: expr, $len: expr, $val: expr, $fieldty: tt) => { + let (typ, len, val) = encode_decode_tlv!($typ, $val, $fieldty)?; + assert_eq!($typ as u64, typ, "Invalid TLV type"); + assert_eq!($len as u64, len, "Invalid TLV length"); + assert_eq!($val, val, "Invalid TLV value"); + }; + } + + macro_rules! test_opt_ranged_type { + ($type: ident, $expected_len: expr) => { + // This will try some values in the range of `$type`. + for i in ($type::MIN..$type::MAX) + .step_by(($type::MAX / 4 + 1) as usize) + .chain(vec![$type::MAX]) + { + test_encode_decode_tlv!(i as u64, $expected_len, Some(i), opt); + } + }; + } + + #[test] + fn test_encode_decode_tlv() -> Result<(), DecodeError> { + // All the Nones and empty vectors should have a length of zero. + test_encode_decode_tlv!(1, 0, Option::::None, opt); + test_encode_decode_tlv!(1, 0, Option::::None, opt); + test_encode_decode_tlv!(1, 0, Option::::None, opt); + test_encode_decode_tlv!(1, 0, Option::::None, opt); + test_encode_decode_tlv!(1, 0, Option::::None, opt); + let v: Vec = Vec::new(); + test_encode_decode_tlv!(1, 0, v, vec); + let v: Vec = Vec::new(); + test_encode_decode_tlv!(1, 0, v, vec); + let v: Vec = Vec::new(); + test_encode_decode_tlv!(1, 0, v, vec); + + // Non-None primitives should have their in-memory byte length as follows. + test_opt_ranged_type!(u8, 1); + test_opt_ranged_type!(u16, 2); + test_opt_ranged_type!(u32, 4); + test_opt_ranged_type!(u64, 8); + + // Other types. + let s = Some(String::from("teos")); + test_encode_decode_tlv!(1, s.as_ref().unwrap().len() + 2, s, opt); + test_encode_decode_tlv!(1, s.as_ref().unwrap().len(), s, opt_str); + + let v = vec![1_u8, 2, 3, 6]; + test_encode_decode_tlv!(1, v.len(), v, vec); + + let l = ser_utils::get_random_locator(); + let v = vec![l; 5]; + test_encode_decode_tlv!(1, v.len() * LOCATOR_LEN, v, vec); + + Ok(()) + } + + macro_rules! encode_decode_tlv_stream { + ({$(($type: expr, $field_name: ident, $fieldty: tt)),* $(,)*}) => {{ + // Encode the TLVs to a stream. + let mut stream = ser_utils::TestVecWriter(Vec::new()); + encode_tlv_stream!(&mut stream, {$(($type, $field_name, $fieldty)),*}); + // Re-initialize the fields to their default values before decoding. + $(init_tlv_field_var!($field_name, $fieldty);)* + // Decode with a reader with the writer's buffer. + decode_tlv_stream!(io::Cursor::new(stream.0), {$(($type, $field_name, $fieldty)),*}); + ($($field_name),*) + }}; + } + + macro_rules! test_encode_decode_tlv_stream { + ({$(($type: expr, $field_name: ident, $field: expr, $fieldty: tt)),* $(,)*}) => { + let original_stream = ($($field),*); + #[allow(unused_assignments)] + let decoded_stream = { + // Initialize the fields we will read from. An unused_assignment happens here. + $(init_tlv_field_var!($field_name, $fieldty);)* + $($field_name = $field;)* + encode_decode_tlv_stream!({$(($type, $field_name, $fieldty)),*}) + }; + assert_eq!(original_stream, decoded_stream, "The decoded stream doesn't match the original one"); + }; + } + + macro_rules! test_encode_decode_tlv_stream_should_panic { + ($args: tt) => { + // Allow unreachable patterns which happen when we have some non-unique tlv types. + #[allow(unreachable_patterns)] + // Runs `test_encode_decode_tlv_stream` inside a closure so that we don't need to + // return a `Result<(), DecodeError` from the test functions. + // `should_panic` tests must not return anything. + (|| { + test_encode_decode_tlv_stream!($args); + Ok(()) + })() + // Unwrap to panic on error. + .unwrap(); + }; + } + + #[test] + fn test_encode_decode_tlv_stream() -> Result<(), DecodeError> { + test_encode_decode_tlv_stream!({ + (0, a, Option::::None, opt), + (1, b, Some(3_u32), opt), + (2, c, Some(String::from("teos")), opt), + (31, d, Some(String::from("teos")), opt_str), + }); + + let l = ser_utils::get_random_locator(); + test_encode_decode_tlv_stream!({ + (12, a, vec![1_u32, 2, 3], vec), + (24, b, Some(vec![1_u8, 2, 3]), opt), + (31, c, vec!["one".to_owned(), "two".to_owned(), "3".to_owned()], vec), + (59, d, Vec::::new(), vec), + (78, e, vec![l; 4], vec), + }); + + Ok(()) + } + + #[test] + #[should_panic] + fn test_encode_decode_tlv_stream_decreasing_type() { + test_encode_decode_tlv_stream_should_panic!({ + (0, a, Some(0_u8), opt), + (2, b, Some(1_u32), opt), + (1, c, Vec::::new(), vec), + }); + } + + #[test] + #[should_panic] + fn test_encode_decode_tlv_stream_non_increasing_type() { + test_encode_decode_tlv_stream_should_panic!({ + (0, a, Some(0_u8), opt), + (1, b, Some(1_u32), opt), + (1, c, Vec::::new(), vec), + }); + } + + #[test] + fn test_set_msg_type() { + struct Test1; + set_msg_type!(Test1, 10); + + assert_eq!(Test1::TYPE, 10); + } +} diff --git a/teos-common/src/lightning/ser_utils.rs b/teos-common/src/lightning/ser_utils.rs index 36bfad30..98b4bb77 100644 --- a/teos-common/src/lightning/ser_utils.rs +++ b/teos-common/src/lightning/ser_utils.rs @@ -170,3 +170,31 @@ impl Read for ReadTrackingReader { } } } + +#[cfg(test)] +mod test_utils { + use crate::appointment::{Locator, LOCATOR_LEN}; + use crate::cryptography::get_random_bytes; + use bitcoin::hashes::Hash; + use bitcoin::Txid; + pub use lightning::util::test_utils::TestVecWriter; + + pub fn get_random_locator() -> Locator { + let bytes = get_random_bytes(LOCATOR_LEN); + Locator::from_slice(&bytes).unwrap() + } + + pub fn get_random_txid() -> Txid { + let bytes = get_random_bytes(32); + Txid::from_slice(&bytes).unwrap() + } + + #[allow(dead_code)] + pub fn get_random_string(size: usize) -> String { + let bytes = get_random_bytes(size); + String::from_utf8_lossy(&bytes).into_owned() + } +} + +#[cfg(test)] +pub(super) use test_utils::*; From 9f12f20f9eac8b0c976fb4c71b30a8960b60a60b Mon Sep 17 00:00:00 2001 From: meryacine Date: Tue, 2 Aug 2022 12:02:26 +0200 Subject: [PATCH 03/14] Add `start_block` to `subscription_details` --- teos-common/src/lightning/messages.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/teos-common/src/lightning/messages.rs b/teos-common/src/lightning/messages.rs index dcd41312..a5c27462 100644 --- a/teos-common/src/lightning/messages.rs +++ b/teos-common/src/lightning/messages.rs @@ -26,6 +26,7 @@ pub struct Register { #[derive(Debug)] pub struct SubscriptionDetails { pub appointment_max_size: u16, + pub start_block: u32, pub amount_msat: u32, // Optional TLV. pub invoice: Option, @@ -112,6 +113,7 @@ impl_writeable_msg!(Register, { impl_writeable_msg!(SubscriptionDetails, { appointment_max_size, + start_block, amount_msat, }, { // Use `opt_str` and not `opt` to avoid writing a length prefix for strings @@ -273,6 +275,7 @@ mod tests { }); test_msg(SubscriptionDetails { appointment_max_size: 3032, + start_block: 358943, amount_msat: 41893, invoice: None, signature: None, @@ -323,12 +326,13 @@ mod tests { fn test_tower_message_with_tlvs() { test_msg(SubscriptionDetails { appointment_max_size: 4498, + start_block: 4934503, amount_msat: 891431, invoice: Some(String::from( "lnbc100p1psj9jhxdqud3jxktt5w46x7unfv9kz6mn0v3jsnp4q0d3p2sfluzdx45...", )), signature: Some(String::from( - "sign: user_pubkey || appointment_max_size || amount_msat || invoice_id?", + "sign: user_pubkey || appointment_max_size || start_block || amount_msat || invoice_id?", )), }); test_msg(AddUpdateAppointment { From 23b87e49f4bf302786e2ba473a8724108524352b Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Sun, 20 Nov 2022 15:30:15 +0200 Subject: [PATCH 04/14] Reviews from mariocynicys --- teos-common/src/lightning/messages.rs | 4 ++-- teos-common/src/lightning/ser_macros.rs | 18 ++++++++++-------- teos-common/src/lightning/ser_utils.rs | 7 +++---- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/teos-common/src/lightning/messages.rs b/teos-common/src/lightning/messages.rs index a5c27462..06926200 100644 --- a/teos-common/src/lightning/messages.rs +++ b/teos-common/src/lightning/messages.rs @@ -258,10 +258,10 @@ mod tests { fn test_msg(msg: T) { // Get a writer and write the message to it. let mut stream = TestVecWriter(Vec::new()); - msg.write(&mut stream).ok().unwrap(); + msg.write(&mut stream).unwrap(); // Create a reader out of the written buffer. let mut stream = Cursor::new(stream.0); - let read_msg: T = Readable::read(&mut stream).ok().unwrap(); + let read_msg: T = Readable::read(&mut stream).unwrap(); // Assert the serialized then deserialized message is the same as the original one. assert_eq!(msg, read_msg); } diff --git a/teos-common/src/lightning/ser_macros.rs b/teos-common/src/lightning/ser_macros.rs index 76e30ae9..2b61e950 100644 --- a/teos-common/src/lightning/ser_macros.rs +++ b/teos-common/src/lightning/ser_macros.rs @@ -201,7 +201,9 @@ pub(super) use set_msg_type; #[cfg(test)] mod tests { - use crate::appointment::{Locator, LOCATOR_LEN}; + use std::mem::size_of; + + use crate::appointment::Locator; use crate::lightning::ser_utils::Type; use crate::lightning::{ser_macros, ser_utils}; use lightning::io; @@ -287,11 +289,14 @@ mod tests { test_encode_decode_tlv!(1, s.as_ref().unwrap().len(), s, opt_str); let v = vec![1_u8, 2, 3, 6]; - test_encode_decode_tlv!(1, v.len(), v, vec); + test_encode_decode_tlv!(1, v.len() * size_of::(), v, vec); + + let v = vec![1_u16, 2, 3, 809]; + test_encode_decode_tlv!(1, v.len() * size_of::(), v, vec); let l = ser_utils::get_random_locator(); let v = vec![l; 5]; - test_encode_decode_tlv!(1, v.len() * LOCATOR_LEN, v, vec); + test_encode_decode_tlv!(1, v.len() * size_of::(), v, vec); Ok(()) } @@ -312,11 +317,8 @@ mod tests { macro_rules! test_encode_decode_tlv_stream { ({$(($type: expr, $field_name: ident, $field: expr, $fieldty: tt)),* $(,)*}) => { let original_stream = ($($field),*); - #[allow(unused_assignments)] let decoded_stream = { - // Initialize the fields we will read from. An unused_assignment happens here. - $(init_tlv_field_var!($field_name, $fieldty);)* - $($field_name = $field;)* + $(let $field_name = $field;)* encode_decode_tlv_stream!({$(($type, $field_name, $fieldty)),*}) }; assert_eq!(original_stream, decoded_stream, "The decoded stream doesn't match the original one"); @@ -328,7 +330,7 @@ mod tests { // Allow unreachable patterns which happen when we have some non-unique tlv types. #[allow(unreachable_patterns)] // Runs `test_encode_decode_tlv_stream` inside a closure so that we don't need to - // return a `Result<(), DecodeError` from the test functions. + // return a `Result<(), DecodeError`> from the test functions. // `should_panic` tests must not return anything. (|| { test_encode_decode_tlv_stream!($args); diff --git a/teos-common/src/lightning/ser_utils.rs b/teos-common/src/lightning/ser_utils.rs index 98b4bb77..42b6400c 100644 --- a/teos-common/src/lightning/ser_utils.rs +++ b/teos-common/src/lightning/ser_utils.rs @@ -9,7 +9,7 @@ use crate::appointment::{Locator, LOCATOR_LEN}; -use lightning::io::{copy, sink, Error, Read}; +use lightning::io::{copy, sink, Error, ErrorKind, Read}; use lightning::ln::msgs::DecodeError; use lightning::util::ser::{MaybeReadable, Readable, Writeable, Writer}; @@ -32,8 +32,7 @@ impl Readable for Locator { // Serialization for a Locator inside a lighting message. impl Writeable for Locator { fn write(&self, writer: &mut W) -> Result<(), Error> { - writer.write_all(&self.to_vec())?; - Ok(()) + writer.write_all(&self.to_vec()) } } @@ -112,7 +111,7 @@ impl FixedLengthReader { /// Consume the remaining bytes. #[inline] pub fn eat_remaining(&mut self) -> Result<(), DecodeError> { - copy(self, &mut sink()).unwrap(); + copy(self, &mut sink()).or(Err(DecodeError::Io(ErrorKind::Other)))?; if self.bytes_read != self.total_bytes { Err(DecodeError::ShortRead) } else { From 195b325a157a7c210c14b15ee7da101540399e29 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Sun, 20 Nov 2022 15:49:38 +0200 Subject: [PATCH 05/14] Removing extra spaces in macros' arms --- teos-common/src/lightning/ser_macros.rs | 40 ++++++++++++------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/teos-common/src/lightning/ser_macros.rs b/teos-common/src/lightning/ser_macros.rs index 2b61e950..f27d0f11 100644 --- a/teos-common/src/lightning/ser_macros.rs +++ b/teos-common/src/lightning/ser_macros.rs @@ -8,12 +8,12 @@ */ macro_rules! encode_tlv { - ($stream: expr, $type: expr, $field: expr, opt) => { + ($stream:expr, $type:expr, $field:expr, opt) => { if let Some(ref field) = $field { ser_macros::encode_tlv!($stream, $type, field); } }; - ($stream: expr, $type: expr, $field: expr, vec) => { + ($stream:expr, $type:expr, $field:expr, vec) => { // Don't write the vector if it's empty. if !$field.is_empty() { // We can't just pass `$field` since this will move it out of the struct we are implementing @@ -22,13 +22,13 @@ macro_rules! encode_tlv { ser_macros::encode_tlv!($stream, $type, lightning_vec); } }; - ($stream: expr, $type: expr, $field: expr, opt_str) => { + ($stream:expr, $type:expr, $field:expr, opt_str) => { if let Some(ref field) = $field { let lightning_str = ser_utils::LightningVecWriter(field.as_bytes()); ser_macros::encode_tlv!($stream, $type, lightning_str); } }; - ($stream: expr, $type: expr, $field: expr) => { + ($stream:expr, $type:expr, $field:expr) => { BigSize($type).write($stream)?; BigSize($field.serialized_length() as u64).write($stream)?; $field.write($stream)?; @@ -36,7 +36,7 @@ macro_rules! encode_tlv { } macro_rules! encode_tlv_stream { - ($stream: expr, {$(($type: expr, $field: expr, $fieldty: tt)),* $(,)*}) => { { + ($stream:expr, {$(($type:expr, $field:expr, $fieldty:tt)),* $(,)*}) => { { #[allow(unused_imports)] use { lightning::util::ser::BigSize, @@ -62,14 +62,14 @@ macro_rules! encode_tlv_stream { } macro_rules! decode_tlv { - ($reader: expr, $field: ident, opt) => { + ($reader:expr, $field:ident, opt) => { $field = Some(Readable::read(&mut $reader)?); }; - ($reader: expr, $field: ident, vec) => { + ($reader:expr, $field:ident, vec) => { let lightning_vec = ser_utils::LightningVecReader::read(&mut $reader)?; $field = lightning_vec.0; }; - ($reader: expr, $field: ident, opt_str) => { + ($reader:expr, $field:ident, opt_str) => { let lightning_str = ser_utils::LightningVecReader::read(&mut $reader)?; let inner_str = String::from_utf8(lightning_str.0).map_err(|_| DecodeError::InvalidValue)?; @@ -78,7 +78,7 @@ macro_rules! decode_tlv { } macro_rules! decode_tlv_stream { - ($stream: expr, {$(($type: expr, $field: ident, $fieldty: tt)),* $(,)*}) => { { + ($stream:expr, {$(($type:expr, $field:ident, $fieldty:tt)),* $(,)*}) => { { #[allow(unused_imports)] use { lightning::ln::msgs::DecodeError, @@ -137,19 +137,19 @@ macro_rules! decode_tlv_stream { } macro_rules! init_tlv_field_var { - ($field: ident, opt) => { + ($field:ident, opt) => { let mut $field = None; }; - ($field: ident, vec) => { + ($field:ident, vec) => { let mut $field = Vec::new(); }; - ($field: ident, opt_str) => { + ($field:ident, opt_str) => { let mut $field = None; }; } macro_rules! impl_writeable_msg { - ($st: ty, {$($field: ident),* $(,)*}, {$(($type: expr, $tlvfield: ident, $fieldty: tt)),* $(,)*}) => { + ($st:ty, {$($field:ident),* $(,)*}, {$(($type:expr, $tlvfield:ident, $fieldty:tt)),* $(,)*}) => { impl lightning::util::ser::Writeable for $st { fn write(&self, w: &mut W) -> Result<(), lightning::io::Error> { $(self.$field.write(w)?;)* @@ -182,7 +182,7 @@ macro_rules! impl_writeable_msg { } macro_rules! set_msg_type { - ($st: ty, $type: expr) => { + ($st:ty, $type:expr) => { impl $crate::lightning::ser_utils::Type for $st { const TYPE: u16 = $type; } @@ -211,7 +211,7 @@ mod tests { use lightning::util::ser::{BigSize, Readable, Writeable}; macro_rules! encode_decode_tlv { - ($typ: expr, $field: expr, $fieldty: tt) => {{ + ($typ:expr, $field:expr, $fieldty:tt) => {{ // Encode the TLV to a stream. let mut stream = ser_utils::TestVecWriter(Vec::new()); encode_tlv!(&mut stream, $typ, $field, $fieldty); @@ -242,7 +242,7 @@ mod tests { } macro_rules! test_encode_decode_tlv { - ($typ: expr, $len: expr, $val: expr, $fieldty: tt) => { + ($typ:expr, $len:expr, $val:expr, $fieldty:tt) => { let (typ, len, val) = encode_decode_tlv!($typ, $val, $fieldty)?; assert_eq!($typ as u64, typ, "Invalid TLV type"); assert_eq!($len as u64, len, "Invalid TLV length"); @@ -251,7 +251,7 @@ mod tests { } macro_rules! test_opt_ranged_type { - ($type: ident, $expected_len: expr) => { + ($type:ident, $expected_len:expr) => { // This will try some values in the range of `$type`. for i in ($type::MIN..$type::MAX) .step_by(($type::MAX / 4 + 1) as usize) @@ -302,7 +302,7 @@ mod tests { } macro_rules! encode_decode_tlv_stream { - ({$(($type: expr, $field_name: ident, $fieldty: tt)),* $(,)*}) => {{ + ({$(($type:expr, $field_name:ident, $fieldty:tt)),* $(,)*}) => {{ // Encode the TLVs to a stream. let mut stream = ser_utils::TestVecWriter(Vec::new()); encode_tlv_stream!(&mut stream, {$(($type, $field_name, $fieldty)),*}); @@ -315,7 +315,7 @@ mod tests { } macro_rules! test_encode_decode_tlv_stream { - ({$(($type: expr, $field_name: ident, $field: expr, $fieldty: tt)),* $(,)*}) => { + ({$(($type:expr, $field_name:ident, $field:expr, $fieldty:tt)),* $(,)*}) => { let original_stream = ($($field),*); let decoded_stream = { $(let $field_name = $field;)* @@ -326,7 +326,7 @@ mod tests { } macro_rules! test_encode_decode_tlv_stream_should_panic { - ($args: tt) => { + ($args:tt) => { // Allow unreachable patterns which happen when we have some non-unique tlv types. #[allow(unreachable_patterns)] // Runs `test_encode_decode_tlv_stream` inside a closure so that we don't need to From ab9699d23cba9d9e752dcaaebb80f882eb1d3675 Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Sun, 20 Nov 2022 17:06:59 +0200 Subject: [PATCH 06/14] Reviews from sr-gi 1 --- teos-common/src/lightning/messages.rs | 25 +++++++++++++------------ teos-common/src/lightning/ser_utils.rs | 19 +++++++++++++++++++ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/teos-common/src/lightning/messages.rs b/teos-common/src/lightning/messages.rs index 06926200..d871f827 100644 --- a/teos-common/src/lightning/messages.rs +++ b/teos-common/src/lightning/messages.rs @@ -4,7 +4,8 @@ use crate::appointment::Locator; use crate::lightning::ser_macros::{impl_writeable_msg, set_msg_type}; -use bitcoin::secp256k1::PublicKey; +use crate::UserId; + use bitcoin::Txid; use lightning::io::Error; use lightning::ln::wire; @@ -16,7 +17,7 @@ pub use crate::lightning::ser_utils::Type; /// The register message sent by the user to subscribe for the watching service. #[derive(Debug)] pub struct Register { - pub pubkey: PublicKey, + pub pubkey: UserId, pub appointment_slots: u32, pub subscription_period: u32, } @@ -27,8 +28,8 @@ pub struct Register { pub struct SubscriptionDetails { pub appointment_max_size: u16, pub start_block: u32, - pub amount_msat: u32, // Optional TLV. + pub amount_msat: Option, pub invoice: Option, pub signature: Option, } @@ -37,7 +38,6 @@ pub struct SubscriptionDetails { #[derive(Debug)] pub struct AddUpdateAppointment { pub locator: Locator, - // NOTE: LDK will prefix varying size fields (e.g. vectors and strings) with their length. pub encrypted_blob: Vec, pub signature: String, // Optional TLV. @@ -114,12 +114,12 @@ impl_writeable_msg!(Register, { impl_writeable_msg!(SubscriptionDetails, { appointment_max_size, start_block, - amount_msat, }, { + (1, amount_msat, opt), // Use `opt_str` and not `opt` to avoid writing a length prefix for strings // since it's already written in the length part of the TLV. - (1, invoice, opt_str), - (3, signature, opt_str), + (3, invoice, opt_str), + (5, signature, opt_str), }); impl_writeable_msg!(AddUpdateAppointment, { @@ -247,8 +247,9 @@ impl Writeable for TowerMessage { #[cfg(test)] mod tests { use super::*; - use crate::cryptography::{get_random_bytes, get_random_keypair}; + use crate::cryptography::get_random_bytes; use crate::lightning::ser_utils::{get_random_locator, get_random_txid, TestVecWriter}; + use crate::test_utils::get_random_user_id; use lightning::io::Cursor; use lightning::util::ser::{Readable, Writeable}; use std::cmp::PartialEq; @@ -269,14 +270,14 @@ mod tests { #[test] fn test_tower_messages_empty_tlvs() { test_msg(Register { - pubkey: get_random_keypair().1, + pubkey: get_random_user_id(), appointment_slots: 4300, subscription_period: 4032, }); test_msg(SubscriptionDetails { appointment_max_size: 3032, start_block: 358943, - amount_msat: 41893, + amount_msat: None, invoice: None, signature: None, }); @@ -327,12 +328,12 @@ mod tests { test_msg(SubscriptionDetails { appointment_max_size: 4498, start_block: 4934503, - amount_msat: 891431, + amount_msat: Some(891431), invoice: Some(String::from( "lnbc100p1psj9jhxdqud3jxktt5w46x7unfv9kz6mn0v3jsnp4q0d3p2sfluzdx45...", )), signature: Some(String::from( - "sign: user_pubkey || appointment_max_size || start_block || amount_msat || invoice_id?", + "sign: user_pubkey || appointment_max_size || start_block || amount_msat? || invoice?", )), }); test_msg(AddUpdateAppointment { diff --git a/teos-common/src/lightning/ser_utils.rs b/teos-common/src/lightning/ser_utils.rs index 42b6400c..48664b36 100644 --- a/teos-common/src/lightning/ser_utils.rs +++ b/teos-common/src/lightning/ser_utils.rs @@ -8,7 +8,9 @@ */ use crate::appointment::{Locator, LOCATOR_LEN}; +use crate::UserId; +use bitcoin::secp256k1::constants::PUBLIC_KEY_SIZE; use lightning::io::{copy, sink, Error, ErrorKind, Read}; use lightning::ln::msgs::DecodeError; use lightning::util::ser::{MaybeReadable, Readable, Writeable, Writer}; @@ -36,6 +38,23 @@ impl Writeable for Locator { } } +// Deserialization for a UserId inside a lightning message. +impl Readable for UserId { + fn read(reader: &mut R) -> Result { + let mut buf = [0; PUBLIC_KEY_SIZE]; + reader.read_exact(&mut buf)?; + let user_id = Self::from_slice(&buf).map_err(|_| DecodeError::InvalidValue)?; + Ok(user_id) + } +} + +// Serialization for a UserId inside a lighting message. +impl Writeable for UserId { + fn write(&self, writer: &mut W) -> Result<(), Error> { + self.0.write(writer) + } +} + /// A read wrapper around a vector inside a lightning message. /// This wrapper mainly exists because we cannot implement LDK's (de)serialization traits /// for Vec (since neither the traits nor Vec are defined in our crate (the orphan rule)). From 19ff970bd69ab64353856f88a21bbc42e5e8e12c Mon Sep 17 00:00:00 2001 From: Omer Yacine Date: Sun, 20 Nov 2022 17:23:37 +0200 Subject: [PATCH 07/14] Reviews from sr-gi 2 --- teos-common/src/lightning/ser_utils.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/teos-common/src/lightning/ser_utils.rs b/teos-common/src/lightning/ser_utils.rs index 48664b36..c1252dba 100644 --- a/teos-common/src/lightning/ser_utils.rs +++ b/teos-common/src/lightning/ser_utils.rs @@ -26,8 +26,7 @@ impl Readable for Locator { fn read(reader: &mut R) -> Result { let mut buf = [0; LOCATOR_LEN]; reader.read_exact(&mut buf)?; - let locator = Self::from_slice(&buf).map_err(|_| DecodeError::InvalidValue)?; - Ok(locator) + Self::from_slice(&buf).map_err(|_| DecodeError::InvalidValue) } } @@ -43,8 +42,7 @@ impl Readable for UserId { fn read(reader: &mut R) -> Result { let mut buf = [0; PUBLIC_KEY_SIZE]; reader.read_exact(&mut buf)?; - let user_id = Self::from_slice(&buf).map_err(|_| DecodeError::InvalidValue)?; - Ok(user_id) + Self::from_slice(&buf).map_err(|_| DecodeError::InvalidValue) } } @@ -77,7 +75,7 @@ impl Readable for LightningVecReader { Ok(None) => {} // If we failed to read any bytes at all, we reached the end of our TLV // stream and have simply exhausted all entries. - Err(e) if e == DecodeError::ShortRead && !track_read.have_read => break, + Err(ref e) if e == &DecodeError::ShortRead && !track_read.have_read => break, Err(e) => return Err(e), } } From 3969d0203f3c24642f657e5a5b1f0a0b37c0e6fd Mon Sep 17 00:00:00 2001 From: meryacine Date: Tue, 2 Aug 2022 12:02:40 +0200 Subject: [PATCH 08/14] Add the lightning server message handler --- teos-common/src/lightning/convert.rs | 145 +++++++++++++++++++ teos-common/src/lightning/messages.rs | 27 ++-- teos-common/src/lightning/mod.rs | 1 + teos/src/api/lightning.rs | 191 ++++++++++++++++++++++++++ teos/src/api/mod.rs | 1 + 5 files changed, 352 insertions(+), 13 deletions(-) create mode 100644 teos-common/src/lightning/convert.rs create mode 100644 teos/src/api/lightning.rs diff --git a/teos-common/src/lightning/convert.rs b/teos-common/src/lightning/convert.rs new file mode 100644 index 00000000..47cb455b --- /dev/null +++ b/teos-common/src/lightning/convert.rs @@ -0,0 +1,145 @@ +//! A module that implements useful gRPC messages to lightning message conversions. + +use super::messages::*; +use crate::appointment::Locator; +use crate::constants::ENCRYPTED_BLOB_MAX_SIZE; +use crate::protos as msgs; + +use bitcoin::hashes::Hash; +use bitcoin::Txid; + +/// Conversions from individual messages to tower messages. +mod msg_to_tower_msg { + use super::*; + macro_rules! impl_from_msg { + ($msg: ident) => { + impl From<$msg> for TowerMessage { + fn from(m: $msg) -> TowerMessage { + TowerMessage::$msg(m) + } + } + }; + } + + impl_from_msg!(Register); + impl_from_msg!(SubscriptionDetails); + impl_from_msg!(AddUpdateAppointment); + impl_from_msg!(AppointmentAccepted); + impl_from_msg!(AppointmentRejected); + impl_from_msg!(GetAppointment); + impl_from_msg!(AppointmentData); + impl_from_msg!(TrackerData); + impl_from_msg!(AppointmentNotFound); + impl_from_msg!(GetSubscriptionInfo); + impl_from_msg!(SubscriptionInfo); +} + +// FIXME: There are a lot of `unwrap()`s in these conversions. We assume that the gRPC interface won't send invalid data. +// If the conversion panics this would crash the lightning server. + +/// Conversion from user requests to gRPC requests. +/// These are used by the tower when routing lightning requests to its internal gRPC server. +mod msg_to_grpc { + use super::*; + impl From for msgs::RegisterRequest { + fn from(r: Register) -> Self { + msgs::RegisterRequest { + user_id: r.pubkey.to_vec(), + } + } + } + + impl From for msgs::AddAppointmentRequest { + fn from(r: AddUpdateAppointment) -> Self { + let appointment = msgs::Appointment { + locator: r.locator.to_vec(), + encrypted_blob: r.encrypted_blob, + to_self_delay: r.to_self_delay.unwrap_or(42), + }; + + msgs::AddAppointmentRequest { + appointment: Some(appointment), + signature: r.signature, + } + } + } + + impl From for msgs::GetAppointmentRequest { + fn from(r: GetAppointment) -> Self { + msgs::GetAppointmentRequest { + locator: r.locator.to_vec(), + signature: r.signature, + } + } + } + + impl From for msgs::GetSubscriptionInfoRequest { + fn from(r: GetSubscriptionInfo) -> Self { + msgs::GetSubscriptionInfoRequest { + signature: r.signature, + } + } + } +} + +/// Conversion from gRPC responses to tower responses. +/// These are used by the tower when parsing internal gRPC server's responses. +mod grpc_to_tower_msg { + use super::*; + impl From for TowerMessage { + fn from(r: msgs::RegisterResponse) -> Self { + SubscriptionDetails { + appointment_max_size: ENCRYPTED_BLOB_MAX_SIZE as u16, + start_block: r.subscription_start, + amount_msat: None, + invoice: None, + signature: Some(r.subscription_signature), + } + .into() + } + } + + impl From for TowerMessage { + fn from(r: msgs::AddAppointmentResponse) -> Self { + AppointmentAccepted { + locator: Locator::from_slice(&r.locator).unwrap(), + start_block: r.start_block, + receipt_signature: Some(r.signature), + } + .into() + } + } + + impl From for TowerMessage { + fn from(r: msgs::GetAppointmentResponse) -> Self { + match r.appointment_data.unwrap().appointment_data.unwrap() { + msgs::appointment_data::AppointmentData::Appointment(a) => AppointmentData { + locator: Locator::from_slice(&a.locator).unwrap(), + encrypted_blob: a.encrypted_blob, + } + .into(), + msgs::appointment_data::AppointmentData::Tracker(t) => TrackerData { + dispute_txid: Txid::from_slice(&t.dispute_txid).unwrap(), + penalty_txid: Txid::from_slice(&t.penalty_txid).unwrap(), + penalty_rawtx: t.penalty_rawtx, + } + .into(), + } + } + } + + impl From for TowerMessage { + fn from(r: msgs::GetSubscriptionInfoResponse) -> Self { + SubscriptionInfo { + available_slots: r.available_slots, + subscription_expiry: r.subscription_expiry, + locators: r + .locators + .into_iter() + .map(|l| Locator::from_slice(&l).unwrap()) + .collect(), + } + .into() + } + } +} diff --git a/teos-common/src/lightning/messages.rs b/teos-common/src/lightning/messages.rs index d871f827..6a7c4024 100644 --- a/teos-common/src/lightning/messages.rs +++ b/teos-common/src/lightning/messages.rs @@ -15,7 +15,7 @@ use lightning::util::ser::{Writeable, Writer}; pub use crate::lightning::ser_utils::Type; /// The register message sent by the user to subscribe for the watching service. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Register { pub pubkey: UserId, pub appointment_slots: u32, @@ -24,7 +24,7 @@ pub struct Register { /// The subscription details message that is sent to the user after registering or toping up. /// This message is the response to the register message. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct SubscriptionDetails { pub appointment_max_size: u16, pub start_block: u32, @@ -35,17 +35,18 @@ pub struct SubscriptionDetails { } /// The add/update appointment message sent by the user. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct AddUpdateAppointment { pub locator: Locator, pub encrypted_blob: Vec, pub signature: String, // Optional TLV. - pub to_self_delay: Option, + // FIXME: BOLT13 uses u64. + pub to_self_delay: Option, } /// The appointment accepted message that is sent after an accepted add/update appointment message. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct AppointmentAccepted { pub locator: Locator, pub start_block: u32, @@ -54,7 +55,7 @@ pub struct AppointmentAccepted { } /// The appointment rejected message that is sent if an add/update appointment message was rejected. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct AppointmentRejected { pub locator: Locator, pub rcode: u16, @@ -62,21 +63,21 @@ pub struct AppointmentRejected { } /// The get appointment message sent by the user to retrieve a previously sent appointment from the tower. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct GetAppointment { pub locator: Locator, pub signature: String, } /// The appointment data message sent by the tower after a get appointment message. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct AppointmentData { pub locator: Locator, pub encrypted_blob: Vec, } /// The tracker data message sent by the tower when the requested appointment has been acted upon. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct TrackerData { pub dispute_txid: Txid, pub penalty_txid: Txid, @@ -85,19 +86,19 @@ pub struct TrackerData { /// The appointment not found message sent by the tower in response to a get appointment message /// whose locator didn't match any known appointment. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct AppointmentNotFound { pub locator: Locator, } /// The get subscription info message (a TEOS custom message, not a bolt13 one). -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct GetSubscriptionInfo { pub signature: String, } /// The subscription info message sent by the tower in response to get subscription info message. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct SubscriptionInfo { pub available_slots: u32, pub subscription_expiry: u32, @@ -189,7 +190,7 @@ set_msg_type!(AppointmentNotFound, 48864); set_msg_type!(GetSubscriptionInfo, 48865); set_msg_type!(SubscriptionInfo, 48867); -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum TowerMessage { // Register messages Register(Register), diff --git a/teos-common/src/lightning/mod.rs b/teos-common/src/lightning/mod.rs index 5a788a7a..9a0671e1 100644 --- a/teos-common/src/lightning/mod.rs +++ b/teos-common/src/lightning/mod.rs @@ -1,3 +1,4 @@ +mod convert; pub mod messages; mod ser_macros; mod ser_utils; diff --git a/teos/src/api/lightning.rs b/teos/src/api/lightning.rs new file mode 100644 index 00000000..679d7047 --- /dev/null +++ b/teos/src/api/lightning.rs @@ -0,0 +1,191 @@ +//! Watchtower's Lightning interface. + +use bitcoin::secp256k1::PublicKey; +use tokio::runtime; + +use crate::protos::public_tower_services_client::PublicTowerServicesClient; +use tonic::transport::Channel; +use tonic::Code; + +use lightning::io; +use lightning::ln::msgs::{DecodeError, ErrorAction, LightningError, WarningMessage}; +use lightning::ln::peer_handler::CustomMessageHandler; +use lightning::ln::wire::CustomMessageReader; +use lightning::util::logger; +use lightning::util::ser::Readable; + +use std::mem; +use std::sync::Mutex; + +use teos_common::lightning::messages::*; +use teos_common::protos as common_msgs; + +/// A helper that returns an [`Err(LightningError)`] with the specified warning message. +fn warn_peer(msg_to_peer: &str, msg_to_log: &str) -> Result { + Err(LightningError { + err: msg_to_log.to_owned(), + action: ErrorAction::SendWarningMessage { + msg: WarningMessage { + // Zeros for channel id tells that the warning isn't channel specific. + channel_id: [0; 32], + data: msg_to_peer.to_owned(), + }, + log_level: logger::Level::Warn, + }, + }) +} + +/// A handler to handle the incoming [`TowerMessage`]s. +pub struct TowerMessageHandler { + /// A queue holding the response messages or errors the tower wants to send to its peers. + msg_queue: Mutex>, + // TODO: Will it make more sense using the watcher interface instead of the gRPC? + // since the watcher interface is not async and it does provide richer error codes. + /// A connection to the tower's internal gRPC API. + grpc_conn: PublicTowerServicesClient, + /// A tokio runtime handle to run gRPC async calls on. + handle: runtime::Handle, +} + +impl TowerMessageHandler { + fn new(grpc_conn: PublicTowerServicesClient, handle: runtime::Handle) -> Self { + Self { + msg_queue: Mutex::new(Vec::new()), + grpc_conn, + handle, + } + } + + fn handle_tower_message( + &self, + msg: TowerMessage, + peer: &PublicKey, + ) -> Result { + log::info!("Received {:?} from {}", msg, peer); + let mut grpc_conn = self.grpc_conn.clone(); + match msg { + TowerMessage::Register(msg) => { + let res = self + .handle + .block_on(grpc_conn.register(common_msgs::RegisterRequest::from(msg))); + match res { + Ok(r) => Ok(r.into_inner().into()), + Err(e) => warn_peer( + e.message(), + &format!("Failed registering {} because {}", peer, e.message()), + ), + } + } + TowerMessage::AddUpdateAppointment(msg) => { + let res = + self.handle + .block_on(grpc_conn.add_appointment( + common_msgs::AddAppointmentRequest::from(msg.clone()), + )); + match res { + Ok(r) => Ok(r.into_inner().into()), + // NOTE: The gRPC interface multiplexes the errors and doesn't let us know what they exactly + // were. Possible errors can be found [here](crate::watcher::AddAppointmentFailure). + Err(e) if e.code() == Code::Unauthenticated => Ok(AppointmentRejected { + locator: msg.locator, + rcode: Code::Unauthenticated as u16, + reason: e.message().into(), + } + .into()), + Err(e) if e.code() == Code::AlreadyExists => Ok(AppointmentRejected { + locator: msg.locator, + rcode: Code::AlreadyExists as u16, + reason: e.message().into(), + } + .into()), + Err(e) => warn_peer( + e.message(), + &format!( + "Failed accepting appointment from {} with locator {}", + peer, msg.locator + ), + ), + } + } + TowerMessage::GetAppointment(msg) => { + let res = + self.handle + .block_on(grpc_conn.get_appointment( + common_msgs::GetAppointmentRequest::from(msg.clone()), + )); + match res { + Ok(r) => Ok(r.into_inner().into()), + Err(e) if e.code() == Code::NotFound => Ok(AppointmentNotFound { + locator: msg.locator, + } + .into()), + Err(e) => warn_peer( + e.message(), + &format!( + "GetAppointment request from {} failed because {}", + peer, + e.message() + ), + ), + } + } + TowerMessage::GetSubscriptionInfo(msg) => { + let res = self.handle.block_on( + grpc_conn + .get_subscription_info(common_msgs::GetSubscriptionInfoRequest::from(msg)), + ); + match res { + Ok(r) => Ok(r.into_inner().into()), + Err(e) => warn_peer( + e.message(), + &format!( + "GetSubscriptionInfo request from {} failed because {}", + peer, + e.message() + ), + ), + } + } + // TODO: DeleteAppointment + // TowerMessageHandler as CustomMessageReader won't produce other than the above messages. + _ => unreachable!(), + } + } +} + +impl CustomMessageReader for TowerMessageHandler { + type CustomMessage = TowerMessage; + + fn read( + &self, + message_type: u16, + buffer: &mut R, + ) -> Result, DecodeError> { + match message_type { + Register::TYPE => Ok(Some(Register::read(buffer)?.into())), + AddUpdateAppointment::TYPE => Ok(Some(AddUpdateAppointment::read(buffer)?.into())), + GetAppointment::TYPE => Ok(Some(GetAppointment::read(buffer)?.into())), + GetSubscriptionInfo::TYPE => Ok(Some(GetSubscriptionInfo::read(buffer)?.into())), + // Unknown message. + _ => Ok(None), + } + } +} + +impl CustomMessageHandler for TowerMessageHandler { + fn handle_custom_message( + &self, + msg: TowerMessage, + sender_node_id: &PublicKey, + ) -> Result<(), LightningError> { + self.msg_queue.lock().unwrap().push(( + *sender_node_id, + self.handle_tower_message(msg, sender_node_id)?, + )); + Ok(()) + } + + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, TowerMessage)> { + mem::take(&mut self.msg_queue.lock().unwrap()) + } +} diff --git a/teos/src/api/mod.rs b/teos/src/api/mod.rs index e901b046..76ad53d8 100644 --- a/teos/src/api/mod.rs +++ b/teos/src/api/mod.rs @@ -1,4 +1,5 @@ pub mod http; pub mod internal; +pub mod lightning; pub mod serde; pub mod tor; From 88bbb1289bd8437b9a0db65f393279b7073f76a1 Mon Sep 17 00:00:00 2001 From: meryacine Date: Thu, 4 Aug 2022 12:46:56 +0200 Subject: [PATCH 09/14] Adding the lightning server This commit adds the lightning server and its port configuration option. The peer manager in charge of the lightning server doesn't know about routing and lightning channels (yet?). It can only responde to the user custom messages defined in `teos_common::lightning::messages`. A \`block_in_place\` is used in \`handle_tower_message\` to synchronously block on grpc calls. --- teos/src/api/lightning.rs | 261 +++++++++++++++++++++++++------------- teos/src/config.rs | 24 +++- teos/src/main.rs | 23 +++- 3 files changed, 210 insertions(+), 98 deletions(-) diff --git a/teos/src/api/lightning.rs b/teos/src/api/lightning.rs index 679d7047..ed6c5be0 100644 --- a/teos/src/api/lightning.rs +++ b/teos/src/api/lightning.rs @@ -1,7 +1,7 @@ //! Watchtower's Lightning interface. -use bitcoin::secp256k1::PublicKey; -use tokio::runtime; +use bitcoin::secp256k1::{PublicKey, SecretKey}; +use triggered::Listener; use crate::protos::public_tower_services_client::PublicTowerServicesClient; use tonic::transport::Channel; @@ -9,17 +9,34 @@ use tonic::Code; use lightning::io; use lightning::ln::msgs::{DecodeError, ErrorAction, LightningError, WarningMessage}; -use lightning::ln::peer_handler::CustomMessageHandler; +use lightning::ln::peer_handler::{ + CustomMessageHandler, ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, + PeerManager, +}; use lightning::ln::wire::CustomMessageReader; -use lightning::util::logger; +use lightning::util::logger::{Level, Logger as LightningLogger, Record}; use lightning::util::ser::Readable; +use lightning_net_tokio::SocketDescriptor; + +use std::convert::TryInto; use std::mem; -use std::sync::Mutex; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use teos_common::cryptography::get_random_bytes; use teos_common::lightning::messages::*; use teos_common::protos as common_msgs; +// FIXME: Check if we can drop some Arcs here. +type TowerPeerManager = PeerManager< + SocketDescriptor, + Arc, // No channel message handler + Arc, // No routing message handler + Arc, + Arc, // Using our custom message handler +>; + /// A helper that returns an [`Err(LightningError)`] with the specified warning message. fn warn_peer(msg_to_peer: &str, msg_to_log: &str) -> Result { Err(LightningError { @@ -30,7 +47,7 @@ fn warn_peer(msg_to_peer: &str, msg_to_log: &str) -> Result>, // TODO: Will it make more sense using the watcher interface instead of the gRPC? // since the watcher interface is not async and it does provide richer error codes. - /// A connection to the tower's internal gRPC API. + /// A connection to the tower's public internal gRPC API. grpc_conn: PublicTowerServicesClient, /// A tokio runtime handle to run gRPC async calls on. - handle: runtime::Handle, + handle: tokio::runtime::Handle, } impl TowerMessageHandler { - fn new(grpc_conn: PublicTowerServicesClient, handle: runtime::Handle) -> Self { + fn new(grpc_conn: PublicTowerServicesClient, handle: tokio::runtime::Handle) -> Self { Self { msg_queue: Mutex::new(Vec::new()), grpc_conn, @@ -56,100 +73,105 @@ impl TowerMessageHandler { } } + /// Handles a tower request message by casting it to a gRPC message and send it to the + /// internal API. The API's response is then casted to a tower response and returned. + /// The argument `peer` is used for logging purposes only. fn handle_tower_message( &self, msg: TowerMessage, peer: &PublicKey, ) -> Result { - log::info!("Received {:?} from {}", msg, peer); - let mut grpc_conn = self.grpc_conn.clone(); - match msg { - TowerMessage::Register(msg) => { - let res = self - .handle - .block_on(grpc_conn.register(common_msgs::RegisterRequest::from(msg))); - match res { - Ok(r) => Ok(r.into_inner().into()), - Err(e) => warn_peer( - e.message(), - &format!("Failed registering {} because {}", peer, e.message()), - ), + tokio::task::block_in_place(|| { + log::info!("Received {:?} from {}", msg, peer); + let mut grpc_conn = self.grpc_conn.clone(); + match msg { + TowerMessage::Register(msg) => { + let res = self + .handle + .block_on(grpc_conn.register(common_msgs::RegisterRequest::from(msg))); + match res { + Ok(r) => Ok(r.into_inner().into()), + Err(e) => warn_peer( + e.message(), + &format!("Failed registering {} because {}", peer, e.message()), + ), + } } - } - TowerMessage::AddUpdateAppointment(msg) => { - let res = - self.handle - .block_on(grpc_conn.add_appointment( + TowerMessage::AddUpdateAppointment(msg) => { + let res = + self.handle.block_on(grpc_conn.add_appointment( common_msgs::AddAppointmentRequest::from(msg.clone()), )); - match res { - Ok(r) => Ok(r.into_inner().into()), - // NOTE: The gRPC interface multiplexes the errors and doesn't let us know what they exactly - // were. Possible errors can be found [here](crate::watcher::AddAppointmentFailure). - Err(e) if e.code() == Code::Unauthenticated => Ok(AppointmentRejected { - locator: msg.locator, - rcode: Code::Unauthenticated as u16, - reason: e.message().into(), - } - .into()), - Err(e) if e.code() == Code::AlreadyExists => Ok(AppointmentRejected { - locator: msg.locator, - rcode: Code::AlreadyExists as u16, - reason: e.message().into(), + match res { + Ok(r) => Ok(r.into_inner().into()), + // NOTE: The gRPC interface multiplexes the errors and doesn't let us know what they exactly + // were. Possible errors can be found [here](crate::watcher::AddAppointmentFailure). + Err(e) if e.code() == Code::Unauthenticated => Ok(AppointmentRejected { + locator: msg.locator, + rcode: Code::Unauthenticated as u16, + reason: e.message().into(), + } + .into()), + Err(e) if e.code() == Code::AlreadyExists => Ok(AppointmentRejected { + locator: msg.locator, + rcode: Code::AlreadyExists as u16, + reason: e.message().into(), + } + .into()), + Err(e) => { + warn_peer( + e.message(), + &format!( + "Failed accepting appointment from {} with locator {} because {}", + peer, msg.locator, e.message() + ), + ) + } } - .into()), - Err(e) => warn_peer( - e.message(), - &format!( - "Failed accepting appointment from {} with locator {}", - peer, msg.locator - ), - ), } - } - TowerMessage::GetAppointment(msg) => { - let res = - self.handle - .block_on(grpc_conn.get_appointment( + TowerMessage::GetAppointment(msg) => { + let res = + self.handle.block_on(grpc_conn.get_appointment( common_msgs::GetAppointmentRequest::from(msg.clone()), )); - match res { - Ok(r) => Ok(r.into_inner().into()), - Err(e) if e.code() == Code::NotFound => Ok(AppointmentNotFound { - locator: msg.locator, - } - .into()), - Err(e) => warn_peer( - e.message(), - &format!( - "GetAppointment request from {} failed because {}", - peer, - e.message() + match res { + Ok(r) => Ok(r.into_inner().into()), + Err(e) if e.code() == Code::NotFound => Ok(AppointmentNotFound { + locator: msg.locator, + } + .into()), + Err(e) => warn_peer( + e.message(), + &format!( + "GetAppointment request from {} failed because {}", + peer, + e.message() + ), ), - ), + } } - } - TowerMessage::GetSubscriptionInfo(msg) => { - let res = self.handle.block_on( - grpc_conn - .get_subscription_info(common_msgs::GetSubscriptionInfoRequest::from(msg)), - ); - match res { - Ok(r) => Ok(r.into_inner().into()), - Err(e) => warn_peer( - e.message(), - &format!( - "GetSubscriptionInfo request from {} failed because {}", - peer, - e.message() + TowerMessage::GetSubscriptionInfo(msg) => { + let res = + self.handle.block_on(grpc_conn.get_subscription_info( + common_msgs::GetSubscriptionInfoRequest::from(msg), + )); + match res { + Ok(r) => Ok(r.into_inner().into()), + Err(e) => warn_peer( + e.message(), + &format!( + "GetSubscriptionInfo request from {} failed because {}", + peer, + e.message() + ), ), - ), + } } + // TODO: DeleteAppointment + // TowerMessageHandler as CustomMessageReader won't produce other than the above messages. + _ => unreachable!(), } - // TODO: DeleteAppointment - // TowerMessageHandler as CustomMessageReader won't produce other than the above messages. - _ => unreachable!(), - } + }) } } @@ -189,3 +211,70 @@ impl CustomMessageHandler for TowerMessageHandler { mem::take(&mut self.msg_queue.lock().unwrap()) } } + +/// A translation struct to translate LDK's logs to our logging system's logs. +pub struct Logger; + +impl LightningLogger for Logger { + fn log(&self, record: &Record) { + match record.level { + Level::Error => log::error!(target: record.module_path, "{}", record.args), + Level::Warn => log::warn!(target: record.module_path, "{}", record.args), + Level::Info => log::info!(target: record.module_path, "{}", record.args), + Level::Debug => log::debug!(target: record.module_path, "{}", record.args), + Level::Trace => log::trace!(target: record.module_path, "{}", record.args), + _ => {} + } + } +} + +pub async fn serve( + lightning_bind: SocketAddr, + grpc_bind: String, + shutdown_signal: Listener, + tower_sk: SecretKey, +) { + let grpc_conn = loop { + match PublicTowerServicesClient::connect(grpc_bind.clone()).await { + Ok(conn) => break conn, + Err(_) => { + log::error!("Cannot connect to the gRPC server. Retrying shortly"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + }; + let tower_message_handler = Arc::new(TowerMessageHandler::new( + grpc_conn, + tokio::runtime::Handle::current(), + )); + let message_handler = MessageHandler { + chan_handler: Arc::new(ErroringMessageHandler::new()), + route_handler: Arc::new(IgnoringMessageHandler {}), + }; + let ephemeral_bytes: [u8; 32] = get_random_bytes(32).try_into().unwrap(); + let peer_manager = Arc::new(TowerPeerManager::new( + message_handler, + tower_sk, + &ephemeral_bytes, + Arc::new(Logger), + tower_message_handler, + )); + // To suppress an issue similar to https://github.com/rust-lang/rust-clippy/issues/2928 + #[allow(clippy::expect_fun_call)] + let listener = tokio::net::TcpListener::bind(lightning_bind) + .await + .expect(&format!( + "Couldn't bind the lightning server to {}", + lightning_bind + )); + loop { + let tcp_stream = listener.accept().await.unwrap().0; + if shutdown_signal.is_triggered() { + return; + } + let peer_manager = peer_manager.clone(); + tokio::spawn(async move { + lightning_net_tokio::setup_inbound(peer_manager, tcp_stream.into_std().unwrap()).await; + }); + } +} diff --git a/teos/src/config.rs b/teos/src/config.rs index 54e7a996..bc1acac8 100644 --- a/teos/src/config.rs +++ b/teos/src/config.rs @@ -46,13 +46,17 @@ impl std::error::Error for ConfigError {} #[structopt(rename_all = "lowercase")] #[structopt(version = env!("CARGO_PKG_VERSION"), about = "The Eye of Satoshi - Lightning watchtower")] pub struct Opt { - /// Address teos HTTP(s) API will bind to [default: localhost] + /// Address teos API will bind to [default: localhost] #[structopt(long)] pub api_bind: Option, + /// Port teos Lightning API will bind to [default: 9815] + #[structopt(long)] + pub lightning_port: Option, + /// Port teos HTTP(s) API will bind to [default: 9814] #[structopt(long)] - pub api_port: Option, + pub http_port: Option, /// Address teos RPC server will bind to [default: localhost] #[structopt(long)] @@ -122,7 +126,8 @@ pub struct Opt { pub struct Config { // API pub api_bind: String, - pub api_port: u16, + pub lightning_port: u16, + pub http_port: u16, // RPC pub rpc_bind: String, @@ -163,8 +168,11 @@ impl Config { if options.api_bind.is_some() { self.api_bind = options.api_bind.unwrap(); } - if options.api_port.is_some() { - self.api_port = options.api_port.unwrap(); + if options.lightning_port.is_some() { + self.lightning_port = options.lightning_port.unwrap(); + } + if options.http_port.is_some() { + self.http_port = options.http_port.unwrap(); } if options.rpc_bind.is_some() { self.rpc_bind = options.rpc_bind.unwrap(); @@ -254,7 +262,8 @@ impl Default for Config { fn default() -> Self { Self { api_bind: "127.0.0.1".into(), - api_port: 9814, + http_port: 9814, + lightning_port: 9815, tor_support: false, tor_control_port: 9051, onion_hidden_service_port: 9814, @@ -288,7 +297,8 @@ mod tests { fn default() -> Self { Self { api_bind: None, - api_port: None, + http_port: None, + lightning_port: None, tor_support: false, tor_control_port: None, onion_hidden_service_port: None, diff --git a/teos/src/main.rs b/teos/src/main.rs index 19863109..5f5c6227 100644 --- a/teos/src/main.rs +++ b/teos/src/main.rs @@ -18,8 +18,9 @@ use lightning_block_sync::poll::{ }; use lightning_block_sync::{BlockSource, SpvClient, UnboundedCache}; +use teos::api; use teos::api::internal::InternalAPI; -use teos::api::{http, tor::TorAPI}; +use teos::api::tor::TorAPI; use teos::bitcoin_cli::BitcoindClient; use teos::carrier::Carrier; use teos::chain_monitor::ChainMonitor; @@ -250,6 +251,7 @@ async fn main() { let (shutdown_trigger, shutdown_signal_rpc_api) = triggered::trigger(); let shutdown_signal_internal_rpc_api = shutdown_signal_rpc_api.clone(); + let shutdown_signal_lightning = shutdown_signal_rpc_api.clone(); let shutdown_signal_http = shutdown_signal_rpc_api.clone(); let shutdown_signal_cm = shutdown_signal_rpc_api.clone(); let shutdown_signal_tor = shutdown_signal_rpc_api.clone(); @@ -274,12 +276,15 @@ async fn main() { log::info!("Bootstrap completed. Turning on interfaces"); // Build interfaces - let http_api_addr = format!("{}:{}", conf.api_bind, conf.api_port) + let lightning_api_addr = format!("{}:{}", conf.api_bind, conf.lightning_port) + .parse() + .unwrap(); + let http_api_addr = format!("{}:{}", conf.api_bind, conf.http_port) .parse() .unwrap(); let mut addresses = vec![msgs::NetworkAddress::from_ipv4( conf.api_bind.clone(), - conf.api_port, + conf.http_port, )]; // Create Tor endpoint if required @@ -293,7 +298,7 @@ async fn main() { .await; addresses.push(msgs::NetworkAddress::from_torv3( tor_api.get_onion_address(), - conf.api_port, + conf.onion_hidden_service_port, )); Some(tor_api) @@ -351,7 +356,14 @@ async fn main() { }); let (http_service_ready, ready_signal_http) = triggered::trigger(); - let http_api_task = task::spawn(http::serve( + let lightning_api_task = task::spawn(api::lightning::serve( + lightning_api_addr, + internal_rpc_api_uri.clone(), + shutdown_signal_lightning, + tower_sk, + )); + + let http_api_task = task::spawn(api::http::serve( http_api_addr, internal_rpc_api_uri, http_service_ready, @@ -382,6 +394,7 @@ async fn main() { chain_monitor.monitor_chain().await; // Wait until shutdown + lightning_api_task.await.unwrap(); http_api_task.await.unwrap(); private_api_task.await.unwrap(); public_api_task.await.unwrap(); From de3c7df371c509fe5cb4c70e8e2c4e1385eb80da Mon Sep 17 00:00:00 2001 From: meryacine Date: Sun, 7 Aug 2022 09:31:08 +0200 Subject: [PATCH 10/14] Refactor `teos::api::http` tests to reuse code for lightning tests --- teos/src/api/http.rs | 96 +++++++----------------------------------- teos/src/test_utils.rs | 44 +++++++++++++++++++ 2 files changed, 59 insertions(+), 81 deletions(-) diff --git a/teos/src/api/http.rs b/teos/src/api/http.rs index f35b9b20..5bfd95f0 100644 --- a/teos/src/api/http.rs +++ b/teos/src/api/http.rs @@ -316,13 +316,8 @@ mod test_helpers { use serde::de::DeserializeOwned; use serde_json::Value; - use std::sync::Arc; - use tokio::net::TcpListener; - use tonic::transport::Server; - use crate::api::internal::InternalAPI; - use crate::protos::public_tower_services_server::PublicTowerServicesServer; - use crate::test_utils::{create_api_with_config, ApiConfig, BitcoindStopper}; + use crate::test_utils::get_public_grpc_conn; pub(crate) enum RequestBody<'a> { Jsonify(&'a str), @@ -331,45 +326,12 @@ mod test_helpers { Body(&'a str), } - pub(crate) async fn run_tower_in_background_with_config( - api_config: ApiConfig, - ) -> (SocketAddr, Arc, BitcoindStopper) { - let (internal_rpc_api, bitcoind_stopper) = create_api_with_config(api_config).await; - let cloned = internal_rpc_api.clone(); - - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - tokio::spawn(async move { - Server::builder() - .add_service(PublicTowerServicesServer::new(internal_rpc_api)) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) - .await - .unwrap(); - }); - - (addr, cloned, bitcoind_stopper) - } - - pub(crate) async fn run_tower_in_background() -> (SocketAddr, BitcoindStopper) { - let (sock_addr, _, bitcoind_stopper) = - run_tower_in_background_with_config(ApiConfig::default()).await; - - (sock_addr, bitcoind_stopper) - } - pub(crate) async fn check_api_error( endpoint: Endpoint, body: RequestBody<'_>, server_addr: SocketAddr, ) -> (ApiError, StatusCode) { - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let req = match body { RequestBody::Json(j) => warp::test::request() @@ -406,13 +368,7 @@ mod test_helpers { B: Serialize, T: DeserializeOwned, { - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .method("POST") @@ -427,9 +383,11 @@ mod test_helpers { #[cfg(test)] mod tests_failures { - use super::test_helpers::{check_api_error, run_tower_in_background, RequestBody}; + use super::test_helpers::{check_api_error, RequestBody}; use super::*; + use crate::test_utils::{get_public_grpc_conn, run_tower_in_background}; + use teos_common::test_utils::get_random_user_id; #[tokio::test] @@ -561,13 +519,7 @@ mod tests_failures { #[tokio::test] async fn test_empty_request_body() { let (server_addr, _s) = run_tower_in_background().await; - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .method("POST") @@ -581,13 +533,7 @@ mod tests_failures { #[tokio::test] async fn test_payload_too_large() { let (server_addr, _s) = run_tower_in_background().await; - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .method("POST") @@ -602,13 +548,7 @@ mod tests_failures { #[tokio::test] async fn test_wrong_endpoint() { let (server_addr, _s) = run_tower_in_background().await; - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .method("POST") @@ -622,13 +562,7 @@ mod tests_failures { #[tokio::test] async fn test_wrong_method() { let (server_addr, _s) = run_tower_in_background().await; - let grpc_conn = PublicTowerServicesClient::connect(format!( - "http://{}:{}", - server_addr.ip(), - server_addr.port() - )) - .await - .unwrap(); + let grpc_conn = get_public_grpc_conn(server_addr).await; let res = warp::test::request() .json(&"") @@ -641,14 +575,14 @@ mod tests_failures { #[cfg(test)] mod tests_methods { - use super::test_helpers::{ - check_api_error, request_to_api, run_tower_in_background, - run_tower_in_background_with_config, RequestBody, - }; + use super::test_helpers::{check_api_error, request_to_api, RequestBody}; use super::*; use crate::extended_appointment::UUID; - use crate::test_utils::{generate_dummy_appointment, ApiConfig, DURATION, SLOTS}; + use crate::test_utils::{ + generate_dummy_appointment, run_tower_in_background, run_tower_in_background_with_config, + ApiConfig, DURATION, SLOTS, + }; use teos_common::test_utils::get_random_user_id; use teos_common::{cryptography, UserId}; diff --git a/teos/src/test_utils.rs b/teos/src/test_utils.rs index b7959fd3..c88c2801 100644 --- a/teos/src/test_utils.rs +++ b/teos/src/test_utils.rs @@ -8,6 +8,7 @@ */ use rand::Rng; +use std::net::SocketAddr; use std::sync::{Arc, Condvar, Mutex}; use std::thread; @@ -15,6 +16,8 @@ use jsonrpc_http_server::jsonrpc_core::error::ErrorCode as JsonRpcErrorCode; use jsonrpc_http_server::jsonrpc_core::{Error as JsonRpcError, IoHandler, Params, Value}; use jsonrpc_http_server::{CloseHandle, Server, ServerBuilder}; +use tonic::transport; + use bitcoincore_rpc::{Auth, Client as BitcoindClient}; use bitcoin::blockdata::block::{Block, BlockHeader}; @@ -46,6 +49,8 @@ use crate::dbm::DBM; use crate::extended_appointment::{ExtendedAppointment, UUID}; use crate::gatekeeper::{Gatekeeper, UserInfo}; use crate::protos as msgs; +use crate::protos::public_tower_services_client::PublicTowerServicesClient; +use crate::protos::public_tower_services_server::PublicTowerServicesServer; use crate::responder::{ConfirmationStatus, Responder, TransactionTracker}; use crate::rpc_errors; use crate::watcher::{Breach, Watcher}; @@ -530,6 +535,45 @@ impl Drop for BitcoindStopper { } } +pub(crate) async fn run_tower_in_background_with_config( + api_config: ApiConfig, +) -> (SocketAddr, Arc, BitcoindStopper) { + let (internal_rpc_api, bitcoind_stopper) = create_api_with_config(api_config).await; + let cloned = internal_rpc_api.clone(); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + transport::Server::builder() + .add_service(PublicTowerServicesServer::new(internal_rpc_api)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) + .await + .unwrap(); + }); + + (addr, cloned, bitcoind_stopper) +} + +pub(crate) async fn run_tower_in_background() -> (SocketAddr, BitcoindStopper) { + let (sock_addr, _, bitcoind_stopper) = + run_tower_in_background_with_config(ApiConfig::default()).await; + + (sock_addr, bitcoind_stopper) +} + +pub(crate) async fn get_public_grpc_conn( + server_addr: SocketAddr, +) -> PublicTowerServicesClient { + PublicTowerServicesClient::connect(format!( + "http://{}:{}", + server_addr.ip(), + server_addr.port() + )) + .await + .unwrap() +} + pub(crate) struct BitcoindMock { pub url: String, pub server: Server, From 697be71674350a5bbfa1a9eeeab3b879ebc259e1 Mon Sep 17 00:00:00 2001 From: meryacine Date: Sun, 7 Aug 2022 11:34:49 +0200 Subject: [PATCH 11/14] tests: Add a signing key getter method for Watcher --- teos/src/watcher.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/teos/src/watcher.rs b/teos/src/watcher.rs index a9672a12..a8f9a7a5 100644 --- a/teos/src/watcher.rs +++ b/teos/src/watcher.rs @@ -809,6 +809,10 @@ mod tests { self.responder .add_random_tracker(uuid, ConfirmationStatus::ConfirmedIn(100)) } + + pub(crate) fn get_signing_key(&self) -> SecretKey { + self.signing_key + } } async fn init_watcher(chain: &mut Blockchain) -> (Watcher, BitcoindStopper) { From 1857e12197ce8cf710943dca4a156e3ecad729ba Mon Sep 17 00:00:00 2001 From: meryacine Date: Wed, 10 Aug 2022 13:08:16 +0200 Subject: [PATCH 12/14] tests: Add lightning tower handler tests These tests are analogous to tests in `http::tests_methods` and they test the tower message handler logic only (not the tower message handler when inside a peer manager) --- teos/src/api/lightning.rs | 433 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 433 insertions(+) diff --git a/teos/src/api/lightning.rs b/teos/src/api/lightning.rs index ed6c5be0..79bea643 100644 --- a/teos/src/api/lightning.rs +++ b/teos/src/api/lightning.rs @@ -278,3 +278,436 @@ pub async fn serve( }); } } + +#[cfg(test)] +mod test_helpers { + use super::*; + + use crate::api::internal::InternalAPI; + use crate::test_utils::{ + get_public_grpc_conn, run_tower_in_background_with_config, ApiConfig, BitcoindStopper, + }; + + pub(crate) async fn get_tower_message_handler_with_config( + conf: ApiConfig, + ) -> (Arc, Arc, BitcoindStopper) { + let (server_addr, internal_api, bitcoind_stopper) = + run_tower_in_background_with_config(conf).await; + let grpc_conn = get_public_grpc_conn(server_addr).await; + let handle = tokio::runtime::Handle::current(); + ( + Arc::new(TowerMessageHandler::new(grpc_conn, handle)), + internal_api, + bitcoind_stopper, + ) + } + + pub(crate) async fn get_tower_message_handler( + ) -> (Arc, Arc, BitcoindStopper) { + get_tower_message_handler_with_config(ApiConfig::default()).await + } + + pub(crate) async fn request_to_tower_message_handler( + tower: &Arc, + msg: TowerMessage, + peer: PublicKey, + ) -> Result { + let tower = tower.clone(); + // Must use `spawn_blocking` because `handle_tower_message` uses `block_on`. + tokio::task::spawn_blocking(move || tower.handle_tower_message(msg, &peer)) + .await + .unwrap() + } +} + +#[cfg(test)] +mod message_handler_tests { + use super::test_helpers::*; + use super::*; + + use teos_common::cryptography::{get_random_keypair, sign}; + use teos_common::test_utils::{generate_random_appointment, get_random_user_id}; + use teos_common::UserId; + + use crate::extended_appointment::UUID; + use crate::test_utils::{ApiConfig, DURATION}; + + #[tokio::test] + async fn test_register() { + let (tower, _, _s) = get_tower_message_handler().await; + let user_id = get_random_user_id(); + let msg = Register { + pubkey: user_id, + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + assert!(matches! { + request_to_tower_message_handler(&tower, msg, user_id.0).await, + Ok(TowerMessage::SubscriptionDetails(SubscriptionDetails { + .. + })) + }) + } + + #[tokio::test] + async fn test_register_max_slots() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::new(u32::MAX, DURATION)).await; + let user_id = get_random_user_id(); + let msg: TowerMessage = Register { + pubkey: user_id, + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // First registration should go through. + assert!(matches!( + request_to_tower_message_handler(&tower, msg.clone(), user_id.0).await, + Ok(TowerMessage::SubscriptionDetails( + SubscriptionDetails { .. } + )) + )); + + // Second one should fail (maximum slots count reached). + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_id.0).await, + Err(LightningError { err, .. }) if err.contains("maximum slots") + )); + } + + #[tokio::test] + async fn test_register_service_unavailable() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::default().bitcoind_unreachable()) + .await; + let user_id = get_random_user_id(); + let msg = Register { + pubkey: user_id, + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_id.0).await, + Err(LightningError { err, .. }) if err.contains("currently unavailable") + )); + } + + #[tokio::test] + async fn test_add_appointment() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // Register with the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob, + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentAccepted( + AppointmentAccepted { locator, .. } + )) if locator == appointment.locator + )); + } + + #[tokio::test] + async fn test_add_appointment_non_registered() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob, + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentRejected( + AppointmentRejected { locator, rcode, .. } + )) if locator == appointment.locator && rcode == Code::Unauthenticated as u16 + )); + } + + #[tokio::test] + async fn test_add_appointment_already_triggered() { + let (tower, internal_api, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // Register with the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob, + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + // Add the appointment to the responder so it counts as triggered. + internal_api + .get_watcher() + .add_random_tracker_to_responder(UUID::new(appointment.locator, UserId(user_pk))); + + // Send the appointment to the tower and assert it rejects because of being already triggered. + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentRejected( + AppointmentRejected { locator, rcode, .. } + )) if locator == appointment.locator && rcode == Code::AlreadyExists as u16 + )); + } + + #[tokio::test] + async fn test_add_appointment_service_unavailable() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::default().bitcoind_unreachable()) + .await; + let (user_sk, user_pk) = get_random_keypair(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob, + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("currently unavailable") + )); + } + + #[tokio::test] + async fn test_get_appointment() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // Register with the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let appointment = generate_random_appointment(None); + let signature = sign(&appointment.to_vec(), &user_sk).unwrap(); + let msg = AddUpdateAppointment { + locator: appointment.locator, + encrypted_blob: appointment.encrypted_blob.clone(), + signature, + to_self_delay: Some(appointment.to_self_delay), + } + .into(); + + // Send the appointment to the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let signature = sign( + format!("get appointment {}", appointment.locator).as_bytes(), + &user_sk, + ) + .unwrap(); + let msg = GetAppointment { + locator: appointment.locator, + signature, + } + .into(); + + // Assert the tower has the appointment we just sent. + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentData(AppointmentData { + locator, encrypted_blob + })) if locator == appointment.locator && encrypted_blob == appointment.encrypted_blob + )); + } + + #[tokio::test] + async fn test_get_appointment_non_registered() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let appointment = generate_random_appointment(None); + let signature = sign( + format!("get appointment {}", appointment.locator).as_bytes(), + &user_sk, + ) + .unwrap(); + let msg = GetAppointment { + locator: appointment.locator, + signature, + } + .into(); + + // Assert the tower cannot authenticate us. + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("cannot be authenticated") + )); + } + + #[tokio::test] + async fn test_get_appointment_not_found() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + // Register with the tower. + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let appointment = generate_random_appointment(None); + let signature = sign( + format!("get appointment {}", appointment.locator).as_bytes(), + &user_sk, + ) + .unwrap(); + let msg = GetAppointment { + locator: appointment.locator, + signature, + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::AppointmentNotFound(AppointmentNotFound { + locator + })) if locator == appointment.locator + )); + } + + #[tokio::test] + async fn test_get_appointment_service_unavailable() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::default().bitcoind_unreachable()) + .await; + let (user_sk, user_pk) = get_random_keypair(); + let appointment = generate_random_appointment(None); + let signature = sign( + format!("get appointment {}", appointment.locator).as_bytes(), + &user_sk, + ) + .unwrap(); + let msg = GetAppointment { + locator: appointment.locator, + signature, + } + .into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("currently unavailable") + )); + } + + #[tokio::test] + async fn test_get_subscription_info() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let msg = Register { + pubkey: UserId(user_pk), + // The tower doesn't use this info ATM. + appointment_slots: 4024, + subscription_period: 4002, + } + .into(); + + request_to_tower_message_handler(&tower, msg, user_pk) + .await + .unwrap(); + + let signature = sign(format!("get subscription info").as_bytes(), &user_sk).unwrap(); + let msg = GetSubscriptionInfo { signature }.into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Ok(TowerMessage::SubscriptionInfo(SubscriptionInfo { .. })) + )); + } + + #[tokio::test] + async fn test_get_subscription_info_non_registered() { + let (tower, _, _s) = get_tower_message_handler().await; + let (user_sk, user_pk) = get_random_keypair(); + let signature = sign(format!("get subscription info").as_bytes(), &user_sk).unwrap(); + let msg = GetSubscriptionInfo { signature }.into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("User not found") + )); + } + + #[tokio::test] + async fn test_get_subscription_info_service_unavailable() { + let (tower, _, _s) = + get_tower_message_handler_with_config(ApiConfig::default().bitcoind_unreachable()) + .await; + let (user_sk, user_pk) = get_random_keypair(); + let signature = sign(format!("get subscription info").as_bytes(), &user_sk).unwrap(); + let msg = GetSubscriptionInfo { signature }.into(); + + assert!(matches!( + request_to_tower_message_handler(&tower, msg, user_pk).await, + Err(LightningError { err, .. }) if err.contains("currently unavailable") + )); + } +} From a1073356facbdb8ab4178fce12d2d1cccbbb6d65 Mon Sep 17 00:00:00 2001 From: meryacine Date: Fri, 12 Aug 2022 09:11:26 +0200 Subject: [PATCH 13/14] tests: Add test utils for peer manager testing These test utils include: - A test lightning client: this simulates a client's peer manager that is trying to connect to the tower and send/receive tower messages. For a real client, this should be part of a bigger application (say a lightning node) and the custom message handler adds the capabilities of speaking with the tower over Lightning. - Some utils to launch a Lightning server and connect one peer manager to another - A simple proof of concept test --- Cargo.lock | 1 + teos/Cargo.toml | 1 + teos/src/api/lightning.rs | 247 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 249 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 703f3868..3b5818bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2996,6 +2996,7 @@ version = "0.2.0" dependencies = [ "bitcoin", "bitcoincore-rpc", + "futures", "hex", "home", "jsonrpc-http-server", diff --git a/teos/Cargo.toml b/teos/Cargo.toml index b062b467..81e4c3cd 100644 --- a/teos/Cargo.toml +++ b/teos/Cargo.toml @@ -47,6 +47,7 @@ teos-common = { path = "../teos-common" } tonic-build = "0.6" [dev-dependencies] +futures = "0.3.21" jsonrpc-http-server = "17.1.0" rand = "0.8.4" tempdir = "0.3.7" diff --git a/teos/src/api/lightning.rs b/teos/src/api/lightning.rs index 79bea643..5f1a2629 100644 --- a/teos/src/api/lightning.rs +++ b/teos/src/api/lightning.rs @@ -217,6 +217,14 @@ pub struct Logger; impl LightningLogger for Logger { fn log(&self, record: &Record) { + #[cfg(test)] + // Pass "-- --nocapture" flag to "cargo test" for this println to appear. + println!( + // "\x1B" stuff are terminal colors. Might not work in some terminals though. + "\x1B[42m{}\x1B[0m [\x1B[33m{}:{}\x1B[0m]: {}", + record.level, record.module_path, record.line, record.args + ); + #[cfg(not(test))] match record.level { Level::Error => log::error!(target: record.module_path, "{}", record.args), Level::Warn => log::warn!(target: record.module_path, "{}", record.args), @@ -279,15 +287,119 @@ pub async fn serve( } } +#[cfg(test)] +mod test_lightning_client { + use super::*; + + use std::collections::VecDeque; + + pub(crate) type TestClientPeerManager = PeerManager< + SocketDescriptor, + Arc, // No channel message handler + Arc, // No routing message handler + Arc, + Arc, // Using our custom message handler + >; + + pub(crate) struct TestClientMessageHandler { + msg_queue: Mutex>, + // A vector we store the received messages in to test whether the tower sent correct responses or not. + received_msgs: Mutex>, + } + + impl TestClientMessageHandler { + pub(crate) fn new() -> Self { + Self { + msg_queue: Mutex::new(Vec::new()), + received_msgs: Mutex::new(VecDeque::new()), + } + } + + /// Sends a tower message to `peer`. + /// This works by pushing the message to a pending messages queue and notifying the passed + /// `peer_manager` that there are some events to process. + /// + /// You should only pass the peer manager that is holding a reference of this `TestClientMessageHandler` + /// (`self`) as a custom message handler and not any other peer manager. + pub(crate) fn send_msg( + &self, + peer_manager: &TestClientPeerManager, + msg: TowerMessage, + peer: &PublicKey, + ) { + self.msg_queue.lock().unwrap().push((*peer, msg)); + // Let the peer manager process our pending message. + peer_manager.process_events(); + // The message queue must be empty after the peer manager has processed events. + assert!(self.msg_queue.lock().unwrap().is_empty()); + } + + pub(crate) fn received_msgs_count(&self) -> usize { + self.received_msgs.lock().unwrap().len() + } + + pub(crate) fn pop_oldest_received_msg(&self) -> TowerMessage { + self.received_msgs.lock().unwrap().pop_front().unwrap() + } + } + + impl CustomMessageReader for TestClientMessageHandler { + type CustomMessage = TowerMessage; + + fn read( + &self, + message_type: u16, + buffer: &mut R, + ) -> Result, DecodeError> { + match message_type { + Register::TYPE => Ok(Some(Register::read(buffer)?.into())), // A real client shouldn't have this + SubscriptionDetails::TYPE => Ok(Some(SubscriptionDetails::read(buffer)?.into())), + AddUpdateAppointment::TYPE => Ok(Some(AddUpdateAppointment::read(buffer)?.into())), // ,this + AppointmentAccepted::TYPE => Ok(Some(AppointmentAccepted::read(buffer)?.into())), + AppointmentRejected::TYPE => Ok(Some(AppointmentRejected::read(buffer)?.into())), + GetAppointment::TYPE => Ok(Some(GetAppointment::read(buffer)?.into())), // ,this + AppointmentData::TYPE => Ok(Some(AppointmentData::read(buffer)?.into())), + TrackerData::TYPE => Ok(Some(TrackerData::read(buffer)?.into())), + AppointmentNotFound::TYPE => Ok(Some(AppointmentNotFound::read(buffer)?.into())), + GetSubscriptionInfo::TYPE => Ok(Some(GetSubscriptionInfo::read(buffer)?.into())), // ,and this. + SubscriptionInfo::TYPE => Ok(Some(SubscriptionInfo::read(buffer)?.into())), + // Unknown message. + _ => Ok(None), + } + } + } + + impl CustomMessageHandler for TestClientMessageHandler { + fn handle_custom_message( + &self, + msg: TowerMessage, + _sender_node_id: &PublicKey, + ) -> Result<(), LightningError> { + self.received_msgs.lock().unwrap().push_back(msg); + Ok(()) + } + + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, TowerMessage)> { + mem::take(&mut self.msg_queue.lock().unwrap()) + } + } +} + #[cfg(test)] mod test_helpers { + use super::test_lightning_client::*; use super::*; + use bitcoin::secp256k1::Secp256k1; + use teos_common::cryptography::get_random_keypair; + use crate::api::internal::InternalAPI; use crate::test_utils::{ get_public_grpc_conn, run_tower_in_background_with_config, ApiConfig, BitcoindStopper, }; + pub(crate) const WAIT_DURATION: tokio::time::Duration = tokio::time::Duration::from_millis(10); + pub(crate) async fn get_tower_message_handler_with_config( conf: ApiConfig, ) -> (Arc, Arc, BitcoindStopper) { @@ -318,6 +430,105 @@ mod test_helpers { .await .unwrap() } + + /// Spawns a tower and a Lightning server that accepts tower messages. + /// Note that the server might not be fully booted up after this function returns. + pub(crate) async fn run_lightning_tower_with_config( + conf: ApiConfig, + ) -> (SocketAddr, PublicKey, BitcoindStopper) { + let (server_addr, internal_api, bitcoind_stopper) = + run_tower_in_background_with_config(conf).await; + let lightning_bind = { + let unused_port = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + unused_port.local_addr().unwrap() + }; + let grpc_bind = format!("http://{}:{}", server_addr.ip(), server_addr.port()); + let (_, shutdown_signal) = triggered::trigger(); + let tower_sk = internal_api.get_watcher().get_signing_key(); + // To make the tests simple, we won't let the testers await on the task or hand them shutdown triggers. + let _ = tokio::task::spawn(serve(lightning_bind, grpc_bind, shutdown_signal, tower_sk)); + ( + lightning_bind, + PublicKey::from_secret_key(&Secp256k1::new(), &tower_sk), + bitcoind_stopper, + ) + } + + pub(crate) async fn run_lightning_tower() -> (SocketAddr, PublicKey, BitcoindStopper) { + run_lightning_tower_with_config(ApiConfig::default()).await + } + + pub(crate) fn get_test_client_peer_manager() -> ( + Arc, + Arc, + PublicKey, + ) { + let client_message_handler = Arc::new(TestClientMessageHandler::new()); + let (client_sk, client_pk) = get_random_keypair(); + let ephemeral_bytes: [u8; 32] = get_random_bytes(32).try_into().unwrap(); + ( + Arc::new(TestClientPeerManager::new( + MessageHandler { + chan_handler: Arc::new(ErroringMessageHandler::new()), + route_handler: Arc::new(IgnoringMessageHandler {}), + }, + client_sk, + &ephemeral_bytes, + Arc::new(Logger), + client_message_handler.clone(), + )), + client_message_handler, + client_pk, + ) + } + + /// Connects `client_peer_manager` to another peer manager at `tower_addr`. + /// It keeps trying indefinitely till a connection is successful. + pub(crate) async fn connect_to_tower( + client_peer_manager: Arc, + tower_addr: SocketAddr, + tower_pk: PublicKey, + ) { + // From https://lightningdevkit.org/payments/connecting_peers/ + loop { + match lightning_net_tokio::connect_outbound( + client_peer_manager.clone(), + tower_pk, + tower_addr, + ) + .await + { + Some(connection_closed_future) => { + let mut connection_closed_future = Box::pin(connection_closed_future); + loop { + // Make sure the connection is still established. + match futures::poll!(&mut connection_closed_future) { + std::task::Poll::Ready(_) => { + panic!( + "{}@{} disconnected before handshake completed", + tower_pk, tower_addr + ); + } + std::task::Poll::Pending => {} + } + // Wait for the handshake to complete. + match client_peer_manager + .get_peer_node_ids() + .iter() + .find(|id| **id == tower_pk) + { + Some(_) => return, + None => tokio::time::sleep(WAIT_DURATION).await, + } + } + } + None => { + // The server takes some time to boot up. Let's wait a little bit. + tokio::time::sleep(WAIT_DURATION).await; + } + } + } + } } #[cfg(test)] @@ -711,3 +922,39 @@ mod message_handler_tests { )); } } + +#[cfg(test)] +mod peer_manager_tests { + use super::test_helpers::*; + use super::*; + + use teos_common::UserId; + + // Needs to be "multi_thread" because we "block_in_place" without using "spawn_blocking". + #[tokio::test(flavor = "multi_thread")] + async fn simple_test() { + let (tower_addr, tower_pk, _s) = run_lightning_tower().await; + let (client_peer_manager, client_messenger, client_pk) = get_test_client_peer_manager(); + connect_to_tower(client_peer_manager.clone(), tower_addr, tower_pk).await; + + let msg = Register { + pubkey: UserId(client_pk), + appointment_slots: 8778, + subscription_period: 6726, + } + .into(); + + // Send the register message to the tower. + client_messenger.send_msg(&client_peer_manager, msg, &tower_pk); + // And wait till we get a response. + while client_messenger.received_msgs_count() != 1 { + tokio::time::sleep(WAIT_DURATION).await; + } + + let received_msg = client_messenger.pop_oldest_received_msg(); + assert!(matches!( + received_msg, + TowerMessage::SubscriptionDetails(SubscriptionDetails { .. }) + )); + } +} From 5206d34248b1371a6453f335f758b5d0e75cd4d4 Mon Sep 17 00:00:00 2001 From: meryacine Date: Sun, 14 Aug 2022 09:08:25 +0200 Subject: [PATCH 14/14] Fix: Run `lightning-net-tokio` on a different runtime This fixes a deadlock issue that appears with too many concurrent requtests. This solution was taken from devrandom's issue: lightningdevkit/rust-lightning #1367 Another solution that would probably work is that lightning net tokio wraps our sync calls with `tokio::task::spawn_blocking(|| sync_call()).await.unwarp()`, but not sure how would this affect performace of other users with no async code requirements. Note that peer_manager_tests now don't need to be anotated with `flavor = "multi_thread"`, that's because the runtime we block_on (inside lightning net tokio) is the artificial runtime created in `api::lightning::server` which is already "multi_thread" --- teos/src/api/lightning.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/teos/src/api/lightning.rs b/teos/src/api/lightning.rs index 5f1a2629..a2bba6b9 100644 --- a/teos/src/api/lightning.rs +++ b/teos/src/api/lightning.rs @@ -275,13 +275,19 @@ pub async fn serve( "Couldn't bind the lightning server to {}", lightning_bind )); + // A tokio runtime handle to run lightning net tokio on. This is to fix a deadlock issue similar + // to https://github.com/lightningdevkit/rust-lightning/issues/1367 which appears with too many + // concurrent requests to the server. + let ldk_handle = Box::leak(Box::new(tokio::runtime::Runtime::new().unwrap())) + .handle() + .clone(); loop { let tcp_stream = listener.accept().await.unwrap().0; if shutdown_signal.is_triggered() { return; } let peer_manager = peer_manager.clone(); - tokio::spawn(async move { + ldk_handle.spawn(async move { lightning_net_tokio::setup_inbound(peer_manager, tcp_stream.into_std().unwrap()).await; }); } @@ -930,8 +936,7 @@ mod peer_manager_tests { use teos_common::UserId; - // Needs to be "multi_thread" because we "block_in_place" without using "spawn_blocking". - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn simple_test() { let (tower_addr, tower_pk, _s) = run_lightning_tower().await; let (client_peer_manager, client_messenger, client_pk) = get_test_client_peer_manager();