Skip to content

Commit

Permalink
graph, runtime: Support subgraph entity operation detection in compos…
Browse files Browse the repository at this point in the history
…ed subgraphs
  • Loading branch information
incrypto32 committed Dec 5, 2024
1 parent ca7824e commit b0b8283
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 46 deletions.
32 changes: 13 additions & 19 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,13 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {

fn create_subgraph_trigger_from_entities(
filter: &SubgraphFilter,
entities: &Vec<EntityWithType>,
entities: Vec<EntityWithType>,
) -> Vec<subgraph::TriggerData> {
entities
.iter()
.map(|e| subgraph::TriggerData {
.into_iter()
.map(|entity| subgraph::TriggerData {
source: filter.subgraph.clone(),
entity: e.entity.clone(),
entity_type: e.entity_type.as_str().to_string(),
entity,
})
.collect()
}
Expand All @@ -369,25 +368,20 @@ async fn create_subgraph_triggers<C: Blockchain>(
logger: Logger,
blocks: Vec<C::Block>,
filter: &SubgraphFilter,
entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
let logger_clone = logger.cheap_clone();

let blocks: Vec<BlockWithTriggers<C>> = blocks
.into_iter()
.map(|block| {
let block_number = block.number();
match entities.get(&block_number) {
Some(e) => {
let trigger_data = create_subgraph_trigger_from_entities(filter, e);
BlockWithTriggers::new_with_subgraph_triggers(
block,
trigger_data,
&logger_clone,
)
}
None => BlockWithTriggers::new_with_subgraph_triggers(block, vec![], &logger_clone),
}
let trigger_data = entities
.remove(&block_number)
.map(|e| create_subgraph_trigger_from_entities(filter, e))
.unwrap_or_else(Vec::new);

BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger_clone)
})
.collect();

Expand Down Expand Up @@ -429,14 +423,14 @@ async fn scan_subgraph_triggers<C: Blockchain>(
}
}

#[derive(Debug)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum EntitySubgraphOperation {
Create,
Modify,
Delete,
}

#[derive(Debug)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct EntityWithType {
pub entity_op: EntitySubgraphOperation,
pub entity_type: EntityType,
Expand Down
24 changes: 10 additions & 14 deletions graph/src/data_source/subgraph.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::{
blockchain::{Block, Blockchain},
components::{
link_resolver::LinkResolver,
store::{BlockNumber, Entity},
},
blockchain::{block_stream::EntityWithType, Block, Blockchain},
components::{link_resolver::LinkResolver, store::BlockNumber},
data::{subgraph::SPEC_VERSION_1_3_0, value::Word},
data_source,
prelude::{DataSourceContext, DeploymentHash, Link},
Expand Down Expand Up @@ -76,7 +73,7 @@ impl DataSource {
}

let trigger_ref = self.mapping.handlers.iter().find_map(|handler| {
if handler.entity != trigger.entity_type {
if handler.entity != trigger.entity_type() {
return None;
}

Expand Down Expand Up @@ -281,17 +278,16 @@ impl UnresolvedDataSourceTemplate {
#[derive(Clone, PartialEq, Eq)]
pub struct TriggerData {
pub source: DeploymentHash,
pub entity: Entity,
pub entity_type: String,
pub entity: EntityWithType,
}

impl TriggerData {
pub fn new(source: DeploymentHash, entity: Entity, entity_type: String) -> Self {
Self {
source,
entity,
entity_type,
}
pub fn new(source: DeploymentHash, entity: EntityWithType) -> Self {
Self { source, entity }
}

pub fn entity_type(&self) -> &str {
self.entity.entity_type.as_str()
}
}

Expand Down
5 changes: 5 additions & 0 deletions graph/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ pub enum IndexForAscTypeId {
// ...
// LastStarknetType = 4499,


// Subgraph Data Source types
AscEntityTrigger = 4500,


// Reserved discriminant space for a future blockchain type IDs: [4,500, 5,499]
//
// Generated with the following shell script:
Expand Down
2 changes: 1 addition & 1 deletion runtime/wasm/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl ToAscPtr for subgraph::TriggerData {
heap: &mut H,
gas: &GasCounter,
) -> Result<AscPtr<()>, HostExportError> {
asc_new(heap, &self.entity.sorted_ref(), gas).map(|ptr| ptr.erase())
asc_new(heap, &self.entity, gas).map(|ptr| ptr.erase())
}
}

Expand Down
45 changes: 44 additions & 1 deletion runtime/wasm/src/to_from/external.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use ethabi;

use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType};
use graph::data::store::scalar::Timestamp;
use graph::data::value::Word;
use graph::prelude::{BigDecimal, BigInt};
use graph::runtime::gas::GasCounter;
use graph::runtime::{
asc_get, asc_new, AscIndexId, AscPtr, AscType, AscValue, HostExportError, ToAscObj,
asc_get, asc_new, AscIndexId, AscPtr, AscType, AscValue, HostExportError, IndexForAscTypeId,
ToAscObj,
};
use graph::{data::store, runtime::DeterministicHostError};
use graph::{prelude::serde_json, runtime::FromAscObj};
use graph::{prelude::web3::types as web3, runtime::AscHeap};
use graph_runtime_derive::AscType;

use crate::asc_abi::class::*;

Expand Down Expand Up @@ -463,3 +466,43 @@ where
})
}
}

#[derive(Debug, Clone, Eq, PartialEq, AscType)]
pub enum AscSubgraphEntityOp {
Create,
Modify,
Delete,
}

#[derive(AscType)]
pub struct AscEntityTrigger {
pub entity_op: AscSubgraphEntityOp,
pub entity_type: AscPtr<AscString>,
pub entity: AscPtr<AscEntity>,
pub vid: i64,
}

impl ToAscObj<AscEntityTrigger> for EntityWithType {
fn to_asc_obj<H: AscHeap + ?Sized>(
&self,
heap: &mut H,
gas: &GasCounter,
) -> Result<AscEntityTrigger, HostExportError> {
let entity_op = match self.entity_op {
EntitySubgraphOperation::Create => AscSubgraphEntityOp::Create,
EntitySubgraphOperation::Modify => AscSubgraphEntityOp::Modify,
EntitySubgraphOperation::Delete => AscSubgraphEntityOp::Delete,
};

Ok(AscEntityTrigger {
entity_op,
entity_type: asc_new(heap, &self.entity_type.as_str(), gas)?,
entity: asc_new(heap, &self.entity.sorted_ref(), gas)?,
vid: self.vid,
})
}
}

impl AscIndexId for AscEntityTrigger {
const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::AscEntityTrigger;
}
12 changes: 11 additions & 1 deletion tests/integration-tests/subgraph-data-sources/src/mapping.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import { Entity, log } from '@graphprotocol/graph-ts';
import { MirrorBlock } from '../generated/schema';

export function handleEntity(blockEntity: Entity): void {
export class EntityTrigger {
constructor(
public entityOp: u32,
public entityType: string,
public entity: Entity,
public vid: i64,
) {}
}

export function handleEntity(trigger: EntityTrigger): void {
let blockEntity = trigger.entity;
let blockNumber = blockEntity.getBigInt('number');
let blockHash = blockEntity.getBytes('hash');
let id = blockEntity.getString('id');
Expand Down
33 changes: 31 additions & 2 deletions tests/runner-tests/subgraph-data-sources/src/mapping.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
import { Entity, log } from '@graphprotocol/graph-ts';

export function handleBlock(content: Entity): void {
let stringContent = content.getString('val');
export const SubgraphEntityOpCreate: u32 = 0;
export const SubgraphEntityOpModify: u32 = 1;
export const SubgraphEntityOpDelete: u32 = 2;

export class EntityTrigger {
constructor(
public entityOp: u32,
public entityType: string,
public entity: Entity,
public vid: i64,
) {}
}

export function handleBlock(content: EntityTrigger): void {
let stringContent = content.entity.getString('val');
log.info('Content: {}', [stringContent]);
log.info('EntityOp: {}', [content.entityOp.toString()]);

switch (content.entityOp) {
case SubgraphEntityOpCreate: {
log.info('Entity created: {}', [content.entityType]);
break
}
case SubgraphEntityOpModify: {
log.info('Entity modified: {}', [content.entityType]);
break;
}
case SubgraphEntityOpDelete: {
log.info('Entity deleted: {}', [content.entityType]);
break;
}
}
}
19 changes: 13 additions & 6 deletions tests/src/fixture/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use super::{
test_ptr, CommonChainConfig, MutexBlockStreamBuilder, NoopAdapterSelector,
NoopRuntimeAdapterBuilder, StaticBlockRefetcher, StaticStreamBuilder, Stores, TestChain,
};
use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType};
use graph::blockchain::client::ChainClient;
use graph::blockchain::{BlockPtr, Trigger, TriggersAdapterSelector};
use graph::cheap_clone::CheapClone;
use graph::data_source::subgraph;
use graph::prelude::ethabi::ethereum_types::H256;
use graph::prelude::web3::types::{Address, Log, Transaction, H160};
use graph::prelude::{ethabi, tiny_keccak, DeploymentHash, Entity, LightEthereumBlock, ENV_VARS};
use graph::schema::EntityType;
use graph::{blockchain::block_stream::BlockWithTriggers, prelude::ethabi::ethereum_types::U64};
use graph_chain_ethereum::network::EthereumNetworkAdapters;
use graph_chain_ethereum::trigger::LogRef;
Expand Down Expand Up @@ -164,15 +166,20 @@ pub fn push_test_subgraph_trigger(
block: &mut BlockWithTriggers<Chain>,
source: DeploymentHash,
entity: Entity,
entity_type: &str,
entity_type: EntityType,
entity_op: EntitySubgraphOperation,
vid: i64,
) {
let entity = EntityWithType {
entity: entity,
entity_type: entity_type,
entity_op: entity_op,
vid,
};

block
.trigger_data
.push(Trigger::Subgraph(subgraph::TriggerData {
source,
entity: entity,
entity_type: entity_type.to_string(),
}));
.push(Trigger::Subgraph(subgraph::TriggerData { source, entity }));
}

pub fn push_test_command(
Expand Down
9 changes: 7 additions & 2 deletions tests/tests/runner_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use std::time::Duration;

use assert_json_diff::assert_json_eq;
use graph::blockchain::block_stream::BlockWithTriggers;
use graph::blockchain::block_stream::{BlockWithTriggers, EntitySubgraphOperation};
use graph::blockchain::{Block, BlockPtr, Blockchain};
use graph::data::store::scalar::Bytes;
use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth};
Expand Down Expand Up @@ -1109,14 +1109,19 @@ async fn subgraph_data_sources() {
])
.unwrap();

let entity_type = schema.entity_type("User").unwrap();

let blocks = {
let block_0 = genesis();
let mut block_1 = empty_block(block_0.ptr(), test_ptr(1));

push_test_subgraph_trigger(
&mut block_1,
DeploymentHash::new("QmRFXhvyvbm4z5Lo7z2mN9Ckmo623uuB2jJYbRmAXgYKXJ").unwrap(),
entity,
"User",
entity_type,
EntitySubgraphOperation::Create,
1,
);

let block_2 = empty_block(block_1.ptr(), test_ptr(2));
Expand Down

0 comments on commit b0b8283

Please sign in to comment.