Skip to content

Commit

Permalink
Fix: substream datasources missing in TriggerFilter (#5001)
Browse files Browse the repository at this point in the history
* tests: test substreams triggerfilter build

* core: fix substreams filter not included when building triggerfilter
  • Loading branch information
incrypto32 authored Nov 24, 2023
1 parent e4ddcaf commit aeb4f4e
Show file tree
Hide file tree
Showing 23 changed files with 1,428 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ pub struct TriggerFilter {
pub(crate) data_sources_len: u8,
}

#[cfg(debug_assertions)]
impl TriggerFilter {
pub fn modules(&self) -> &Option<Modules> {
&self.modules
}

pub fn module_name(&self) -> &str {
&self.module_name
}

pub fn start_block(&self) -> &Option<BlockNumber> {
&self.start_block
}

pub fn data_sources_len(&self) -> u8 {
self.data_sources_len
}
}

// TriggerFilter should bypass all triggers and just rely on block since all the data received
// should already have been processed.
impl blockchain::TriggerFilter<Chain> for TriggerFilter {
Expand Down
19 changes: 19 additions & 0 deletions core/src/subgraph/context/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ where
C: Blockchain,
T: RuntimeHostBuilder<C>,
{
/// All onchain data sources that are part of this subgraph. This includes data sources
/// that are included in the subgraph manifest and dynamic data sources.
pub fn onchain_data_sources(&self) -> impl Iterator<Item = &C::DataSource> + Clone {
let host_data_sources = self
.hosts()
.iter()
.filter_map(|h| h.data_source().as_onchain());

// Datasources that are defined in the subgraph manifest but does not correspond to any host
// in the subgraph. Currently these are only substreams data sources.
let substreams_data_sources = self
.data_sources
.iter()
.filter(|ds| ds.runtime().is_none())
.filter_map(|ds| ds.as_onchain());

host_data_sources.chain(substreams_data_sources)
}

/// Create a new subgraph instance from the given manifest and data sources.
/// `data_sources` must contain all data sources declared in the manifest + all dynamic data sources.
pub fn from_manifest(
Expand Down
10 changes: 6 additions & 4 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ where
return C::TriggerFilter::from_data_sources(
self.ctx
.instance()
.hosts()
.iter()
.filter_map(|h| h.data_source().as_onchain())
// Filter out data sources that have reached their end block if the block is final.
.onchain_data_sources()
.filter(end_block_filter),
);
}
Expand Down Expand Up @@ -158,6 +155,11 @@ where
filter
}

#[cfg(debug_assertions)]
pub fn build_filter_for_test(&self) -> C::TriggerFilter {
self.build_filter()
}

pub async fn run(self) -> Result<(), Error> {
self.run_inner(false).await.map(|_| ())
}
Expand Down
1 change: 1 addition & 0 deletions tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async-stream = "0.3.5"
futures = { version = "0.3", features = ["compat"] }
graph = { path = "../graph" }
graph-chain-ethereum = { path = "../chain/ethereum" }
graph-chain-substreams= {path = "../chain/substreams"}
graph-node = { path = "../node" }
graph-core = { path = "../core" }
graph-graphql = { path = "../graphql" }
Expand Down
Binary file not shown.
4 changes: 4 additions & 0 deletions tests/runner-tests/substreams/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
target/
.idea
src/pb/
node_modules/
Loading

0 comments on commit aeb4f4e

Please sign in to comment.