diff --git a/.github/actions/ci-rust-setup/action.yml b/.github/actions/ci-rust-setup/action.yml new file mode 100644 index 00000000..03b548bb --- /dev/null +++ b/.github/actions/ci-rust-setup/action.yml @@ -0,0 +1,50 @@ +name: CI Rust Setup +description: 'Sets up the environment for Rust jobs during CI workflow' + +inputs: + cache-name: + description: 'Name of cache artifacts (same name is same cache key) empty to disable cache' + required: false + targets: + description: 'A comma separated list of extra targets you want to install' + required: false + components: + description: 'A comma separated list of extra components you want to install' + required: false + toolchain: + description: 'The toolchain to use. If not specified, the rust-toolchain file will be used' + required: false + +runs: + using: composite + steps: + - name: Get toolchain from input OR rust-toolchain file + id: gettoolchain + shell: bash + run: |- + RUST_TOOLCHAIN="${{ inputs.toolchain }}" + if [ ! -f rust-toolchain ] && [ -z "${RUST_TOOLCHAIN}" ]; then + echo "***ERROR*** NEED toolchain INPUT OR rust-toolchain FILE IN ROOT OF REPOSITORY" >&2 + exit 1 + fi + if [ -z "${RUST_TOOLCHAIN}" ]; then + RUST_TOOLCHAIN="$(cat rust-toolchain)" + fi + echo "toolchain=\"${RUST_TOOLCHAIN}\"" >> $GITHUB_OUTPUT + - name: Install ${{ steps.gettoolchain.outputs.toolchain }} Rust toolchain + id: toolchain + # Commit date is Sep 19, 2023 + uses: dtolnay/rust-toolchain@439cf607258077187679211f12aa6f19af4a0af7 + with: + toolchain: ${{ steps.gettoolchain.outputs.toolchain }} + targets: ${{ inputs.targets }} + components: ${{ inputs.components }} + - name: Cache dependencies + uses: actions/cache@v3 + if: inputs.cache-name != '' + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ inputs.cache-name }}-${{ steps.toolchain.outputs.cachekey }}-${{ hashFiles('**/Cargo.lock') }} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6f96b4b4..c9ef33c5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,16 +13,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - id: toolchain - uses: dtolnay/rust-toolchain@1.70 - - name: Cache dependencies - uses: actions/cache@v3 + - name: Setup Rust + uses: './.github/actions/ci-rust-setup' with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-${{ steps.toolchain.outputs.cachekey }}-${{ hashFiles('**/Cargo.lock') }} + cache-name: dev - run: cargo check --all-features fmt: @@ -30,8 +24,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - id: toolchain - uses: dtolnay/rust-toolchain@1.70 + - name: Setup Rust + uses: './.github/actions/ci-rust-setup' with: components: rustfmt - run: cargo fmt --all -- --check @@ -41,17 +35,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - id: toolchain - uses: dtolnay/rust-toolchain@1.70 - - name: Cache dependencies - uses: actions/cache@v3 - with: # test cache key is different (adding test cfg is a re-compile) - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-test-${{ steps.toolchain.outputs.cachekey }}-${{ hashFiles('**/Cargo.lock') }} - - run: cargo test --package electrs --lib --all-features + - name: Setup Rust + uses: './.github/actions/ci-rust-setup' + with: + cache-name: test + - run: cargo test --lib --all-features clippy: name: Linter @@ -67,17 +55,10 @@ jobs: ] steps: - uses: actions/checkout@v3 - - id: toolchain - uses: dtolnay/rust-toolchain@1.70 + - name: Setup Rust + uses: './.github/actions/ci-rust-setup' with: + cache-name: dev components: clippy - - name: Cache dependencies - uses: actions/cache@v3 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-${{ steps.toolchain.outputs.cachekey }}-${{ hashFiles('**/Cargo.lock') }} - name: Clippy with Features = ${{ matrix.features }} run: cargo clippy ${{ matrix.features }} -- -D warnings diff --git a/.github/workflows/on-tag.yml b/.github/workflows/on-tag.yml new file mode 100644 index 00000000..f3e200e0 --- /dev/null +++ b/.github/workflows/on-tag.yml @@ -0,0 +1,97 @@ +name: Docker build on tag +env: + DOCKER_CLI_EXPERIMENTAL: enabled + TAG_FMT: "^refs/tags/(((.?[0-9]+){3,4}))$" + DOCKER_BUILDKIT: 0 + COMPOSE_DOCKER_CLI_BUILD: 0 + +on: + push: + tags: + - v[0-9]+.[0-9]+.[0-9]+ + - v[0-9]+.[0-9]+.[0-9]+-* + +permissions: + contents: read + +jobs: + build: + runs-on: ubuntu-latest + timeout-minutes: 120 + name: Build and push to DockerHub + steps: + # Workaround based on JonasAlfredsson/docker-on-tmpfs@v1.0.1 + - name: Replace the current swap file + shell: bash + run: | + sudo swapoff /mnt/swapfile + sudo rm -v /mnt/swapfile + sudo fallocate -l 13G /mnt/swapfile + sudo chmod 600 /mnt/swapfile + sudo mkswap /mnt/swapfile + sudo swapon /mnt/swapfile + + - name: Show current memory and swap status + shell: bash + run: | + sudo free -h + echo + sudo swapon --show + + - name: Mount a tmpfs over /var/lib/docker + shell: bash + run: | + if [ ! -d "/var/lib/docker" ]; then + echo "Directory '/var/lib/docker' not found" + exit 1 + fi + sudo mount -t tmpfs -o size=10G tmpfs /var/lib/docker + sudo systemctl restart docker + sudo df -h | grep docker + + - name: Set env variables + run: echo "TAG=${GITHUB_REF/refs\/tags\//}" >> $GITHUB_ENV + + - name: Show set environment variables + run: | + printf " TAG: %s\n" "$TAG" + + - name: Add SHORT_SHA env property with commit short sha + run: echo "SHORT_SHA=`echo ${GITHUB_SHA} | cut -c1-8`" >> $GITHUB_ENV + + - name: Login to Docker for building + run: echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin + + - name: Checkout project + uses: actions/checkout@v3 + + # - name: Set up QEMU + # uses: docker/setup-qemu-action@v3 + # id: qemu + + - name: Setup Docker buildx action + uses: docker/setup-buildx-action@v3 + id: buildx + + - name: Available platforms + run: echo ${{ steps.buildx.outputs.platforms }} + + - name: Cache Docker layers + uses: actions/cache@v3 + id: cache + with: + path: /tmp/.buildx-cache + key: ${{ runner.os }}-buildx + restore-keys: | + ${{ runner.os }}-buildx + + - name: Run Docker buildx against tag + run: | + docker buildx build \ + --cache-from "type=local,src=/tmp/.buildx-cache" \ + --cache-to "type=local,dest=/tmp/.buildx-cache" \ + --platform linux/amd64 \ + --tag ${{ secrets.DOCKER_HUB_USER }}/electrs:$TAG \ + --tag ${{ secrets.DOCKER_HUB_USER }}/electrs:latest \ + --output "type=registry" . \ + --build-arg commitHash=$SHORT_SHA diff --git a/Cargo.lock b/Cargo.lock index 23b2bb57..db6d368a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -397,49 +397,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" -[[package]] -name = "electrs" -version = "3.0.0-dev" -dependencies = [ - "arrayref", - "base64 0.13.0", - "bincode", - "bitcoin 0.28.0", - "bounded-vec-deque", - "clap", - "crossbeam-channel", - "dirs", - "electrum-client", - "elements", - "error-chain", - "glob", - "hex", - "hyper", - "hyperlocal", - "itertools", - "lazy_static", - "libc", - "log", - "num_cpus", - "page_size", - "prometheus", - "rayon", - "rocksdb", - "serde", - "serde_derive", - "serde_json", - "sha2", - "signal-hook", - "socket2", - "stderrlog", - "sysconf", - "tempfile", - "time 0.3.9", - "tiny_http", - "tokio", - "url", -] - [[package]] name = "electrum-client" version = "0.8.0" @@ -863,6 +820,49 @@ dependencies = [ "autocfg 1.1.0", ] +[[package]] +name = "mempool-electrs" +version = "3.0.0-dev" +dependencies = [ + "arrayref", + "base64 0.13.0", + "bincode", + "bitcoin 0.28.0", + "bounded-vec-deque", + "clap", + "crossbeam-channel", + "dirs", + "electrum-client", + "elements", + "error-chain", + "glob", + "hex", + "hyper", + "hyperlocal", + "itertools", + "lazy_static", + "libc", + "log", + "num_cpus", + "page_size", + "prometheus", + "rayon", + "rocksdb", + "serde", + "serde_derive", + "serde_json", + "sha2", + "signal-hook", + "socket2", + "stderrlog", + "sysconf", + "tempfile", + "time 0.3.9", + "tiny_http", + "tokio", + "url", +] + [[package]] name = "minimal-lexical" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 5872d6ba..58c5b579 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,20 +1,28 @@ [package] -name = "electrs" +name = "mempool-electrs" version = "3.0.0-dev" -authors = ["Roman Zeyde "] +authors = [ + "Roman Zeyde ", + "Nadav Ivgi ", + "wiz ", + "junderw " +] description = "An efficient re-implementation of Electrum Server in Rust" license = "MIT" homepage = "https://github.com/mempool/electrs" repository = "https://github.com/mempool/electrs" +publish = false keywords = ["bitcoin", "electrum", "server", "index", "database"] -documentation = "https://docs.rs/electrs/" readme = "README.md" edition = "2018" +[lib] +name = "electrs" + [features] default = [] -liquid = [ "elements" ] -electrum-discovery = [ "electrum-client"] +liquid = ["elements"] +electrum-discovery = ["electrum-client"] [dependencies] arrayref = "0.3.6" diff --git a/README.md b/README.md index 5cf685ff..d684dc48 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ -# Esplora - Electrs backend API +# Mempool - Electrs backend API -A block chain index engine and HTTP API written in Rust based on [romanz/electrs](https://github.com/romanz/electrs). +A block chain index engine and HTTP API written in Rust based on [romanz/electrs](https://github.com/romanz/electrs) and [Blockstream/electrs](https://github.com/Blockstream/electrs). -Used as the backend for the [Esplora block explorer](https://github.com/Blockstream/esplora) powering [blockstream.info](https://blockstream.info/). +Used as the backend for the [mempool block explorer](https://github.com/mempool/mempool) powering [mempool.space](https://mempool.space/). -API documentation [is available here](https://github.com/blockstream/esplora/blob/master/API.md). +API documentation [is available here](https://mempool.space/docs/api/rest). Documentation for the database schema and indexing process [is available here](doc/schema.md). @@ -13,8 +13,8 @@ Documentation for the database schema and indexing process [is available here](d Install Rust, Bitcoin Core (no `txindex` needed) and the `clang` and `cmake` packages, then: ```bash -$ git clone https://github.com/blockstream/electrs && cd electrs -$ git checkout new-index +$ git clone https://github.com/mempool/electrs && cd electrs +$ git checkout mempool $ cargo run --release --bin electrs -- -vvvv --daemon-dir ~/.bitcoin # Or for liquid: @@ -24,11 +24,9 @@ $ cargo run --features liquid --release --bin electrs -- -vvvv --network liquid See [electrs's original documentation](https://github.com/romanz/electrs/blob/master/doc/usage.md) for more detailed instructions. Note that our indexes are incompatible with electrs's and has to be created separately. -The indexes require 610GB of storage after running compaction (as of June 2020), but you'll need to have +The indexes require 1.3TB of storage after running compaction (as of October 2023), but you'll need to have free space of about double that available during the index compaction process. -Creating the indexes should take a few hours on a beefy machine with SSD. - -To deploy with Docker, follow the [instructions here](https://github.com/Blockstream/esplora#how-to-build-the-docker-image). +Creating the indexes should take a few hours on a beefy machine with high speed NVMe SSD(s). ### Light mode @@ -78,7 +76,7 @@ Additional options with the `electrum-discovery` feature: - `--electrum-hosts ` - a json map of the public hosts where the electrum server is reachable, in the [`server.features` format](https://electrumx.readthedocs.io/en/latest/protocol-methods.html#server.features). - `--electrum-announce` - announce the electrum server on the electrum p2p server discovery network. -See `$ cargo run --release --bin electrs -- --help` for the full list of options. +See `$ cargo run --bin electrs -- --help` for the full list of options. ## License diff --git a/rust-toolchain b/rust-toolchain new file mode 100644 index 00000000..bfe79d0b --- /dev/null +++ b/rust-toolchain @@ -0,0 +1 @@ +1.70 diff --git a/scripts/checks.sh b/scripts/checks.sh index aeda1261..a80db52e 100755 --- a/scripts/checks.sh +++ b/scripts/checks.sh @@ -59,4 +59,4 @@ cargo clippy $@ -q -F electrum-discovery,liquid TESTNAME="Running cargo test with all features" echo "$TESTNAME" -cargo test $@ -q --package electrs --lib --all-features +cargo test $@ -q --lib --all-features diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 68fa47ff..0c649355 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -50,6 +50,7 @@ fn run_server(config: Arc) -> Result<()> { config.daemon_rpc_addr, config.cookie_getter(), config.network_type, + config.magic, signal.clone(), &metrics, )?); @@ -74,7 +75,18 @@ fn run_server(config: Arc) -> Result<()> { &metrics, Arc::clone(&config), ))); - mempool.write().unwrap().update(&daemon)?; + loop { + match Mempool::update(&mempool, &daemon) { + Ok(_) => break, + Err(e) => { + warn!( + "Error performing initial mempool update, trying again in 5 seconds: {}", + e.display_chain() + ); + signal.wait(Duration::from_secs(5), false)?; + } + } + } #[cfg(feature = "liquid")] let asset_db = config.asset_db_path.as_ref().map(|db_dir| { @@ -93,7 +105,7 @@ fn run_server(config: Arc) -> Result<()> { )); // TODO: configuration for which servers to start - let rest_server = rest::start(Arc::clone(&config), Arc::clone(&query)); + let rest_server = rest::start(Arc::clone(&config), Arc::clone(&query), &metrics); let electrum_server = ElectrumRPC::start(Arc::clone(&config), Arc::clone(&query), &metrics); if let Some(ref precache_file) = config.precache_scripts { @@ -107,7 +119,7 @@ fn run_server(config: Arc) -> Result<()> { } loop { - if let Err(err) = signal.wait(Duration::from_millis(500), true) { + if let Err(err) = signal.wait(Duration::from_millis(config.main_loop_delay), true) { info!("stopping server: {}", err); electrs::util::spawn_thread("shutdown-thread-checker", || { @@ -136,7 +148,13 @@ fn run_server(config: Arc) -> Result<()> { }; // Update mempool - mempool.write().unwrap().update(&daemon)?; + if let Err(e) = Mempool::update(&mempool, &daemon) { + // Log the error if the result is an Err + warn!( + "Error updating mempool, skipping mempool update: {}", + e.display_chain() + ); + } // Update subscribed clients electrum_server.notify(); diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 55cb6797..5b38561f 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -35,6 +35,7 @@ fn main() { config.daemon_rpc_addr, config.cookie_getter(), config.network_type, + config.magic, signal, &metrics, ) diff --git a/src/chain.rs b/src/chain.rs index de726186..8abf9a4a 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + #[cfg(not(feature = "liquid"))] // use regular Bitcoin data structures pub use bitcoin::{ blockdata::{opcodes, script, witness::Witness}, @@ -32,6 +34,8 @@ pub enum Network { #[cfg(not(feature = "liquid"))] Testnet, #[cfg(not(feature = "liquid"))] + Testnet4, + #[cfg(not(feature = "liquid"))] Regtest, #[cfg(not(feature = "liquid"))] Signet, @@ -129,22 +133,25 @@ pub fn genesis_hash(network: Network) -> BlockHash { return liquid_genesis_hash(network); } -pub fn bitcoin_genesis_hash(network: BNetwork) -> bitcoin::BlockHash { +pub fn bitcoin_genesis_hash(network: Network) -> bitcoin::BlockHash { lazy_static! { static ref BITCOIN_GENESIS: bitcoin::BlockHash = genesis_block(BNetwork::Bitcoin).block_hash(); static ref TESTNET_GENESIS: bitcoin::BlockHash = genesis_block(BNetwork::Testnet).block_hash(); + static ref TESTNET4_GENESIS: bitcoin::BlockHash = + BlockHash::from_str("00000000da84f2bafbbc53dee25a72ae507ff4914b867c565be350b0da8bf043").unwrap(); static ref REGTEST_GENESIS: bitcoin::BlockHash = genesis_block(BNetwork::Regtest).block_hash(); static ref SIGNET_GENESIS: bitcoin::BlockHash = genesis_block(BNetwork::Signet).block_hash(); } match network { - BNetwork::Bitcoin => *BITCOIN_GENESIS, - BNetwork::Testnet => *TESTNET_GENESIS, - BNetwork::Regtest => *REGTEST_GENESIS, - BNetwork::Signet => *SIGNET_GENESIS, + Network::Bitcoin => *BITCOIN_GENESIS, + Network::Testnet => *TESTNET_GENESIS, + Network::Testnet4 => *TESTNET4_GENESIS, + Network::Regtest => *REGTEST_GENESIS, + Network::Signet => *SIGNET_GENESIS, } } @@ -174,6 +181,8 @@ impl From<&str> for Network { #[cfg(not(feature = "liquid"))] "testnet" => Network::Testnet, #[cfg(not(feature = "liquid"))] + "testnet4" => Network::Testnet4, + #[cfg(not(feature = "liquid"))] "regtest" => Network::Regtest, #[cfg(not(feature = "liquid"))] "signet" => Network::Signet, @@ -196,6 +205,7 @@ impl From for BNetwork { match network { Network::Bitcoin => BNetwork::Bitcoin, Network::Testnet => BNetwork::Testnet, + Network::Testnet4 => BNetwork::Testnet, Network::Regtest => BNetwork::Regtest, Network::Signet => BNetwork::Signet, } diff --git a/src/config.rs b/src/config.rs index ff81d4ad..a5e903ce 100644 --- a/src/config.rs +++ b/src/config.rs @@ -33,6 +33,7 @@ pub struct Config { // See below for the documentation of each field: pub log: stderrlog::StdErrLog, pub network_type: Network, + pub magic: Option, pub db_path: PathBuf, pub daemon_dir: PathBuf, pub blocks_dir: PathBuf, @@ -45,6 +46,7 @@ pub struct Config { pub monitoring_addr: SocketAddr, pub jsonrpc_import: bool, pub light_mode: bool, + pub main_loop_delay: u64, pub address_search: bool, pub index_unspendables: bool, pub cors: Option, @@ -58,6 +60,9 @@ pub struct Config { pub rest_default_block_limit: usize, pub rest_default_chain_txs_per_page: usize, pub rest_default_max_mempool_txs: usize, + pub rest_default_max_address_summary_txs: usize, + pub rest_max_mempool_page_size: usize, + pub rest_max_mempool_txid_page_size: usize, #[cfg(feature = "liquid")] pub parent_network: BNetwork, @@ -85,7 +90,7 @@ impl Config { pub fn from_args() -> Config { let network_help = format!("Select network type ({})", Network::names().join(", ")); - let args = App::new("Electrum Rust Server") + let args = App::new("Mempool Electrum Rust Server") .version(crate_version!()) .arg( Arg::with_name("version") @@ -133,6 +138,12 @@ impl Config { .help(&network_help) .takes_value(true), ) + .arg( + Arg::with_name("magic") + .long("magic") + .default_value("") + .takes_value(true), + ) .arg( Arg::with_name("electrum_rpc_addr") .long("electrum-rpc-addr") @@ -167,6 +178,12 @@ impl Config { .long("lightmode") .help("Enable light mode for reduced storage") ) + .arg( + Arg::with_name("main_loop_delay") + .long("main-loop-delay") + .help("The number of milliseconds the main loop will wait between loops. (Can be shortened with SIGUSR1)") + .default_value("500") + ) .arg( Arg::with_name("address_search") .long("address-search") @@ -231,6 +248,24 @@ impl Config { .help("The default number of mempool transactions returned by the txs endpoints.") .default_value("50") ) + .arg( + Arg::with_name("rest_default_max_address_summary_txs") + .long("rest-default-max-address-summary-txs") + .help("The default number of transactions returned by the address summary endpoints.") + .default_value("5000") + ) + .arg( + Arg::with_name("rest_max_mempool_page_size") + .long("rest-max-mempool-page-size") + .help("The maximum number of transactions returned by the paginated /internal/mempool/txs endpoint.") + .default_value("1000") + ) + .arg( + Arg::with_name("rest_max_mempool_txid_page_size") + .long("rest-max-mempool-txid-page-size") + .help("The maximum number of transactions returned by the paginated /mempool/txids/page endpoint.") + .default_value("10000") + ) .arg( Arg::with_name("electrum_txs_limit") .long("electrum-txs-limit") @@ -300,6 +335,10 @@ impl Config { let network_name = m.value_of("network").unwrap_or("mainnet"); let network_type = Network::from(network_name); + let magic: Option = m + .value_of("magic") + .filter(|s| !s.is_empty()) + .map(|s| u32::from_str_radix(s, 16).expect("invalid network magic")); let db_dir = Path::new(m.value_of("db_dir").unwrap_or("./db")); let db_path = db_dir.join(network_name); @@ -325,6 +364,8 @@ impl Config { Network::Regtest => 18443, #[cfg(not(feature = "liquid"))] Network::Signet => 38332, + #[cfg(not(feature = "liquid"))] + Network::Testnet4 => 48332, #[cfg(feature = "liquid")] Network::Liquid => 7041, @@ -337,6 +378,8 @@ impl Config { #[cfg(not(feature = "liquid"))] Network::Testnet => 60001, #[cfg(not(feature = "liquid"))] + Network::Testnet4 => 40001, + #[cfg(not(feature = "liquid"))] Network::Regtest => 60401, #[cfg(not(feature = "liquid"))] Network::Signet => 60601, @@ -357,6 +400,8 @@ impl Config { Network::Regtest => 3002, #[cfg(not(feature = "liquid"))] Network::Signet => 3003, + #[cfg(not(feature = "liquid"))] + Network::Testnet4 => 3004, #[cfg(feature = "liquid")] Network::Liquid => 3000, @@ -373,6 +418,8 @@ impl Config { #[cfg(not(feature = "liquid"))] Network::Regtest => 24224, #[cfg(not(feature = "liquid"))] + Network::Testnet4 => 44224, + #[cfg(not(feature = "liquid"))] Network::Signet => 54224, #[cfg(feature = "liquid")] @@ -421,6 +468,8 @@ impl Config { #[cfg(not(feature = "liquid"))] Network::Testnet => daemon_dir.push("testnet3"), #[cfg(not(feature = "liquid"))] + Network::Testnet4 => daemon_dir.push("testnet4"), + #[cfg(not(feature = "liquid"))] Network::Regtest => daemon_dir.push("regtest"), #[cfg(not(feature = "liquid"))] Network::Signet => daemon_dir.push("signet"), @@ -458,6 +507,7 @@ impl Config { let config = Config { log, network_type, + magic, db_path, daemon_dir, blocks_dir, @@ -484,8 +534,20 @@ impl Config { "rest_default_max_mempool_txs", usize ), + rest_default_max_address_summary_txs: value_t_or_exit!( + m, + "rest_default_max_address_summary_txs", + usize + ), + rest_max_mempool_page_size: value_t_or_exit!(m, "rest_max_mempool_page_size", usize), + rest_max_mempool_txid_page_size: value_t_or_exit!( + m, + "rest_max_mempool_txid_page_size", + usize + ), jsonrpc_import: m.is_present("jsonrpc_import"), light_mode: m.is_present("light_mode"), + main_loop_delay: value_t_or_exit!(m, "main_loop_delay", u64), address_search: m.is_present("address_search"), index_unspendables: m.is_present("index_unspendables"), cors: m.value_of("cors").map(|s| s.to_string()), diff --git a/src/daemon.rs b/src/daemon.rs index 0be3a8ff..b8bde690 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -117,6 +117,26 @@ struct NetworkInfo { relayfee: f64, // in BTC/kB } +#[derive(Serialize, Deserialize, Debug)] +struct MempoolFees { + base: f64, + #[serde(rename = "effective-feerate")] + effective_feerate: f64, + #[serde(rename = "effective-includes")] + effective_includes: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct MempoolAcceptResult { + txid: String, + wtxid: String, + allowed: Option, + vsize: Option, + fees: Option, + #[serde(rename = "reject-reason")] + reject_reason: Option, +} + pub trait CookieGetter: Send + Sync { fn get(&self) -> Result>; } @@ -264,6 +284,7 @@ pub struct Daemon { daemon_dir: PathBuf, blocks_dir: PathBuf, network: Network, + magic: Option, conn: Mutex, message_id: Counter, // for monotonic JSONRPC 'id' signal: Waiter, @@ -280,6 +301,7 @@ impl Daemon { daemon_rpc_addr: SocketAddr, cookie_getter: Arc, network: Network, + magic: Option, signal: Waiter, metrics: &Metrics, ) -> Result { @@ -287,6 +309,7 @@ impl Daemon { daemon_dir, blocks_dir, network, + magic, conn: Mutex::new(Connection::new( daemon_rpc_addr, cookie_getter, @@ -321,10 +344,10 @@ impl Daemon { let mempool = daemon.getmempoolinfo()?; let ibd_done = if network.is_regtest() { - info.blocks == 0 && info.headers == 0 + info.blocks == info.headers } else { - false - } || !info.initialblockdownload.unwrap_or(false); + !info.initialblockdownload.unwrap_or(false) + }; if mempool.loaded && ibd_done && info.blocks == info.headers { break; @@ -347,6 +370,7 @@ impl Daemon { daemon_dir: self.daemon_dir.clone(), blocks_dir: self.blocks_dir.clone(), network: self.network, + magic: self.magic, conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), message_id: Counter::new(), signal: self.signal.clone(), @@ -367,7 +391,7 @@ impl Daemon { } pub fn magic(&self) -> u32 { - self.network.magic() + self.magic.unwrap_or_else(|| self.network.magic()) } fn call_jsonrpc(&self, method: &str, request: &Value) -> Result { @@ -387,19 +411,46 @@ impl Daemon { Ok(result) } - fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn handle_request_batch( + &self, + method: &str, + params_list: &[Value], + failure_threshold: f64, + ) -> Result> { let id = self.message_id.next(); let chunks = params_list .iter() .map(|params| json!({"method": method, "params": params, "id": id})) .chunks(50_000); // Max Amount of batched requests let mut results = vec![]; + let total_requests = params_list.len(); + let mut failed_requests: u64 = 0; + let threshold = (failure_threshold * total_requests as f64).round() as u64; + let mut n = 0; + for chunk in &chunks { let reqs = chunk.collect(); let mut replies = self.call_jsonrpc(method, &reqs)?; if let Some(replies_vec) = replies.as_array_mut() { for reply in replies_vec { - results.push(parse_jsonrpc_reply(reply.take(), method, id)?) + n += 1; + match parse_jsonrpc_reply(reply.take(), method, id) { + Ok(parsed_reply) => results.push(parsed_reply), + Err(e) => { + failed_requests += 1; + warn!( + "batch request {} {}/{} failed: {}", + method, + n, + total_requests, + e.to_string() + ); + // abort and return the last error once a threshold number of requests have failed + if failed_requests > threshold { + return Err(e); + } + } + } } } else { bail!("non-array replies: {:?}", replies); @@ -409,9 +460,14 @@ impl Daemon { Ok(results) } - fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn retry_request_batch( + &self, + method: &str, + params_list: &[Value], + failure_threshold: f64, + ) -> Result> { loop { - match self.handle_request_batch(method, params_list) { + match self.handle_request_batch(method, params_list, failure_threshold) { Err(Error(ErrorKind::Connection(msg), _)) => { warn!("reconnecting to bitcoind: {}", msg); self.signal.wait(Duration::from_secs(3), false)?; @@ -425,13 +481,13 @@ impl Daemon { } fn request(&self, method: &str, params: Value) -> Result { - let mut values = self.retry_request_batch(method, &[params])?; + let mut values = self.retry_request_batch(method, &[params], 0.0)?; assert_eq!(values.len(), 1); Ok(values.remove(0)) } fn requests(&self, method: &str, params_list: &[Value]) -> Result> { - self.retry_request_batch(method, params_list) + self.retry_request_batch(method, params_list, 0.0) } // bitcoind JSONRPC API: @@ -506,13 +562,12 @@ impl Daemon { .iter() .map(|txhash| json!([txhash.to_hex(), /*verbose=*/ false])) .collect(); - - let values = self.requests("getrawtransaction", ¶ms_list)?; + let values = self.retry_request_batch("getrawtransaction", ¶ms_list, 0.25)?; let mut txs = vec![]; for value in values { txs.push(tx_from_value(value)?); } - assert_eq!(txhashes.len(), txs.len()); + // missing transactions are skipped, so the number of txs returned may be less than the number of txids requested Ok(txs) } @@ -551,6 +606,20 @@ impl Daemon { .chain_err(|| "failed to parse txid") } + pub fn test_mempool_accept( + &self, + txhex: Vec, + maxfeerate: Option, + ) -> Result> { + let params = match maxfeerate { + Some(rate) => json!([txhex, format!("{:.8}", rate)]), + None => json!([txhex]), + }; + let result = self.request("testmempoolaccept", params)?; + serde_json::from_value::>(result) + .chain_err(|| "invalid testmempoolaccept reply") + } + // Get estimated feerates for the provided confirmation targets using a batch RPC request // Missing estimates are logged but do not cause a failure, whatever is available is returned #[allow(clippy::float_cmp)] diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 1b44ee99..ae427cd2 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -636,7 +636,10 @@ fn get_history( ) -> Result)>> { // to avoid silently trunacting history entries, ask for one extra more than the limit and fail if it exists let history_txids = query.history_txids(scripthash, txs_limit + 1); - ensure!(history_txids.len() <= txs_limit, ErrorKind::TooPopular); + ensure!( + history_txids.len() <= txs_limit, + ErrorKind::TooManyTxs(txs_limit) + ); Ok(history_txids) } diff --git a/src/errors.rs b/src/errors.rs index cec50cce..48274fbb 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -14,9 +14,14 @@ error_chain! { display("Iterrupted by signal {}", sig) } - TooPopular { - description("Too many history entries") - display("Too many history entries") + TooManyUtxos(limit: usize) { + description("Too many unspent transaction outputs. Contact support to raise limits.") + display("Too many unspent transaction outputs (>{}). Contact support to raise limits.", limit) + } + + TooManyTxs(limit: usize) { + description("Too many history transactions. Contact support to raise limits.") + display("Too many history transactions (>{}). Contact support to raise limits.", limit) } #[cfg(feature = "electrum-discovery")] diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index ccd50fe1..8a94962f 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -9,7 +9,7 @@ use elements::{encode::serialize, AssetId}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::iter::FromIterator; use std::ops::Bound::{Excluded, Unbounded}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid}; @@ -288,6 +288,24 @@ impl Mempool { self.txstore.keys().collect() } + // Get n txids after the given txid in the mempool + pub fn txids_page(&self, n: usize, start: Option) -> Vec<&Txid> { + let _timer = self + .latency + .with_label_values(&["txids_page"]) + .start_timer(); + let start_bound = match start { + Some(txid) => Excluded(txid), + None => Unbounded, + }; + + self.txstore + .range((start_bound, Unbounded)) + .take(n) + .map(|(k, _v)| k) + .collect() + } + // Get all txs in the mempool pub fn txs(&self) -> Vec { let _timer = self.latency.with_label_values(&["txs"]).start_timer(); @@ -296,7 +314,7 @@ impl Mempool { // Get n txs after the given txid in the mempool pub fn txs_page(&self, n: usize, start: Option) -> Vec { - let _timer = self.latency.with_label_values(&["txs"]).start_timer(); + let _timer = self.latency.with_label_values(&["txs_page"]).start_timer(); let mut page = Vec::with_capacity(n); let start_bound = match start { Some(txid) => Excluded(txid), @@ -325,46 +343,67 @@ impl Mempool { &self.backlog_stats.0 } - pub fn update(&mut self, daemon: &Daemon) -> Result<()> { - let _timer = self.latency.with_label_values(&["update"]).start_timer(); - let new_txids = daemon + pub fn unique_txids(&self) -> HashSet { + return HashSet::from_iter(self.txstore.keys().cloned()); + } + + pub fn update(mempool: &RwLock, daemon: &Daemon) -> Result<()> { + // 1. Start the metrics timer and get the current mempool txids + // [LOCK] Takes read lock for whole scope. + let (_timer, old_txids) = { + let mempool = mempool.read().unwrap(); + ( + mempool.latency.with_label_values(&["update"]).start_timer(), + mempool.unique_txids(), + ) + }; + + // 2. Get all the mempool txids from the RPC. + // [LOCK] No lock taken. Wait for RPC request. Get lists of remove/add txes. + let all_txids = daemon .getmempooltxids() .chain_err(|| "failed to update mempool from daemon")?; - let old_txids = HashSet::from_iter(self.txstore.keys().cloned()); - let to_remove: HashSet<&Txid> = old_txids.difference(&new_txids).collect(); - - // Download and add new transactions from bitcoind's mempool - let txids: Vec<&Txid> = new_txids.difference(&old_txids).collect(); - let to_add = match daemon.gettransactions(&txids) { - Ok(txs) => txs, - Err(err) => { - warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF - return Ok(()); // keep the mempool until next update() + let txids_to_remove: HashSet<&Txid> = old_txids.difference(&all_txids).collect(); + let txids_to_add: Vec<&Txid> = all_txids.difference(&old_txids).collect(); + + // 3. Remove missing transactions. Even if we are unable to download new transactions from + // the daemon, we still want to remove the transactions that are no longer in the mempool. + // [LOCK] Write lock is released at the end of the call to remove(). + mempool.write().unwrap().remove(txids_to_remove); + + // 4. Download the new transactions from the daemon's mempool + // [LOCK] No lock taken, waiting for RPC response. + let txs_to_add = daemon + .gettransactions(&txids_to_add) + .chain_err(|| format!("failed to get {} transactions", txids_to_add.len()))?; + + // 4. Update local mempool to match daemon's state + // [LOCK] Takes Write lock for whole scope. + { + let mut mempool = mempool.write().unwrap(); + // Add new transactions + if txs_to_add.len() > mempool.add(txs_to_add) { + debug!("Mempool update added less transactions than expected"); } - }; - // Add new transactions - if to_add.len() > self.add(to_add) { - debug!("Mempool update added less transactions than expected"); - } - // Remove missing transactions - self.remove(to_remove); - self.count - .with_label_values(&["txs"]) - .set(self.txstore.len() as f64); + mempool + .count + .with_label_values(&["txs"]) + .set(mempool.txstore.len() as f64); + + // Update cached backlog stats (if expired) + if mempool.backlog_stats.1.elapsed() + > Duration::from_secs(mempool.config.mempool_backlog_stats_ttl) + { + let _timer = mempool + .latency + .with_label_values(&["update_backlog_stats"]) + .start_timer(); + mempool.backlog_stats = (BacklogStats::new(&mempool.feeinfo), Instant::now()); + } - // Update cached backlog stats (if expired) - if self.backlog_stats.1.elapsed() - > Duration::from_secs(self.config.mempool_backlog_stats_ttl) - { - let _timer = self - .latency - .with_label_values(&["update_backlog_stats"]) - .start_timer(); - self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now()); + Ok(()) } - - Ok(()) } pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> { @@ -400,8 +439,12 @@ impl Mempool { // Phase 1: add to txstore for tx in txs { let txid = tx.txid(); - txids.push(txid); - self.txstore.insert(txid, tx); + // Only push if it doesn't already exist. + // This is important now that update doesn't lock during + // the entire function body. + if self.txstore.insert(txid, tx).is_none() { + txids.push(txid); + } } // Phase 2: index history and spend edges (some txos can be missing) diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 3003a256..3e314fd1 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -6,7 +6,7 @@ use std::time::{Duration, Instant}; use crate::chain::{Network, OutPoint, Transaction, TxOut, Txid}; use crate::config::Config; -use crate::daemon::Daemon; +use crate::daemon::{Daemon, MempoolAcceptResult}; use crate::errors::*; use crate::new_index::{ChainQuery, Mempool, ScriptStats, SpendingInput, Utxo}; use crate::util::{is_spendable, BlockId, Bytes, TransactionStatus}; @@ -87,6 +87,14 @@ impl Query { Ok(txid) } + pub fn test_mempool_accept( + &self, + txhex: Vec, + maxfeerate: Option, + ) -> Result> { + self.daemon.test_mempool_accept(txhex, maxfeerate) + } + pub fn utxo(&self, scripthash: &[u8]) -> Result> { let mut utxos = self.chain.utxo( scripthash, diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 77564200..00ee3e89 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -329,7 +329,12 @@ impl Indexer { .added_blockhashes .write() .unwrap() - .extend(blocks.iter().map(|b| b.entry.hash())); + .extend(blocks.iter().map(|b| { + if b.entry.height() % 10_000 == 0 { + info!("Tx indexing is up to height={}", b.entry.height()); + } + b.entry.hash() + })); } fn index(&self, blocks: &[BlockEntry]) { @@ -342,6 +347,9 @@ impl Indexer { let _timer = self.start_timer("index_process"); let added_blockhashes = self.store.added_blockhashes.read().unwrap(); for b in blocks { + if b.entry.height() % 10_000 == 0 { + info!("History indexing is up to height={}", b.entry.height()); + } let blockhash = b.entry.hash(); // TODO: replace by lookup into txstore_db? if !added_blockhashes.contains(blockhash) { @@ -504,6 +512,108 @@ impl ChainQuery { ) } + pub fn summary( + &self, + scripthash: &[u8], + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + // scripthash lookup + self._summary(b'H', scripthash, last_seen_txid, limit) + } + + fn _summary( + &self, + code: u8, + hash: &[u8], + last_seen_txid: Option<&Txid>, + limit: usize, + ) -> Vec { + let _timer_scan = self.start_timer("address_summary"); + let rows = self + .history_iter_scan_reverse(code, hash) + .map(TxHistoryRow::from_row) + .map(|row| (row.get_txid(), row.key.txinfo)) + .skip_while(|(txid, _)| { + // skip until we reach the last_seen_txid + last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) + }) + .skip_while(|(txid, _)| { + // skip the last_seen_txid itself + last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid == txid) + }) + .filter_map(|(txid, info)| { + self.tx_confirming_block(&txid) + .map(|b| (txid, info, b.height, b.time)) + }); + + // collate utxo funding/spending events by transaction + let mut map: HashMap = HashMap::new(); + for (txid, info, height, time) in rows { + if !map.contains_key(&txid) && map.len() == limit { + break; + } + match info { + #[cfg(not(feature = "liquid"))] + TxHistoryInfo::Funding(info) => { + map.entry(txid) + .and_modify(|tx| { + tx.value = tx.value.saturating_add(info.value.try_into().unwrap_or(0)) + }) + .or_insert(TxHistorySummary { + txid, + value: info.value.try_into().unwrap_or(0), + height, + time, + }); + } + #[cfg(not(feature = "liquid"))] + TxHistoryInfo::Spending(info) => { + map.entry(txid) + .and_modify(|tx| { + tx.value = tx.value.saturating_sub(info.value.try_into().unwrap_or(0)) + }) + .or_insert(TxHistorySummary { + txid, + value: 0_i64.saturating_sub(info.value.try_into().unwrap_or(0)), + height, + time, + }); + } + #[cfg(feature = "liquid")] + TxHistoryInfo::Funding(_info) => { + map.entry(txid).or_insert(TxHistorySummary { + txid, + value: 0, + height, + time, + }); + } + #[cfg(feature = "liquid")] + TxHistoryInfo::Spending(_info) => { + map.entry(txid).or_insert(TxHistorySummary { + txid, + value: 0, + height, + time, + }); + } + #[cfg(feature = "liquid")] + _ => {} + } + } + + let mut tx_summaries = map.into_values().collect::>(); + tx_summaries.sort_by(|a, b| { + if a.height == b.height { + a.value.cmp(&b.value) + } else { + b.height.cmp(&a.height) + } + }); + tx_summaries + } + pub fn history( &self, scripthash: &[u8], @@ -671,7 +781,7 @@ impl ChainQuery { // abort if the utxo set size excedees the limit at any point in time if utxos.len() > limit { - bail!(ErrorKind::TooPopular) + bail!(ErrorKind::TooManyUtxos(limit)) } } @@ -1087,11 +1197,23 @@ fn lookup_txos( outpoints: &BTreeSet, allow_missing: bool, ) -> HashMap { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(16) // we need to saturate SSD IOPS - .thread_name(|i| format!("lookup-txo-{}", i)) - .build() - .unwrap(); + let mut loop_count = 10; + let pool = loop { + match rayon::ThreadPoolBuilder::new() + .num_threads(16) // we need to saturate SSD IOPS + .thread_name(|i| format!("lookup-txo-{}", i)) + .build() + { + Ok(pool) => break pool, + Err(e) => { + if loop_count == 0 { + panic!("schema::lookup_txos failed to create a ThreadPool: {}", e); + } + std::thread::sleep(std::time::Duration::from_millis(50)); + loop_count -= 1; + } + } + }; pool.install(|| { outpoints .par_iter() @@ -1553,6 +1675,14 @@ impl TxHistoryInfo { } } +#[derive(Serialize, Deserialize)] +pub struct TxHistorySummary { + txid: Txid, + height: usize, + value: i64, + time: u32, +} + #[derive(Serialize, Deserialize)] struct TxEdgeKey { code: u8, diff --git a/src/rest.rs b/src/rest.rs index 56567393..2e061b11 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -1,6 +1,7 @@ use crate::chain::{address, BlockHash, Network, OutPoint, Script, Transaction, TxIn, TxOut, Txid}; use crate::config::{Config, VERSION_STRING}; use crate::errors; +use crate::metrics::Metrics; use crate::new_index::{compute_script_hash, Query, SpendingInput, Utxo}; use crate::util::{ create_socket, electrum_merkle, extract_tx_prevouts, full_hash, get_innerscripts, get_tx_fee, @@ -17,10 +18,11 @@ use bitcoin::hashes::Error as HashError; use hex::{self, FromHexError}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Response, Server, StatusCode}; +use prometheus::{HistogramOpts, HistogramVec}; use tokio::sync::oneshot; use hyperlocal::UnixServerExt; -use std::fs; +use std::{cmp, fs}; #[cfg(feature = "liquid")] use { crate::elements::{peg::PegoutValue, AssetSorting, IssuanceValue}, @@ -51,6 +53,9 @@ const TTL_SHORT: u32 = 10; // ttl for volatie resources const TTL_MEMPOOL_RECENT: u32 = 5; // ttl for GET /mempool/recent const CONF_FINAL: usize = 10; // reorgs deeper than this are considered unlikely +// internal api prefix +const INTERNAL_PREFIX: &str = "internal"; + #[derive(Serialize, Deserialize)] struct BlockValue { id: String, @@ -549,7 +554,12 @@ fn prepare_txs( } #[tokio::main] -async fn run_server(config: Arc, query: Arc, rx: oneshot::Receiver<()>) { +async fn run_server( + config: Arc, + query: Arc, + rx: oneshot::Receiver<()>, + metric: HistogramVec, +) { let addr = &config.http_addr; let socket_file = &config.http_socket_file; @@ -559,31 +569,36 @@ async fn run_server(config: Arc, query: Arc, rx: oneshot::Receive let make_service_fn_inn = || { let query = Arc::clone(&query); let config = Arc::clone(&config); + let metric = metric.clone(); async move { Ok::<_, hyper::Error>(service_fn(move |req| { let query = Arc::clone(&query); let config = Arc::clone(&config); + let timer = metric.with_label_values(&["all_methods"]).start_timer(); async move { let method = req.method().clone(); let uri = req.uri().clone(); let body = hyper::body::to_bytes(req.into_body()).await?; - let mut resp = handle_request(method, uri, body, &query, &config) - .unwrap_or_else(|err| { - warn!("{:?}", err); - Response::builder() - .status(err.0) - .header("Content-Type", "text/plain") - .header("X-Powered-By", &**VERSION_STRING) - .body(Body::from(err.1)) - .unwrap() - }); + let mut resp = tokio::task::block_in_place(|| { + handle_request(method, uri, body, &query, &config) + }) + .unwrap_or_else(|err| { + warn!("{:?}", err); + Response::builder() + .status(err.0) + .header("Content-Type", "text/plain") + .header("X-Powered-By", &**VERSION_STRING) + .body(Body::from(err.1)) + .unwrap() + }); if let Some(ref origins) = config.cors { resp.headers_mut() .insert("Access-Control-Allow-Origin", origins.parse().unwrap()); } + timer.observe_duration(); Ok::<_, hyper::Error>(resp) } })) @@ -630,13 +645,17 @@ async fn run_server(config: Arc, query: Arc, rx: oneshot::Receive } } -pub fn start(config: Arc, query: Arc) -> Handle { +pub fn start(config: Arc, query: Arc, metrics: &Metrics) -> Handle { let (tx, rx) = oneshot::channel::<()>(); + let response_timer = metrics.histogram_vec( + HistogramOpts::new("electrs_rest_api", "Electrs REST API response timings"), + &["method"], + ); Handle { tx, thread: crate::util::spawn_thread("rest-server", move || { - run_server(config, query, rx); + run_server(config, query, rx, response_timer); }), } } @@ -726,17 +745,19 @@ fn handle_request( .ok_or_else(|| HttpError::not_found("Block not found".to_string()))?; json_response(txids, TTL_LONG) } - (&Method::GET, Some(&"block"), Some(hash), Some(&"txs"), None, None) => { + (&Method::GET, Some(&INTERNAL_PREFIX), Some(&"block"), Some(hash), Some(&"txs"), None) => { let hash = BlockHash::from_hex(hash)?; + let block_id = query.chain().blockid_by_hash(&hash); let txs = query .chain() .get_block_txs(&hash) .ok_or_else(|| HttpError::not_found("Block not found".to_string()))? .into_iter() - .map(|tx| (tx, None)) + .map(|tx| (tx, block_id.clone())) .collect(); - json_response(prepare_txs(txs, query, config), TTL_SHORT) + let ttl = ttl_by_depth(block_id.map(|b| b.height), query); + json_response(prepare_txs(txs, query, config), ttl) } (&Method::GET, Some(&"block"), Some(hash), Some(&"header"), None, None) => { let hash = BlockHash::from_hex(hash)?; @@ -936,6 +957,38 @@ fn handle_request( json_response(prepare_txs(txs, query, config), TTL_SHORT) } + ( + &Method::GET, + Some(script_type @ &"address"), + Some(script_str), + Some(&"txs"), + Some(&"summary"), + last_seen_txid, + ) + | ( + &Method::GET, + Some(script_type @ &"scripthash"), + Some(script_str), + Some(&"txs"), + Some(&"summary"), + last_seen_txid, + ) => { + let script_hash = to_scripthash(script_type, script_str, config.network_type)?; + let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok()); + let max_txs = cmp::min( + config.rest_default_max_address_summary_txs, + query_params + .get("max_txs") + .and_then(|s| s.parse::().ok()) + .unwrap_or(config.rest_default_max_address_summary_txs), + ); + + let summary = query + .chain() + .summary(&script_hash[..], last_seen_txid.as_ref(), max_txs); + + json_response(summary, TTL_SHORT) + } ( &Method::GET, Some(script_type @ &"address"), @@ -1020,6 +1073,29 @@ fn handle_request( json_response(tx.remove(0), ttl) } } + (&Method::POST, Some(&INTERNAL_PREFIX), Some(&"txs"), None, None, None) => { + let txid_strings: Vec = + serde_json::from_slice(&body).map_err(|err| HttpError::from(err.to_string()))?; + + match txid_strings + .into_iter() + .map(|txid| Txid::from_hex(&txid)) + .collect::, _>>() + { + Ok(txids) => { + let txs: Vec<(Transaction, Option)> = txids + .iter() + .filter_map(|txid| { + query + .lookup_txn(txid) + .map(|tx| (tx, query.chain().tx_confirming_block(txid))) + }) + .collect(); + json_response(prepare_txs(txs, query, config), 0) + } + Err(err) => http_message(StatusCode::BAD_REQUEST, err.to_string(), 0), + } + } (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"hex"), None, None) | (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"raw"), None, None) => { let hash = Txid::from_hex(hash)?; @@ -1126,6 +1202,48 @@ fn handle_request( .map_err(|err| HttpError::from(err.description().to_string()))?; http_message(StatusCode::OK, txid.to_hex(), 0) } + (&Method::POST, Some(&"txs"), Some(&"test"), None, None, None) => { + let txhexes: Vec = + serde_json::from_str(String::from_utf8(body.to_vec())?.as_str())?; + + if txhexes.len() > 25 { + Result::Err(HttpError::from( + "Exceeded maximum of 25 transactions".to_string(), + ))? + } + + let maxfeerate = query_params + .get("maxfeerate") + .map(|s| { + s.parse::() + .map_err(|_| HttpError::from("Invalid maxfeerate".to_string())) + }) + .transpose()?; + + // pre-checks + txhexes.iter().enumerate().try_for_each(|(index, txhex)| { + // each transaction must be of reasonable size (more than 60 bytes, within 400kWU standardness limit) + if !(120..800_000).contains(&txhex.len()) { + Result::Err(HttpError::from(format!( + "Invalid transaction size for item {}", + index + ))) + } else { + // must be a valid hex string + Vec::::from_hex(txhex) + .map_err(|_| { + HttpError::from(format!("Invalid transaction hex for item {}", index)) + }) + .map(|_| ()) + } + })?; + + let result = query + .test_mempool_accept(txhexes, maxfeerate) + .map_err(|err| HttpError::from(err.description().to_string()))?; + + json_response(result, TTL_SHORT) + } (&Method::GET, Some(&"txs"), Some(&"outspends"), None, None, None) => { let txid_strings: Vec<&str> = query_params .get("txids") @@ -1158,6 +1276,69 @@ fn handle_request( json_response(spends, TTL_SHORT) } + ( + &Method::POST, + Some(&INTERNAL_PREFIX), + Some(&"txs"), + Some(&"outspends"), + Some(&"by-txid"), + None, + ) => { + let txid_strings: Vec = + serde_json::from_slice(&body).map_err(|err| HttpError::from(err.to_string()))?; + + let spends: Vec> = txid_strings + .into_iter() + .map(|txid_str| { + Txid::from_hex(&txid_str) + .ok() + .and_then(|txid| query.lookup_txn(&txid)) + .map_or_else(Vec::new, |tx| { + query + .lookup_tx_spends(tx) + .into_iter() + .map(|spend| { + spend.map_or_else(SpendingValue::default, SpendingValue::from) + }) + .collect() + }) + }) + .collect(); + + json_response(spends, TTL_SHORT) + } + ( + &Method::POST, + Some(&INTERNAL_PREFIX), + Some(&"txs"), + Some(&"outspends"), + Some(&"by-outpoint"), + None, + ) => { + let outpoint_strings: Vec = + serde_json::from_slice(&body).map_err(|err| HttpError::from(err.to_string()))?; + + let spends: Vec = outpoint_strings + .into_iter() + .map(|outpoint_str| { + let mut parts = outpoint_str.split(':'); + let hash_part = parts.next(); + let index_part = parts.next(); + + if let (Some(hash), Some(index)) = (hash_part, index_part) { + if let (Ok(txid), Ok(vout)) = (Txid::from_hex(hash), index.parse::()) { + let outpoint = OutPoint { txid, vout }; + return query + .lookup_spend(&outpoint) + .map_or_else(SpendingValue::default, SpendingValue::from); + } + } + SpendingValue::default() + }) + .collect(); + + json_response(spends, TTL_SHORT) + } (&Method::GET, Some(&"mempool"), None, None, None, None) => { json_response(query.mempool().backlog_stats(), TTL_SHORT) @@ -1165,7 +1346,25 @@ fn handle_request( (&Method::GET, Some(&"mempool"), Some(&"txids"), None, None, None) => { json_response(query.mempool().txids(), TTL_SHORT) } - (&Method::GET, Some(&"mempool"), Some(&"txs"), Some(&"all"), None, None) => { + (&Method::GET, Some(&"mempool"), Some(&"txids"), Some(&"page"), last_seen_txid, None) => { + let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok()); + let max_txs = query_params + .get("max_txs") + .and_then(|s| s.parse::().ok()) + .unwrap_or(config.rest_max_mempool_txid_page_size); + json_response( + query.mempool().txids_page(max_txs, last_seen_txid), + TTL_SHORT, + ) + } + ( + &Method::GET, + Some(&INTERNAL_PREFIX), + Some(&"mempool"), + Some(&"txs"), + Some(&"all"), + None, + ) => { let txs = query .mempool() .txs() @@ -1175,7 +1374,7 @@ fn handle_request( json_response(prepare_txs(txs, query, config), TTL_SHORT) } - (&Method::POST, Some(&"mempool"), Some(&"txs"), None, None, None) => { + (&Method::POST, Some(&INTERNAL_PREFIX), Some(&"mempool"), Some(&"txs"), None, None) => { let txid_strings: Vec = serde_json::from_slice(&body).map_err(|err| HttpError::from(err.to_string()))?; @@ -1198,11 +1397,22 @@ fn handle_request( Err(err) => http_message(StatusCode::BAD_REQUEST, err.to_string(), 0), } } - (&Method::GET, Some(&"mempool"), Some(&"txs"), last_seen_txid, None, None) => { + ( + &Method::GET, + Some(&INTERNAL_PREFIX), + Some(&"mempool"), + Some(&"txs"), + last_seen_txid, + None, + ) => { let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok()); + let max_txs = query_params + .get("max_txs") + .and_then(|s| s.parse::().ok()) + .unwrap_or(config.rest_max_mempool_page_size); let txs = query .mempool() - .txs_page(10_000, last_seen_txid) + .txs_page(max_txs, last_seen_txid) .into_iter() .map(|tx| (tx, None)) .collect(); @@ -1458,7 +1668,7 @@ fn address_to_scripthash(addr: &str, network: Network) -> Result