From 938abd1d6903c4a78e18105f3c8f8991eaafe2bd Mon Sep 17 00:00:00 2001 From: Reisen Date: Sat, 2 Sep 2023 11:00:17 +0000 Subject: [PATCH] feat(hermes): add tracing --- contract_manager/scripts/bench.ts | 75 ++++++++++++++ hermes/Cargo.lock | 76 ++++++++++++++- hermes/Cargo.toml | 71 +++++++------- hermes/src/api.rs | 7 +- hermes/src/api/ws.rs | 26 ++--- hermes/src/main.rs | 23 ++++- hermes/src/network/p2p.go | 12 ++- hermes/src/network/p2p.rs | 111 ++++++++++++++++++--- hermes/src/network/pythnet.rs | 49 +++++----- hermes/src/store.rs | 61 ++++++++---- hermes/src/store/proof/wormhole_merkle.rs | 28 +++--- hermes/src/store/types.rs | 4 +- hermes/src/trace.rs | 113 ++++++++++++++++++++++ 13 files changed, 524 insertions(+), 132 deletions(-) create mode 100644 contract_manager/scripts/bench.ts create mode 100644 hermes/src/trace.rs diff --git a/contract_manager/scripts/bench.ts b/contract_manager/scripts/bench.ts new file mode 100644 index 000000000..075601552 --- /dev/null +++ b/contract_manager/scripts/bench.ts @@ -0,0 +1,75 @@ +import yargs from "yargs"; +import { hideBin } from "yargs/helpers"; +import { DefaultStore } from "../src"; +import { PriceFeed, PriceServiceConnection } from "@pythnetwork/price-service-client"; +import { ethers } from "ethers"; +import { PrivateKey, toPrivateKey } from "../lib"; +import {sleep} from "pyth_relay/lib/helpers"; + +const parser = yargs(hideBin(process.argv)) + .scriptName("update_pricefeed.ts") + .usage( + "Usage: $0" + ) + .options({}); + +async function main() { + const mainnetFeedIds = [ + "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", + "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace", + "63f341689d98a12ef60a5cff1d7f85c70a9e17bf1575f0e7c0b2512d48b1c8b3", + "7677dd124dee46cfcd46ff03cf405fb0ed94b1f49efbea3444aadbda939a7ad3", + "14eb6f846b84f37c841ce7a52a38706e54966df84b3a09cc40499b164af05672", + "fa2d39b681f3cef5fa87432a8dbd05113917fffb1b6829a37395c88396522a4e", + "7a5bc1d2b56ad029048cd63964b3ad2776eadf812edc1a43a31406cb54bff592", + "abe4f2b264560a397f38eec024369356e5c1ea4f7aab94729369f144b3d97779", + "5fcf71143bb70d41af4fa9aa1287e2efd3c5911cee59f909f915c9f61baacb1e", + "b82449fd728133488d2d41131cffe763f9c1693b73c544d9ef6aaa371060dd25", + ]; + + let feedIds: string[] = []; + feedIds = [mainnetFeedIds[0]]; + const defaultEndpoint = "https://xc-mainnet.pyth.network"; + const hermesEndpoint = "http://127.0.0.1:7575"; + + const priceService = new PriceServiceConnection(defaultEndpoint, {verbose: true}); + const hermes = new PriceServiceConnection(hermesEndpoint, {verbose: true}); + + const origUpdates: PriceFeed[] = []; + const hermesUpdates: PriceFeed[] = []; + + let priceServiceSum = 0; + let priceServiceCount = 0; + + priceService.subscribePriceFeedUpdates(feedIds, (feed) => { + const publishTime = feed.getPriceUnchecked().publishTime; + // const now = (new Date()).getTime(); + // const diff = now / 1000 - publishTime; + const receivedTime = feed.getMetadata()?.priceServiceReceiveTime!; + const diff = receivedTime - publishTime; + + priceServiceSum += diff; + priceServiceCount += 1; + + console.log(`p: ${priceServiceSum/priceServiceCount} ${diff} ${feed.getPriceUnchecked().price} ${JSON.stringify(feed.getMetadata())}`) + origUpdates.push(feed); + }) + + let hermesSum = 0; + let hermesCount = 0; + + hermes.subscribePriceFeedUpdates(feedIds, (feed) => { + const publishTime = feed.getPriceUnchecked().publishTime; + // const now = (new Date()).getTime(); + // const diff = now / 1000 - publishTime; + const receivedTime = feed.getMetadata()?.priceServiceReceiveTime!; + const diff = receivedTime - publishTime; + + hermesSum += diff; + hermesCount += 1; + console.log(`h: ${hermesSum/hermesCount} ${diff} ${feed.getPriceUnchecked().price} ${JSON.stringify(feed.getMetadata())}`) + hermesUpdates.push(feed); + }) +} + +main(); diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index a31b1e080..3a4e88e8e 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -835,9 +835,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.26" +version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" +checksum = "95ed24df0632f708f5f6d8082675bef2596f7084dee3dd55f632290bf35bfe0f" dependencies = [ "android-tzdata", "iana-time-zone", @@ -846,7 +846,7 @@ dependencies = [ "serde", "time 0.1.45", "wasm-bindgen", - "winapi", + "windows-targets 0.48.1", ] [[package]] @@ -1772,6 +1772,7 @@ dependencies = [ "base64 0.21.2", "borsh 0.10.3", "byteorder", + "chrono", "dashmap", "derive_more", "env_logger 0.10.0", @@ -1801,6 +1802,8 @@ dependencies = [ "strum", "tokio", "tower-http", + "tracing", + "tracing-subscriber", "utoipa", "utoipa-swagger-ui", "wormhole-sdk", @@ -3226,6 +3229,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.2.1" @@ -3494,6 +3507,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "owning_ref" version = "0.4.1" @@ -4798,6 +4817,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "shell-words" version = "1.1.0" @@ -5737,6 +5765,16 @@ dependencies = [ "syn 2.0.26", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.1.45" @@ -6027,6 +6065,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -6293,6 +6357,12 @@ dependencies = [ "zip", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "value-bag" version = "1.4.1" diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index 2a1829268..f95a13f99 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -4,40 +4,43 @@ version = "0.1.13" edition = "2021" [dependencies] -anyhow = { version = "1.0.69" } -axum = { version = "0.6.20", features = ["json", "ws", "macros"] } -axum-macros = { version = "0.3.8" } -base64 = { version = "0.21.0" } -borsh = { version = "0.10.3" } -byteorder = { version = "1.4.3" } -dashmap = { version = "5.4.0" } -derive_more = { version = "0.99.17" } -env_logger = { version = "0.10.0" } -futures = { version = "0.3.28" } -hex = { version = "0.4.3" } -humantime = { version = "2.1.0" } -lazy_static = { version = "1.4.0" } -libc = { version = "0.2.140" } -log = { version = "0.4.17" } -mock_instant = { version = "0.3.1", features = ["sync"] } -prometheus-client = { version = "0.21.1" } -pyth-sdk = { version = "0.8.0" } -pythnet-sdk = { path = "../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] } -rand = { version = "0.8.5" } -reqwest = { version = "0.11.14", features = ["blocking", "json"] } -secp256k1 = { version = "0.27.0", features = ["rand", "recovery", "serde"] } -serde = { version = "1.0.152", features = ["derive"] } -serde_json = { version = "1.0.93" } -serde_qs = { version = "0.12.0", features = ["axum"] } -serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" } -sha3 = { version = "0.10.4" } -structopt = { version = "0.3.26" } -strum = { version = "0.24.1", features = ["derive"] } -tokio = { version = "1.26.0", features = ["full"] } -tower-http = { version = "0.4.0", features = ["cors"] } -utoipa = { version = "3.4.0", features = ["axum_extras"] } -utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] } -wormhole-sdk = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" } +anyhow = { version = "1.0.69" } +axum = { version = "0.6.20", features = ["json", "ws", "macros"] } +axum-macros = { version = "0.3.8" } +base64 = { version = "0.21.0" } +borsh = { version = "0.10.3" } +byteorder = { version = "1.4.3" } +chrono = { version = "0.4.28" } +dashmap = { version = "5.4.0" } +derive_more = { version = "0.99.17" } +env_logger = { version = "0.10.0" } +futures = { version = "0.3.28" } +hex = { version = "0.4.3" } +humantime = { version = "2.1.0" } +lazy_static = { version = "1.4.0" } +libc = { version = "0.2.140" } +log = { version = "0.4.17" } +mock_instant = { version = "0.3.1", features = ["sync"] } +prometheus-client = { version = "0.21.1" } +pyth-sdk = { version = "0.8.0" } +pythnet-sdk = { path = "../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] } +rand = { version = "0.8.5" } +reqwest = { version = "0.11.14", features = ["blocking", "json"] } +secp256k1 = { version = "0.27.0", features = ["rand", "recovery", "serde"] } +serde = { version = "1.0.152", features = ["derive"] } +serde_json = { version = "1.0.93" } +serde_qs = { version = "0.12.0", features = ["axum"] } +serde_wormhole = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" } +sha3 = { version = "0.10.4" } +structopt = { version = "0.3.26" } +strum = { version = "0.24.1", features = ["derive"] } +tokio = { version = "1.26.0", features = ["full"] } +tower-http = { version = "0.4.0", features = ["cors"] } +tracing = { version = "0.1.37", features = ["log"] } +tracing-subscriber = { version = "0.3.17" } +utoipa = { version = "3.4.0", features = ["axum_extras"] } +utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] } +wormhole-sdk = { git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.17.1" } # Setup LibP2P. Unfortunately the dependencies required by libp2p are shared # with the dependencies required by many Solana components. This means that we diff --git a/hermes/src/api.rs b/hermes/src/api.rs index 38c0d22d0..297b49665 100644 --- a/hermes/src/api.rs +++ b/hermes/src/api.rs @@ -47,8 +47,9 @@ impl State { /// /// Currently this is based on Axum due to the simplicity and strong ecosystem support for the /// packages they are based on (tokio & hyper). +#[tracing::instrument(skip(opts, store, update_rx))] pub async fn run(opts: RunOptions, store: Arc, mut update_rx: Receiver<()>) -> Result<()> { - log::info!("Starting RPC server on {}", opts.api_addr); + tracing::info!(endpoint = %opts.api_addr, "Starting RPC Server."); #[derive(OpenApi)] #[openapi( @@ -109,7 +110,7 @@ pub async fn run(opts: RunOptions, store: Arc, mut update_rx: Receiver<() // Causes a full application shutdown if an error occurs, we can't recover from this so // we just quit. if update_rx.recv().await.is_none() { - log::error!("Failed to receive update from store."); + tracing::error!("Failed to receive update from store."); crate::SHOULD_EXIT.store(true, Ordering::Release); break; } @@ -117,7 +118,7 @@ pub async fn run(opts: RunOptions, store: Arc, mut update_rx: Receiver<() notify_updates(state.ws.clone()).await; } - log::info!("Shutting down websocket updates...") + tracing::info!("Shutting down websocket updates...") }); // Binds the axum's server to the configured address and port. This is a blocking call and will diff --git a/hermes/src/api/ws.rs b/hermes/src/api/ws.rs index 8fde69eff..84a7d1892 100644 --- a/hermes/src/api/ws.rs +++ b/hermes/src/api/ws.rs @@ -61,10 +61,11 @@ pub async fn ws_route_handler( ws.on_upgrade(|socket| websocket_handler(socket, state)) } +#[tracing::instrument(skip(stream, state))] async fn websocket_handler(stream: WebSocket, state: super::State) { let ws_state = state.ws.clone(); let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst); - log::debug!("New websocket connection, assigning id: {}", id); + tracing::debug!(id, "New Websocket Connection"); let (notify_sender, notify_receiver) = mpsc::channel::<()>(NOTIFICATIONS_CHAN_LEN); let (sender, receiver) = stream.split(); @@ -112,10 +113,11 @@ impl Subscriber { } } + #[tracing::instrument(skip(self))] pub async fn run(&mut self) { while !self.closed { if let Err(e) = self.handle_next().await { - log::debug!("Subscriber {}: Error handling next message: {}", self.id, e); + tracing::debug!(subscriber = self.id, error = ?e, "Error Handling Subscriber Message."); break; } } @@ -179,33 +181,32 @@ impl Subscriber { Ok(()) } + #[tracing::instrument(skip(self, message))] async fn handle_client_message(&mut self, message: Message) -> Result<()> { + // Track Pong's to make sure the client is still connected. + if let Message::Pong(_) = message { + self.responded_to_ping = true; + } + let maybe_client_message = match message { Message::Close(_) => { // Closing the connection. We don't remove it from the subscribers // list, instead when the Subscriber struct is dropped the channel // to subscribers list will be closed and it will eventually get // removed. - log::trace!("Subscriber {} closed connection", self.id); + tracing::trace!(id = self.id, "Subscriber Closed Connection."); // Send the close message to gracefully shut down the connection // Otherwise the client might get an abnormal Websocket closure // error. self.sender.close().await?; - self.closed = true; return Ok(()); } Message::Text(text) => serde_json::from_str::(&text), Message::Binary(data) => serde_json::from_slice::(&data), - Message::Ping(_) => { - // Axum will send Pong automatically - return Ok(()); - } - Message::Pong(_) => { - self.responded_to_ping = true; - return Ok(()); - } + Message::Ping(_) => return Ok(()), // Axum handles pong automatically + Message::Pong(_) => return Ok(()), // No need to do anything else. }; match maybe_client_message { @@ -222,6 +223,7 @@ impl Subscriber { .await?; return Ok(()); } + Ok(ClientMessage::Subscribe { ids, verbose, diff --git a/hermes/src/main.rs b/hermes/src/main.rs index 9b553d1a5..3a02bf878 100644 --- a/hermes/src/main.rs +++ b/hermes/src/main.rs @@ -9,6 +9,7 @@ use { std::sync::atomic::AtomicBool, structopt::StructOpt, tokio::spawn, + tracing_subscriber::Layer, }; mod api; @@ -17,6 +18,7 @@ mod doc_examples; mod macros; mod network; mod store; +mod trace; // A static exit flag to indicate to running threads that we're shutting down. This is used to // gracefully shutdown the application. @@ -27,14 +29,15 @@ mod store; pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); /// Initialize the Application. This can be invoked either by real main, or by the Geyser plugin. +#[tracing::instrument] async fn init() -> Result<()> { - log::info!("Initializing Hermes..."); + tracing::info!("Initializing Hermes..."); // Parse the command line arguments with StructOpt, will exit automatically on `--help` or // with invalid arguments. match config::Options::from_args() { config::Options::Run(opts) => { - log::info!("Starting hermes service..."); + tracing::info!("Starting hermes service..."); // The update channel is used to send store update notifications to the public API. let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000); @@ -45,8 +48,9 @@ async fn init() -> Result<()> { // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. We // also send off any notifications needed to close off any waiting tasks. spawn(async move { + tracing::info!("Registered shutdown signal handler..."); tokio::signal::ctrl_c().await.unwrap(); - log::info!("Shut down signal received, waiting for tasks..."); + tracing::info!("Shut down signal received, waiting for tasks..."); SHOULD_EXIT.store(true, std::sync::atomic::Ordering::Release); let _ = update_tx.send(()).await; }); @@ -70,9 +74,22 @@ async fn init() -> Result<()> { } #[tokio::main] +#[tracing::instrument] async fn main() -> Result<()> { env_logger::init(); + // Initialize a Tracing Subscriber + tracing::subscriber::set_global_default( + trace::LatencyLayer::new().with_subscriber( + tracing_subscriber::fmt() + .compact() + .with_file(false) + .with_line_number(true) + .with_thread_ids(true) + .finish(), + ), + )?; + // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE // should be set to 1 for this otherwise it will only print the top-level error. if let Err(result) = init().await { diff --git a/hermes/src/network/p2p.go b/hermes/src/network/p2p.go index 80b6496fa..606bf9717 100644 --- a/hermes/src/network/p2p.go +++ b/hermes/src/network/p2p.go @@ -51,7 +51,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" - libp2pquicreuse "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" + // libp2pquicreuse "github.com/libp2p/go-libp2p/p2p/transport/quicreuse" ) //export RegisterObservationCallback @@ -154,7 +154,7 @@ func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, li // iptables -A INPUT -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT // // Which is a standard rule in many firewall configurations (RELATED is the key flag). - libp2p.QUICReuse(libp2pquicreuse.NewConnManager, libp2pquicreuse.DisableReuseport()), + // libp2p.QUICReuse(libp2pquicreuse.NewConnManager, libp2pquicreuse.DisableReuseport()), libp2p.Transport(libp2pquic.NewTransport), libp2p.ConnectionManager(mgr), libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { @@ -189,7 +189,11 @@ func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, li defer h.Close() topic := fmt.Sprintf("%s/%s", networkID, "broadcast") - ps, err := pubsub.NewGossipSub(ctx, h) + ps, err := pubsub.NewGossipSub( + ctx, + h, + pubsub.WithValidateQueueSize(1024), + ) if err != nil { err := fmt.Errorf("Failed to create Pubsub: %w", err) fmt.Println(err) @@ -205,7 +209,7 @@ func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, li defer th.Close() - sub, err := th.Subscribe() + sub, err := th.Subscribe(pubsub.WithBufferSize(1024)) if err != nil { err := fmt.Errorf("Failed to subscribe topic: %w", err) fmt.Println(err) diff --git a/hermes/src/network/p2p.rs b/hermes/src/network/p2p.rs index 018922cf8..639e54689 100644 --- a/hermes/src/network/p2p.rs +++ b/hermes/src/network/p2p.rs @@ -19,6 +19,10 @@ use { }, anyhow::Result, libp2p::Multiaddr, + pythnet_sdk::wire::v1::{ + WormholeMessage, + WormholePayload, + }, std::{ ffi::{ c_char, @@ -34,6 +38,10 @@ use { Mutex, }, }, + wormhole_sdk::{ + Address, + Chain, + }, }; extern "C" { @@ -53,7 +61,32 @@ pub struct ObservationC { pub vaa_len: usize, } -pub type Observation = Vec; +/// A wrapper around a VAA observed from Wormhole. +/// +/// This wrapper tracks a Span that allows tracking the VAA through the system. This Span is +/// expected to be tied to the `proxy` Span and so logging will always be traced back to the +/// associated `proxy` Span regardless of where in the system it is being used. +#[derive(Clone, Debug)] +pub struct Vaa { + pub span: tracing::Span, + pub data: Vec, +} + +// Allow PartialEq on Vaa that ignores the Span. +impl PartialEq for Vaa { + fn eq(&self, other: &Self) -> bool { + self.data == other.data + } +} + +/// Deref to &[u8] so we can ignore the wrapper when passing it to the store. +impl std::ops::Deref for Vaa { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.data + } +} // A Static Channel to pipe the `Observation` from the callback into the local Rust handler for // observation messages. It has to be static for now because there's no way to capture state in @@ -61,8 +94,8 @@ pub type Observation = Vec; // TODO: Move this channel to the module level that spawns the services lazy_static::lazy_static! { pub static ref OBSERVATIONS: ( - Mutex>, - Mutex>, + Mutex>, + Mutex>, ) = { let (tx, rc) = std::sync::mpsc::channel(); (Mutex::new(tx), Mutex::new(rc)) @@ -72,20 +105,60 @@ lazy_static::lazy_static! { /// This function is passed as a callback to the Go libp2p runtime, it passes observations back and /// acts as a proxy forwarding these observations into our main loop. #[no_mangle] +#[tracing::instrument(skip(o))] extern "C" fn proxy(o: ObservationC) { // Create a fixed slice from the pointer and length. let vaa = unsafe { std::slice::from_raw_parts(o.vaa, o.vaa_len) }.to_owned(); - // The chances of the mutex getting poisioned is very low and if it happens - // there is no way for us to recover from it. + + // Deserialize VAA to check Creation Time + let deserialized_vaa = { + serde_wormhole::from_slice::>(&vaa) + .map_err(|e| { + tracing::error!(error = ?e, "Failed to deserialize VAA."); + }) + .ok() + } + .unwrap(); + + if deserialized_vaa.emitter_chain != Chain::Pythnet + || deserialized_vaa.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS) + { + return; // Ignore VAA from other emitters + } + + // Get the slot from the VAA. + let slot = match WormholeMessage::try_from_bytes(deserialized_vaa.payload) + .unwrap() + .payload + { + WormholePayload::Merkle(proof) => proof.slot, + }; + + // Create a Span tied to the Span of the curent proxy. + let span = tracing::span!( + parent: tracing::Span::current(), + tracing::Level::INFO, + "Observation", + slot = slot, + ); + + // Find the observation time for said VAA (which is a unix timestamp) and serialize as a ISO 8601 string. + let observed_time = deserialized_vaa.timestamp; + let observed_time = chrono::NaiveDateTime::from_timestamp_opt(observed_time as i64, 0).unwrap(); + let observed_time = observed_time.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string(); + span.in_scope(|| tracing::info!(vaa_timestamp = observed_time, "Observed VAA")); + + // The chances of the mutex getting poisioned is very low and if it happens there is no way for + // us to recover from it. if OBSERVATIONS .0 .lock() .map_err(|_| ()) - .and_then(|tx| tx.send(vaa).map_err(|_| ())) + .and_then(|tx| tx.send(Vaa { span, data: vaa }).map_err(|_| ())) .is_err() { - log::error!("Failed to lock p2p observation channel or to send observation."); crate::SHOULD_EXIT.store(true, Ordering::Release); + tracing::error!("Failed to lock p2p observation channel or to send observation."); } } @@ -94,6 +167,7 @@ extern "C" fn proxy(o: ObservationC) { /// TODO: handle_message should be capable of handling more than just Observations. But we don't /// have our own P2P network, we pass it in to keep the code structure and read directly from the /// OBSERVATIONS channel in the RPC for now. +#[tracing::instrument(skip(wh_bootstrap_addrs, wh_listen_addrs))] pub fn bootstrap( network_id: String, wh_bootstrap_addrs: Vec, @@ -124,12 +198,16 @@ pub fn bootstrap( wh_listen_addrs_cstr.as_ptr(), ); } + + tracing::info!("Registered observation callback."); + Ok(()) } // Spawn's the P2P layer as a separate thread via Go. +#[tracing::instrument(skip(opts, store))] pub async fn spawn(opts: RunOptions, store: Arc) -> Result<()> { - log::info!("Starting P2P server on {:?}", opts.wh_listen_addrs); + tracing::info!(listeners = ?opts.wh_listen_addrs, "Starting P2P Server"); std::thread::spawn(|| { if bootstrap( @@ -139,7 +217,7 @@ pub async fn spawn(opts: RunOptions, store: Arc) -> Result<()> { ) .is_err() { - log::error!("Failed to bootstrap P2P server."); + tracing::error!("Failed to bootstrap P2P server."); crate::SHOULD_EXIT.store(true, Ordering::Release); } }); @@ -148,14 +226,14 @@ pub async fn spawn(opts: RunOptions, store: Arc) -> Result<()> { // Listen in the background for new VAA's from the p2p layer // and update the state accordingly. while !crate::SHOULD_EXIT.load(Ordering::Acquire) { - let vaa_bytes = tokio::task::spawn_blocking(|| { + let vaa = tokio::task::spawn_blocking(|| { let observation = OBSERVATIONS.1.lock(); let observation = match observation { Ok(observation) => observation, Err(e) => { // This should never happen, but if it does, we want to panic and crash // as it is not recoverable. - log::error!("Failed to lock p2p observation channel: {e}"); + tracing::error!(error = ?e, "Failed to lock p2p observation channel."); crate::SHOULD_EXIT.store(true, Ordering::Release); return Err(anyhow::anyhow!("Failed to lock p2p observation channel")); } @@ -166,7 +244,7 @@ pub async fn spawn(opts: RunOptions, store: Arc) -> Result<()> { Err(e) => { // This should never happen, but if it does, we want to shutdown the // application as it is unrecoverable. - log::error!("Failed to receive p2p observation: {e}"); + tracing::error!(error = ?e, "Failed to receive p2p observation."); crate::SHOULD_EXIT.store(true, Ordering::Release); Err(anyhow::anyhow!("Failed to receive p2p observation.")) } @@ -174,15 +252,18 @@ pub async fn spawn(opts: RunOptions, store: Arc) -> Result<()> { }) .await??; + vaa.span + .in_scope(|| tracing::info!("Received VAA from P2P layer.")); + let store = store.clone(); tokio::spawn(async move { - if let Err(e) = store.store_update(Update::Vaa(vaa_bytes)).await { - log::error!("Failed to process VAA: {:?}", e); + if let Err(e) = store.store_update(Update::Vaa(vaa)).await { + tracing::error!(error = ?e, "Failed to process VAA."); } }); } - log::info!("Shutting down P2P server..."); + tracing::info!("Shutting down P2P server..."); Ok::<(), anyhow::Error>(()) }); diff --git a/hermes/src/network/pythnet.rs b/hermes/src/network/pythnet.rs index aa7fe7326..ef17e2ab5 100644 --- a/hermes/src/network/pythnet.rs +++ b/hermes/src/network/pythnet.rs @@ -156,7 +156,7 @@ pub async fn run(store: Arc, pythnet_ws_endpoint: String) -> Result<()> { let account: Account = match update.value.account.decode() { Some(account) => account, None => { - log::error!("Failed to decode account from update: {:?}", update); + tracing::error!(?update, "Failed to decode account from update."); continue; } }; @@ -179,20 +179,20 @@ pub async fn run(store: Arc, pythnet_ws_endpoint: String) -> Result<()> { .store_update(Update::AccumulatorMessages(accumulator_messages)) .await { - log::error!("Failed to store accumulator messages: {:?}", err); + tracing::error!(error = ?err, "Failed to store accumulator messages."); } }); } else { - log::error!( - "Failed to verify the messages public key: {:?} != {:?}", - candidate, - update.value.pubkey + tracing::error!( + ?candidate, + ?update.value.pubkey, + "Failed to verify message public keys.", ); } } Err(err) => { - log::error!("Failed to parse AccumulatorMessages: {:?}", err); + tracing::error!(error = ?err, "Failed to parse AccumulatorMessages."); } }; } @@ -222,10 +222,10 @@ async fn fetch_existing_guardian_sets( let current = fetch_guardian_set(&client, wormhole_contract_addr, bridge.guardian_set_index).await?; - log::info!( - "Retrieved Current GuardianSet ({}): {}", - bridge.guardian_set_index, - current + tracing::info!( + guardian_set_index = bridge.guardian_set_index, + %current, + "Retrieved Current GuardianSet.", ); store @@ -242,10 +242,10 @@ async fn fetch_existing_guardian_sets( ) .await?; - log::info!( - "Retrieved Previous GuardianSet ({}): {}", - bridge.guardian_set_index - 1, - previous + tracing::info!( + previous_guardian_set_index = bridge.guardian_set_index - 1, + %previous, + "Retrieved Previous GuardianSet.", ); store @@ -256,10 +256,11 @@ async fn fetch_existing_guardian_sets( Ok(()) } +#[tracing::instrument(skip(opts, store))] pub async fn spawn(opts: RunOptions, store: Arc) -> Result<()> { - log::info!( - "Starting Pythnet listener using {}", - opts.pythnet_ws_endpoint + tracing::info!( + endpoint = opts.pythnet_ws_endpoint, + "Started Pythnet Listener." ); fetch_existing_guardian_sets( @@ -277,17 +278,15 @@ pub async fn spawn(opts: RunOptions, store: Arc) -> Result<()> { let current_time = Instant::now(); if let Err(ref e) = run(store.clone(), pythnet_ws_endpoint.clone()).await { - log::error!("Error in Pythnet network listener: {:?}", e); + tracing::error!(error = ?e, "Error in Pythnet network listener."); if current_time.elapsed() < Duration::from_secs(30) { - log::error!( - "Pythnet network listener restarting too quickly. Sleeping for 1s" - ); + tracing::error!("Pythnet listener restarting too quickly. Sleep 1s."); tokio::time::sleep(Duration::from_secs(1)).await; } } } - log::info!("Shutting down Pythnet listener..."); + tracing::info!("Shutting down Pythnet listener..."); }) }; @@ -315,12 +314,12 @@ pub async fn spawn(opts: RunOptions, store: Arc) -> Result<()> { { Ok(_) => {} Err(err) => { - log::error!("Failed to poll for new guardian sets: {:?}", err); + tracing::error!(error = ?err, "Failed to poll for new guardian sets.") } } } - log::info!("Shutting down Pythnet guardian set poller..."); + tracing::info!("Shutting down Pythnet guardian set poller..."); }) }; diff --git a/hermes/src/store.rs b/hermes/src/store.rs index 4f9302a8d..5a837dfa2 100644 --- a/hermes/src/store.rs +++ b/hermes/src/store.rs @@ -73,11 +73,7 @@ use { mpsc::Sender, RwLock, }, - wormhole_sdk::{ - Address, - Chain, - Vaa, - }, + wormhole_sdk::Vaa, }; pub mod proof; @@ -118,20 +114,15 @@ impl Store { } /// Stores the update data in the store + #[tracing::instrument(skip(self, update))] pub async fn store_update(&self, update: Update) -> Result<()> { // The slot that the update is originating from. It should be available // in all the updates. let slot = match update { - Update::Vaa(vaa_bytes) => { + Update::Vaa(update_vaa) => { // FIXME: Move to wormhole.rs let vaa = - serde_wormhole::from_slice::>(&vaa_bytes)?; - - if vaa.emitter_chain != Chain::Pythnet - || vaa.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS) - { - return Ok(()); // Ignore VAA from other emitters - } + serde_wormhole::from_slice::>(&update_vaa)?; if self.observed_vaa_seqs.read().await.contains(&vaa.sequence) { return Ok(()); // Ignore VAA if we have already seen it @@ -142,13 +133,16 @@ impl Store { let vaa = match vaa { Ok(vaa) => vaa, Err(err) => { - log::info!("Ignoring invalid VAA: {:?}", err); + tracing::warn!(error = ?err, "Ignoring invalid VAA."); return Ok(()); } }; { let mut observed_vaa_seqs = self.observed_vaa_seqs.write().await; + if observed_vaa_seqs.contains(&vaa.sequence) { + return Ok(()); // Ignore VAA if we have already seen it + } observed_vaa_seqs.insert(vaa.sequence); while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE { observed_vaa_seqs.pop_first(); @@ -157,16 +151,35 @@ impl Store { match WormholeMessage::try_from_bytes(vaa.payload)?.payload { WormholePayload::Merkle(proof) => { - log::info!("Storing merkle proof for slot {:?}", proof.slot,); - store_wormhole_merkle_verified_message(self, proof.clone(), vaa_bytes) - .await?; + update_vaa.span.in_scope(|| { + tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof."); + }); + + store_wormhole_merkle_verified_message( + self, + proof.clone(), + update_vaa.to_owned(), + ) + .await?; + proof.slot } } } + Update::AccumulatorMessages(accumulator_messages) => { let slot = accumulator_messages.slot; - log::info!("Storing accumulator messages for slot {:?}.", slot,); + if let Some(state) = self.storage.fetch_wormhole_merkle_state(slot).await? { + state.vaa.span.in_scope(|| { + tracing::info!( + slot = slot, + "Storing Accumulator Messages (existing Proof)." + ); + }); + } else { + tracing::info!(slot = slot, "Storing Accumulator Messages."); + } + self.storage .store_accumulator_messages(accumulator_messages) .await?; @@ -185,6 +198,10 @@ impl Store { _ => return Ok(()), }; + wormhole_merkle_state.vaa.span.in_scope(|| { + tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update."); + }); + // Once the accumulator reaches a complete state for a specific slot // we can build the message states self.build_message_states(accumulator_messages, wormhole_merkle_state) @@ -200,6 +217,7 @@ impl Store { Ok(()) } + #[tracing::instrument(skip(self, accumulator_messages, wormhole_merkle_state))] async fn build_message_states( &self, accumulator_messages: AccumulatorMessages, @@ -232,7 +250,7 @@ impl Store { }) .collect::>>()?; - log::info!("Message states len: {:?}", message_states.len()); + tracing::info!(len = message_states.len(), "Storing Message States."); self.storage.store_message_states(message_states).await?; @@ -385,7 +403,10 @@ mod test { payload: serde_wormhole::RawMessage::new(wormhole_message.as_ref()), }; - updates.push(Update::Vaa(serde_wormhole::to_vec(&vaa).unwrap())); + updates.push(Update::Vaa(crate::network::p2p::Vaa { + span: tracing::Span::current(), + data: serde_wormhole::to_vec(&vaa).unwrap(), + })); updates } diff --git a/hermes/src/store/proof/wormhole_merkle.rs b/hermes/src/store/proof/wormhole_merkle.rs index 24f277988..dfa5e768f 100644 --- a/hermes/src/store/proof/wormhole_merkle.rs +++ b/hermes/src/store/proof/wormhole_merkle.rs @@ -1,8 +1,11 @@ use { - crate::store::{ - storage::MessageState, - types::AccumulatorMessages, - Store, + crate::{ + network::p2p::Vaa, + store::{ + storage::MessageState, + types::AccumulatorMessages, + Store, + }, }, anyhow::{ anyhow, @@ -36,26 +39,23 @@ pub const MAX_MESSAGE_IN_SINGLE_UPDATE_DATA: usize = 255; #[derive(Clone, PartialEq, Debug)] pub struct WormholeMerkleState { pub root: WormholeMerkleRoot, - pub vaa: Vec, + pub vaa: Vaa, } #[derive(Clone, PartialEq, Debug)] pub struct WormholeMerkleMessageProof { - pub vaa: Vec, pub proof: MerklePath, + pub vaa: Vaa, } pub async fn store_wormhole_merkle_verified_message( store: &Store, root: WormholeMerkleRoot, - vaa_bytes: Vec, + vaa: Vaa, ) -> Result<()> { store .storage - .store_wormhole_merkle_state(WormholeMerkleState { - root, - vaa: vaa_bytes, - }) + .store_wormhole_merkle_state(WormholeMerkleState { root, vaa }) .await?; Ok(()) } @@ -108,9 +108,13 @@ pub fn construct_update_data(mut message_states: Vec<&MessageState>) -> Result(&AccumulatorUpdateData::new( Proof::WormholeMerkle { - vaa: vaa.into(), + vaa: (*vaa).to_owned().into(), updates: messages .iter() .map(|message| { diff --git a/hermes/src/store/types.rs b/hermes/src/store/types.rs index 015723169..901a5015a 100644 --- a/hermes/src/store/types.rs +++ b/hermes/src/store/types.rs @@ -1,5 +1,6 @@ use { super::proof::wormhole_merkle::WormholeMerkleMessageProof, + crate::network::p2p::Vaa, borsh::BorshDeserialize, pythnet_sdk::messages::PriceFeedMessage, }; @@ -44,8 +45,9 @@ impl AccumulatorMessages { } } +#[derive(Debug)] pub enum Update { - Vaa(Vec), + Vaa(Vaa), AccumulatorMessages(AccumulatorMessages), } diff --git a/hermes/src/trace.rs b/hermes/src/trace.rs new file mode 100644 index 000000000..b38d4d570 --- /dev/null +++ b/hermes/src/trace.rs @@ -0,0 +1,113 @@ +use { + std::{ + fmt, + sync::{ + Arc, + Mutex, + RwLock, + }, + }, + tracing::{ + field::{ + Field, + Visit, + }, + span::{ + Attributes, + Id, + Record, + Span, + }, + Event, + Subscriber, + }, + tracing_subscriber::{ + layer::Context, + registry::LookupSpan, + Layer, + }, +}; + + +pub struct LatencyLayer { + // Store an Id to slot mapping so we know what slot a Span ID relates to. + span_id_to_slot: Arc>>, +} + +impl LatencyLayer { + pub fn new() -> Self { + Self { + span_id_to_slot: Arc::new(RwLock::new(std::collections::HashMap::new())), + } + } +} + +impl Layer for LatencyLayer { + fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { + struct FieldVisitor { + slot: Option, + } + + impl tracing::field::Visit for FieldVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + // If there is a field named `slot`, we look up the current span Id and if + // there is an Id associated to slot mapping, we print it. + if field.name() == "slot" { + self.slot = Some(format!("{:?}", value)); + } + } + } + + let mut visitor = FieldVisitor { slot: None }; + + // Get the attributes associated with the span. + event.record(&mut visitor); + + let slot = match visitor.slot { + Some(slot) => slot, + None => { + let current_span = ctx.current_span(); + let span_id = current_span.id(); + if let Some(span_id) = span_id { + if let Some(slot) = self.span_id_to_slot.read().unwrap().get(&span_id) { + slot.clone() + } else { + return; + } + } else { + return; + } + } + }; + + println!("event slot = {:?}", slot); + } + + fn on_new_span( + &self, + span: &tracing::span::Attributes<'_>, + id: &tracing::span::Id, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + struct FieldVisitor<'r> { + layer: &'r LatencyLayer, + id: &'r tracing::span::Id, + } + + impl<'r> tracing::field::Visit for FieldVisitor<'r> { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + // When the field name is `slot` we can store it. + if field.name() == "slot" { + self.layer + .span_id_to_slot + .write() + .unwrap() + .insert(self.id.clone(), format!("{:?}", value)); + } + } + } + + // Get the attributes associated with the span. + span.record(&mut FieldVisitor { layer: self, id }); + } +}