diff --git a/.circleci/config.yml b/.circleci/config.yml index 9489d8c092..d87c2f5f8e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -121,7 +121,7 @@ commands: echo "deb https://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list echo "deb https://deb.nodesource.com/node_10.x/ trusty main" | sudo tee /etc/apt/sources.list.d/node_10.list sudo apt-get update - sudo apt-get install -y nodejs=10.* yarn libzmq3-dev + sudo apt-get install -y nodejs=10.* yarn install_rust: steps: - run: diff --git a/Cargo.lock b/Cargo.lock index 31bd520036..eb37459988 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -411,7 +411,6 @@ name = "btsieve" version = "0.1.0" dependencies = [ "bitcoin_support 0.1.0", - "bitcoincore-rpc 0.8.0-rc1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "config 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -445,7 +444,6 @@ dependencies = [ "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "warp 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", - "zmq-rs 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4637,24 +4635,6 @@ dependencies = [ "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "zmq-ffi" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "zmq-rs" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)", - "zmq-ffi 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", -] - [metadata] "checksum adler32 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e522997b529f05601e05166c07ed17789691f562762c7f3b987263d2dedee5c" "checksum aes-ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d2e5b0458ea3beae0d1d8c0f3946564f8e10f90646cf78c06b4351052058d1ee" @@ -5120,5 +5100,3 @@ dependencies = [ "checksum zeroize 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "45af6a010d13e4cf5b54c94ba5a2b2eba5596b9e46bf5875612d332a1f2b3f86" "checksum zeroize_derive 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "080616bd0e31f36095288bb0acdf1f78ef02c2fa15527d7e993f2a6c7591643e" "checksum zip 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "e7341988e4535c60882d5e5f0b7ad0a9a56b080ade8bdb5527cb512f7b2180e0" -"checksum zmq-ffi 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c623b43007493e02be41621aed654933b53aaed7e1859530c5c10d0720ca270c" -"checksum zmq-rs 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "bb0116ad9aadee6a05bfdedf9c31b4c5d9b5a7511c080ffc79567f3211412eb9" diff --git a/README.md b/README.md index 2a19ca794a..c589155d6f 100644 --- a/README.md +++ b/README.md @@ -29,10 +29,7 @@ See [#626](https://github.com/comit-network/comit-rs/issues/626) for tracking. ## Setup build environment 1. Install `rustup`: `curl https://sh.rustup.rs -sSf | sh` -2. Install libzmq: - - Ubuntu/Debian: `apt install libzmq3-dev` - - Mac ([Homebrew](https://brew.sh/)) `brew install zeromq` -3. Install OpenSSL: +2. Install OpenSSL: - Ubuntu/Debian: `apt install libssl-dev pkg-config` ## Build & Run diff --git a/api_tests/lib/bitcoin.ts b/api_tests/lib/bitcoin.ts index 5a78462708..27ac1a957a 100644 --- a/api_tests/lib/bitcoin.ts +++ b/api_tests/lib/bitcoin.ts @@ -14,11 +14,11 @@ import sb from "satoshi-bitcoin"; import { test_rng } from "./util"; export interface BitcoinNodeConfig { + network: string; username: string; password: string; host: string; rpcPort: number; - zmqPort: number; } interface GetBlockchainInfoResponse { @@ -79,7 +79,7 @@ function createBitcoinRpcClient(btcConfig?: BitcoinNodeConfig) { if (!bitcoinRpcClient || btcConfig !== bitcoinConfig) { bitcoinRpcClient = new BitcoinRpcClient({ - network: "regtest", + network: btcConfig.network, port: btcConfig.rpcPort, host: btcConfig.host, username: btcConfig.username, diff --git a/api_tests/lib/config.ts b/api_tests/lib/config.ts index e48c36d8cf..651006d581 100644 --- a/api_tests/lib/config.ts +++ b/api_tests/lib/config.ts @@ -22,13 +22,7 @@ export interface CndConfigFile { interface BtsieveBitcoin { node_url: string; - zmq_endpoint: string; - authentication: { - basic: { - node_username: string; - node_password: string; - }; - }; + network: string; } interface BtsieveEthereum { @@ -143,14 +137,8 @@ export function btsieveBitcoinConfig( nodeConfig: BitcoinNodeConfig ): BtsieveBitcoin { return { - authentication: { - basic: { - node_password: nodeConfig.password, - node_username: nodeConfig.username, - }, - }, node_url: `http://${nodeConfig.host}:${nodeConfig.rpcPort}`, - zmq_endpoint: `tcp://${nodeConfig.host}:${nodeConfig.zmqPort}`, + network: nodeConfig.network, }; } diff --git a/api_tests/lib/ledger_runner.ts b/api_tests/lib/ledger_runner.ts index 2fc53f7365..c36b749b85 100644 --- a/api_tests/lib/ledger_runner.ts +++ b/api_tests/lib/ledger_runner.ts @@ -108,9 +108,9 @@ export class LedgerRunner { const [, password] = result.output.split(":"); return { + network: "regtest", host: container.getContainerIpAddress(), rpcPort: container.getMappedPort(18443), - zmqPort: container.getMappedPort(28332), username: "__cookie__", password, }; @@ -162,11 +162,10 @@ async function startBitcoinContainer(): Promise { "-rpcbind=0.0.0.0:18443", "-rpcallowip=0.0.0.0/0", "-debug=1", - "-zmqpubrawblock=tcp://*:28332", - "-zmqpubrawtx=tcp://*:28333", "-acceptnonstdtxn=0", + "-rest", ]) - .withExposedPorts(18443, 28332) + .withExposedPorts(18443) .withWaitStrategy(new LogWaitStrategy("Flushed wallet.dat")) .start(); } diff --git a/btsieve/Cargo.toml b/btsieve/Cargo.toml index a568e12fe1..c4734035cd 100644 --- a/btsieve/Cargo.toml +++ b/btsieve/Cargo.toml @@ -7,7 +7,6 @@ version = "0.1.0" [dependencies] bitcoin_support = { path = "../internal/bitcoin_support" } -bitcoincore-rpc = "0.8.0-rc1" byteorder = "1.3" chrono = { version = "0.4", features = ["serde"] } config = { version = "0.9", features = ["toml"] } @@ -34,7 +33,6 @@ structopt = "0.3" tokio = "0.1" url = { version = "2.1", features = ["serde"] } warp = { version = "0.1", default-features = false } -zmq-rs = "0.1" [dev-dependencies] env_logger = "0.6" diff --git a/btsieve/config/bitcoin_basicauth.toml b/btsieve/config/bitcoin_basicauth.toml deleted file mode 100644 index cd37ce83d9..0000000000 --- a/btsieve/config/bitcoin_basicauth.toml +++ /dev/null @@ -1,12 +0,0 @@ -[bitcoin] -node_url = "http://localhost:18443" -zmq_endpoint = "tcp://127.0.0.1:28332" - -[bitcoin.authentication.basic] -node_username = "Satoshi" -node_password = "Nakamoto" - -[http_api] -address_bind="0.0.0.0" -port_bind=8080 -external_url="http://localhost:8080/" diff --git a/btsieve/config/bitcoin_cookieauth.toml b/btsieve/config/bitcoin_cookieauth.toml deleted file mode 100644 index 476dab5691..0000000000 --- a/btsieve/config/bitcoin_cookieauth.toml +++ /dev/null @@ -1,11 +0,0 @@ -[bitcoin] -node_url = "http://localhost:18443" -zmq_endpoint = "tcp://127.0.0.1:28332" - -[bitcoin.authentication.cookie] -file_path = "/home/bitcoin/.bitcoin/regtest/.cookie" - -[http_api] -address_bind="0.0.0.0" -port_bind=8080 -external_url="http://localhost:8080/" diff --git a/btsieve/config/bitcoin_only.toml b/btsieve/config/bitcoin_only.toml index 11e1c2836b..23ba63ac06 100644 --- a/btsieve/config/bitcoin_only.toml +++ b/btsieve/config/bitcoin_only.toml @@ -1,10 +1,6 @@ [bitcoin] +network = "regtest" node_url = "http://localhost:18443" -zmq_endpoint = "tcp://127.0.0.1:28332" - -[bitcoin.authentication.basic] -node_username = "bitcoin" -node_password = "54pLR_f7-G6is32LP-7nbhzZSbJs_2zSATtZV_r05yg=" [http_api] address_bind="0.0.0.0" diff --git a/btsieve/config/btsieve.toml b/btsieve/config/btsieve.toml index e700cfaf18..2babf64e11 100644 --- a/btsieve/config/btsieve.toml +++ b/btsieve/config/btsieve.toml @@ -1,10 +1,6 @@ [bitcoin] +network = "testnet" node_url = "http://localhost:18443" -zmq_endpoint = "tcp://127.0.0.1:28332" - -[bitcoin.authentication.basic] -node_username = "bitcoin" -node_password = "54pLR_f7-G6is32LP-7nbhzZSbJs_2zSATtZV_r05yg=" [ethereum] node_url = "http://localhost:8545" diff --git a/btsieve/src/bitcoin/bitcoind_http_blocksource.rs b/btsieve/src/bitcoin/bitcoind_http_blocksource.rs new file mode 100644 index 0000000000..19f9ccad2d --- /dev/null +++ b/btsieve/src/bitcoin/bitcoind_http_blocksource.rs @@ -0,0 +1,169 @@ +use crate::blocksource::{self, BlockSource}; +use bitcoin_support::{deserialize, Block, Network, Transaction}; +use futures::{Future, Stream}; +use reqwest::r#async::Client; +use serde::Deserialize; +use std::time::Duration; +use tokio::timer::Interval; + +#[derive(Deserialize)] +struct ChainInfo { + bestblockhash: String, +} + +#[derive(Debug)] +pub enum Error { + Reqwest(reqwest::Error), + Hex(hex::FromHexError), + BlockDeserialization(bitcoin_support::consensus::encode::Error), + TransactionDeserialization(String), +} + +#[derive(Clone)] +pub struct BitcoindHttpBlockSource { + network: Network, + base_url: String, + client: Client, +} + +impl BitcoindHttpBlockSource { + pub fn new(url: String, network: Network) -> Self { + Self { + network, + base_url: url, + client: Client::new(), + } + } + + fn latest_block(&self) -> impl Future + Send + 'static { + let cloned_self = self.clone(); + + self.latest_block_hash() + .and_then(move |latest_block_hash| cloned_self.block_by_hash(latest_block_hash)) + } + + fn latest_block_hash(&self) -> impl Future + Send + 'static { + let bitcoind_blockchain_info_url = format!("{}/rest/chaininfo.json", self.base_url); + + self.client + .get(bitcoind_blockchain_info_url.as_str()) + .send() + .map_err(|e| { + log::error!("Error when sending request to bitcoind"); + Error::Reqwest(e) + }) + .and_then(move |mut response| { + response.json::().map_err(|e| { + log::error!("Error when deserialising the response from bitcoind"); + Error::Reqwest(e) + }) + }) + .map(move |blockchain_info| blockchain_info.bestblockhash) + } + + fn block_by_hash( + &self, + block_hash: String, + ) -> impl Future + Send + 'static { + let raw_block_by_hash_url = format!("{}/rest/block/{}.hex", self.base_url, block_hash); + + self.client + .get(raw_block_by_hash_url.as_str()) + .send() + .map_err(Error::Reqwest) + .and_then(|mut response| response.text().map_err(Error::Reqwest)) + .and_then(move |mut response_text| { + response_text = response_text.as_str().trim().to_string(); + hex::decode(response_text).map_err(Error::Hex) + }) + .and_then(|bytes| deserialize(bytes.as_ref()).map_err(Error::BlockDeserialization)) + .map(move |block| { + log::trace!("Got {:?}", block); + block + }) + } + + pub fn transaction_by_hash( + &self, + transaction_hash: String, + ) -> impl Future + Send + 'static { + let raw_transaction_by_hash_url = + format!("{}/rest/tx/{}.hex", self.base_url, transaction_hash); + + self.client + .get(raw_transaction_by_hash_url.as_str()) + .send() + .map_err(Error::Reqwest) + .and_then(|mut response| response.text().map_err(Error::Reqwest)) + .and_then(move |mut response_text| { + response_text = response_text.as_str().trim().to_string(); + hex::decode(response_text).map_err(Error::Hex) + }) + .and_then(|bytes| { + deserialize(bytes.as_ref()).map_err(|e| { + log::error!( + "Got new transaction but failed to deserialize it because {:?}", + e + ); + Error::TransactionDeserialization(format!( + "Failed to deserialize the response from bitcoind into a transaction: {}", + e + )) + }) + }) + .inspect(move |transaction| { + log::debug!("Fetched transaction {:?}", transaction); + }) + } +} + +impl BlockSource for BitcoindHttpBlockSource { + type Block = Block; + type Error = Error; + + fn blocks( + &self, + ) -> Box> + Send> { + // The Bitcoin blockchain has a mining interval of about 10 minutes. + // The poll interval is configured to once every 2 minutes for mainnet and + // testnet so we don't have to wait to long to see a new block. + let poll_interval = match self.network { + Network::Mainnet => 120_000, + Network::Testnet => 120_000, + Network::Regtest => 300, + }; + + log::info!(target: "bitcoin::blocksource", "polling for new blocks from bitcoind on {} every {} seconds", self.network, poll_interval); + + let cloned_self = self.clone(); + + let stream = Interval::new_interval(Duration::from_millis(poll_interval)) + .map_err(blocksource::Error::Timer) + .and_then(move |_| { + cloned_self + .latest_block() + .map(Some) + .or_else(|error| { + match error { + Error::Reqwest(e) => { + log::warn!(target: "bitcoin::blocksource", "reqwest error encountered during polling: {:?}", e); + Ok(None) + } + Error::Hex(e) => { + log::warn!(target: "bitcoin::blocksource", "hex-decode error encountered during polling: {:?}", e); + Ok(None) + } + Error::BlockDeserialization(e) => { + log::warn!(target: "bitcoin::blocksource", "block-deserialization error encountered during polling: {:?}", e); + Ok(None) + } + _ => Err(error) + } + }) + .map_err(blocksource::Error::Source) + }) + .filter_map(|maybe_block| maybe_block); + + Box::new(stream) + } +} diff --git a/btsieve/src/bitcoin/bitcoind_zmq_listener.rs b/btsieve/src/bitcoin/bitcoind_zmq_listener.rs deleted file mode 100644 index 90f2b6bd84..0000000000 --- a/btsieve/src/bitcoin/bitcoind_zmq_listener.rs +++ /dev/null @@ -1,59 +0,0 @@ -use bitcoin_support::{deserialize, Block}; -use futures::sync::mpsc::{self, UnboundedReceiver}; -use std::thread; -use zmq_rs::{self as zmq, Context, Socket}; - -pub fn bitcoin_block_listener(endpoint: &str) -> Result, zmq::Error> { - let context = Context::new()?; - let mut socket = context.socket(zmq::SUB)?; - - socket.set_subscribe(b"rawblock")?; - socket.connect(endpoint)?; - - log::info!( - "Connecting to {} to subscribe to new Bitcoin blocks over ZeroMQ", - socket.get_last_endpoint().unwrap() - ); - - let (state_sender, state_receiver) = mpsc::unbounded(); - - thread::spawn(move || { - // we need this to keep the context alive - let _context = context; - - loop { - let result = receive_block(&mut socket); - - if let Ok(Some(block)) = result { - let _ = state_sender.unbounded_send(block); - } - } - }); - Ok(state_receiver) -} - -fn receive_block(socket: &mut Socket) -> Result, zmq::Error> { - let bytes = socket.recv_bytes(zmq::SNDMORE)?; - let bytes: &[u8] = bytes.as_ref(); - - match bytes { - b"rawblock" => { - let bytes = socket.recv_bytes(zmq::SNDMORE)?; - - match deserialize(bytes.as_ref()) { - Ok(block) => { - log::trace!("Got {:?}", block); - Ok(Some(block)) - } - Err(e) => { - log::error!("Got new block but failed to deserialize it because {:?}", e); - Ok(None) - } - } - } - _ => { - log::error!("Unhandled message: {:?}", bytes); - Ok(None) - } - } -} diff --git a/btsieve/src/bitcoin/mod.rs b/btsieve/src/bitcoin/mod.rs index b4ba78cc49..dc916b7c5b 100644 --- a/btsieve/src/bitcoin/mod.rs +++ b/btsieve/src/bitcoin/mod.rs @@ -1,26 +1,28 @@ -pub mod bitcoind_zmq_listener; +pub mod bitcoind_http_blocksource; pub mod block_processor; pub mod blockchain_info_bitcoin_http_blocksource; pub mod queries; pub use self::{block_processor::check_transaction_queries, queries::TransactionQuery}; use crate::{Bitcoin, Blockchain}; -use bitcoin_support::Block; +use bitcoin_support::{BitcoinHash, Block}; impl Blockchain for Bitcoin { fn add_block(&mut self, block: Block) { + let block_hash = block.bitcoin_hash(); if self.0.nodes.contains(&block) { - return log::warn!("Block already known {:?} ", block); + return log::warn!("Block already known {:?} ", block_hash); } + + log::debug!("Retrieved block {:?} ", block_hash); match self.find_predecessor(&block) { Some(_prev) => { - self.0.vertices.push(( - block.clone().header.prev_blockhash, - block.clone().header.merkle_root, - )); + self.0 + .vertices + .push((block.clone().header.prev_blockhash, block_hash)); } None => { - log::warn!("Could not find previous block for {:?} ", block); + log::warn!("Could not find previous block for {:?} ", block_hash); } } self.0.nodes.push(block); @@ -31,17 +33,17 @@ impl Blockchain for Bitcoin { } fn find_predecessor(&self, block: &Block) -> Option<&Block> { - self.0 - .nodes - .iter() - .find(|b| b.header.merkle_root.eq(&block.header.prev_blockhash)) + self.0.nodes.iter().find(|b| { + let block_hash = b.bitcoin_hash(); + block_hash.eq(&block.header.prev_blockhash) + }) } } #[cfg(test)] mod test { use super::*; - use bitcoin_support::{Block, BlockHeader, FromHex, Sha256dHash}; + use bitcoin_support::{deserialize, Block, BlockHeader, FromHex, Sha256dHash}; use spectral::{option::OptionAssertions, *}; fn new_block(prev_blockhash: Sha256dHash, merkle_root: Sha256dHash) -> Block { @@ -115,27 +117,11 @@ mod test { fn add_block_and_find_predecessor() { let mut bitcoin_chain = Bitcoin::default(); - let block1 = new_block( - Sha256dHash::from_hex( - "0000000000000000000000000000000000000000000000000000000000000001", - ) - .unwrap(), - Sha256dHash::from_hex( - "0000000000000000000000000000000000000000000000000000000000000002", - ) - .unwrap(), - ); + let block1_hex = "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f916ab5835dbf97d3848f79b2bf9ca535ee8bafa7a83328448279d15cc8e9b7662450785dffff7f200100000001020000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0200f2052a0100000023210384e9f396a3ed6e0013927e4387d687437e354878354bbd4fd0098ff7d6fb81eeac0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf90120000000000000000000000000000000000000000000000000000000000000000000000000"; + let block2_hex = "0000002037eb36655ef7cf9ff058b37acfde115f6824d711d33577c7bfffe2960fecab77269b50d009000a42a6c201b71ffcea8455ef538b972f788ef5c13b8c6b0f5e7f2550785dffff7f200200000001020000000001010000000000000000000000000000000000000000000000000000000000000000ffffffff03520101ffffffff0200f2052a0100000023210357693690036e8831f2c1cc588eae3d09b0ae58e1fd4aad6e29c96f3467ca85d0ac0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf90120000000000000000000000000000000000000000000000000000000000000000000000000"; - let block2 = new_block( - Sha256dHash::from_hex( - "0000000000000000000000000000000000000000000000000000000000000002", - ) - .unwrap(), - Sha256dHash::from_hex( - "0000000000000000000000000000000000000000000000000000000000000003", - ) - .unwrap(), - ); + let block1: Block = deserialize(hex::decode(block1_hex).unwrap().as_ref()).unwrap(); + let block2: Block = deserialize(hex::decode(block2_hex).unwrap().as_ref()).unwrap(); assert_that(&bitcoin_chain.size()).is_equal_to(&0); bitcoin_chain.add_block(block1.clone()); diff --git a/btsieve/src/bitcoin/queries/mod.rs b/btsieve/src/bitcoin/queries/mod.rs index 2b901bd099..040acdd724 100644 --- a/btsieve/src/bitcoin/queries/mod.rs +++ b/btsieve/src/bitcoin/queries/mod.rs @@ -1,19 +1,12 @@ pub mod transaction; pub use self::transaction::TransactionQuery; -use bitcoin_support::{FromHex, Sha256dHash, Transaction}; +use bitcoin_support::Transaction; use serde::Serialize; #[derive(Serialize, Debug)] #[serde(untagged)] pub enum PayloadKind { - Id { id: Sha256dHash }, + Id { id: String }, Transaction { transaction: Transaction }, } - -fn to_sha256d_hash>(id: S) -> Option { - let id = id.as_ref(); - Sha256dHash::from_hex(id) - .map_err(|e| log::warn!("skipping {} because it is invalid hex: {:?}", id, e)) - .ok() -} diff --git a/btsieve/src/bitcoin/queries/transaction.rs b/btsieve/src/bitcoin/queries/transaction.rs index 8dd6560754..93a95821f0 100644 --- a/btsieve/src/bitcoin/queries/transaction.rs +++ b/btsieve/src/bitcoin/queries/transaction.rs @@ -1,13 +1,16 @@ use crate::{ - bitcoin::queries::{to_sha256d_hash, PayloadKind}, + bitcoin::{bitcoind_http_blocksource::BitcoindHttpBlockSource, queries::PayloadKind}, query_result_repository::QueryResult, route_factory::{Error, QueryType, ToHttpPayload}, }; use bitcoin_support::{ - Address, OutPoint, SpendsFrom, SpendsFromWith, SpendsTo, SpendsWith, Transaction, TransactionId, + Address, OutPoint, SpendsFrom, SpendsFromWith, SpendsTo, SpendsWith, Transaction, }; -use bitcoincore_rpc::RpcApi; use derivative::Derivative; +use futures::{ + future::Future, + stream::{FuturesOrdered, Stream}, +}; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone, Default, Debug, Eq, PartialEq)] @@ -33,48 +36,43 @@ pub enum ReturnAs { } impl ToHttpPayload for QueryResult { - type Client = bitcoincore_rpc::Client; + type Client = BitcoindHttpBlockSource; type Item = PayloadKind; fn to_http_payload( &self, return_as: &ReturnAs, - client: &bitcoincore_rpc::Client, - ) -> Result, Error> { + client: &BitcoindHttpBlockSource, + ) -> Box, Error = Error> + Send + 'static> { // Close over some local variables for easier usage of the method - let to_payload = |id: TransactionId| to_payload(client, return_as, id); + let to_payload = |id: String| to_payload(client, return_as, id); - self.0 - .iter() - .filter_map(to_sha256d_hash) + let future = self + .0 + .clone() + .into_iter() .map(to_payload) - // .collect for Vec> transforms it into - // Result, Error> returning the first Error or the whole - // collection. - // We want this because the the Error means something is wrong with the connection to - // our bitcoin node and skipping is unlikely to help since the next call will fail as - // well. That is why we simply fail the whole function. - .collect() + .collect::>() + .collect(); + + Box::new(future) } } fn to_payload( - client: &bitcoincore_rpc::Client, + client: &BitcoindHttpBlockSource, return_as: &ReturnAs, - id: TransactionId, -) -> Result { + id: String, +) -> Box + Send> { + log::info!("Request for transaction {:?}", id); match return_as { - ReturnAs::TransactionId => Ok(PayloadKind::Id { id }), - ReturnAs::Transaction => { - match client - .get_raw_transaction_verbose(&id, None) - .map(|result| result.transaction()) - { - Ok(Ok(transaction)) => Ok(PayloadKind::Transaction { transaction }), - Ok(Err(e)) => Err(Error::BitcoinRpc(e.into())), - Err(e) => Err(Error::BitcoinRpc(e)), - } - } + ReturnAs::TransactionId => Box::new(futures::future::ok(PayloadKind::Id { id })), + ReturnAs::Transaction => Box::new( + client + .transaction_by_hash(id) + .map(|transaction| PayloadKind::Transaction { transaction }) + .map_err(Error::BitcoindHttp), + ), } } diff --git a/btsieve/src/ethereum/mod.rs b/btsieve/src/ethereum/mod.rs index 1ec6514709..7c24899933 100644 --- a/btsieve/src/ethereum/mod.rs +++ b/btsieve/src/ethereum/mod.rs @@ -17,7 +17,7 @@ impl Blockchain> for Ethereum { } Some(current_hash) => { if self.0.nodes.contains(&block) { - return log::warn!("Block already known {:?} ", block); + return log::warn!("Block already known {:?} ", block.hash); } match self.find_predecessor(&block) { Some(_prev) => { @@ -26,7 +26,7 @@ impl Blockchain> for Ethereum { .push((block.clone().parent_hash, current_hash)); } None => { - log::warn!("Could not find previous block for {:?} ", block); + log::warn!("Could not find previous block for {:?} ", block.hash); } } self.0.nodes.push(block); diff --git a/btsieve/src/ethereum/queries/event.rs b/btsieve/src/ethereum/queries/event.rs index 85d823b959..c76a95ca6a 100644 --- a/btsieve/src/ethereum/queries/event.rs +++ b/btsieve/src/ethereum/queries/event.rs @@ -136,16 +136,18 @@ impl ToHttpPayload for QueryResult { &self, return_as: &ReturnAs, client: &Web3, - ) -> Result, Error> { + ) -> Box, Error = Error> + Send + 'static> { let to_payload = |transaction_id: H256| to_payload(client, transaction_id, return_as); - self.0 + let future = self + .0 .iter() .filter_map(to_h256) .map(to_payload) .collect::>() - .collect() - .wait() + .collect(); + + Box::new(future) } } @@ -153,7 +155,7 @@ fn to_payload( client: &Web3, transaction_id: H256, return_as: &ReturnAs, -) -> Box> { +) -> Box + Send> { let tx_future = create_transaction_future(client, transaction_id); let receipt_future = create_receipt_future(client, transaction_id); diff --git a/btsieve/src/ethereum/queries/transaction.rs b/btsieve/src/ethereum/queries/transaction.rs index ff4cd831d7..eb40597b7d 100644 --- a/btsieve/src/ethereum/queries/transaction.rs +++ b/btsieve/src/ethereum/queries/transaction.rs @@ -84,16 +84,18 @@ impl ToHttpPayload for QueryResult { &self, return_as: &ReturnAs, client: &Web3, - ) -> Result, Error> { + ) -> Box, Error = Error> + Send + 'static> { let to_payload = |transaction_id: H256| to_payload(client, transaction_id, return_as); - self.0 + let future = self + .0 .iter() .filter_map(to_h256) .map(to_payload) .collect::>() - .collect() - .wait() + .collect(); + + Box::new(future) } } @@ -101,7 +103,7 @@ fn to_payload( client: &Web3, transaction_id: H256, return_as: &ReturnAs, -) -> Box> { +) -> Box + Send> { match return_as { ReturnAs::Transaction => Box::new( create_transaction_future(client, transaction_id) diff --git a/btsieve/src/main.rs b/btsieve/src/main.rs index c80ce1461d..2ca57e6893 100644 --- a/btsieve/src/main.rs +++ b/btsieve/src/main.rs @@ -1,10 +1,8 @@ #![warn(unused_extern_crates, missing_debug_implementations, rust_2018_idioms)] #![forbid(unsafe_code)] -use bitcoin_support::Network as BitcoinNetwork; -use bitcoincore_rpc::RpcApi; use btsieve::{ - bitcoin::{self, bitcoind_zmq_listener::bitcoin_block_listener}, + bitcoin::{self, bitcoind_http_blocksource::BitcoindHttpBlockSource}, blocksource::BlockSource, create_bitcoin_stub_endpoints, create_ethereum_stub_endpoints, ethereum::{self, web3_http_blocksource::Web3HttpBlockSource}, @@ -135,50 +133,36 @@ fn create_bitcoin_routes( let mut bitcoin_chain = Bitcoin::default(); - let bitcoin_rpc_client = bitcoincore_rpc::Client::new( - settings.node_url.to_string(), - settings.authentication.into(), - ) - .map_err(|e| { - log::debug!("failed to create bitcoincore_rpc::Client: {:?}", e); - Error::ConnectionError { - ledger: "bitcoin".to_owned(), - } - })?; - let blockchain_info = get_bitcoin_info(&bitcoin_rpc_client)?; - log::info!("Connected to Bitcoin: {:?}.", blockchain_info); - let network = blockchain_info - .chain - .parse::() - .map_err(|_| Error::UnknownLedgerVersion { - network: blockchain_info.chain, - ledger: "bitcoin".to_string(), - })? - .into(); - - log::trace!("Setting up bitcoin routes to {:?}.", network); - - log::info!("Connect BitcoinZmqListener to {}.", settings.zmq_endpoint); + let block_source = Arc::new(BitcoindHttpBlockSource::new( + settings.node_url, + settings.network, + )); + log::trace!("Setting up bitcoin routes to {:?}", settings.network); { let transaction_query_repository = Arc::clone(&transaction_query_repository); let transaction_query_result_repository = Arc::clone(&transaction_query_result_repository); - let blocks = bitcoin_block_listener(settings.zmq_endpoint.as_str()) - .expect("Should return a Bitcoind received for Blocks"); - - let bitcoin_processor = blocks.for_each(move |block| { - bitcoin_chain.add_block(block.clone()); + let bitcoin_block_processor = block_source + .clone() + .blocks() + .map_err(|e| log::warn!(target: "bitcoin", "error fetching latest block {:?}", e)) + .for_each(move |block| { + bitcoin_chain.add_block(block.clone()); - bitcoin::check_transaction_queries(transaction_query_repository.clone(), block.clone()) - .for_each(|QueryMatch(id, block_id)| { - transaction_query_result_repository.add_result(id.0, block_id); + bitcoin::check_transaction_queries( + transaction_query_repository.clone(), + block.clone(), + ) + .for_each(|QueryMatch(id, transaction_id)| { + transaction_query_result_repository.add_result(id.0, transaction_id); }); - Ok(()) - }); - runtime.spawn(bitcoin_processor); + Ok(()) + }); + + runtime.spawn(bitcoin_block_processor); } let ledger_name = "bitcoin"; @@ -187,9 +171,9 @@ fn create_bitcoin_routes( route_factory::create_endpoints::( transaction_query_repository, transaction_query_result_repository, - Arc::from(bitcoin_rpc_client), + block_source, ledger_name, - network, + settings.network.into(), ); Ok(transaction_routes.boxed()) @@ -283,20 +267,6 @@ fn create_ethereum_routes( Ok((transaction_routes.or(bloom_routes).boxed(), event_loop)) } -fn get_bitcoin_info( - client: &bitcoincore_rpc::Client, -) -> Result { - client.get_blockchain_info().map_err(|error| { - log::error!( - "Could not retrieve network version from ledger Bitcoin: {:?}", - error - ); - Error::ConnectionError { - ledger: String::from("Bitcoin"), - } - }) -} - fn get_ethereum_info(client: Arc>) -> Result { let network = client.net().version().wait()?; log::trace!("Connected to ethereum {:?}", network); diff --git a/btsieve/src/route_factory.rs b/btsieve/src/route_factory.rs index 3465fb99e0..930c7bb7f6 100644 --- a/btsieve/src/route_factory.rs +++ b/btsieve/src/route_factory.rs @@ -1,10 +1,12 @@ use crate::{ + bitcoin::bitcoind_http_blocksource, query_repository::QueryRepository, query_result_repository::{QueryResult, QueryResultRepository}, routes::{self, HttpApiProblemStdError}, web3, }; use ethereum_support::H256; +use futures::Future; use routes::Error as RouteError; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{fmt::Debug, sync::Arc}; @@ -15,7 +17,7 @@ pub const MAX_QUERY_ID_LENGTH: usize = 100; #[derive(Debug)] pub enum Error { - BitcoinRpc(bitcoincore_rpc::Error), + BitcoindHttp(bitcoind_http_blocksource::Error), Web3(web3::Error), MissingTransaction(H256), } @@ -32,7 +34,7 @@ pub trait ToHttpPayload { &self, return_as: &R, client: &Self::Client, - ) -> Result, Error>; + ) -> Box, Error = Error> + Send>; } #[derive(Deserialize, Serialize, Default, Debug, Eq, PartialEq, Hash)] @@ -80,7 +82,7 @@ pub fn create_ethereum_stub_endpoints( } pub fn create_endpoints< - R, + R: Sync, Q: QueryType + DeserializeOwned + Serialize + Debug + Send + Eq + 'static, QR: QueryRepository, QRR: QueryResultRepository, diff --git a/btsieve/src/routes.rs b/btsieve/src/routes.rs index 6fe1934a51..9191144db2 100644 --- a/btsieve/src/routes.rs +++ b/btsieve/src/routes.rs @@ -3,6 +3,7 @@ use crate::{ query_result_repository::{QueryResult, QueryResultRepository}, route_factory::{QueryParams, ToHttpPayload, MAX_QUERY_ID_LENGTH}, }; +use futures::{Future, IntoFuture}; use http::StatusCode; use http_api_problem::HttpApiProblem; use serde::{Deserialize, Serialize}; @@ -112,7 +113,7 @@ pub fn create_query, C: 'static + Send + Sync>( #[allow(clippy::needless_pass_by_value)] pub fn retrieve_query< - R: Debug + Default, + R: Debug + Default + Sync + 'static, Q: Serialize + Send + Debug, QR: QueryRepository, QRR: QueryResultRepository, @@ -124,7 +125,7 @@ pub fn retrieve_query< query_result_repository: Arc, id: String, query_params: QueryParams, -) -> Result +) -> impl Future where for<'de> R: Deserialize<'de>, QueryResult: ToHttpPayload, @@ -132,22 +133,27 @@ where query_repository .get(id.clone()) .ok_or(Error::QueryNotFound) - .and_then(|query| { - query_result_repository - .get(id.clone()) - .unwrap_or_default() - .to_http_payload(&query_params.return_as, &client) - .map(|matches| RetrieveQueryResponse { query, matches }) - .map(|response| warp::reply::json(&response)) - .map_err(|e| { - log::error!( - "failed to transform result for query {} to payload {:?}: {:?}.", - id, - query_params.return_as, - e - ); - Error::TransformToPayload - }) + .into_future() + .and_then({ + let id = id.clone(); + let query_result_repository = Arc::clone(&query_result_repository); + move |query| { + query_result_repository + .get(id.clone()) + .unwrap_or_default() + .to_http_payload(&query_params.return_as, &client) + .map(|matches| RetrieveQueryResponse { query, matches }) + .map(|response| warp::reply::json(&response)) + .map_err(move |e| { + log::error!( + "failed to transform result for query {} to payload {:?}: {:?}.", + id, + query_params.return_as, + e + ); + Error::TransformToPayload + }) + } }) .map_err(|e| { warp::reject::custom(HttpApiProblemStdError { diff --git a/btsieve/src/settings/mod.rs b/btsieve/src/settings/mod.rs index 910eb33669..77527c439b 100644 --- a/btsieve/src/settings/mod.rs +++ b/btsieve/src/settings/mod.rs @@ -1,13 +1,10 @@ mod serde_log; +use bitcoin_support::Network; use config::{Config, ConfigError, File}; use log::LevelFilter; use serde::Deserialize; -use std::{ - ffi::OsStr, - net::IpAddr, - path::{Path, PathBuf}, -}; +use std::{ffi::OsStr, net::IpAddr, path::Path}; #[derive(Debug, Deserialize, Clone)] pub struct Settings { @@ -34,36 +31,10 @@ pub struct HttpApi { pub port_bind: u16, } -#[derive(Debug, Deserialize, Clone, PartialEq)] -pub enum BitcoinAuth { - Cookie { - file_path: String, - }, - Basic { - node_username: String, - node_password: String, - }, -} - -impl From for bitcoincore_rpc::Auth { - fn from(bitcoin_auth: BitcoinAuth) -> Self { - match bitcoin_auth { - BitcoinAuth::Basic { - node_username, - node_password, - } => bitcoincore_rpc::Auth::UserPass(node_username, node_password), - BitcoinAuth::Cookie { file_path } => { - bitcoincore_rpc::Auth::CookieFile(PathBuf::from(file_path)) - } - } - } -} - #[derive(Debug, Deserialize, Clone)] pub struct Bitcoin { - pub zmq_endpoint: String, - pub node_url: url::Url, - pub authentication: BitcoinAuth, + pub network: Network, + pub node_url: String, } #[derive(Debug, Deserialize, Clone)] @@ -143,37 +114,4 @@ mod tests { Ok(()) } - - #[test] - fn can_read_config_with_bitcoin_cookie_authentication() { - let settings = Settings::read("./config/bitcoin_cookieauth.toml"); - - let cookie_authentication = BitcoinAuth::Cookie { - file_path: "/home/bitcoin/.bitcoin/regtest/.cookie".to_owned(), - }; - - assert_that(&settings) - .is_ok() - .map(|s| &s.bitcoin) - .is_some() - .map(|b| &b.authentication) - .is_equal_to(cookie_authentication); - } - - #[test] - fn can_read_config_with_bitcoin_basic_authentication() { - let settings = Settings::read("./config/bitcoin_basicauth.toml"); - - let basic_authentication = BitcoinAuth::Basic { - node_username: "Satoshi".to_owned(), - node_password: "Nakamoto".to_owned(), - }; - - assert_that(&settings) - .is_ok() - .map(|s| &s.bitcoin) - .is_some() - .map(|b| &b.authentication) - .is_equal_to(basic_authentication); - } } diff --git a/cnd/src/swap_protocols/rfc003/bitcoin/htlc_events.rs b/cnd/src/swap_protocols/rfc003/bitcoin/htlc_events.rs index 4ae067f912..ea60f56d79 100644 --- a/cnd/src/swap_protocols/rfc003/bitcoin/htlc_events.rs +++ b/cnd/src/swap_protocols/rfc003/bitcoin/htlc_events.rs @@ -74,6 +74,7 @@ impl HtlcEvents for Arc { let refunded_query = self .create(BitcoinQuery::refund_htlc(htlc_deployment.location)) + .inspect(|query_id| log::debug!("Refund query id {:?}", query_id)) .map_err(rfc003::Error::Btsieve); refunded_query @@ -89,6 +90,7 @@ impl HtlcEvents for Arc { let query_bitcoin = Arc::clone(&self); let redeemed_query = self .create(BitcoinQuery::redeem_htlc(htlc_deployment.location)) + .inspect(|query_id| log::debug!("Redeem query id {:?}", query_id)) .map_err(rfc003::Error::Btsieve); redeemed_query.and_then(move |query_id| { diff --git a/internal/bitcoin_support/src/lib.rs b/internal/bitcoin_support/src/lib.rs index 45d9f2f160..0b55e12fbe 100644 --- a/internal/bitcoin_support/src/lib.rs +++ b/internal/bitcoin_support/src/lib.rs @@ -9,7 +9,7 @@ pub use bitcoin::{ script::{self, Script}, transaction::{OutPoint, SigHashType, Transaction, TxIn, TxOut}, }, - consensus::{deserialize, encode::serialize_hex, serialize}, + consensus::{self, deserialize, encode::serialize_hex, serialize}, hashes::{hash160::Hash as Hash160, hex::FromHex, sha256d::Hash as Sha256dHash, Hash}, secp256k1, util::{