Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework generation of the VID #5737

Open
wants to merge 25 commits into
base: zoran/rename-structs
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ where
logger,
);

state.entity_cache.set(key, entity)?;
state.entity_cache.set(key, entity, block.number)?;
}
ParsedChanges::Delete(entity_key) => {
let entity_type = entity_key.entity_type.cheap_clone();
Expand Down
5 changes: 4 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,7 @@ async fn update_proof_of_indexing(
key: EntityKey,
digest: Bytes,
block_time: BlockTime,
block: BlockNumber,
) -> Result<(), Error> {
let digest_name = entity_cache.schema.poi_digest();
let mut data = vec![
Expand All @@ -1617,11 +1618,12 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi)
entity_cache.set(key, poi, block)
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");

let block_number = proof_of_indexing.get_block();
let mut proof_of_indexing = proof_of_indexing.take();

for (causality_region, stream) in proof_of_indexing.drain() {
Expand Down Expand Up @@ -1657,6 +1659,7 @@ async fn update_proof_of_indexing(
entity_key,
updated_proof_of_indexing,
block_time,
block_number,
)?;
}

Expand Down
50 changes: 38 additions & 12 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::components::store::{self as s, Entity, EntityOperation};
use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::data::store::{EntityV, EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::prelude::ENV_VARS;
use crate::schema::{EntityKey, InputSchema};
use crate::util::intern::Error as InternError;
Expand All @@ -17,6 +17,10 @@ use super::{BlockNumber, DerivedEntityQuery, LoadRelatedRequest, StoreError};

pub type EntityLfuCache = LfuCache<EntityKey, Option<Arc<Entity>>>;

// Number of VIDs that are reserved ourside of the generated ones here.
// Currently only 1 for POIs is used, but lets reserve a few more.
const RESERVED_VIDS: u32 = 100;

/// The scope in which the `EntityCache` should perform a `get` operation
pub enum GetScope {
/// Get from all previously stored entities in the store
Expand All @@ -29,8 +33,8 @@ pub enum GetScope {
#[derive(Debug, Clone)]
enum EntityOp {
Remove,
Update(Entity),
Overwrite(Entity),
Update(EntityV),
Overwrite(EntityV),
}

impl EntityOp {
Expand All @@ -41,7 +45,7 @@ impl EntityOp {
use EntityOp::*;
match (self, entity) {
(Remove, _) => Ok(None),
(Overwrite(new), _) | (Update(new), None) => Ok(Some(new)),
(Overwrite(new), _) | (Update(new), None) => Ok(Some(new.e)),
(Update(updates), Some(entity)) => {
let mut e = entity.borrow().clone();
e.merge_remove_null_fields(updates)?;
Expand All @@ -65,7 +69,7 @@ impl EntityOp {
match self {
// This is how `Overwrite` is constructed, by accumulating `Update` onto `Remove`.
Remove => *self = Overwrite(update),
Update(current) | Overwrite(current) => current.merge(update),
Update(current) | Overwrite(current) => current.e.merge(update.e),
}
}
}
Expand Down Expand Up @@ -105,6 +109,10 @@ pub struct EntityCache {
/// generated IDs, the `EntityCache` needs to be newly instantiated for
/// each block
seq: u32,

// Sequence number of the next VID value for this block. The value written
// in the database consist of a block number and this SEQ number.
pub vid_seq: u32,
}

impl Debug for EntityCache {
Expand Down Expand Up @@ -132,6 +140,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand All @@ -152,6 +161,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand Down Expand Up @@ -278,9 +288,9 @@ impl EntityCache {
) -> Result<Option<Entity>, anyhow::Error> {
match op {
EntityOp::Update(entity) | EntityOp::Overwrite(entity)
if query.matches(key, entity) =>
if query.matches(key, &entity.e) =>
{
Ok(Some(entity.clone()))
Ok(Some(entity.e.clone()))
}
EntityOp::Remove => Ok(None),
_ => Ok(None),
Expand Down Expand Up @@ -349,10 +359,20 @@ impl EntityCache {
/// with existing data. The entity will be validated against the
/// subgraph schema, and any errors will result in an `Err` being
/// returned.
pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> {
pub fn set(
&mut self,
key: EntityKey,
entity: Entity,
block: BlockNumber,
) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.validate(&key).is_ok();

// The next VID is based on a block number and a sequence within the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
let entity = EntityV::new(entity, vid);

self.entity_op(key.clone(), EntityOp::Update(entity));

// The updates we were given are not valid by themselves; force a
Expand Down Expand Up @@ -458,19 +478,22 @@ impl EntityCache {
// Entity was created
(None, EntityOp::Update(mut updates))
| (None, EntityOp::Overwrite(mut updates)) => {
updates.remove_null_fields();
let data = Arc::new(updates);
let vid = updates.vid;
updates.e.remove_null_fields();
let data = Arc::new(updates.e.clone());
self.current.insert(key.clone(), Some(data.cheap_clone()));
Some(Insert {
key,
data,
block,
end: None,
vid,
})
}
// Entity may have been changed
(Some(current), EntityOp::Update(updates)) => {
let mut data = current.as_ref().clone();
let vid = updates.vid;
data.merge_remove_null_fields(updates)
.map_err(|e| key.unknown_attribute(e))?;
let data = Arc::new(data);
Expand All @@ -481,21 +504,24 @@ impl EntityCache {
data,
block,
end: None,
vid,
})
} else {
None
}
}
// Entity was removed and then updated, so it will be overwritten
(Some(current), EntityOp::Overwrite(data)) => {
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.clone()));
let vid = data.vid;
let data = Arc::new(data.e.clone());
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
Some(Overwrite {
key,
data,
block,
end: None,
vid,
})
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::constraint_violation;
use crate::data::store::scalar::Bytes;
use crate::data::store::{Id, IdList, Value};
use crate::data::store::{EntityV, Id, IdList, Value};
use crate::data::value::Word;
use crate::data_source::CausalityRegion;
use crate::derive::CheapClone;
Expand Down Expand Up @@ -829,7 +829,7 @@ where
pub enum EntityOperation {
/// Locates the entity specified by `key` and sets its attributes according to the contents of
/// `data`. If no entity exists with this key, creates a new entity.
Set { key: EntityKey, data: Entity },
Set { key: EntityKey, data: EntityV },

/// Removes an entity with the specified key, if one exists.
Remove { key: EntityKey },
Expand Down
20 changes: 18 additions & 2 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ pub enum EntityModification {
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Update the entity by overwriting it
Overwrite {
key: EntityKey,
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Remove the entity
Remove { key: EntityKey, block: BlockNumber },
Expand All @@ -67,6 +69,7 @@ pub struct EntityWrite<'a> {
// The end of the block range for which this write is valid. The value
// of `end` itself is not included in the range
pub end: Option<BlockNumber>,
pub vid: i64,
}

impl std::fmt::Display for EntityWrite<'_> {
Expand All @@ -89,24 +92,28 @@ impl<'a> TryFrom<&'a EntityModification> for EntityWrite<'a> {
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),
EntityModification::Overwrite {
key,
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: &data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),

EntityModification::Remove { .. } => Err(()),
Expand Down Expand Up @@ -213,11 +220,13 @@ impl EntityModification {
data,
block,
end,
vid,
} => Ok(Insert {
key,
data,
block,
end,
vid,
}),
Remove { key, .. } => {
return Err(constraint_violation!(
Expand Down Expand Up @@ -271,21 +280,23 @@ impl EntityModification {
}

impl EntityModification {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
EntityModification::Insert {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
EntityModification::Overwrite {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

Expand Down Expand Up @@ -1017,31 +1028,36 @@ mod test {

let value = value.clone();
let key = THING_TYPE.parse_key("one").unwrap();
let vid = 0;
match value {
Ins(block) => EntityModification::Insert {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: None,
vid,
},
Ovw(block) => EntityModification::Overwrite {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: None,
vid,
},
Rem(block) => EntityModification::Remove { key, block },
InsC(block, end) => EntityModification::Insert {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: Some(end),
vid,
},
OvwC(block, end) => EntityModification::Overwrite {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
block,
end: Some(end),
vid,
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions graph/src/components/subgraph/proof_of_indexing/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl ProofOfIndexing {
pub fn take(self) -> HashMap<Id, BlockEventStream> {
self.per_causality_region
}

pub fn get_block(&self) -> BlockNumber {
self.block_number
}
}

pub struct ProofOfIndexingFinisher {
Expand Down
Loading
Loading