Skip to content

Commit d71040d

Browse files
committed
query tracing
1 parent 8bc4645 commit d71040d

File tree

19 files changed

+752
-39
lines changed

19 files changed

+752
-39
lines changed

Cargo.lock

+36-16
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+16-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,15 @@ bs58 = "0.5.1"
3434
chrono = "0.4.38"
3535
clap = { version = "4.5.4", features = ["derive", "env"] }
3636
derivative = "2.2.0"
37-
diesel = { version = "2.2.4", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid", "i-implement-a-third-party-backend-and-opt-into-breaking-changes"] }
37+
diesel = { version = "2.2.4", features = [
38+
"postgres",
39+
"serde_json",
40+
"numeric",
41+
"r2d2",
42+
"chrono",
43+
"uuid",
44+
"i-implement-a-third-party-backend-and-opt-into-breaking-changes",
45+
] }
3846
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
3947
diesel-dynamic-schema = { version = "0.2.1", features = ["postgres"] }
4048
diesel_derives = "2.1.4"
@@ -56,19 +64,24 @@ serde_derive = "1.0.125"
5664
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
5765
serde_regex = "1.1.0"
5866
serde_yaml = "0.9.21"
59-
slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] }
67+
slog = { version = "2.7.0", features = [
68+
"release_max_level_trace",
69+
"max_level_trace",
70+
] }
6071
sqlparser = "0.46.0"
6172
strum = { version = "0.26", features = ["derive"] }
6273
syn = { version = "2.0.66", features = ["full"] }
6374
test-store = { path = "./store/test-store" }
6475
thiserror = "1.0.25"
65-
tokio = { version = "1.38.0", features = ["full"] }
76+
tokio = { version = "1", features = ["full"] }
6677
tonic = { version = "0.11.0", features = ["tls-roots", "gzip"] }
6778
tonic-build = { version = "0.11.0", features = ["prost"] }
6879
tower-http = { version = "0.5.2", features = ["cors"] }
6980
wasmparser = "0.118.1"
7081
wasmtime = "15.0.1"
7182

83+
indexer-watcher = { git = "https://github.com/graphprotocol/indexer-rs", tag = "indexer-service-rs-v1.4.2" }
84+
7285
# Incremental compilation on Rust 1.58 causes an ICE on build. As soon as graph node builds again, these can be removed.
7386
[profile.test]
7487
incremental = false

graph/Cargo.toml

+7-10
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ atomic_refcell = "0.1.13"
1212
# We require this precise version of bigdecimal. Updating to later versions
1313
# has caused PoI differences; if you update this version, you will need to
1414
# make sure that it does not cause PoI changes
15-
old_bigdecimal = { version = "=0.1.2", features = ["serde"], package = "bigdecimal" }
15+
old_bigdecimal = { version = "=0.1.2", features = [
16+
"serde",
17+
], package = "bigdecimal" }
1618
bytes = "1.0.1"
1719
bs58 = { workspace = true }
1820
cid = "0.11.1"
@@ -64,14 +66,7 @@ slog-envlogger = "2.1.0"
6466
slog-term = "2.7.0"
6567
petgraph = "0.6.5"
6668
tiny-keccak = "1.5.0"
67-
tokio = { version = "1.38.0", features = [
68-
"time",
69-
"sync",
70-
"macros",
71-
"test-util",
72-
"rt-multi-thread",
73-
"parking_lot",
74-
] }
69+
tokio.workspace = true
7570
tokio-stream = { version = "0.1.15", features = ["sync"] }
7671
tokio-retry = "0.3.0"
7772
toml = "0.8.8"
@@ -92,11 +87,13 @@ defer = "0.2"
9287
# Our fork contains patches to make some fields optional for Celo and Fantom compatibility.
9388
# Without the "arbitrary_precision" feature, we get the error `data did not match any variant of untagged enum Response`.
9489
web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-patches-onto-0.18", features = [
95-
"arbitrary_precision", "test"
90+
"arbitrary_precision",
91+
"test",
9692
] }
9793
serde_plain = "1.0.2"
9894
csv = "1.3.0"
9995
object_store = { version = "0.11.0", features = ["gcp"] }
96+
indexer-watcher.workspace = true
10097

10198
[dev-dependencies]
10299
clap.workspace = true

graph/build.rs

+6
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,10 @@ fn main() {
2525
.out_dir("src/substreams_rpc")
2626
.compile(&["proto/substreams-rpc.proto"], &["proto"])
2727
.expect("Failed to compile Substreams RPC proto(s)");
28+
29+
tonic_build::configure()
30+
.out_dir("src/grpc/pb")
31+
.include_file("mod.rs")
32+
.compile(&["proto/tracing.proto"], &["proto"])
33+
.expect("Failed to compile Tracing proto(s)");
2834
}

