diff --git a/Cargo.toml b/Cargo.toml index 4658131..e855ccd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,15 +13,7 @@ rust-version = "1.63.0" [dependencies] bdk_wallet = { version = "1.0.0" } -kyoto-cbf = { version = "0.6.0", default-features = false, features = ["dns", "database"] } -tracing = { version = "0.1", optional = true } -tracing-subscriber = { version = "0.3", optional = true } - -[features] -default = ["events", "callbacks", "trace"] -trace = ["tracing", "tracing-subscriber"] -callbacks = [] -events = [] +kyoto-cbf = { version = "0.8.0", default-features = false, features = ["dns", "database"] } [dev-dependencies] tokio = { version = "1.37", features = ["full"], default-features = false } @@ -33,9 +25,4 @@ tracing-subscriber = { version = "0.3" } [[example]] -name = "callbacks" -required-features = ["trace", "callbacks"] - -[[example]] -name = "events" -required-features = ["events"] +name = "example" diff --git a/README.md b/README.md index 190bd01..a0449d5 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,6 @@ client/server relationship. Esplora and Electrum offer _proactive_ APIs, in that In the case of running a node as a background process, the developer experience is far more _reactive_, in that the node may emit any number of events, and the application may respond to them. BDK-Kyoto curates these events into structures that are easily handled by BDK APIs, making integration of compact block filters easily understood. -Developers are free to use [`bdk_wallet`](https://docs.rs/bdk_wallet/latest/bdk_wallet/), or only primitives found in [`bdk_core`](https://docs.rs/bdk_core/latest/bdk_core/) and [`bdk_chain`](https://docs.rs/bdk_chain/latest/bdk_chain/). - ## License Licensed under either of @@ -18,4 +16,4 @@ Licensed under either of * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or ) * MIT license ([LICENSE-MIT](LICENSE-MIT) or ) -at your option. \ No newline at end of file +at your option. diff --git a/examples/callbacks.rs b/examples/callbacks.rs deleted file mode 100644 index cc0ccfa..0000000 --- a/examples/callbacks.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::net::{IpAddr, Ipv4Addr}; - -use bdk_kyoto::builder::{LightClientBuilder, ServiceFlags, TrustedPeer}; -use bdk_kyoto::logger::TraceLogger; -use bdk_kyoto::{EventSenderExt, LightClient}; -use bdk_wallet::bitcoin::Network; -use bdk_wallet::{KeychainKind, Wallet}; - -/// Peer address whitelist -const PEERS: &[IpAddr] = &[IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100))]; - -/* Sync a bdk wallet */ - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)"; - let change_desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)"; - - let peers = PEERS - .iter() - .map(|ip| { - let mut peer = TrustedPeer::from_ip(*ip); - peer.set_services(ServiceFlags::P2P_V2); - peer - }) - .collect(); - - let mut wallet = Wallet::create(desc, change_desc) - .network(Network::Signet) - .lookahead(30) - .create_wallet_no_persist()?; - - // The light client builder handles the logic of inserting the SPKs - let LightClient { - sender, - mut receiver, - node, - } = LightClientBuilder::new() - .scan_after(170_000) - .peers(peers) - .build(&wallet) - .unwrap(); - - tokio::task::spawn(async move { node.run().await }); - - // Print logs to the console using the `tracing` crate - let logger = TraceLogger::new()?; - - tracing::info!( - "Balance before sync: {} sats", - wallet.balance().total().to_sat() - ); - - // Sync and apply updates. We can do this a continual loop while the "application" is running. - // Often this would occur on a separate thread than the underlying application user interface. - loop { - if let Some(update) = receiver.update(&logger).await { - wallet.apply_update(update)?; - tracing::info!("Tx count: {}", wallet.transactions().count()); - tracing::info!("Balance: {}", wallet.balance().total().to_sat()); - let last_revealed = wallet.derivation_index(KeychainKind::External); - tracing::info!("Last revealed External: {:?}", last_revealed); - tracing::info!( - "Last revealed Internal: {:?}", - wallet.derivation_index(KeychainKind::Internal) - ); - tracing::info!("Local chain tip: {}", wallet.local_chain().tip().height()); - let next = wallet.reveal_next_address(KeychainKind::External).address; - tracing::info!("Next receiving address: {next}"); - tracing::info!( - "Broadcast minimum fee rate: {}", - receiver.broadcast_minimum() - ); - sender.add_revealed_scripts(&wallet).await?; - } - } -} diff --git a/examples/events.rs b/examples/events.rs deleted file mode 100644 index def5108..0000000 --- a/examples/events.rs +++ /dev/null @@ -1,52 +0,0 @@ -use bdk_kyoto::builder::LightClientBuilder; -use bdk_kyoto::{Event, LightClient, LogLevel}; -use bdk_wallet::bitcoin::Network; -use bdk_wallet::Wallet; - -/* Sync a bdk wallet using events*/ - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)"; - let change_desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)"; - - let mut wallet = Wallet::create(desc, change_desc) - .network(Network::Signet) - .lookahead(30) - .create_wallet_no_persist()?; - - // The light client builder handles the logic of inserting the SPKs - let LightClient { - sender: _, - mut receiver, - node, - } = LightClientBuilder::new() - .scan_after(170_000) - .build(&wallet) - .unwrap(); - - tokio::task::spawn(async move { node.run().await }); - - loop { - if let Some(event) = receiver.next_event(LogLevel::Info).await { - match event { - Event::Log(log) => println!("INFO: {log}"), - Event::Warning(warning) => println!("WARNING: {warning}"), - Event::ScanResponse(full_scan_result) => { - wallet.apply_update(full_scan_result).unwrap(); - println!( - "INFO: Balance in BTC: {}", - wallet.balance().total().to_btc() - ); - } - Event::PeersFound => println!("INFO: Connected to all necessary peers."), - Event::TxSent(txid) => println!("INFO: Broadcast transaction: {txid}"), - Event::TxFailed(failure_payload) => { - println!("WARNING: Transaction failed to broadcast: {failure_payload:?}") - } - Event::StateChange(node_state) => println!("NEW TASK: {node_state}"), - Event::BlocksDisconnected(_) => {} - } - } - } -} diff --git a/examples/example.rs b/examples/example.rs new file mode 100644 index 0000000..d5fcd30 --- /dev/null +++ b/examples/example.rs @@ -0,0 +1,85 @@ +use std::net::{IpAddr, Ipv4Addr}; + +use bdk_kyoto::builder::{LightClientBuilder, ServiceFlags, TrustedPeer}; +use bdk_kyoto::{LightClient, RequesterExt}; +use bdk_wallet::bitcoin::Network; +use bdk_wallet::{KeychainKind, Wallet}; +use tokio::select; + +/// Peer address whitelist +const PEERS: &[IpAddr] = &[IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100))]; + +/* Sync a bdk wallet */ + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)"; + let change_desc = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)"; + + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber)?; + + let peers = PEERS + .iter() + .map(|ip| { + let mut peer = TrustedPeer::from_ip(*ip); + peer.set_services(ServiceFlags::P2P_V2); + peer + }) + .collect(); + + let mut wallet = Wallet::create(desc, change_desc) + .network(Network::Signet) + .lookahead(30) + .create_wallet_no_persist()?; + + // The light client builder handles the logic of inserting the SPKs + let LightClient { + requester, + mut log_subscriber, + mut warning_subscriber, + mut update_subscriber, + node, + } = LightClientBuilder::new() + .scan_after(170_000) + .peers(peers) + .build(&wallet) + .unwrap(); + + tokio::task::spawn(async move { node.run().await }); + + // Sync and apply updates. We can do this a continual loop while the "application" is running. + // Often this would occur on a separate thread than the underlying application user interface. + loop { + select! { + update = update_subscriber.update() => { + if let Some(update) = update { + wallet.apply_update(update)?; + tracing::info!("Tx count: {}", wallet.transactions().count()); + tracing::info!("Balance: {}", wallet.balance().total().to_sat()); + let last_revealed = wallet.derivation_index(KeychainKind::External); + tracing::info!("Last revealed External: {:?}", last_revealed); + tracing::info!( + "Last revealed Internal: {:?}", + wallet.derivation_index(KeychainKind::Internal) + ); + tracing::info!("Local chain tip: {}", wallet.local_chain().tip().height()); + let next = wallet.reveal_next_address(KeychainKind::External).address; + tracing::info!("Next receiving address: {next}"); + let fee_filter = requester.broadcast_min_feerate().await.unwrap(); + tracing::info!( + "Broadcast minimum fee rate: {:#}", + fee_filter + ); + requester.add_revealed_scripts(&wallet).await?; + } + }, + log = log_subscriber.next_log() => { + tracing::info!("{log}") + } + warn = warning_subscriber.next_warning() => { + tracing::warn!("{warn}") + } + } + } +} diff --git a/src/builder.rs b/src/builder.rs index 60f3fdf..a2c5363 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -14,7 +14,6 @@ //! use bdk_wallet::Wallet; //! use bdk_wallet::bitcoin::Network; //! use bdk_kyoto::builder::{LightClientBuilder, TrustedPeer}; -//! use bdk_kyoto::logger::PrintLogger; //! use bdk_kyoto::LightClient; //! //! #[tokio::main] @@ -30,7 +29,7 @@ //! .network(Network::Signet) //! .create_wallet_no_persist()?; //! -//! let LightClient { sender, receiver, node } = LightClientBuilder::new() +//! let LightClient { requester, log_subscriber, warning_subscriber, update_subscriber, node } = LightClientBuilder::new() //! // When recovering a user's wallet, specify a height to start at //! .scan_after(200_000) //! // A node may handle mutliple connections @@ -55,7 +54,7 @@ pub use kyoto::{ TrustedPeer, }; -use crate::{EventReceiver, LightClient, WalletExt}; +use crate::{LightClient, LogSubscriber, UpdateSubscriber, WalletExt, WarningSubscriber}; const RECOMMENDED_PEERS: u8 = 2; @@ -156,15 +155,22 @@ impl LightClientBuilder { let (node, kyoto_client) = node_builder .add_scripts(wallet.peek_revealed_plus_lookahead().collect()) .build_node()?; - let (sender, receiver) = kyoto_client.split(); - let event_receiver = EventReceiver::from_index( + let kyoto::Client { + requester, + log_rx, + warn_rx, + event_rx, + } = kyoto_client; + let update_subscriber = UpdateSubscriber::from_index( wallet.local_chain().tip(), wallet.spk_index().clone(), - receiver, + event_rx, )?; Ok(LightClient { - sender, - receiver: event_receiver, + requester, + log_subscriber: LogSubscriber::new(log_rx), + warning_subscriber: WarningSubscriber::new(warn_rx), + update_subscriber, node, }) } diff --git a/src/lib.rs b/src/lib.rs index 42e4f85..db311b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,7 +25,6 @@ //! use bdk_wallet::Wallet; //! use bdk_wallet::bitcoin::Network; //! use bdk_kyoto::builder::LightClientBuilder; -//! use bdk_kyoto::logger::PrintLogger; //! use bdk_kyoto::LightClient; //! //! #[tokio::main] @@ -34,61 +33,34 @@ //! .network(Network::Signet) //! .create_wallet_no_persist()?; //! -//! let LightClient { sender, mut receiver, node } = LightClientBuilder::new().build(&wallet)?; +//! let LightClient { +//! requester, +//! log_subscriber: _, +//! warning_subscriber: _, +//! mut update_subscriber, +//! node +//! } = LightClientBuilder::new().build(&wallet)?; //! //! tokio::task::spawn(async move { node.run().await }); //! -//! let logger = PrintLogger::new(); //! loop { -//! if let Some(update) = receiver.update(&logger).await { +//! if let Some(update) = update_subscriber.update().await { //! wallet.apply_update(update)?; //! return Ok(()); //! } //! } //! } //! ``` -//! -//! It may be preferable to use events instead of defining a trait. To do so, -//! the workflow for building the node remains the same. -//! -//! ```no_run -//! # const RECEIVE: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/0/*)"; -//! # const CHANGE: &str = "tr([7d94197e/86'/1'/0']tpubDCyQVJj8KzjiQsFjmb3KwECVXPvMwvAxxZGCP9XmWSopmjW3bCV3wD7TgxrUhiGSueDS1MU5X1Vb1YjYcp8jitXc5fXfdC1z68hDDEyKRNr/1/*)"; -//! # use bdk_kyoto::builder::LightClientBuilder; -//! # use bdk_kyoto::{Event, LogLevel, LightClient}; -//! # use bdk_wallet::bitcoin::Network; -//! # use bdk_wallet::Wallet; -//! #[tokio::main] -//! async fn main() -> anyhow::Result<()> { -//! let mut wallet = Wallet::create(RECEIVE, CHANGE) -//! .network(Network::Signet) -//! .create_wallet_no_persist()?; -//! -//! let LightClient { sender, mut receiver, node } = LightClientBuilder::new().build(&wallet)?; -//! -//! tokio::task::spawn(async move { node.run().await }); -//! -//! loop { -//! if let Some(event) = receiver.next_event(LogLevel::Info).await { -//! match event { -//! Event::ScanResponse(full_scan_result) => { -//! wallet.apply_update(full_scan_result).unwrap(); -//! }, -//! _ => (), -//! } -//! } -//! } -//! } -//! ``` #![warn(missing_docs)] -use core::fmt; use core::{future::Future, pin::Pin}; use std::collections::BTreeMap; use std::collections::HashSet; type FutureResult<'a, T, E> = Pin> + Send + 'a>>; +pub use bdk_wallet::chain::local_chain::MissingGenesisError; + use bdk_wallet::chain::{ keychain_txout::KeychainTxOutIndex, local_chain::{self, CheckPoint, LocalChain}, @@ -96,35 +68,35 @@ use bdk_wallet::chain::{ IndexedTxGraph, }; use bdk_wallet::chain::{ConfirmationBlockTime, TxUpdate}; - -pub use bdk_wallet::chain::bitcoin::FeeRate; -pub use bdk_wallet::chain::local_chain::MissingGenesisError; +use bdk_wallet::KeychainKind; pub extern crate kyoto; -use bdk_wallet::KeychainKind; - pub use kyoto::core::builder::NodeDefault; -#[cfg(feature = "events")] -pub use kyoto::{DisconnectedHeader, FailurePayload}; - -pub use kyoto::ClientSender as EventSender; -use kyoto::{IndexedBlock, NodeMessage, RejectReason}; +pub use kyoto::FeeRate; +pub use kyoto::Requester; +pub use kyoto::{DisconnectedHeader, RejectPayload}; pub use kyoto::{ - NodeState, Receiver, ScriptBuf, SyncUpdate, TxBroadcast, TxBroadcastPolicy, Txid, Warning, + Log, NodeState, ScriptBuf, SyncUpdate, TxBroadcast, TxBroadcastPolicy, Txid, Warning, }; +use kyoto::Receiver; +use kyoto::UnboundedReceiver; +use kyoto::{Event, IndexedBlock}; + pub mod builder; -#[cfg(feature = "callbacks")] -pub mod logger; #[derive(Debug)] /// A node and associated structs to send and receive events to and from the node. pub struct LightClient { /// Send events to a running node (i.e. broadcast a transaction). - pub sender: EventSender, + pub requester: Requester, + /// Receive logs from the node as it runs. + pub log_subscriber: LogSubscriber, + /// Receive warnings from the node as it runs. + pub warning_subscriber: WarningSubscriber, /// Receive wallet updates from a node. - pub receiver: EventReceiver, + pub update_subscriber: UpdateSubscriber, /// The underlying node that must be run to fetch blocks from peers. pub node: NodeDefault, } @@ -132,29 +104,26 @@ pub struct LightClient { /// Interpret events from a node that is running to apply /// updates to an underlying wallet. #[derive(Debug)] -pub struct EventReceiver { +pub struct UpdateSubscriber { // channel receiver - receiver: kyoto::Receiver, + receiver: UnboundedReceiver, // changes to local chain chain: local_chain::LocalChain, // receive graph graph: IndexedTxGraph>, - // the network minimum to broadcast a transaction - min_broadcast_fee: FeeRate, } -impl EventReceiver { +impl UpdateSubscriber { /// Build a light client event handler from a [`KeychainTxOutIndex`] and [`CheckPoint`]. pub(crate) fn from_index( cp: CheckPoint, index: KeychainTxOutIndex, - receiver: Receiver, + receiver: UnboundedReceiver, ) -> Result { Ok(Self { receiver, chain: LocalChain::from_tip(cp)?, graph: IndexedTxGraph::new(index.clone()), - min_broadcast_fee: FeeRate::BROADCAST_MIN, }) } @@ -165,27 +134,22 @@ impl EventReceiver { /// A reference to a [`NodeEventHandler`] is required, which handles events emitted from a /// running node. Production applications should define how the application handles /// these events and displays them to end users. - #[cfg(feature = "callbacks")] - pub async fn update( - &mut self, - logger: &dyn NodeEventHandler, - ) -> Option> { + pub async fn update(&mut self) -> Option> { let mut chain_changeset = BTreeMap::new(); - while let Ok(message) = self.receiver.recv().await { - self.log(&message, logger); + while let Some(message) = self.receiver.recv().await { match message { - NodeMessage::Block(IndexedBlock { height, block }) => { + Event::Block(IndexedBlock { height, block }) => { let hash = block.header.block_hash(); chain_changeset.insert(height, Some(hash)); let _ = self.graph.apply_block_relevant(&block, height); } - NodeMessage::BlocksDisconnected(headers) => { + Event::BlocksDisconnected(headers) => { for header in headers { let height = header.height; chain_changeset.insert(height, None); } } - NodeMessage::Synced(SyncUpdate { + Event::Synced(SyncUpdate { tip, recent_history, }) => { @@ -201,12 +165,6 @@ impl EventReceiver { }); break; } - NodeMessage::FeeFilter(fee_filter) => { - if self.min_broadcast_fee < fee_filter { - self.min_broadcast_fee = fee_filter; - } - } - _ => (), } } self.chain @@ -215,37 +173,6 @@ impl EventReceiver { Some(self.get_scan_response()) } - // Send dialogs to an arbitrary logger - #[cfg(feature = "callbacks")] - fn log(&self, message: &NodeMessage, logger: &dyn NodeEventHandler) { - match message { - NodeMessage::Dialog(d) => logger.dialog(d.clone()), - NodeMessage::Warning(w) => logger.warning(w.clone()), - NodeMessage::StateChange(s) => logger.state_changed(*s), - NodeMessage::Block(b) => { - let hash = b.block.header.block_hash(); - logger.dialog(format!("Applying Block: {hash}")); - } - NodeMessage::Synced(SyncUpdate { - tip, - recent_history: _, - }) => { - logger.synced(tip.height); - } - NodeMessage::BlocksDisconnected(headers) => { - logger.blocks_disconnected(headers.iter().map(|dc| dc.height).collect()); - } - NodeMessage::TxSent(t) => { - logger.tx_sent(*t); - } - NodeMessage::TxBroadcastFailure(r) => { - logger.tx_failed(r.txid, r.reason.map(|reason| reason.into_string())) - } - NodeMessage::ConnectionsMet => logger.connections_met(), - _ => (), - } - } - // When the client is believed to have synced to the chain tip of most work, // we can return a wallet update. fn get_scan_response(&mut self) -> FullScanResponse { @@ -259,159 +186,48 @@ impl EventReceiver { chain_update: Some(self.chain.tip()), } } +} - /// Wait for the next event from the client. If no event is ready, - /// `None` will be returned. Otherwise, the event will be `Some(..)`. - /// - /// Blocks will be processed while waiting for the next event of relevance. - /// When the node is fully synced to the chain of all connected peers, - /// an update for the provided keychain or underlying wallet will be returned. - /// - /// Informational messages on the node operation may be filtered out with - /// [`LogLevel::Warning`], which will only emit warnings when called. - #[cfg(feature = "events")] - pub async fn next_event(&mut self, log_level: LogLevel) -> Option { - while let Ok(message) = self.receiver.recv().await { - match message { - NodeMessage::Dialog(log) => { - if matches!(log_level, LogLevel::Info) { - return Some(Event::Log(log)); - } - } - NodeMessage::Warning(warning) => return Some(Event::Warning(warning)), - NodeMessage::StateChange(node_state) => { - return Some(Event::StateChange(node_state)) - } - NodeMessage::ConnectionsMet => return Some(Event::PeersFound), - NodeMessage::Block(IndexedBlock { height, block }) => { - // This is weird but I'm having problems doing things differently. - let mut chain_changeset = BTreeMap::new(); - chain_changeset.insert(height, Some(block.block_hash())); - self.chain - .apply_changeset(&local_chain::ChangeSet::from(chain_changeset)) - .expect("chain initialized with genesis"); - let _ = self.graph.apply_block_relevant(&block, height); - if matches!(log_level, LogLevel::Info) { - return Some(Event::Log(format!( - "Applied block {} to keychain", - block.block_hash() - ))); - } - } - NodeMessage::Synced(SyncUpdate { - tip: _, - recent_history, - }) => { - let mut chain_changeset = BTreeMap::new(); - recent_history.into_iter().for_each(|(height, header)| { - chain_changeset.insert(height, Some(header.block_hash())); - }); - self.chain - .apply_changeset(&local_chain::ChangeSet::from(chain_changeset)) - .expect("chain was initialized with genesis"); - let result = self.get_scan_response(); - return Some(Event::ScanResponse(result)); - } - NodeMessage::BlocksDisconnected(headers) => { - let mut chain_changeset = BTreeMap::new(); - for dc in &headers { - let height = dc.height; - chain_changeset.insert(height, None); - } - self.chain - .apply_changeset(&local_chain::ChangeSet::from(chain_changeset)) - .expect("chain was initialized with genesis."); - return Some(Event::BlocksDisconnected(headers)); - } - NodeMessage::TxSent(txid) => { - return Some(Event::TxSent(txid)); - } - NodeMessage::TxBroadcastFailure(failure_payload) => { - return Some(Event::TxFailed(failure_payload)); - } - NodeMessage::FeeFilter(fee_filter) => { - if self.min_broadcast_fee < fee_filter { - self.min_broadcast_fee = fee_filter; - } - } - _ => continue, - } - } - None +/// Receive logs from the node as it runs to drive user interface changes. +#[derive(Debug)] +pub struct LogSubscriber { + receiver: Receiver, +} + +impl LogSubscriber { + pub(crate) fn new(receiver: Receiver) -> Self { + Self { receiver } } - /// The minimum fee required for a transaction to propagate to the connected peers. - pub fn broadcast_minimum(&self) -> FeeRate { - self.min_broadcast_fee + /// Wait until the node emits a log for an indeterminant amount of time. + pub async fn next_log(&mut self) -> Log { + loop { + if let Some(log) = self.receiver.recv().await { + return log; + } + } } } -/// Handle dialog and state changes from a node with some arbitrary behavior. -/// The primary purpose of this trait is not to respond to events by persisting changes, -/// or acting on the underlying wallet. Instead, this trait should be used to drive changes in user -/// interface behavior or keep a simple log. Relevant events that effect on the wallet are handled -/// automatically in [`EventReceiver::update`](EventReceiver). -#[cfg(feature = "callbacks")] -pub trait NodeEventHandler: Send + Sync + fmt::Debug + 'static { - /// Make use of some message the node has sent. - fn dialog(&self, dialog: String); - /// Make use of some warning the node has sent. - fn warning(&self, warning: Warning); - /// Handle a change in the node's state. - fn state_changed(&self, state: NodeState); - /// The required number of connections for the node was met. - fn connections_met(&self); - /// A transaction was broadcast to at least one peer. - fn tx_sent(&self, txid: Txid); - /// A transaction was rejected or failed to broadcast. - fn tx_failed(&self, txid: Txid, reject_reason: Option); - /// A list of block heights were reorganized - fn blocks_disconnected(&self, blocks: Vec); - /// The node has synced to the height of the connected peers. - fn synced(&self, tip: u32); +/// Receive wanrings from the node to act on or to drive user interface changes +#[derive(Debug)] +pub struct WarningSubscriber { + receiver: UnboundedReceiver, } -/// Events emitted by a node that may be used by a wallet or application. -#[cfg(feature = "events")] -pub enum Event { - /// Information about the current node process. - Log(String), - /// Warnings emitted by the node that may effect sync times or node operation. - Warning(Warning), - /// All required connnections have been met. - PeersFound, - /// A transaction was broadcast. - TxSent(Txid), - /// A transaction failed to broadcast or was rejected. - TxFailed(FailurePayload), - /// The node is performing a new task. - StateChange(NodeState), - /// A result after scanning compact block filters to the tip of the chain. - /// - /// ## Note - /// - /// This event will be emitted every time a new block is found while the node - /// is running and is connected to peers. - ScanResponse(FullScanResponse), - /// Blocks were reorganized from the chain of most work. - /// - /// ## Note - /// - /// No action is required from the developer, as these events are already - /// handled within the [`EventReceiver`]. This event is to inform the user of - /// such an event. - BlocksDisconnected(Vec), -} +impl WarningSubscriber { + pub(crate) fn new(receiver: UnboundedReceiver) -> Self { + Self { receiver } + } -/// Filter [`Event`] by a specified level. [`LogLevel::Info`] will pass -/// through both [`Event::Log`] and [`Event::Warning`]. [`LogLevel::Warning`] -/// will omit [`Event::Log`] events. -#[cfg(feature = "events")] -pub enum LogLevel { - /// Receive info messages and warnings. - Info, - /// Omit info messages and only receive warnings. - Warning, + /// Wait until the node emits a warning for an indeterminant amount of time. + pub async fn next_warning(&mut self) -> Warning { + loop { + if let Some(warning) = self.receiver.recv().await { + return warning; + } + } + } } /// Extend the functionality of [`Wallet`](bdk_wallet) for interoperablility @@ -436,8 +252,8 @@ impl WalletExt for bdk_wallet::Wallet { } } -/// Extend the [`EventSender`] functionality to work conveniently with a [`Wallet`](bdk_wallet). -pub trait EventSenderExt { +/// Extend the [`Requester`] functionality to work conveniently with a [`Wallet`](bdk_wallet). +pub trait RequesterExt { /// Add all revealed scripts to the node to monitor. fn add_revealed_scripts<'a>( &'a self, @@ -445,19 +261,19 @@ pub trait EventSenderExt { ) -> FutureResult<'a, (), kyoto::ClientError>; } -impl EventSenderExt for EventSender { +impl RequesterExt for Requester { fn add_revealed_scripts<'a>( &'a self, wallet: &'a bdk_wallet::Wallet, ) -> FutureResult<'a, (), kyoto::ClientError> { async fn _add_revealed( - sender: &EventSender, + requester: &Requester, wallet: &bdk_wallet::Wallet, ) -> Result<(), kyoto::ClientError> { for keychain in [KeychainKind::External, KeychainKind::Internal] { let scripts = wallet.spk_index().revealed_keychain_spks(keychain); for (_, script) in scripts { - sender.add_script(script).await?; + requester.add_script(script).await?; } } Ok(()) @@ -465,23 +281,3 @@ impl EventSenderExt for EventSender { Box::pin(_add_revealed(self, wallet)) } } - -trait StringExt { - fn into_string(self) -> String; -} - -impl StringExt for RejectReason { - fn into_string(self) -> String { - let message = match self { - RejectReason::Malformed => "Message could not be decoded.", - RejectReason::Invalid => "Transaction was invalid for some reason.", - RejectReason::Obsolete => "Client version is no longer supported.", - RejectReason::Duplicate => "Duplicate version message received.", - RejectReason::NonStandard => "Transaction was nonstandard.", - RejectReason::Dust => "One or more outputs are below the dust threshold.", - RejectReason::Fee => "Transaction does not have enough fee to be mined.", - RejectReason::Checkpoint => "Inconsistent with compiled checkpoint.", - }; - message.into() - } -} diff --git a/src/logger.rs b/src/logger.rs deleted file mode 100644 index ced8129..0000000 --- a/src/logger.rs +++ /dev/null @@ -1,157 +0,0 @@ -//! Handle messages from a node. -//! -//! # Examples -//! -//! For quick iteration and debugging, the [`PrintLogger`] responds to node events by simply -//! printing the display to the console. -//! -//! ```rust -//! #[cfg(feature = "callbacks")] -//! use bdk_kyoto::logger::PrintLogger; -//! use bdk_kyoto::Warning; -//! use bdk_kyoto::NodeEventHandler; -//! -//! let logger = PrintLogger::new(); -//! logger.dialog("The node is running".into()); -//! logger.warning(Warning::PeerTimedOut); -//! ``` -//! -//! For a more descriptive console log, the `trace` feature may be used. -//! -//! ```rust -//! use bdk_kyoto::logger::TraceLogger; -//! use bdk_kyoto::Warning; -//! use bdk_kyoto::NodeEventHandler; -//! -//! let logger = TraceLogger::new().unwrap(); -//! logger.dialog("The node is running".into()); -//! logger.warning(Warning::PeerTimedOut); -//! ``` -//! -//! For production applications, a custom implementation of [`NodeEventHandler`] should be -//! implemented. An example of a good applciation logger should implement user interface behavior -//! and potentially save information to a file. - -use std::fmt::Debug; - -use kyoto::NodeState; -use kyoto::Txid; -use kyoto::Warning; -#[cfg(feature = "trace")] -use tracing::subscriber::SetGlobalDefaultError; - -use crate::NodeEventHandler; - -/// Print messages from the node to the console -#[derive(Default, Debug)] -pub struct PrintLogger {} - -impl PrintLogger { - /// Build a new print logger - pub fn new() -> Self { - Self {} - } -} - -impl NodeEventHandler for PrintLogger { - fn dialog(&self, dialog: String) { - println!("{dialog}"); - } - - fn warning(&self, warning: Warning) { - println!("{warning}"); - } - - fn state_changed(&self, state: NodeState) { - println!("State change: {state}"); - } - - fn tx_sent(&self, txid: Txid) { - println!("Transaction sent: {txid}"); - } - - fn tx_failed(&self, txid: Txid, _reject_reason: Option) { - println!("Transaction failed: {txid}"); - } - - fn blocks_disconnected(&self, blocks: Vec) { - for block in blocks { - println!("Block {block} was reorganized"); - } - } - - fn synced(&self, tip: u32) { - println!("Synced to tip {tip}"); - } - - fn connections_met(&self) { - println!("Required connections met"); - } -} - -/// Print messages from the node to the console using [`tracing`]. -#[cfg(feature = "trace")] -#[derive(Default, Debug)] -pub struct TraceLogger {} - -#[cfg(feature = "trace")] -impl TraceLogger { - /// Build a new trace logger. This constructor will initialize the [`tracing::subscriber`] globally. - /// - /// ## Errors - /// - /// If [`TraceLogger::new`] has already been called. - pub fn new() -> Result { - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber)?; - Ok(Self {}) - } -} - -#[cfg(feature = "trace")] -impl NodeEventHandler for TraceLogger { - fn dialog(&self, dialog: String) { - tracing::info!("{dialog}") - } - - fn warning(&self, warning: Warning) { - tracing::warn!("{warning}") - } - - fn state_changed(&self, state: NodeState) { - tracing::info!("State change: {state}") - } - - fn tx_sent(&self, txid: Txid) { - tracing::info!("Transaction sent: {txid}") - } - - fn tx_failed(&self, txid: Txid, _reject_reason: Option) { - tracing::info!("Transaction failed: {txid}") - } - - fn blocks_disconnected(&self, blocks: Vec) { - for block in blocks { - tracing::warn!("Block {block} was reorganized"); - } - } - - fn synced(&self, tip: u32) { - tracing::info!("Synced to height: {tip}") - } - - fn connections_met(&self) { - tracing::info!("Required connections met") - } -} - -impl NodeEventHandler for () { - fn dialog(&self, _dialog: String) {} - fn warning(&self, _warning: Warning) {} - fn state_changed(&self, _state: NodeState) {} - fn connections_met(&self) {} - fn tx_sent(&self, _txid: Txid) {} - fn tx_failed(&self, _txid: Txid, _reject_reason: Option) {} - fn blocks_disconnected(&self, _blocks: Vec) {} - fn synced(&self, _tip: u32) {} -} diff --git a/tests/client.rs b/tests/client.rs index fe1558c..ecb7426 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -8,8 +8,6 @@ use tokio::time; use bdk_kyoto::builder::LightClientBuilder; use bdk_kyoto::builder::TrustedPeer; -#[cfg(feature = "callbacks")] -use bdk_kyoto::logger::PrintLogger; use bdk_testenv::bitcoincore_rpc::RpcApi; use bdk_testenv::bitcoind; use bdk_testenv::TestEnv; @@ -59,7 +57,6 @@ fn init_node( } #[tokio::test] -#[cfg(feature = "callbacks")] async fn update_returns_blockchain_data() -> anyhow::Result<()> { let env = testenv()?; @@ -78,9 +75,10 @@ async fn update_returns_blockchain_data() -> anyhow::Result<()> { // build node/client let tempdir = tempfile::tempdir()?.path().join("kyoto-data"); let LightClient { - sender, - mut receiver, + requester, + mut update_subscriber, node, + .. } = init_node(&env, &wallet, tempdir)?; // mine blocks @@ -95,9 +93,11 @@ async fn update_returns_blockchain_data() -> anyhow::Result<()> { // run node task::spawn(async move { node.run().await }); - let logger = PrintLogger::new(); // get update - let res = receiver.update(&logger).await.expect("should have update"); + let res = update_subscriber + .update() + .await + .expect("should have update"); let FullScanResponse { tx_update, chain_update, @@ -120,13 +120,12 @@ async fn update_returns_blockchain_data() -> anyhow::Result<()> { [(KeychainKind::External, index)].into() ); - sender.shutdown().await?; + requester.shutdown().await?; Ok(()) } #[tokio::test] -#[cfg(feature = "callbacks")] async fn update_handles_reorg() -> anyhow::Result<()> { let env = testenv()?; @@ -137,9 +136,10 @@ async fn update_handles_reorg() -> anyhow::Result<()> { let tempdir = tempfile::tempdir()?.path().join("kyoto-data"); let LightClient { - sender, - mut receiver, + requester, + mut update_subscriber, node, + .. } = init_node(&env, &wallet, tempdir)?; // mine blocks @@ -160,8 +160,10 @@ async fn update_handles_reorg() -> anyhow::Result<()> { task::spawn(async move { node.run().await }); // get update - let logger = PrintLogger::new(); - let res = receiver.update(&logger).await.expect("should have update"); + let res = update_subscriber + .update() + .await + .expect("should have update"); let (anchor, anchor_txid) = *res.tx_update.anchors.iter().next().unwrap(); assert_eq!(anchor.block_id.hash, blockhash); assert_eq!(anchor_txid, txid); @@ -174,7 +176,10 @@ async fn update_handles_reorg() -> anyhow::Result<()> { wait_for_height(&env, 103).await?; // expect tx to confirm at same height but different blockhash - let res = receiver.update(&logger).await.expect("should have update"); + let res = update_subscriber + .update() + .await + .expect("should have update"); let (anchor, anchor_txid) = *res.tx_update.anchors.iter().next().unwrap(); assert_eq!(anchor_txid, txid); assert_eq!(anchor.block_id.height, 102); @@ -182,13 +187,12 @@ async fn update_handles_reorg() -> anyhow::Result<()> { assert_eq!(anchor.block_id.hash, new_blockhash); wallet.apply_update(res).unwrap(); - sender.shutdown().await?; + requester.shutdown().await?; Ok(()) } #[tokio::test] -#[cfg(feature = "callbacks")] async fn update_handles_dormant_wallet() -> anyhow::Result<()> { let env = testenv()?; @@ -199,9 +203,10 @@ async fn update_handles_dormant_wallet() -> anyhow::Result<()> { let tempdir = tempfile::tempdir()?.path().join("kyoto-data"); let LightClient { - sender, - mut receiver, + requester, + mut update_subscriber, node, + .. } = init_node(&env, &wallet, tempdir.clone())?; // mine blocks @@ -222,15 +227,17 @@ async fn update_handles_dormant_wallet() -> anyhow::Result<()> { task::spawn(async move { node.run().await }); // get update - let logger = PrintLogger::new(); - let res = receiver.update(&logger).await.expect("should have update"); + let res = update_subscriber + .update() + .await + .expect("should have update"); let (anchor, anchor_txid) = *res.tx_update.anchors.iter().next().unwrap(); assert_eq!(anchor.block_id.hash, blockhash); assert_eq!(anchor_txid, txid); wallet.apply_update(res).unwrap(); // shut down then reorg - sender.shutdown().await?; + requester.shutdown().await?; let hashes = env.reorg(1)?; // 102 let new_blockhash = hashes[0]; @@ -238,15 +245,19 @@ async fn update_handles_dormant_wallet() -> anyhow::Result<()> { wait_for_height(&env, 122).await?; let LightClient { - sender, - mut receiver, + requester, + mut update_subscriber, node, + .. } = init_node(&env, &wallet, tempdir)?; task::spawn(async move { node.run().await }); // expect tx to confirm at same height but different blockhash - let res = receiver.update(&logger).await.expect("should have update"); + let res = update_subscriber + .update() + .await + .expect("should have update"); let (anchor, anchor_txid) = *res.tx_update.anchors.iter().next().unwrap(); assert_eq!(anchor_txid, txid); assert_eq!(anchor.block_id.height, 102); @@ -254,7 +265,7 @@ async fn update_handles_dormant_wallet() -> anyhow::Result<()> { assert_eq!(anchor.block_id.hash, new_blockhash); wallet.apply_update(res).unwrap(); - sender.shutdown().await?; + requester.shutdown().await?; Ok(()) }