diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index 3e6dafcb2f0..4dd8d5f74ac 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -237,7 +237,7 @@ where logger, ); - state.entity_cache.set(key, entity)?; + state.entity_cache.set(key, entity, block.number)?; } ParsedChanges::Delete(entity_key) => { let entity_type = entity_key.entity_type.cheap_clone(); diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 02774452f9a..daf1c4cf6da 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -1603,6 +1603,7 @@ async fn update_proof_of_indexing( key: EntityKey, digest: Bytes, block_time: BlockTime, + block: BlockNumber, ) -> Result<(), Error> { let digest_name = entity_cache.schema.poi_digest(); let mut data = vec![ @@ -1617,11 +1618,12 @@ async fn update_proof_of_indexing( data.push((entity_cache.schema.poi_block_time(), block_time)); } let poi = entity_cache.make_entity(data)?; - entity_cache.set(key, poi) + entity_cache.set(key, poi, block) } let _section_guard = stopwatch.start_section("update_proof_of_indexing"); + let block_number = proof_of_indexing.get_block(); let mut proof_of_indexing = proof_of_indexing.take(); for (causality_region, stream) in proof_of_indexing.drain() { @@ -1657,6 +1659,7 @@ async fn update_proof_of_indexing( entity_key, updated_proof_of_indexing, block_time, + block_number, )?; } diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index dfaae80f76a..2bb4c7b1791 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -17,6 +17,10 @@ use super::{BlockNumber, DerivedEntityQuery, LoadRelatedRequest, StoreError}; pub type EntityLfuCache = LfuCache>>; +// Number of VIDs that are reserved ourside of the generated ones here. +// Currently only 1 for POIs is used, but lets reserve a few more. +const RESERVED_VIDS: u32 = 100; + /// The scope in which the `EntityCache` should perform a `get` operation pub enum GetScope { /// Get from all previously stored entities in the store @@ -105,6 +109,10 @@ pub struct EntityCache { /// generated IDs, the `EntityCache` needs to be newly instantiated for /// each block seq: u32, + + // Sequence number of the next VID value for this block. The value written + // in the database consist of a block number and this SEQ number. + pub vid_seq: u32, } impl Debug for EntityCache { @@ -132,6 +140,7 @@ impl EntityCache { schema: store.input_schema(), store, seq: 0, + vid_seq: RESERVED_VIDS, } } @@ -152,6 +161,7 @@ impl EntityCache { schema: store.input_schema(), store, seq: 0, + vid_seq: RESERVED_VIDS, } } @@ -278,7 +288,7 @@ impl EntityCache { ) -> Result, anyhow::Error> { match op { EntityOp::Update(entity) | EntityOp::Overwrite(entity) - if query.matches(key, entity) => + if query.matches(key, &entity) => { Ok(Some(entity.clone())) } @@ -349,10 +359,23 @@ impl EntityCache { /// with existing data. The entity will be validated against the /// subgraph schema, and any errors will result in an `Err` being /// returned. - pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> { + pub fn set( + &mut self, + key: EntityKey, + entity: Entity, + block: BlockNumber, + ) -> Result<(), anyhow::Error> { // check the validate for derived fields let is_valid = entity.validate(&key).is_ok(); + // The next VID is based on a block number and a sequence within the block + let vid = ((block as i64) << 32) + self.vid_seq as i64; + self.vid_seq += 1; + let mut entity = entity; + let old_vid = entity.set_vid(vid).expect("the vid should be set"); + // Make sure that there was no VID previously set for this entity. + assert!(old_vid.is_none()); + self.entity_op(key.clone(), EntityOp::Update(entity)); // The updates we were given are not valid by themselves; force a @@ -489,7 +512,7 @@ impl EntityCache { // Entity was removed and then updated, so it will be overwritten (Some(current), EntityOp::Overwrite(data)) => { let data = Arc::new(data); - self.current.insert(key.clone(), Some(data.clone())); + self.current.insert(key.clone(), Some(data.cheap_clone())); if current != data { Some(Overwrite { key, diff --git a/graph/src/components/subgraph/proof_of_indexing/online.rs b/graph/src/components/subgraph/proof_of_indexing/online.rs index f90fac969cf..d47f08b0a8f 100644 --- a/graph/src/components/subgraph/proof_of_indexing/online.rs +++ b/graph/src/components/subgraph/proof_of_indexing/online.rs @@ -242,6 +242,10 @@ impl ProofOfIndexing { pub fn take(self) -> HashMap { self.per_causality_region } + + pub fn get_block(&self) -> BlockNumber { + self.block_number + } } pub struct ProofOfIndexingFinisher { diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index 33d9286ceec..b16f6a2866d 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -910,6 +910,29 @@ impl Entity { Id::try_from(self.get("id").unwrap().clone()).expect("the id is set to a valid value") } + /// Return the VID of this entity and if its missing or of a type different than + /// i64 it panics. + pub fn vid(&self) -> i64 { + self.get("vid") + .expect("the vid is set") + .as_int8() + .expect("the vid is set to a valid value") + } + + /// Sets the VID of the entity. The previous one is returned. + pub fn set_vid(&mut self, value: i64) -> Result, InternError> { + self.0.insert("vid", value.into()) + } + + /// Sets the VID if it's not already set. Should be used only for tests. + #[cfg(debug_assertions)] + pub fn set_vid_if_empty(&mut self) { + let vid = self.get("vid"); + if vid.is_none() { + let _ = self.set_vid(100).expect("the vid should be set"); + } + } + /// Merges an entity update `update` into this entity. /// /// If a key exists in both entities, the value from `update` is chosen. diff --git a/graph/src/data/subgraph/api_version.rs b/graph/src/data/subgraph/api_version.rs index 43ee639007c..dcfdb329476 100644 --- a/graph/src/data/subgraph/api_version.rs +++ b/graph/src/data/subgraph/api_version.rs @@ -55,10 +55,12 @@ pub const SPEC_VERSION_1_1_0: Version = Version::new(1, 1, 0); pub const SPEC_VERSION_1_2_0: Version = Version::new(1, 2, 0); // Enables subgraphs as datasource +// Change the way the VID field is generated. It used to be autoincrement. Now its +// based on block number and the sequence of the entities in a block. pub const SPEC_VERSION_1_3_0: Version = Version::new(1, 3, 0); // The latest spec version available -pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_2_0; +pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_3_0; pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2); diff --git a/graph/src/schema/input/mod.rs b/graph/src/schema/input/mod.rs index 84897299785..440873f47d6 100644 --- a/graph/src/schema/input/mod.rs +++ b/graph/src/schema/input/mod.rs @@ -35,6 +35,7 @@ pub(crate) const POI_OBJECT: &str = "Poi$"; const POI_DIGEST: &str = "digest"; /// The name of the PoI attribute for storing the block time const POI_BLOCK_TIME: &str = "blockTime"; +const VID_FIELD: &str = "vid"; pub mod kw { pub const ENTITY: &str = "entity"; @@ -1597,6 +1598,8 @@ fn atom_pool(document: &s::Document) -> AtomPool { pool.intern(POI_DIGEST); pool.intern(POI_BLOCK_TIME); + pool.intern(VID_FIELD); + for definition in &document.definitions { match definition { s::Definition::TypeDefinition(typedef) => match typedef { diff --git a/runtime/test/src/test.rs b/runtime/test/src/test.rs index 91895e07725..b926562e0d4 100644 --- a/runtime/test/src/test.rs +++ b/runtime/test/src/test.rs @@ -476,13 +476,13 @@ async fn test_ipfs_block() { // The user_data value we use with calls to ipfs_map const USER_DATA: &str = "user_data"; -fn make_thing(id: &str, value: &str) -> (String, EntityModification) { +fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) { const DOCUMENT: &str = " type Thing @entity { id: String!, value: String!, extra: String }"; lazy_static! { static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter"); static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap(); } - let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA }; + let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid: vid }; let key = THING_TYPE.parse_key(id).unwrap(); ( format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value), @@ -552,8 +552,8 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) { let subgraph_id = "ipfsMap"; // Try it with two valid objects - let (str1, thing1) = make_thing("one", "eins"); - let (str2, thing2) = make_thing("two", "zwei"); + let (str1, thing1) = make_thing("one", "eins", 100); + let (str2, thing2) = make_thing("two", "zwei", 100); let ops = run_ipfs_map( ipfs.clone(), subgraph_id, @@ -1022,8 +1022,8 @@ async fn test_entity_store(api_version: Version) { let schema = store.input_schema(&deployment.hash).unwrap(); - let alex = entity! { schema => id: "alex", name: "Alex" }; - let steve = entity! { schema => id: "steve", name: "Steve" }; + let alex = entity! { schema => id: "alex", name: "Alex", vid: 0i64 }; + let steve = entity! { schema => id: "steve", name: "Steve", vid: 1i64 }; let user_type = schema.entity_type("User").unwrap(); test_store::insert_entities( &deployment, diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 4d050db23de..3de534b9069 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -350,7 +350,7 @@ impl HostExports { state.metrics.track_entity_write(&entity_type, &entity); - state.entity_cache.set(key, entity)?; + state.entity_cache.set(key, entity, block)?; Ok(()) } diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index aa3aefd3561..38a7ac13ff7 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -116,12 +116,18 @@ impl Table { Ok(cols) } + let vid_type = if !self.object.is_object_type() { + "bigserial" + } else { + "bigint" + }; + if self.immutable { writeln!( out, " create table {qname} ( - {vid} bigserial primary key, + {vid} {vid_type} primary key, {block} int not null,\n\ {cols}, unique({id}) @@ -129,6 +135,7 @@ impl Table { qname = self.qualified_name, cols = columns_ddl(self)?, vid = VID_COLUMN, + vid_type = vid_type, block = BLOCK_COLUMN, id = self.primary_key().name ) @@ -137,13 +144,14 @@ impl Table { out, r#" create table {qname} ( - {vid} bigserial primary key, + {vid} {vid_type} primary key, {block_range} int4range not null, {cols} );"#, qname = self.qualified_name, cols = columns_ddl(self)?, vid = VID_COLUMN, + vid_type = vid_type, block_range = BLOCK_RANGE_COLUMN )?; diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index b7f9b44afac..86e9f232d49 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -384,7 +384,7 @@ create type sgd0815."size" as enum ('large', 'medium', 'small'); create table "sgd0815"."thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "big_thing" text not null @@ -405,7 +405,7 @@ create index attr_0_1_thing_big_thing create table "sgd0815"."scalar" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "bool" boolean, @@ -444,7 +444,7 @@ create index attr_1_7_scalar_color create table "sgd0815"."file_thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, causality_region int not null, "id" text not null @@ -469,7 +469,7 @@ create type sgd0815."size" as enum ('large', 'medium', 'small'); create table "sgd0815"."thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "big_thing" text not null @@ -490,7 +490,7 @@ create index attr_0_1_thing_big_thing create table "sgd0815"."scalar" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "bool" boolean, @@ -515,7 +515,7 @@ create index attr_1_0_scalar_id create table "sgd0815"."file_thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, causality_region int not null, "id" text not null @@ -575,7 +575,7 @@ type SongStat @entity { played: Int! }"#; const MUSIC_DDL: &str = r#"create table "sgd0815"."musician" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "name" text not null, @@ -598,7 +598,7 @@ create index attr_0_2_musician_main_band on "sgd0815"."musician" using gist("main_band", block_range); create table "sgd0815"."band" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "name" text not null, @@ -618,8 +618,8 @@ create index attr_1_1_band_name on "sgd0815"."band" using btree(left("name", 256)); create table "sgd0815"."song" ( - vid bigserial primary key, - block$ int not null, + vid bigint primary key, + block$ int not null, "id" text not null, "title" text not null, "written_by" text not null, @@ -634,7 +634,7 @@ create index attr_2_1_song_written_by on "sgd0815"."song" using btree("written_by", block$); create table "sgd0815"."song_stat" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "played" int4 not null @@ -676,7 +676,7 @@ type Habitat @entity { }"#; const FOREST_DDL: &str = r#"create table "sgd0815"."animal" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "forest" text @@ -695,8 +695,8 @@ create index attr_0_1_animal_forest on "sgd0815"."animal" using gist("forest", block_range); create table "sgd0815"."forest" ( - vid bigserial primary key, - block_range int4range not null, + vid bigint primary key, + block_range int4range not null, "id" text not null ); alter table "sgd0815"."forest" @@ -711,7 +711,7 @@ create index attr_1_0_forest_id on "sgd0815"."forest" using btree("id"); create table "sgd0815"."habitat" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "most_common" text not null, @@ -763,7 +763,7 @@ type Habitat @entity { }"#; const FULLTEXT_DDL: &str = r#"create table "sgd0815"."animal" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "name" text not null, @@ -791,7 +791,7 @@ create index attr_0_4_animal_search on "sgd0815"."animal" using gin("search"); create table "sgd0815"."forest" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null ); @@ -808,7 +808,7 @@ create index attr_1_0_forest_id on "sgd0815"."forest" using btree("id"); create table "sgd0815"."habitat" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "most_common" text not null, @@ -843,7 +843,7 @@ enum Orientation { const FORWARD_ENUM_SQL: &str = r#"create type sgd0815."orientation" as enum ('DOWN', 'UP'); create table "sgd0815"."thing" ( - vid bigserial primary key, + vid bigint primary key, block_range int4range not null, "id" text not null, "orientation" "sgd0815"."orientation" not null @@ -880,8 +880,8 @@ type Stats @aggregation(intervals: ["hour", "day"], source: "Data") { const TS_SQL: &str = r#" create table "sgd0815"."data" ( - vid bigserial primary key, - block$ int not null, + vid bigint primary key, + block$ int not null, "id" int8 not null, "timestamp" timestamptz not null, "amount" numeric not null, @@ -896,7 +896,7 @@ create index attr_0_1_data_amount create table "sgd0815"."stats_hour" ( vid bigserial primary key, - block$ int not null, + block$ int not null, "id" int8 not null, "timestamp" timestamptz not null, "volume" numeric not null, @@ -971,9 +971,9 @@ const LIFETIME_GQL: &str = r#" const LIFETIME_SQL: &str = r#" create table "sgd0815"."data" ( - vid bigserial primary key, - block$ int not null, -"id" int8 not null, + vid bigint primary key, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_1" int4 not null, "group_2" int4 not null, @@ -993,8 +993,8 @@ on "sgd0815"."data" using btree("amount"); create table "sgd0815"."stats_1_hour" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "volume" numeric not null, unique(id) @@ -1009,8 +1009,8 @@ on "sgd0815"."stats_1_hour" using btree("volume"); create table "sgd0815"."stats_1_day" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "volume" numeric not null, unique(id) @@ -1025,8 +1025,8 @@ on "sgd0815"."stats_1_day" using btree("volume"); create table "sgd0815"."stats_2_hour" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_1" int4 not null, "volume" numeric not null, @@ -1045,8 +1045,8 @@ on "sgd0815"."stats_2_hour"(group_1, timestamp); create table "sgd0815"."stats_2_day" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_1" int4 not null, "volume" numeric not null, @@ -1065,8 +1065,8 @@ on "sgd0815"."stats_2_day"(group_1, timestamp); create table "sgd0815"."stats_3_hour" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_2" int4 not null, "group_1" int4 not null, @@ -1088,8 +1088,8 @@ on "sgd0815"."stats_3_hour"(group_2, group_1, timestamp); create table "sgd0815"."stats_3_day" ( vid bigserial primary key, - block$ int not null, -"id" int8 not null, + block$ int not null, + "id" int8 not null, "timestamp" timestamptz not null, "group_2" int4 not null, "group_1" int4 not null, diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 6b5fcdc6940..39337d2a485 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -17,12 +17,7 @@ use graph::{ }; use itertools::Itertools; -use crate::{ - catalog, - copy::AdaptiveBatchSize, - deployment, - relational::{Table, VID_COLUMN}, -}; +use crate::{catalog, copy::AdaptiveBatchSize, deployment, relational::Table}; use super::{Catalog, Layout, Namespace}; @@ -73,7 +68,6 @@ struct TablePair { // has the same name as `src` but is in a different namespace dst: Arc, src_nsp: Namespace, - dst_nsp: Namespace, } impl TablePair { @@ -100,12 +94,7 @@ impl TablePair { } conn.batch_execute(&query)?; - Ok(TablePair { - src, - dst, - src_nsp, - dst_nsp, - }) + Ok(TablePair { src, dst, src_nsp }) } /// Copy all entity versions visible between `earliest_block` and @@ -239,10 +228,6 @@ impl TablePair { let src_qname = &self.src.qualified_name; let dst_qname = &self.dst.qualified_name; let src_nsp = &self.src_nsp; - let dst_nsp = &self.dst_nsp; - - let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name); - let mut query = String::new(); // What we are about to do would get blocked by autovacuum on our @@ -252,13 +237,6 @@ impl TablePair { "src" => src_nsp.as_str(), "error" => e.to_string()); } - // Make sure the vid sequence - // continues from where it was - writeln!( - query, - "select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));" - )?; - writeln!(query, "drop table {src_qname};")?; writeln!(query, "alter table {dst_qname} set schema {src_nsp}")?; conn.transaction(|conn| conn.batch_execute(&query))?; diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 0462bca9e13..08127dff2c5 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -539,7 +539,14 @@ impl EntityData { // table column; those will be things like the // block_range that `select *` pulls in but that we // don't care about here - if let Some(column) = table.column(&SqlName::verbatim(key)) { + if key == "vid" { + // VID is not in the input schema but we need it, so deserialize it too + match T::Value::from_column_value(&ColumnType::Int8, json) { + Ok(value) if value.is_null() => None, + Ok(value) => Some(Ok((Word::from("vid"), value))), + Err(e) => Some(Err(e)), + } + } else if let Some(column) = table.column(&SqlName::verbatim(key)) { match T::Value::from_column_value(&column.column_type, json) { Ok(value) if value.is_null() => None, Ok(value) => Some(Ok((Word::from(column.field.to_string()), value))), @@ -2413,6 +2420,7 @@ struct InsertRow<'a> { values: Vec>, br_value: BlockRangeValue, causality_region: CausalityRegion, + vid: i64, } impl<'a> InsertRow<'a> { @@ -2449,10 +2457,12 @@ impl<'a> InsertRow<'a> { } let br_value = BlockRangeValue::new(table, row.block, row.end); let causality_region = row.causality_region; + let vid = row.entity.vid(); Ok(Self { values, br_value, causality_region, + vid, }) } } @@ -2563,6 +2573,7 @@ impl<'a> QueryFragment for InsertQuery<'a> { out.push_sql(CAUSALITY_REGION_COLUMN); }; + out.push_sql(", vid"); out.push_sql(") values\n"); for (i, row) in self.rows.iter().enumerate() { @@ -2580,6 +2591,8 @@ impl<'a> QueryFragment for InsertQuery<'a> { out.push_sql(", "); out.push_bind_param::(&row.causality_region)?; }; + out.push_sql(", "); + out.push_bind_param::(&row.vid)?; out.push_sql(")"); } @@ -5097,6 +5110,7 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { out.push_sql(", "); out.push_sql(CAUSALITY_REGION_COLUMN); }; + out.push_sql(", vid"); out.push_sql(")\nselect "); for column in &self.columns { @@ -5162,6 +5176,8 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { )); } } + out.push_sql(", vid"); + out.push_sql(" from "); out.push_sql(self.src.qualified_name.as_str()); out.push_sql(" where vid >= "); diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index 2921d375286..5475a29db2c 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -422,12 +422,13 @@ pub async fn insert_entities( deployment: &DeploymentLocator, entities: Vec<(EntityType, Entity)>, ) -> Result<(), StoreError> { - let insert_ops = entities - .into_iter() - .map(|(entity_type, data)| EntityOperation::Set { + let insert_ops = entities.into_iter().map(|(entity_type, mut data)| { + data.set_vid_if_empty(); + EntityOperation::Set { key: entity_type.key(data.id()), data, - }); + } + }); transact_entity_operations( &SUBGRAPH_STORE, diff --git a/store/test-store/tests/core/interfaces.rs b/store/test-store/tests/core/interfaces.rs index 78eb2fda390..a4fc8314665 100644 --- a/store/test-store/tests/core/interfaces.rs +++ b/store/test-store/tests/core/interfaces.rs @@ -201,9 +201,9 @@ async fn reference_interface_derived() { let query = "query { events { id transaction { id } } }"; - let buy = ("BuyEvent", entity! { schema => id: "buy" }); - let sell1 = ("SellEvent", entity! { schema => id: "sell1" }); - let sell2 = ("SellEvent", entity! { schema => id: "sell2" }); + let buy = ("BuyEvent", entity! { schema => id: "buy", vid: 0i64 }); + let sell1 = ("SellEvent", entity! { schema => id: "sell1", vid: 1i64 }); + let sell2 = ("SellEvent", entity! { schema => id: "sell2", vid: 2i64 }); let gift = ( "GiftEvent", entity! { schema => id: "gift", transaction: "txn" }, @@ -278,11 +278,11 @@ async fn follow_interface_reference() { let parent = ( "Animal", - entity! { schema => id: "parent", legs: 4, parent: Value::Null }, + entity! { schema => id: "parent", legs: 4, parent: Value::Null, vid: 0i64}, ); let child = ( "Animal", - entity! { schema => id: "child", legs: 3, parent: "parent" }, + entity! { schema => id: "child", legs: 3, parent: "parent" , vid: 1i64}, ); let res = insert_and_query(subgraph_id, document, vec![parent, child], query) @@ -459,16 +459,16 @@ async fn interface_inline_fragment_with_subquery() { "; let schema = InputSchema::raw(document, subgraph_id); - let mama_cow = ("Parent", entity! { schema => id: "mama_cow" }); + let mama_cow = ("Parent", entity! { schema => id: "mama_cow", vid: 0i64 }); let cow = ( "Animal", - entity! { schema => id: "1", name: "cow", legs: 4, parent: "mama_cow" }, + entity! { schema => id: "1", name: "cow", legs: 4, parent: "mama_cow", vid: 0i64 }, ); - let mama_bird = ("Parent", entity! { schema => id: "mama_bird" }); + let mama_bird = ("Parent", entity! { schema => id: "mama_bird", vid: 1i64 }); let bird = ( "Bird", - entity! { schema => id: "2", airspeed: 5, legs: 2, parent: "mama_bird" }, + entity! { schema => id: "2", airspeed: 5, legs: 2, parent: "mama_bird", vid: 1i64 }, ); let query = "query { leggeds(orderBy: legs) { legs ... on Bird { airspeed parent { id } } } }"; @@ -545,11 +545,11 @@ async fn alias() { let parent = ( "Animal", - entity! { schema => id: "parent", legs: 4, parent: Value::Null }, + entity! { schema => id: "parent", legs: 4, parent: Value::Null, vid: 0i64 }, ); let child = ( "Animal", - entity! { schema => id: "child", legs: 3, parent: "parent" }, + entity! { schema => id: "child", legs: 3, parent: "parent", vid: 1i64 }, ); let res = insert_and_query(subgraph_id, document, vec![parent, child], query) @@ -608,9 +608,15 @@ async fn fragments_dont_panic() { "; // The panic manifests if two parents exist. - let parent = ("Parent", entity! { schema => id: "p", child: "c" }); - let parent2 = ("Parent", entity! { schema => id: "p2", child: Value::Null }); - let child = ("Child", entity! { schema => id:"c" }); + let parent = ( + "Parent", + entity! { schema => id: "p", child: "c", vid: 0i64 }, + ); + let parent2 = ( + "Parent", + entity! { schema => id: "p2", child: Value::Null, vid: 1i64 }, + ); + let child = ("Child", entity! { schema => id:"c", vid: 2i64 }); let res = insert_and_query(subgraph_id, document, vec![parent, parent2, child], query) .await @@ -668,12 +674,15 @@ async fn fragments_dont_duplicate_data() { "; // This bug manifests if two parents exist. - let parent = ("Parent", entity! { schema => id: "p", children: vec!["c"] }); + let parent = ( + "Parent", + entity! { schema => id: "p", children: vec!["c"], vid: 0i64 }, + ); let parent2 = ( "Parent", - entity! { schema => id: "b", children: Vec::::new() }, + entity! { schema => id: "b", children: Vec::::new(), vid: 1i64 }, ); - let child = ("Child", entity! { schema => id:"c" }); + let child = ("Child", entity! { schema => id:"c", vid: 2i64 }); let res = insert_and_query(subgraph_id, document, vec![parent, parent2, child], query) .await @@ -721,11 +730,11 @@ async fn redundant_fields() { let parent = ( "Animal", - entity! { schema => id: "parent", parent: Value::Null }, + entity! { schema => id: "parent", parent: Value::Null, vid: 0i64 }, ); let child = ( "Animal", - entity! { schema => id: "child", parent: "parent" }, + entity! { schema => id: "child", parent: "parent", vid: 1i64 }, ); let res = insert_and_query(subgraph_id, document, vec![parent, child], query) @@ -783,8 +792,11 @@ async fn fragments_merge_selections() { } "; - let parent = ("Parent", entity! { schema => id: "p", children: vec!["c"] }); - let child = ("Child", entity! { schema => id: "c", foo: 1 }); + let parent = ( + "Parent", + entity! { schema => id: "p", children: vec!["c"], vid: 0i64 }, + ); + let child = ("Child", entity! { schema => id: "c", foo: 1, vid: 1i64 }); let res = insert_and_query(subgraph_id, document, vec![parent, child], query) .await @@ -1081,11 +1093,11 @@ async fn enums() { let entities = vec![ ( "Trajectory", - entity! { schema => id: "1", direction: "EAST", meters: 10 }, + entity! { schema => id: "1", direction: "EAST", meters: 10, vid: 0i64 }, ), ( "Trajectory", - entity! { schema => id: "2", direction: "NORTH", meters: 15 }, + entity! { schema => id: "2", direction: "NORTH", meters: 15, vid: 1i64 }, ), ]; let query = "query { trajectories { id, direction, meters } }"; @@ -1134,15 +1146,15 @@ async fn enum_list_filters() { let entities = vec![ ( "Trajectory", - entity! { schema => id: "1", direction: "EAST", meters: 10 }, + entity! { schema => id: "1", direction: "EAST", meters: 10, vid: 0i64 }, ), ( "Trajectory", - entity! { schema => id: "2", direction: "NORTH", meters: 15 }, + entity! { schema => id: "2", direction: "NORTH", meters: 15, vid: 1i64 }, ), ( "Trajectory", - entity! { schema => id: "3", direction: "WEST", meters: 20 }, + entity! { schema => id: "3", direction: "WEST", meters: 20, vid: 2i64 }, ), ]; @@ -1327,8 +1339,8 @@ async fn derived_interface_bytes() { let entities = vec![ ("Pool", entity! { schema => id: b("0xf001") }), - ("Sell", entity! { schema => id: b("0xc0"), pool: "0xf001"}), - ("Buy", entity! { schema => id: b("0xb0"), pool: "0xf001"}), + ("Sell", entity! { schema => id: b("0xc0"), pool: "0xf001" }), + ("Buy", entity! { schema => id: b("0xb0"), pool: "0xf001" }), ]; let res = insert_and_query(subgraph_id, document, entities, query) diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index d7ebb30785c..ab3cc5ed02e 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -207,16 +207,21 @@ fn insert_modifications() { let store = Arc::new(store); let mut cache = EntityCache::new(store); - let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai" }; + let mut mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai" }; let mogwai_key = make_band_key("mogwai"); - cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap(); + cache + .set(mogwai_key.clone(), mogwai_data.clone(), 0) + .unwrap(); - let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" }; + let mut sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" }; let sigurros_key = make_band_key("sigurros"); cache - .set(sigurros_key.clone(), sigurros_data.clone()) + .set(sigurros_key.clone(), sigurros_data.clone(), 0) .unwrap(); + mogwai_data.set_vid(100).unwrap(); + sigurros_data.set_vid(101).unwrap(); + let result = cache.as_modifications(0); assert_eq!( sort_by_entity_key(result.unwrap().modifications), @@ -251,16 +256,21 @@ fn overwrite_modifications() { let store = Arc::new(store); let mut cache = EntityCache::new(store); - let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }; + let mut mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }; let mogwai_key = make_band_key("mogwai"); - cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap(); + cache + .set(mogwai_key.clone(), mogwai_data.clone(), 0) + .unwrap(); - let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994 }; + let mut sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994}; let sigurros_key = make_band_key("sigurros"); cache - .set(sigurros_key.clone(), sigurros_data.clone()) + .set(sigurros_key.clone(), sigurros_data.clone(), 0) .unwrap(); + mogwai_data.set_vid(100).unwrap(); + sigurros_data.set_vid(101).unwrap(); + let result = cache.as_modifications(0); assert_eq!( sort_by_entity_key(result.unwrap().modifications), @@ -289,12 +299,12 @@ fn consecutive_modifications() { let update_data = entity! { SCHEMA => id: "mogwai", founded: 1995, label: "Rock Action Records" }; let update_key = make_band_key("mogwai"); - cache.set(update_key, update_data).unwrap(); + cache.set(update_key, update_data, 0).unwrap(); // Then, just reset the "label". let update_data = entity! { SCHEMA => id: "mogwai", label: Value::Null }; let update_key = make_band_key("mogwai"); - cache.set(update_key.clone(), update_data).unwrap(); + cache.set(update_key.clone(), update_data, 0).unwrap(); // We expect a single overwrite modification for the above that leaves "id" // and "name" untouched, sets "founded" and removes the "label" field. @@ -303,8 +313,8 @@ fn consecutive_modifications() { sort_by_entity_key(result.unwrap().modifications), sort_by_entity_key(vec![EntityModification::overwrite( update_key, - entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }, - 0 + entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995, vid: 101i64 }, + 0, )]) ); } @@ -428,17 +438,17 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator .unwrap(); // 1 account 3 wallets - let test_entity_1 = create_account_entity("1", "Johnton", "tonofjohn@email.com", 67_i32); + let test_entity_1 = create_account_entity("1", "Johnton", "tonofjohn@email.com", 67_i32, 1); let id_one = WALLET_TYPE.parse_id("1").unwrap(); - let wallet_entity_1 = create_wallet_operation("1", &id_one, 67_i32); - let wallet_entity_2 = create_wallet_operation("2", &id_one, 92_i32); - let wallet_entity_3 = create_wallet_operation("3", &id_one, 192_i32); + let wallet_entity_1 = create_wallet_operation("1", &id_one, 67_i32, 1); + let wallet_entity_2 = create_wallet_operation("2", &id_one, 92_i32, 2); + let wallet_entity_3 = create_wallet_operation("3", &id_one, 192_i32, 3); // 1 account 1 wallet - let test_entity_2 = create_account_entity("2", "Cindini", "dinici@email.com", 42_i32); + let test_entity_2 = create_account_entity("2", "Cindini", "dinici@email.com", 42_i32, 2); let id_two = WALLET_TYPE.parse_id("2").unwrap(); - let wallet_entity_4 = create_wallet_operation("4", &id_two, 32_i32); + let wallet_entity_4 = create_wallet_operation("4", &id_two, 32_i32, 4); // 1 account 0 wallets - let test_entity_3 = create_account_entity("3", "Shaqueeena", "queensha@email.com", 28_i32); + let test_entity_3 = create_account_entity("3", "Shaqueeena", "queensha@email.com", 28_i32, 3); transact_entity_operations( &store, &deployment, @@ -458,9 +468,9 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator deployment } -fn create_account_entity(id: &str, name: &str, email: &str, age: i32) -> EntityOperation { +fn create_account_entity(id: &str, name: &str, email: &str, age: i32, vid: i64) -> EntityOperation { let test_entity = - entity! { LOAD_RELATED_SUBGRAPH => id: id, name: name, email: email, age: age }; + entity! { LOAD_RELATED_SUBGRAPH => id: id, name: name, email: email, age: age, vid: vid}; EntityOperation::Set { key: ACCOUNT_TYPE.parse_key(id).unwrap(), @@ -468,12 +478,18 @@ fn create_account_entity(id: &str, name: &str, email: &str, age: i32) -> EntityO } } -fn create_wallet_entity(id: &str, account_id: &Id, balance: i32) -> Entity { +fn create_wallet_entity(id: &str, account_id: &Id, balance: i32, vid: i64) -> Entity { let account_id = Value::from(account_id.clone()); - entity! { LOAD_RELATED_SUBGRAPH => id: id, account: account_id, balance: balance } + entity! { LOAD_RELATED_SUBGRAPH => id: id, account: account_id, balance: balance, vid: vid} } -fn create_wallet_operation(id: &str, account_id: &Id, balance: i32) -> EntityOperation { - let test_wallet = create_wallet_entity(id, account_id, balance); + +fn create_wallet_entity_no_vid(id: &str, account_id: &Id, balance: i32) -> Entity { + let account_id = Value::from(account_id.clone()); + entity! { LOAD_RELATED_SUBGRAPH => id: id, account: account_id, balance: balance} +} + +fn create_wallet_operation(id: &str, account_id: &Id, balance: i32, vid: i64) -> EntityOperation { + let test_wallet = create_wallet_entity(id, account_id, balance, vid); EntityOperation::Set { key: WALLET_TYPE.parse_key(id).unwrap(), data: test_wallet, @@ -491,9 +507,9 @@ fn check_for_account_with_multiple_wallets() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("1", &account_id, 67_i32); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_1 = create_wallet_entity("1", &account_id, 67_i32, 1); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; assert_eq!(result, expeted_vec); @@ -511,7 +527,7 @@ fn check_for_account_with_single_wallet() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("4", &account_id, 32_i32); + let wallet_1 = create_wallet_entity("4", &account_id, 32_i32, 4); let expeted_vec = vec![wallet_1]; assert_eq!(result, expeted_vec); @@ -577,8 +593,8 @@ fn check_for_insert_async_store() { run_store_test(|mut cache, store, deployment, _writable| async move { let account_id = ACCOUNT_TYPE.parse_id("2").unwrap(); // insert a new wallet - let wallet_entity_5 = create_wallet_operation("5", &account_id, 79_i32); - let wallet_entity_6 = create_wallet_operation("6", &account_id, 200_i32); + let wallet_entity_5 = create_wallet_operation("5", &account_id, 79_i32, 12); + let wallet_entity_6 = create_wallet_operation("6", &account_id, 200_i32, 13); transact_entity_operations( &store, @@ -595,9 +611,9 @@ fn check_for_insert_async_store() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("4", &account_id, 32_i32); - let wallet_2 = create_wallet_entity("5", &account_id, 79_i32); - let wallet_3 = create_wallet_entity("6", &account_id, 200_i32); + let wallet_1 = create_wallet_entity("4", &account_id, 32_i32, 4); + let wallet_2 = create_wallet_entity("5", &account_id, 79_i32, 12); + let wallet_3 = create_wallet_entity("6", &account_id, 200_i32, 13); let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; assert_eq!(result, expeted_vec); @@ -608,8 +624,8 @@ fn check_for_insert_async_not_related() { run_store_test(|mut cache, store, deployment, _writable| async move { let account_id = ACCOUNT_TYPE.parse_id("2").unwrap(); // insert a new wallet - let wallet_entity_5 = create_wallet_operation("5", &account_id, 79_i32); - let wallet_entity_6 = create_wallet_operation("6", &account_id, 200_i32); + let wallet_entity_5 = create_wallet_operation("5", &account_id, 79_i32, 5); + let wallet_entity_6 = create_wallet_operation("6", &account_id, 200_i32, 6); transact_entity_operations( &store, @@ -627,9 +643,9 @@ fn check_for_insert_async_not_related() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_1 = create_wallet_entity("1", &account_id, 67_i32); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_1 = create_wallet_entity("1", &account_id, 67_i32, 1); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; assert_eq!(result, expeted_vec); @@ -641,7 +657,7 @@ fn check_for_update_async_related() { run_store_test(|mut cache, store, deployment, writable| async move { let entity_key = WALLET_TYPE.parse_key("1").unwrap(); let account_id = entity_key.entity_id.clone(); - let wallet_entity_update = create_wallet_operation("1", &account_id, 79_i32); + let wallet_entity_update = create_wallet_operation("1", &account_id, 79_i32, 11); let new_data = match wallet_entity_update { EntityOperation::Set { ref data, .. } => data.clone(), @@ -665,8 +681,8 @@ fn check_for_update_async_related() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); let expeted_vec = vec![new_data, wallet_2, wallet_3]; assert_eq!(result, expeted_vec); @@ -695,40 +711,43 @@ fn check_for_delete_async_related() { causality_region: CausalityRegion::ONCHAIN, }; let result = cache.load_related(&request).unwrap(); - let wallet_2 = create_wallet_entity("2", &account_id, 92_i32); - let wallet_3 = create_wallet_entity("3", &account_id, 192_i32); + let wallet_2 = create_wallet_entity("2", &account_id, 92_i32, 2); + let wallet_3 = create_wallet_entity("3", &account_id, 192_i32, 3); let expeted_vec = vec![wallet_2, wallet_3]; assert_eq!(result, expeted_vec); }); } - #[test] fn scoped_get() { run_store_test(|mut cache, _store, _deployment, _writable| async move { // Key for an existing entity that is in the store let account1 = ACCOUNT_TYPE.parse_id("1").unwrap(); let key1 = WALLET_TYPE.parse_key("1").unwrap(); - let wallet1 = create_wallet_entity("1", &account1, 67); + let wallet1 = create_wallet_entity_no_vid("1", &account1, 67); // Create a new entity that is not in the store let account5 = ACCOUNT_TYPE.parse_id("5").unwrap(); - let wallet5 = create_wallet_entity("5", &account5, 100); + let mut wallet5 = create_wallet_entity_no_vid("5", &account5, 100); let key5 = WALLET_TYPE.parse_key("5").unwrap(); - cache.set(key5.clone(), wallet5.clone()).unwrap(); + cache.set(key5.clone(), wallet5.clone(), 0).unwrap(); + wallet5.set_vid(100).unwrap(); // For the new entity, we can retrieve it with either scope let act5 = cache.get(&key5, GetScope::InBlock).unwrap(); assert_eq!(Some(&wallet5), act5.as_ref().map(|e| e.as_ref())); let act5 = cache.get(&key5, GetScope::Store).unwrap(); assert_eq!(Some(&wallet5), act5.as_ref().map(|e| e.as_ref())); + let mut wallet1a = wallet1.clone(); + wallet1a.set_vid(1).unwrap(); // For an entity in the store, we can not get it `InBlock` but with // `Store` let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); assert_eq!(None, act1); let act1 = cache.get(&key1, GetScope::Store).unwrap(); - assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref())); + assert_eq!(Some(&wallet1a), act1.as_ref().map(|e| e.as_ref())); + // Even after reading from the store, the entity is not visible with // `InBlock` let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); @@ -736,11 +755,13 @@ fn scoped_get() { // But if it gets updated, it becomes visible with either scope let mut wallet1 = wallet1; wallet1.set("balance", 70).unwrap(); - cache.set(key1.clone(), wallet1.clone()).unwrap(); + cache.set(key1.clone(), wallet1.clone(), 0).unwrap(); + wallet1a = wallet1; + wallet1a.set_vid(101).unwrap(); let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); - assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref())); + assert_eq!(Some(&wallet1a), act1.as_ref().map(|e| e.as_ref())); let act1 = cache.get(&key1, GetScope::Store).unwrap(); - assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref())); + assert_eq!(Some(&wallet1a), act1.as_ref().map(|e| e.as_ref())); }) } @@ -783,6 +804,6 @@ fn no_interface_mods() { let entity = entity! { LOAD_RELATED_SUBGRAPH => id: "1", balance: 100 }; - cache.set(key, entity).unwrap_err(); + cache.set(key, entity, 0).unwrap_err(); }) } diff --git a/store/test-store/tests/graphql/query.rs b/store/test-store/tests/graphql/query.rs index 08ad26ef9b9..d7e7dec8f55 100644 --- a/store/test-store/tests/graphql/query.rs +++ b/store/test-store/tests/graphql/query.rs @@ -424,9 +424,12 @@ async fn insert_test_entities( .into_iter() .map(|(typename, entities)| { let entity_type = schema.entity_type(typename).unwrap(); - entities.into_iter().map(move |data| EntityOperation::Set { - key: entity_type.key(data.id()), - data, + entities.into_iter().map(move |mut data| { + data.set_vid_if_empty(); + EntityOperation::Set { + key: entity_type.key(data.id()), + data, + } }) }) .flatten() @@ -468,115 +471,118 @@ async fn insert_test_entities( ( "Musician", vec![ - entity! { is => id: "m1", name: "John", mainBand: "b1", bands: vec!["b1", "b2"], favoriteCount: 10, birthDate: timestamp.clone() }, - entity! { is => id: "m2", name: "Lisa", mainBand: "b1", bands: vec!["b1"], favoriteCount: 100, birthDate: timestamp.clone() }, + entity! { is => id: "m1", name: "John", mainBand: "b1", bands: vec!["b1", "b2"], favoriteCount: 10, birthDate: timestamp.clone(), vid: 0i64 }, + entity! { is => id: "m2", name: "Lisa", mainBand: "b1", bands: vec!["b1"], favoriteCount: 100, birthDate: timestamp.clone(), vid: 1i64 }, ], ), - ("Publisher", vec![entity! { is => id: pub1 }]), + ("Publisher", vec![entity! { is => id: pub1, vid: 0i64 }]), ( "Band", vec![ - entity! { is => id: "b1", name: "The Musicians", originalSongs: vec![s[1], s[2]] }, - entity! { is => id: "b2", name: "The Amateurs", originalSongs: vec![s[1], s[3], s[4]] }, + entity! { is => id: "b1", name: "The Musicians", originalSongs: vec![s[1], s[2]], vid: 0i64 }, + entity! { is => id: "b2", name: "The Amateurs", originalSongs: vec![s[1], s[3], s[4]], vid: 1i64 }, ], ), ( "Song", vec![ - entity! { is => id: s[1], sid: "s1", title: "Cheesy Tune", publisher: pub1, writtenBy: "m1", media: vec![md[1], md[2]] }, - entity! { is => id: s[2], sid: "s2", title: "Rock Tune", publisher: pub1, writtenBy: "m2", media: vec![md[3], md[4]] }, - entity! { is => id: s[3], sid: "s3", title: "Pop Tune", publisher: pub1, writtenBy: "m1", media: vec![md[5]] }, - entity! { is => id: s[4], sid: "s4", title: "Folk Tune", publisher: pub1, writtenBy: "m3", media: vec![md[6]] }, + entity! { is => id: s[1], sid: "s1", title: "Cheesy Tune", publisher: pub1, writtenBy: "m1", media: vec![md[1], md[2]], vid: 0i64 }, + entity! { is => id: s[2], sid: "s2", title: "Rock Tune", publisher: pub1, writtenBy: "m2", media: vec![md[3], md[4]], vid: 1i64 }, + entity! { is => id: s[3], sid: "s3", title: "Pop Tune", publisher: pub1, writtenBy: "m1", media: vec![md[5]], vid: 2i64 }, + entity! { is => id: s[4], sid: "s4", title: "Folk Tune", publisher: pub1, writtenBy: "m3", media: vec![md[6]], vid: 3i64 }, ], ), ( "User", vec![ - entity! { is => id: "u1", name: "User 1", latestSongReview: "r3", latestBandReview: "r1", latestReview: "r3" }, + entity! { is => id: "u1", name: "User 1", latestSongReview: "r3", latestBandReview: "r1", latestReview: "r3", vid: 0i64 }, ], ), ( "SongStat", vec![ - entity! { is => id: s[1], played: 10 }, - entity! { is => id: s[2], played: 15 }, + entity! { is => id: s[1], played: 10, vid: 0i64 }, + entity! { is => id: s[2], played: 15, vid: 1i64 }, ], ), ( "BandReview", vec![ - entity! { is => id: "r1", body: "Bad musicians", band: "b1", author: "u1" }, - entity! { is => id: "r2", body: "Good amateurs", band: "b2", author: "u2" }, - entity! { is => id: "r5", body: "Very Bad musicians", band: "b1", author: "u3" }, + entity! { is => id: "r1", body: "Bad musicians", band: "b1", author: "u1", vid: 0i64 }, + entity! { is => id: "r2", body: "Good amateurs", band: "b2", author: "u2", vid: 1i64 }, + entity! { is => id: "r5", body: "Very Bad musicians", band: "b1", author: "u3", vid: 2i64 }, ], ), ( "SongReview", vec![ - entity! { is => id: "r3", body: "Bad", song: s[2], author: "u1" }, - entity! { is => id: "r4", body: "Good", song: s[3], author: "u2" }, - entity! { is => id: "r6", body: "Very Bad", song: s[2], author: "u3" }, + entity! { is => id: "r3", body: "Bad", song: s[2], author: "u1", vid: 0i64 }, + entity! { is => id: "r4", body: "Good", song: s[3], author: "u2", vid: 1i64 }, + entity! { is => id: "r6", body: "Very Bad", song: s[2], author: "u3", vid: 2i64 }, ], ), ( "User", vec![ - entity! { is => id: "u1", name: "Baden", latestSongReview: "r3", latestBandReview: "r1", latestReview: "r1" }, - entity! { is => id: "u2", name: "Goodwill", latestSongReview: "r4", latestBandReview: "r2", latestReview: "r2" }, + entity! { is => id: "u1", name: "Baden", latestSongReview: "r3", latestBandReview: "r1", latestReview: "r1", vid: 0i64 }, + entity! { is => id: "u2", name: "Goodwill", latestSongReview: "r4", latestBandReview: "r2", latestReview: "r2", vid: 1i64 }, ], ), ( "AnonymousUser", vec![ - entity! { is => id: "u3", name: "Anonymous 3", latestSongReview: "r6", latestBandReview: "r5", latestReview: "r5" }, + entity! { is => id: "u3", name: "Anonymous 3", latestSongReview: "r6", latestBandReview: "r5", latestReview: "r5", vid: 0i64 }, ], ), ( "Photo", vec![ - entity! { is => id: md[1], title: "Cheesy Tune Single Cover", author: "u1" }, - entity! { is => id: md[3], title: "Rock Tune Single Cover", author: "u1" }, - entity! { is => id: md[5], title: "Pop Tune Single Cover", author: "u1" }, + entity! { is => id: md[1], title: "Cheesy Tune Single Cover", author: "u1", vid: 0i64 }, + entity! { is => id: md[3], title: "Rock Tune Single Cover", author: "u1", vid: 1i64 }, + entity! { is => id: md[5], title: "Pop Tune Single Cover", author: "u1", vid: 2i64 }, ], ), ( "Video", vec![ - entity! { is => id: md[2], title: "Cheesy Tune Music Video", author: "u2" }, - entity! { is => id: md[4], title: "Rock Tune Music Video", author: "u2" }, - entity! { is => id: md[6], title: "Folk Tune Music Video", author: "u2" }, + entity! { is => id: md[2], title: "Cheesy Tune Music Video", author: "u2", vid: 0i64 }, + entity! { is => id: md[4], title: "Rock Tune Music Video", author: "u2", vid: 1i64 }, + entity! { is => id: md[6], title: "Folk Tune Music Video", author: "u2", vid: 2i64 }, ], ), ( "Album", - vec![entity! { is => id: "rl1", title: "Pop and Folk", songs: vec![s[3], s[4]] }], + vec![ + entity! { is => id: "rl1", title: "Pop and Folk", songs: vec![s[3], s[4]], vid: 0i64 }, + ], ), ( "Single", vec![ - entity! { is => id: "rl2", title: "Rock", songs: vec![s[2]] }, - entity! { is => id: "rl3", title: "Cheesy", songs: vec![s[1]] }, - entity! { is => id: "rl4", title: "Silence", songs: Vec::::new() }, + entity! { is => id: "rl2", title: "Rock", songs: vec![s[2]], vid: 0i64 }, + entity! { is => id: "rl3", title: "Cheesy", songs: vec![s[1]], vid: 1i64 }, + entity! { is => id: "rl4", title: "Silence", songs: Vec::::new(), vid: 2i64 }, ], ), ( "Plays", vec![ - entity! { is => id: 1i64, timestamp: ts0, song: s[1], user: "u1"}, - entity! { is => id: 2i64, timestamp: ts0, song: s[1], user: "u2"}, - entity! { is => id: 3i64, timestamp: ts0, song: s[2], user: "u1"}, - entity! { is => id: 4i64, timestamp: ts0, song: s[1], user: "u1"}, - entity! { is => id: 5i64, timestamp: ts0, song: s[1], user: "u1"}, + entity! { is => id: 1i64, timestamp: ts0, song: s[1], user: "u1", vid: 0i64 }, + entity! { is => id: 2i64, timestamp: ts0, song: s[1], user: "u2", vid: 1i64 }, + entity! { is => id: 3i64, timestamp: ts0, song: s[2], user: "u1", vid: 2i64 }, + entity! { is => id: 4i64, timestamp: ts0, song: s[1], user: "u1", vid: 3i64 }, + entity! { is => id: 5i64, timestamp: ts0, song: s[1], user: "u1", vid: 4i64 }, ], ), ]; + let entities0 = insert_ops(&manifest.schema, entities0); let entities1 = vec![( "Musician", vec![ - entity! { is => id: "m3", name: "Tom", mainBand: "b2", bands: vec!["b1", "b2"], favoriteCount: 5, birthDate: timestamp.clone() }, - entity! { is => id: "m4", name: "Valerie", bands: Vec::::new(), favoriteCount: 20, birthDate: timestamp.clone() }, + entity! { is => id: "m3", name: "Tom", mainBand: "b2", bands: vec!["b1", "b2"], favoriteCount: 5, birthDate: timestamp.clone(), vid: 2i64 }, + entity! { is => id: "m4", name: "Valerie", bands: Vec::::new(), favoriteCount: 20, birthDate: timestamp.clone(), vid: 3i64 }, ], )]; let entities1 = insert_ops(&manifest.schema, entities1); diff --git a/store/test-store/tests/postgres/aggregation.rs b/store/test-store/tests/postgres/aggregation.rs index 432bc685a62..b131cb4a323 100644 --- a/store/test-store/tests/postgres/aggregation.rs +++ b/store/test-store/tests/postgres/aggregation.rs @@ -79,9 +79,10 @@ pub async fn insert( let schema = ReadStore::input_schema(store); let ops = entities .into_iter() - .map(|data| { + .map(|mut data| { let data_type = schema.entity_type("Data").unwrap(); let key = data_type.key(data.id()); + data.set_vid_if_empty(); EntityOperation::Set { data, key } }) .collect(); @@ -125,8 +126,8 @@ async fn insert_test_data(store: Arc, deployment: DeploymentL let ts64 = TIMES[0]; let entities = vec![ - entity! { schema => id: 1i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(1), amount: bd(10) }, - entity! { schema => id: 2i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(1), amount: bd(1) }, + entity! { schema => id: 1i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(1), amount: bd(10), vid: 11i64 }, + entity! { schema => id: 2i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(1), amount: bd(1), vid: 12i64 }, ]; insert(&store, &deployment, BLOCKS[0].clone(), TIMES[0], entities) @@ -135,8 +136,8 @@ async fn insert_test_data(store: Arc, deployment: DeploymentL let ts64 = TIMES[1]; let entities = vec![ - entity! { schema => id: 11i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(2), amount: bd(2) }, - entity! { schema => id: 12i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(2), amount: bd(20) }, + entity! { schema => id: 11i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(2), amount: bd(2), vid: 21i64 }, + entity! { schema => id: 12i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(2), amount: bd(20), vid: 22i64 }, ]; insert(&store, &deployment, BLOCKS[1].clone(), TIMES[1], entities) .await @@ -144,8 +145,8 @@ async fn insert_test_data(store: Arc, deployment: DeploymentL let ts64 = TIMES[2]; let entities = vec![ - entity! { schema => id: 21i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(3), amount: bd(30) }, - entity! { schema => id: 22i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(3), amount: bd(3) }, + entity! { schema => id: 21i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(3), amount: bd(30), vid: 31i64 }, + entity! { schema => id: 22i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(3), amount: bd(3), vid: 32i64 }, ]; insert(&store, &deployment, BLOCKS[2].clone(), TIMES[2], entities) .await @@ -153,8 +154,8 @@ async fn insert_test_data(store: Arc, deployment: DeploymentL let ts64 = TIMES[3]; let entities = vec![ - entity! { schema => id: 31i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(4), amount: bd(4) }, - entity! { schema => id: 32i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(4), amount: bd(40) }, + entity! { schema => id: 31i64, timestamp: ts64, token: TOKEN1.clone(), price: bd(4), amount: bd(4), vid: 41i64 }, + entity! { schema => id: 32i64, timestamp: ts64, token: TOKEN2.clone(), price: bd(4), amount: bd(40), vid: 42i64 }, ]; insert(&store, &deployment, BLOCKS[3].clone(), TIMES[3], entities) .await @@ -173,10 +174,10 @@ fn stats_hour(schema: &InputSchema) -> Vec> { let block2 = vec![ entity! { schema => id: 11i64, timestamp: ts2, token: TOKEN1.clone(), sum: bd(3), sum_sq: bd(5), max: bd(10), first: bd(10), last: bd(2), - value: bd(14), totalValue: bd(14) }, + value: bd(14), totalValue: bd(14), vid: 1i64 }, entity! { schema => id: 12i64, timestamp: ts2, token: TOKEN2.clone(), sum: bd(3), sum_sq: bd(5), max: bd(20), first: bd(1), last: bd(20), - value: bd(41), totalValue: bd(41) }, + value: bd(41), totalValue: bd(41), vid: 2i64 }, ]; let ts3 = BlockTime::since_epoch(3600, 0); @@ -186,10 +187,10 @@ fn stats_hour(schema: &InputSchema) -> Vec> { let mut v2 = vec![ entity! { schema => id: 21i64, timestamp: ts3, token: TOKEN1.clone(), sum: bd(3), sum_sq: bd(9), max: bd(30), first: bd(30), last: bd(30), - value: bd(90), totalValue: bd(104) }, + value: bd(90), totalValue: bd(104), vid: 3i64 }, entity! { schema => id: 22i64, timestamp: ts3, token: TOKEN2.clone(), sum: bd(3), sum_sq: bd(9), max: bd(3), first: bd(3), last: bd(3), - value: bd(9), totalValue: bd(50)}, + value: bd(9), totalValue: bd(50), vid: 4i64 }, ]; v1.append(&mut v2); v1 diff --git a/store/test-store/tests/postgres/graft.rs b/store/test-store/tests/postgres/graft.rs index 88f77c45b97..0d741b1b26c 100644 --- a/store/test-store/tests/postgres/graft.rs +++ b/store/test-store/tests/postgres/graft.rs @@ -175,6 +175,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 184.4, false, None, + 0, ); transact_entity_operations(&store, &deployment, BLOCKS[0].clone(), vec![test_entity_1]) .await @@ -189,6 +190,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 159.1, true, Some("red"), + 1, ); let test_entity_3_1 = create_test_entity( "3", @@ -199,6 +201,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 111.7, false, Some("blue"), + 2, ); transact_entity_operations( &store, @@ -218,6 +221,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 111.7, false, None, + 3, ); transact_entity_operations( &store, @@ -241,6 +245,7 @@ fn create_test_entity( weight: f64, coffee: bool, favorite_color: Option<&str>, + vid: i64, ) -> EntityOperation { let bin_name = scalar::Bytes::from_str(&hex::encode(name)).unwrap(); let test_entity = entity! { TEST_SUBGRAPH_SCHEMA => @@ -252,7 +257,8 @@ fn create_test_entity( seconds_age: age * 31557600, weight: Value::BigDecimal(weight.into()), coffee: coffee, - favorite_color: favorite_color + favorite_color: favorite_color, + vid: vid, }; let entity_type = TEST_SUBGRAPH_SCHEMA.entity_type(entity_type).unwrap(); @@ -324,6 +330,7 @@ async fn check_graft( // Make our own entries for block 2 shaq.set("email", "shaq@gmail.com").unwrap(); + let _ = shaq.set_vid(3); let op = EntityOperation::Set { key: user_type.parse_key("3").unwrap(), data: shaq, @@ -601,6 +608,7 @@ fn prune() { 157.1, true, Some("red"), + 4, ); transact_and_wait(&store, &src, BLOCKS[5].clone(), vec![user2]) .await diff --git a/store/test-store/tests/postgres/relational.rs b/store/test-store/tests/postgres/relational.rs index fe366b34509..a02778df675 100644 --- a/store/test-store/tests/postgres/relational.rs +++ b/store/test-store/tests/postgres/relational.rs @@ -205,11 +205,13 @@ lazy_static! { bigInt: big_int.clone(), bigIntArray: vec![big_int.clone(), (big_int + 1.into())], color: "yellow", + vid: 0i64, } }; static ref EMPTY_NULLABLESTRINGS_ENTITY: Entity = { entity! { THINGS_SCHEMA => id: "one", + vid: 0i64, } }; static ref SCALAR_TYPE: EntityType = THINGS_SCHEMA.entity_type("Scalar").unwrap(); @@ -318,6 +320,7 @@ fn insert_user_entity( drinks: Option>, visits: i64, block: BlockNumber, + vid: i64, ) { let user = make_user( &layout.input_schema, @@ -330,6 +333,7 @@ fn insert_user_entity( favorite_color, drinks, visits, + vid, ); insert_entity_at(conn, layout, entity_type, vec![user], block); @@ -346,6 +350,7 @@ fn make_user( favorite_color: Option<&str>, drinks: Option>, visits: i64, + vid: i64, ) -> Entity { let favorite_color = favorite_color .map(|s| Value::String(s.to_owned())) @@ -361,7 +366,8 @@ fn make_user( weight: BigDecimal::from(weight), coffee: coffee, favorite_color: favorite_color, - visits: visits + visits: visits, + vid: vid, }; if let Some(drinks) = drinks { user.insert("drinks", drinks.into()).unwrap(); @@ -384,6 +390,7 @@ fn insert_users(conn: &mut PgConnection, layout: &Layout) { None, 60, 0, + 0, ); insert_user_entity( conn, @@ -399,6 +406,7 @@ fn insert_users(conn: &mut PgConnection, layout: &Layout) { Some(vec!["beer", "wine"]), 50, 0, + 1, ); insert_user_entity( conn, @@ -414,6 +422,7 @@ fn insert_users(conn: &mut PgConnection, layout: &Layout) { Some(vec!["coffee", "tea"]), 22, 0, + 2, ); } @@ -431,6 +440,7 @@ fn update_user_entity( drinks: Option>, visits: i64, block: BlockNumber, + vid: i64, ) { let user = make_user( &layout.input_schema, @@ -443,6 +453,7 @@ fn update_user_entity( favorite_color, drinks, visits, + vid, ); update_entity_at(conn, layout, entity_type, vec![user], block); } @@ -454,17 +465,19 @@ fn insert_pet( id: &str, name: &str, block: BlockNumber, + vid: i64, ) { let pet = entity! { layout.input_schema => id: id, - name: name + name: name, + vid: vid, }; insert_entity_at(conn, layout, entity_type, vec![pet], block); } fn insert_pets(conn: &mut PgConnection, layout: &Layout) { - insert_pet(conn, layout, &*DOG_TYPE, "pluto", "Pluto", 0); - insert_pet(conn, layout, &*CAT_TYPE, "garfield", "Garfield", 0); + insert_pet(conn, layout, &*DOG_TYPE, "pluto", "Pluto", 0, 0); + insert_pet(conn, layout, &*CAT_TYPE, "garfield", "Garfield", 0, 1); } fn create_schema(conn: &mut PgConnection) -> Layout { @@ -597,6 +610,7 @@ fn update() { entity.set("string", "updated").unwrap(); entity.remove("strings"); entity.set("bool", Value::Null).unwrap(); + entity.set("vid", 1i64).unwrap(); let key = SCALAR_TYPE.key(entity.id()); let entity_type = layout.input_schema.entity_type("Scalar").unwrap(); @@ -624,8 +638,10 @@ fn update_many() { let mut one = SCALAR_ENTITY.clone(); let mut two = SCALAR_ENTITY.clone(); two.set("id", "two").unwrap(); + two.set("vid", 1i64).unwrap(); let mut three = SCALAR_ENTITY.clone(); three.set("id", "three").unwrap(); + three.set("vid", 2i64).unwrap(); insert_entity( conn, layout, @@ -647,6 +663,10 @@ fn update_many() { three.remove("strings"); three.set("color", "red").unwrap(); + one.set("vid", 3i64).unwrap(); + two.set("vid", 4i64).unwrap(); + three.set("vid", 5i64).unwrap(); + // generate keys let entity_type = layout.input_schema.entity_type("Scalar").unwrap(); let keys: Vec = ["one", "two", "three"] @@ -713,10 +733,13 @@ fn serialize_bigdecimal() { // Update with overwrite let mut entity = SCALAR_ENTITY.clone(); + let mut vid = 1i64; for d in &["50", "50.00", "5000", "0.5000", "0.050", "0.5", "0.05"] { let d = BigDecimal::from_str(d).unwrap(); entity.set("bigDecimal", d).unwrap(); + entity.set("vid", vid).unwrap(); + vid += 1; let key = SCALAR_TYPE.key(entity.id()); let entity_type = layout.input_schema.entity_type("Scalar").unwrap(); @@ -761,6 +784,7 @@ fn delete() { insert_entity(conn, layout, &*SCALAR_TYPE, vec![SCALAR_ENTITY.clone()]); let mut two = SCALAR_ENTITY.clone(); two.set("id", "two").unwrap(); + two.set("vid", 1i64).unwrap(); insert_entity(conn, layout, &*SCALAR_TYPE, vec![two]); // Delete where nothing is getting deleted @@ -795,8 +819,10 @@ fn insert_many_and_delete_many() { let one = SCALAR_ENTITY.clone(); let mut two = SCALAR_ENTITY.clone(); two.set("id", "two").unwrap(); + two.set("vid", 1i64).unwrap(); let mut three = SCALAR_ENTITY.clone(); three.set("id", "three").unwrap(); + three.set("vid", 2i64).unwrap(); insert_entity(conn, layout, &*SCALAR_TYPE, vec![one, two, three]); // confidence test: there should be 3 scalar entities in store right now @@ -877,6 +903,7 @@ fn conflicting_entity() { cat: &str, dog: &str, ferret: &str, + vid: i64, ) { let conflicting = |conn: &mut PgConnection, entity_type: &EntityType, types: Vec<&EntityType>| { @@ -902,7 +929,7 @@ fn conflicting_entity() { let dog_type = layout.input_schema.entity_type(dog).unwrap(); let ferret_type = layout.input_schema.entity_type(ferret).unwrap(); - let fred = entity! { layout.input_schema => id: id.clone(), name: id.clone() }; + let fred = entity! { layout.input_schema => id: id.clone(), name: id.clone(), vid: vid }; insert_entity(conn, layout, &cat_type, vec![fred]); // If we wanted to create Fred the dog, which is forbidden, we'd run this: @@ -916,10 +943,10 @@ fn conflicting_entity() { run_test(|mut conn, layout| { let id = Value::String("fred".to_string()); - check(&mut conn, layout, id, "Cat", "Dog", "Ferret"); + check(&mut conn, layout, id, "Cat", "Dog", "Ferret", 0); let id = Value::Bytes(scalar::Bytes::from_str("0xf1ed").unwrap()); - check(&mut conn, layout, id, "ByteCat", "ByteDog", "ByteFerret"); + check(&mut conn, layout, id, "ByteCat", "ByteDog", "ByteFerret", 1); }) } @@ -931,7 +958,8 @@ fn revert_block() { let set_fred = |conn: &mut PgConnection, name, block| { let fred = entity! { layout.input_schema => id: id, - name: name + name: name, + vid: block as i64, }; if block == 0 { insert_entity_at(conn, layout, &*CAT_TYPE, vec![fred], block); @@ -971,6 +999,7 @@ fn revert_block() { let marty = entity! { layout.input_schema => id: id, order: block, + vid: (block + 10) as i64 }; insert_entity_at(conn, layout, &*MINK_TYPE, vec![marty], block); } @@ -1049,6 +1078,7 @@ impl<'a> QueryChecker<'a> { None, 23, 0, + 3, ); insert_pets(conn, layout); @@ -1161,6 +1191,7 @@ fn check_block_finds() { None, 55, 1, + 4, ); checker @@ -1703,10 +1734,10 @@ struct FilterChecker<'a> { impl<'a> FilterChecker<'a> { fn new(conn: &'a mut PgConnection, layout: &'a Layout) -> Self { let (a1, a2, a2b, a3) = ferrets(); - insert_pet(conn, layout, &*FERRET_TYPE, "a1", &a1, 0); - insert_pet(conn, layout, &*FERRET_TYPE, "a2", &a2, 0); - insert_pet(conn, layout, &*FERRET_TYPE, "a2b", &a2b, 0); - insert_pet(conn, layout, &*FERRET_TYPE, "a3", &a3, 0); + insert_pet(conn, layout, &*FERRET_TYPE, "a1", &a1, 0, 0); + insert_pet(conn, layout, &*FERRET_TYPE, "a2", &a2, 0, 1); + insert_pet(conn, layout, &*FERRET_TYPE, "a2b", &a2b, 0, 2); + insert_pet(conn, layout, &*FERRET_TYPE, "a3", &a3, 0, 3); Self { conn, layout } } @@ -1850,7 +1881,8 @@ fn check_filters() { &*FERRET_TYPE, vec![entity! { layout.input_schema => id: "a1", - name: "Test" + name: "Test", + vid: 5i64 }], 1, ); diff --git a/store/test-store/tests/postgres/relational_bytes.rs b/store/test-store/tests/postgres/relational_bytes.rs index b7b8f36b7d7..3f4bd88c8d8 100644 --- a/store/test-store/tests/postgres/relational_bytes.rs +++ b/store/test-store/tests/postgres/relational_bytes.rs @@ -57,6 +57,7 @@ lazy_static! { static ref BEEF_ENTITY: Entity = entity! { THINGS_SCHEMA => id: scalar::Bytes::from_str("deadbeef").unwrap(), name: "Beef", + vid: 0i64 }; static ref NAMESPACE: Namespace = Namespace::new("sgd0815".to_string()).unwrap(); static ref THING_TYPE: EntityType = THINGS_SCHEMA.entity_type("Thing").unwrap(); @@ -128,14 +129,15 @@ fn insert_entity(conn: &mut PgConnection, layout: &Layout, entity_type: &str, en layout.insert(conn, &group, &MOCK_STOPWATCH).expect(&errmsg); } -fn insert_thing(conn: &mut PgConnection, layout: &Layout, id: &str, name: &str) { +fn insert_thing(conn: &mut PgConnection, layout: &Layout, id: &str, name: &str, vid: i64) { insert_entity( conn, layout, "Thing", entity! { layout.input_schema => id: id, - name: name + name: name, + vid: vid, }, ); } @@ -155,12 +157,6 @@ fn create_schema(conn: &mut PgConnection) -> Layout { .expect("Failed to create relational schema") } -fn scrub(entity: &Entity) -> Entity { - let mut scrubbed = entity.clone(); - scrubbed.remove_null_fields(); - scrubbed -} - macro_rules! assert_entity_eq { ($left:expr, $right:expr) => {{ let (left, right) = (&($left), &($right)); @@ -265,11 +261,11 @@ fn find() { const ID: &str = "deadbeef"; const NAME: &str = "Beef"; - insert_thing(&mut conn, layout, ID, NAME); + insert_thing(&mut conn, layout, ID, NAME, 0); // Happy path: find existing entity let entity = find_entity(conn, layout, ID).unwrap(); - assert_entity_eq!(scrub(&BEEF_ENTITY), entity); + assert_entity_eq!(BEEF_ENTITY.clone(), entity); assert!(CausalityRegion::from_entity(&entity) == CausalityRegion::ONCHAIN); // Find non-existing entity @@ -285,8 +281,8 @@ fn find_many() { const NAME: &str = "Beef"; const ID2: &str = "0xdeadbeef02"; const NAME2: &str = "Moo"; - insert_thing(&mut conn, layout, ID, NAME); - insert_thing(&mut conn, layout, ID2, NAME2); + insert_thing(&mut conn, layout, ID, NAME, 0); + insert_thing(&mut conn, layout, ID2, NAME2, 1); let mut id_map = BTreeMap::default(); let ids = IdList::try_from_iter( @@ -318,6 +314,7 @@ fn update() { // Update the entity let mut entity = BEEF_ENTITY.clone(); entity.set("name", "Moo").unwrap(); + entity.set("vid", 1i64).unwrap(); let key = THING_TYPE.key(entity.id()); let entity_id = entity.id(); @@ -345,6 +342,7 @@ fn delete() { insert_entity(&mut conn, layout, "Thing", BEEF_ENTITY.clone()); let mut two = BEEF_ENTITY.clone(); two.set("id", TWO_ID).unwrap(); + two.set("vid", 1i64).unwrap(); insert_entity(&mut conn, layout, "Thing", two); // Delete where nothing is getting deleted @@ -392,29 +390,34 @@ fn make_thing_tree(conn: &mut PgConnection, layout: &Layout) -> (Entity, Entity, let root = entity! { layout.input_schema => id: ROOT, name: "root", - children: vec!["babe01", "babe02"] + children: vec!["babe01", "babe02"], + vid: 0i64, }; let child1 = entity! { layout.input_schema => id: CHILD1, name: "child1", parent: "dead00", - children: vec![GRANDCHILD1] + children: vec![GRANDCHILD1], + vid: 1i64, }; let child2 = entity! { layout.input_schema => id: CHILD2, name: "child2", parent: "dead00", - children: vec![GRANDCHILD1] + children: vec![GRANDCHILD1], + vid: 2i64, }; let grand_child1 = entity! { layout.input_schema => id: GRANDCHILD1, name: "grandchild1", - parent: CHILD1 + parent: CHILD1, + vid: 3i64, }; let grand_child2 = entity! { layout.input_schema => id: GRANDCHILD2, name: "grandchild2", - parent: CHILD2 + parent: CHILD2, + vid: 4i64, }; insert_entity(conn, layout, "Thing", root.clone()); diff --git a/store/test-store/tests/postgres/store.rs b/store/test-store/tests/postgres/store.rs index 6605c39b51d..be7f3cf550b 100644 --- a/store/test-store/tests/postgres/store.rs +++ b/store/test-store/tests/postgres/store.rs @@ -201,6 +201,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 184.4, false, None, + 0, ); transact_entity_operations( &store, @@ -220,6 +221,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 159.1, true, Some("red"), + 1, ); let test_entity_3_1 = create_test_entity( "3", @@ -230,6 +232,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 111.7, false, Some("blue"), + 2, ); transact_entity_operations( &store, @@ -249,6 +252,7 @@ async fn insert_test_data(store: Arc) -> DeploymentLocator 111.7, false, None, + 3, ); transact_and_wait( &store, @@ -272,6 +276,7 @@ fn create_test_entity( weight: f64, coffee: bool, favorite_color: Option<&str>, + vid: i64, ) -> EntityOperation { let bin_name = scalar::Bytes::from_str(&hex::encode(name)).unwrap(); let test_entity = entity! { TEST_SUBGRAPH_SCHEMA => @@ -284,6 +289,7 @@ fn create_test_entity( weight: Value::BigDecimal(weight.into()), coffee: coffee, favorite_color: favorite_color, + vid: vid, }; EntityOperation::Set { @@ -352,6 +358,7 @@ fn get_entity_1() { seconds_age: Value::BigInt(BigInt::from(2114359200)), weight: Value::BigDecimal(184.4.into()), coffee: false, + vid: 0i64 }; // "favorite_color" was set to `Null` earlier and should be absent @@ -377,6 +384,7 @@ fn get_entity_3() { seconds_age: Value::BigInt(BigInt::from(883612800)), weight: Value::BigDecimal(111.7.into()), coffee: false, + vid: 3_i64, }; // "favorite_color" was set to `Null` earlier and should be absent @@ -398,6 +406,7 @@ fn insert_entity() { 111.7, true, Some("green"), + 5, ); let count = get_entity_count(store.clone(), &deployment.hash); transact_and_wait( @@ -429,6 +438,7 @@ fn update_existing() { 111.7, true, Some("green"), + 6, ); let mut new_data = match op { EntityOperation::Set { ref data, .. } => data.clone(), @@ -467,7 +477,8 @@ fn partially_update_existing() { let entity_key = USER_TYPE.parse_key("1").unwrap(); let schema = writable.input_schema(); - let partial_entity = entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null }; + let partial_entity = + entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null, vid: 11i64 }; let original_entity = writable .get(&entity_key) @@ -1077,7 +1088,8 @@ fn revert_block_with_partial_update() { let entity_key = USER_TYPE.parse_key("1").unwrap(); let schema = writable.input_schema(); - let partial_entity = entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null }; + let partial_entity = + entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null, vid: 5i64 }; let original_entity = writable.get(&entity_key).unwrap().expect("missing entity"); @@ -1088,7 +1100,7 @@ fn revert_block_with_partial_update() { TEST_BLOCK_3_PTR.clone(), vec![EntityOperation::Set { key: entity_key.clone(), - data: partial_entity.clone(), + data: partial_entity, }], ) .await @@ -1172,7 +1184,8 @@ fn revert_block_with_dynamic_data_source_operations() { // Create operations to add a user let user_key = USER_TYPE.parse_key("1").unwrap(); - let partial_entity = entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null }; + let partial_entity = + entity! { schema => id: "1", name: "Johnny Boy", email: Value::Null, vid: 5i64 }; // Get the original user for comparisons let original_user = writable.get(&user_key).unwrap().expect("missing entity"); @@ -1291,9 +1304,12 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { let added_entities = vec![ ( "1".to_owned(), - entity! { schema => id: "1", name: "Johnny Boy" }, + entity! { schema => id: "1", name: "Johnny Boy", vid: 5i64 }, + ), + ( + "2".to_owned(), + entity! { schema => id: "2", name: "Tessa", vid: 6i64 }, ), - ("2".to_owned(), entity! { schema => id: "2", name: "Tessa" }), ]; transact_and_wait( &store.subgraph_store(), @@ -1301,9 +1317,13 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { TEST_BLOCK_1_PTR.clone(), added_entities .iter() - .map(|(id, data)| EntityOperation::Set { - key: USER_TYPE.parse_key(id.as_str()).unwrap(), - data: data.clone(), + .map(|(id, data)| { + let mut data = data.clone(); + data.set_vid_if_empty(); + EntityOperation::Set { + key: USER_TYPE.parse_key(id.as_str()).unwrap(), + data, + } }) .collect(), ) @@ -1311,7 +1331,7 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() { .unwrap(); // Update an entity in the store - let updated_entity = entity! { schema => id: "1", name: "Johnny" }; + let updated_entity = entity! { schema => id: "1", name: "Johnny", vid: 7i64 }; let update_op = EntityOperation::Set { key: USER_TYPE.parse_key("1").unwrap(), data: updated_entity.clone(), @@ -1387,6 +1407,7 @@ fn throttle_subscription_delivers() { 120.7, false, None, + 7, ); transact_entity_operations( @@ -1432,6 +1453,7 @@ fn throttle_subscription_throttles() { 120.7, false, None, + 8, ); transact_entity_operations( @@ -1505,8 +1527,9 @@ fn handle_large_string_with_index() { name: &str, schema: &InputSchema, block: BlockNumber, + vid: i64, ) -> EntityModification { - let data = entity! { schema => id: id, name: name }; + let data = entity! { schema => id: id, name: name, vid: vid }; let key = USER_TYPE.parse_key(id).unwrap(); @@ -1539,8 +1562,8 @@ fn handle_large_string_with_index() { BlockTime::for_test(&*TEST_BLOCK_3_PTR), FirehoseCursor::None, vec![ - make_insert_op(ONE, &long_text, &schema, block), - make_insert_op(TWO, &other_text, &schema, block), + make_insert_op(ONE, &long_text, &schema, block, 11), + make_insert_op(TWO, &other_text, &schema, block, 12), ], &stopwatch_metrics, Vec::new(), @@ -1551,6 +1574,7 @@ fn handle_large_string_with_index() { ) .await .expect("Failed to insert large text"); + writable.flush().await.unwrap(); let query = user_query() @@ -1604,8 +1628,9 @@ fn handle_large_bytea_with_index() { name: &[u8], schema: &InputSchema, block: BlockNumber, + vid: i64, ) -> EntityModification { - let data = entity! { schema => id: id, bin_name: scalar::Bytes::from(name) }; + let data = entity! { schema => id: id, bin_name: scalar::Bytes::from(name), vid: vid }; let key = USER_TYPE.parse_key(id).unwrap(); @@ -1643,8 +1668,8 @@ fn handle_large_bytea_with_index() { BlockTime::for_test(&*TEST_BLOCK_3_PTR), FirehoseCursor::None, vec![ - make_insert_op(ONE, &long_bytea, &schema, block), - make_insert_op(TWO, &other_bytea, &schema, block), + make_insert_op(ONE, &long_bytea, &schema, block, 10), + make_insert_op(TWO, &other_bytea, &schema, block, 11), ], &stopwatch_metrics, Vec::new(), @@ -1812,8 +1837,10 @@ fn window() { id: &str, color: &str, age: i32, + vid: i64, ) -> EntityOperation { - let entity = entity! { TEST_SUBGRAPH_SCHEMA => id: id, age: age, favorite_color: color }; + let entity = + entity! { TEST_SUBGRAPH_SCHEMA => id: id, age: age, favorite_color: color, vid: vid }; EntityOperation::Set { key: entity_type.parse_key(id).unwrap(), @@ -1821,25 +1848,25 @@ fn window() { } } - fn make_user(id: &str, color: &str, age: i32) -> EntityOperation { - make_color_and_age(&*USER_TYPE, id, color, age) + fn make_user(id: &str, color: &str, age: i32, vid: i64) -> EntityOperation { + make_color_and_age(&*USER_TYPE, id, color, age, vid) } - fn make_person(id: &str, color: &str, age: i32) -> EntityOperation { - make_color_and_age(&*PERSON_TYPE, id, color, age) + fn make_person(id: &str, color: &str, age: i32, vid: i64) -> EntityOperation { + make_color_and_age(&*PERSON_TYPE, id, color, age, vid) } let ops = vec![ - make_user("4", "green", 34), - make_user("5", "green", 17), - make_user("6", "green", 41), - make_user("7", "red", 25), - make_user("8", "red", 45), - make_user("9", "yellow", 37), - make_user("10", "blue", 27), - make_user("11", "blue", 19), - make_person("p1", "green", 12), - make_person("p2", "red", 15), + make_user("4", "green", 34, 11), + make_user("5", "green", 17, 12), + make_user("6", "green", 41, 13), + make_user("7", "red", 25, 14), + make_user("8", "red", 45, 15), + make_user("9", "yellow", 37, 16), + make_user("10", "blue", 27, 17), + make_user("11", "blue", 19, 18), + make_person("p1", "green", 12, 19), + make_person("p2", "red", 15, 20), ]; run_test(|store, _, deployment| async move { @@ -2077,6 +2104,7 @@ fn reorg_tracking() { deployment: &DeploymentLocator, age: i32, block: &BlockPtr, + vid: i64, ) { let test_entity_1 = create_test_entity( "1", @@ -2087,6 +2115,7 @@ fn reorg_tracking() { 184.4, false, None, + vid, ); transact_and_wait(store, deployment, block.clone(), vec![test_entity_1]) .await @@ -2137,15 +2166,15 @@ fn reorg_tracking() { check_state!(store, 2, 2, 2); // Forward to block 3 - update_john(&subgraph_store, &deployment, 70, &TEST_BLOCK_3_PTR).await; + update_john(&subgraph_store, &deployment, 70, &TEST_BLOCK_3_PTR, 5).await; check_state!(store, 2, 2, 3); // Forward to block 4 - update_john(&subgraph_store, &deployment, 71, &TEST_BLOCK_4_PTR).await; + update_john(&subgraph_store, &deployment, 71, &TEST_BLOCK_4_PTR, 6).await; check_state!(store, 2, 2, 4); // Forward to block 5 - update_john(&subgraph_store, &deployment, 72, &TEST_BLOCK_5_PTR).await; + update_john(&subgraph_store, &deployment, 72, &TEST_BLOCK_5_PTR, 7).await; check_state!(store, 2, 2, 5); // Revert all the way back to block 2 diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index 96ce2d58b39..2228c148d25 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -142,7 +142,8 @@ async fn insert_count( let count_key_local = |counter_type: &EntityType, id: &str| counter_type.parse_key(id).unwrap(); let data = entity! { TEST_SUBGRAPH_SCHEMA => id: "1", - count: count as i32 + count: count as i32, + vid: block as i64, }; let entity_op = if block != 3 && block != 5 && block != 7 { EntityOperation::Set { @@ -163,6 +164,7 @@ async fn insert_count( let data = entity! { TEST_SUBGRAPH_SCHEMA => id: &block.to_string(), count :count as i32, + vid: block as i64, }; let entity_op = EntityOperation::Set { key: count_key_local(&COUNTER2_TYPE, &data.get("id").unwrap().to_string()), @@ -295,7 +297,7 @@ fn restart() { // Cause an error by leaving out the non-nullable `count` attribute let entity_ops = vec![EntityOperation::Set { key: count_key("1"), - data: entity! { schema => id: "1" }, + data: entity! { schema => id: "1", vid: 0i64}, }]; transact_entity_operations( &subgraph_store, @@ -319,7 +321,7 @@ fn restart() { // Retry our write with correct data let entity_ops = vec![EntityOperation::Set { key: count_key("1"), - data: entity! { schema => id: "1", count: 1 }, + data: entity! { schema => id: "1", count: 1, vid: 0i64}, }]; // `SubgraphStore` caches the correct writable so that this call // uses the restarted writable, and is equivalent to using @@ -341,13 +343,13 @@ fn restart() { fn read_range_test() { run_test(|store, writable, sourceable, deployment| async move { let result_entities = vec![ - r#"(1, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }])"#, - r#"(2, [EntitySourceOperation { entity_op: Modify, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(4), id: String("2") }, vid: 2 }])"#, - r#"(3, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(6), id: String("3") }, vid: 3 }])"#, - r#"(4, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(8), id: String("4") }, vid: 4 }])"#, - r#"(5, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(10), id: String("5") }, vid: 5 }])"#, - r#"(6, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"#, - r#"(7, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"#, + r#"(1, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(2), id: String("1"), vid: Int8(1) }, vid: 1 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(2), id: String("1"), vid: Int8(1) }, vid: 1 }])"#, + r#"(2, [EntitySourceOperation { entity_op: Modify, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1"), vid: Int8(2) }, vid: 2 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(4), id: String("2"), vid: Int8(2) }, vid: 2 }])"#, + r#"(3, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1"), vid: Int8(2) }, vid: 2 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(6), id: String("3"), vid: Int8(3) }, vid: 3 }])"#, + r#"(4, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1"), vid: Int8(4) }, vid: 4 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(8), id: String("4"), vid: Int8(4) }, vid: 4 }])"#, + r#"(5, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1"), vid: Int8(4) }, vid: 4 }, EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(10), id: String("5"), vid: Int8(5) }, vid: 5 }])"#, + r#"(6, [EntitySourceOperation { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1"), vid: Int8(6) }, vid: 6 }])"#, + r#"(7, [EntitySourceOperation { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1"), vid: Int8(6) }, vid: 6 }])"#, ]; let subgraph_store = store.subgraph_store(); writable.deployment_synced().unwrap();