graph/proto/tracing.proto

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
syntax = "proto3";
2+
3+
package graph.tracing.v1;
4+
5+
service Stream {
6+
rpc QueryTrace(Request) returns (stream Trace);
7+
}
8+
9+
message Request {
10+
int32 deployment_id = 1;
11+
}
12+
13+
message Trace {
14+
int32 deployment_id = 1;
15+
string query = 2;
16+
uint64 duration_millis = 3;
17+
}

graph/src/components/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ pub mod metrics;
6060
/// Components dealing with versioning
6161
pub mod versions;
6262

63+
pub mod tracing;
64+
6365
/// A component that receives events of type `T`.
6466
pub trait EventConsumer<E> {
6567
/// Get the event sink.

graph/src/components/tracing.rs

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use std::{collections::HashMap, sync::Arc, time::Duration};
2+
3+
use tokio::sync::{mpsc, watch::Receiver, RwLock};
4+
5+
use super::store::DeploymentId;
6+
7+
const DEFAULT_BUFFER_SIZE: usize = 100;
8+
9+
#[derive(Debug, Clone)]
10+
pub struct Subscriptions<T> {
11+
inner: Arc<RwLock<HashMap<DeploymentId, mpsc::Sender<T>>>>,
12+
}
13+
14+
impl<T> Default for Subscriptions<T> {
15+
fn default() -> Self {
16+
Self {
17+
inner: Arc::new(RwLock::new(HashMap::new())),
18+
}
19+
}
20+
}
21+
22+
pub type SubscriptionsWatcher<T> = Receiver<Subscriptions<T>>;
23+
24+
/// A control structure for managing tracing subscriptions.
25+
#[derive(Debug)]
26+
pub struct TracingControl<T> {
27+
watcher: Receiver<HashMap<DeploymentId, mpsc::Sender<T>>>,
28+
subscriptions: Subscriptions<T>,
29+
default_buffer_size: usize,
30+
}
31+
32+
impl<T: Send + Clone + 'static> Default for TracingControl<T> {
33+
fn default() -> Self {
34+
let subscriptions = Subscriptions::default();
35+
let subs = subscriptions.clone();
36+
let watcher = std::thread::spawn(move || {
37+
let runtime = tokio::runtime::Builder::new_current_thread()
38+
.enable_all()
39+
.build()
40+
.unwrap();
41+
runtime.block_on(indexer_watcher::new_watcher(
42+
Duration::from_secs(30),
43+
move || {
44+
let subs = subs.clone();
45+
46+
async move { Ok(subs.inner.read().await.clone()) }
47+
},
48+
))
49+
})
50+
.join()
51+
.unwrap()
52+
.unwrap();
53+
54+
Self {
55+
subscriptions,
56+
default_buffer_size: DEFAULT_BUFFER_SIZE,
57+
watcher,
58+
}
59+
}
60+
}
61+
62+
impl<T: Send + Clone + 'static> TracingControl<T> {
63+
pub fn new(default_buffer_size: Option<usize>) -> Self {
64+
Self {
65+
default_buffer_size: default_buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
66+
..Default::default()
67+
}
68+
}
69+
70+
pub fn producer(&self, key: DeploymentId) -> Option<mpsc::Sender<T>> {
71+
self.watcher
72+
.borrow()
73+
.get(&key)
74+
.cloned()
75+
.filter(|sender| !sender.is_closed())
76+
}
77+
78+
pub async fn subscribe_with_chan_size(
79+
&self,
80+
key: DeploymentId,
81+
buffer_size: usize,
82+
) -> mpsc::Receiver<T> {
83+
let (tx, rx) = mpsc::channel(buffer_size);
84+
let mut guard = self.subscriptions.inner.write().await;
85+
guard.insert(key, tx);
86+
87+
rx
88+
}
89+
90+
/// Creates a new subscription for a given deployment ID. If a subscription already
91+
/// exists, it will be replaced.
92+
pub async fn subscribe(&self, key: DeploymentId) -> mpsc::Receiver<T> {
93+
self.subscribe_with_chan_size(key, self.default_buffer_size)
94+
.await
95+
}
96+
}
97+
98+
#[cfg(test)]
99+
mod test {
100+
101+
use super::*;
102+
use std::sync::Arc;
103+
104+
#[tokio::test]
105+
async fn test_tracing_control() {
106+
let control: TracingControl<()> = TracingControl::default();
107+
let control = Arc::new(control);
108+
109+
// produce before subscription
110+
let tx = control.producer(DeploymentId(123));
111+
assert!(tx.is_none());
112+
113+
// drop the subscription
114+
let rx = control.subscribe(DeploymentId(123));
115+
drop(rx);
116+
117+
// check subscription is none because channel is closed
118+
let tx = control.producer(DeploymentId(123));
119+
assert!(tx.is_none());
120+
121+
// re-create subscription
122+
let _rx = control.subscribe(DeploymentId(123));
123+
// check old subscription was replaced
124+
let tx = control.producer(DeploymentId(123));
125+
assert!(!tx.unwrap().is_closed())
126+
}
127+
}

0 commit comments

Comments
 (0)