Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman committed Oct 10, 2023
1 parent 36aee48 commit 3bb4440
Show file tree
Hide file tree
Showing 45 changed files with 525 additions and 841 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 12 additions & 22 deletions common/src/allocations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

// Copyright 2023-, GraphOps and Semiotic Labs. SPDX-License-Identifier: Apache-2.0
use crate::types::SubgraphDeploymentID;
use alloy_primitives::Address;
use anyhow::Result;
use ethers::signers::coins_bip39::English;
Expand All @@ -10,8 +9,6 @@ use ethers_core::types::U256;
use serde::Deserialize;
use serde::Deserializer;

use crate::types::SubgraphDeploymentID;

pub mod monitor;

#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -76,7 +73,6 @@ impl<'d> Deserialize<'d> for Allocation {
}

let outer = Outer::deserialize(deserializer)?;

Ok(Allocation {
id: outer.id,
status: AllocationStatus::Null,
Expand Down Expand Up @@ -112,7 +108,6 @@ pub fn derive_key_pair(
.join("/"),
);
derivation_path.push_str(format!("/{}", index).as_str());

Ok(MnemonicBuilder::<English>::default()
.derivation_path(&derivation_path)
.expect("Valid derivation path")
Expand All @@ -121,11 +116,11 @@ pub fn derive_key_pair(
}

pub fn allocation_signer(indexer_mnemonic: &str, allocation: &Allocation) -> Result<SigningKey> {
// Guess the allocation index by enumerating all indexes in the
// range [0, 100] and checking for a match
// Guess the allocation index by enumerating all indexes in the range [0, 100] and
// checking for a match
for i in 0..100 {
// The allocation was either created at the epoch it intended to or one
// epoch later. So try both both.
// The allocation was either created at the epoch it intended to or one epoch
// later. So try both both.
for created_at_epoch in [allocation.created_at_epoch, allocation.created_at_epoch - 1] {
let allocation_wallet = derive_key_pair(
indexer_mnemonic,
Expand All @@ -146,11 +141,9 @@ pub fn allocation_signer(indexer_mnemonic: &str, allocation: &Allocation) -> Res

#[cfg(test)]
mod test {
use std::str::FromStr;

use crate::prelude::SubgraphDeploymentID;

use super::*;
use crate::prelude::SubgraphDeploymentID;
use std::str::FromStr;

const INDEXER_OPERATOR_MNEMONIC: &str = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about";

Expand All @@ -161,26 +154,25 @@ mod test {
INDEXER_OPERATOR_MNEMONIC,
953,
&SubgraphDeploymentID::new(
"0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a"
"0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a",
)
.unwrap(),
0
0,
)
.unwrap()
.address()
.as_fixed_bytes(),
Address::from_str("0xfa44c72b753a66591f241c7dc04e8178c30e13af").unwrap()
);

assert_eq!(
derive_key_pair(
INDEXER_OPERATOR_MNEMONIC,
940,
&SubgraphDeploymentID::new(
"0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a"
"0xbbde25a2c85f55b53b7698b9476610c3d1202d88870e66502ab0076b7218f98a",
)
.unwrap(),
2
2,
)
.unwrap()
.address()
Expand All @@ -192,7 +184,6 @@ mod test {
#[test]
fn test_allocation_signer() {
// Note that we use `derive_key_pair` to derive the private key

let allocation = Allocation {
id: Address::from_str("0xa171cd12c3dde7eb8fe7717a0bcd06f3ffa65658").unwrap(),
status: AllocationStatus::Null,
Expand Down Expand Up @@ -233,7 +224,6 @@ mod test {
#[test]
fn test_allocation_signer_error() {
// Note that because allocation will try 200 derivations paths, this is a slow test

let allocation = Allocation {
// Purposefully wrong address
id: Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(),
Expand Down
80 changes: 26 additions & 54 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
// Copyright 2023-, GraphOps and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;
use std::sync::Arc;

// Copyright 2023-, GraphOps and Semiotic Labs. SPDX-License-Identifier: Apache-2.0
use crate::prelude::{Allocation, NetworkSubgraph};
use alloy_primitives::Address;
use anyhow::Result;
use log::{info, warn};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::watch::{Receiver, Sender};
use tokio::sync::RwLock;

use crate::prelude::{Allocation, NetworkSubgraph};

#[derive(Debug)]
struct AllocationMonitorInner {
network_subgraph: NetworkSubgraph,
Expand Down Expand Up @@ -40,7 +36,6 @@ impl AllocationMonitor {
) -> Result<Self> {
// These are used to ping subscribers when the allocations are updated
let (watch_sender, watch_receiver) = tokio::sync::watch::channel(());

let inner = Arc::new(AllocationMonitorInner {
network_subgraph,
indexer_address,
Expand All @@ -50,16 +45,13 @@ impl AllocationMonitor {
watch_sender,
watch_receiver,
});

let inner_clone = inner.clone();

let monitor = AllocationMonitor {
_monitor_handle: Arc::new(tokio::spawn(async move {
AllocationMonitor::monitor_loop(&inner_clone).await.unwrap();
})),
inner,
};

Ok(monitor)
}

Expand All @@ -77,7 +69,9 @@ impl AllocationMonitor {
}
"#
.to_string(),
Some(serde_json::json!({ "id": graph_network_id })),
Some(serde_json::json!({
"id": graph_network_id
})),
)
.await
.map_err(|e| {
Expand All @@ -86,7 +80,6 @@ impl AllocationMonitor {
e
)
})?;

res.get("data")
.and_then(|d| d.get("graphNetwork"))
.and_then(|d| d.get("currentEpoch"))
Expand All @@ -102,8 +95,8 @@ impl AllocationMonitor {
closed_at_epoch_threshold: u64,
) -> Result<HashMap<Address, Allocation>> {
let mut res = network_subgraph
.network_query(
r#"
.network_query(
r#"
query allocations($indexer: ID!, $closedAtEpochThreshold: Int!) {
indexer(id: $indexer) {
activeAllocations: totalAllocations(
Expand Down Expand Up @@ -151,47 +144,43 @@ impl AllocationMonitor {
}
}
"#
.to_string(),
Some(serde_json::json!({ "indexer": indexer_address, "closedAtEpochThreshold": closed_at_epoch_threshold })),
)
.await
.to_string(),
Some(serde_json::json!({
"indexer": indexer_address,
"closedAtEpochThreshold": closed_at_epoch_threshold
})),
)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to fetch current allocations from network subgraph: {}",
e
)
})?;

let indexer_json = res
.get_mut("data")
.and_then(|d| d.get_mut("indexer"))
.ok_or_else(|| anyhow::anyhow!("No data / indexer not found on chain",))?;

.ok_or_else(|| anyhow::anyhow!("No data / indexer not found on chain"))?;
let active_allocations_json =
indexer_json.get_mut("activeAllocations").ok_or_else(|| {
anyhow::anyhow!("Failed to parse active allocations from network subgraph",)
anyhow::anyhow!("Failed to parse active allocations from network subgraph")
})?;
let active_allocations: Vec<Allocation> =
serde_json::from_value(active_allocations_json.take())?;
let mut eligible_allocations: HashMap<Address, Allocation> =
HashMap::from_iter(active_allocations.into_iter().map(|a| (a.id, a)));

let recently_closed_allocations_json =
indexer_json
.get_mut("recentlyClosedAllocations")
.ok_or_else(|| {
anyhow::anyhow!(
"Failed to parse recently closed allocations from network subgraph",
)
})?;
let recently_closed_allocations_json = indexer_json
.get_mut("recentlyClosedAllocations")
.ok_or_else(|| {
anyhow::anyhow!("Failed to parse recently closed allocations from network subgraph")
})?;
let recently_closed_allocations: Vec<Allocation> =
serde_json::from_value(recently_closed_allocations_json.take())?;
eligible_allocations.extend(
recently_closed_allocations
.into_iter()
.map(move |a| (a.id, a)),
);

Ok(eligible_allocations)
}

Expand Down Expand Up @@ -230,7 +219,6 @@ impl AllocationMonitor {
);
}
}

info!(
"Eligible allocations: {}",
inner
Expand All @@ -249,7 +237,6 @@ impl AllocationMonitor {
.collect::<Vec<String>>()
.join(", ")
);

tokio::time::sleep(tokio::time::Duration::from_millis(inner.interval_ms)).await;
}
}
Expand All @@ -275,21 +262,17 @@ impl AllocationMonitor {

#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::NetworkSubgraph;
use crate::test_vectors;
use std::str::FromStr;

use test_log::test;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

use crate::prelude::NetworkSubgraph;
use crate::test_vectors;

use super::*;

#[test(tokio::test)]
async fn test_current_epoch() {
let mock_server = MockServer::start().await;

let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint(
&mock_server.uri(),
test_vectors::NETWORK_SUBGRAPH_ID,
Expand All @@ -299,7 +282,6 @@ mod tests {
Some(test_vectors::NETWORK_SUBGRAPH_ID),
network_subgraph_endpoint.as_ref(),
);

let mock = Mock::given(method("POST"))
.and(path(
"/subgraphs/id/".to_string() + test_vectors::NETWORK_SUBGRAPH_ID,
Expand All @@ -316,22 +298,17 @@ mod tests {
"#,
"application/json",
));

mock_server.register(mock).await;

let epoch = AllocationMonitor::current_epoch(&network_subgraph, 1)
.await
.unwrap();

assert_eq!(epoch, 896419);
}

#[test(tokio::test)]
async fn test_current_eligible_allocations() {
let indexer_address = Address::from_str(test_vectors::INDEXER_ADDRESS).unwrap();

let mock_server = MockServer::start().await;

let network_subgraph_endpoint = NetworkSubgraph::local_deployment_endpoint(
&mock_server.uri(),
test_vectors::NETWORK_SUBGRAPH_ID,
Expand All @@ -341,7 +318,6 @@ mod tests {
Some(test_vectors::NETWORK_SUBGRAPH_ID),
network_subgraph_endpoint.as_ref(),
);

let mock = Mock::given(method("POST"))
.and(path(
"/subgraphs/id/".to_string() + test_vectors::NETWORK_SUBGRAPH_ID,
Expand All @@ -350,17 +326,14 @@ mod tests {
ResponseTemplate::new(200)
.set_body_raw(test_vectors::ALLOCATIONS_QUERY_RESPONSE, "application/json"),
);

mock_server.register(mock).await;

let allocations = AllocationMonitor::current_eligible_allocations(
&network_subgraph,
&indexer_address,
940,
)
.await
.unwrap();

assert_eq!(allocations, test_vectors::expected_eligible_allocations())
}

Expand All @@ -373,7 +346,6 @@ mod tests {
let network_subgraph_id =
std::env::var("NETWORK_SUBGRAPH_ID").expect("NETWORK_SUBGRAPH_ID not set");
let indexer_address = std::env::var("INDEXER_ADDRESS").expect("INDEXER_ADDRESS not set");

let network_subgraph_endpoint =
NetworkSubgraph::local_deployment_endpoint(&graph_node_url, &network_subgraph_id);
let network_subgraph = NetworkSubgraph::new(
Expand Down
Loading

0 comments on commit 3bb4440

Please sign in to comment.