diff --git a/store/postgres/src/block_range.rs b/store/postgres/src/block_range.rs index 7b06d2223bb..13ede1096a6 100644 --- a/store/postgres/src/block_range.rs +++ b/store/postgres/src/block_range.rs @@ -135,7 +135,7 @@ impl<'a> QueryFragment for BlockRangeUpperBoundClause<'a> { /// Helper for generating SQL fragments for selecting entities in a specific block range #[derive(Debug, Clone, Copy)] pub enum EntityBlockRange { - Mutable((BlockRange, bool)), // TODO: check if this is a proper type here (maybe Range?) + Mutable((BlockRange, bool)), Immutable(BlockRange), } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index abe8e706ac9..9776aef9d01 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -578,6 +578,7 @@ impl Ord for EntityDataExt { if ord != Ordering::Equal { ord } else { + // TODO: check if this and the next cmp for string match postgress C localle let ord = self.entity.cmp(&other.entity); if ord != Ordering::Equal { ord @@ -2074,7 +2075,7 @@ impl<'a> QueryFragment for FindRangeQuery<'a> { out.push_sql(" from "); out.push_sql(table.qualified_name.as_str()); out.push_sql(" e\n where"); - // TODO: do we need to care about it? + // TODO: add casuality region to the query // if self.table.has_causality_region { // out.push_sql("causality_region = "); // out.push_bind_param::(&self.key.causality_region)?; diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index 1eae8938730..ef619c13323 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -1,10 +1,10 @@ -use graph::blockchain::block_stream::FirehoseCursor; +use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor}; use graph::data::subgraph::schema::DeploymentCreate; use graph::data::value::Word; use graph::data_source::CausalityRegion; use graph::schema::{EntityKey, EntityType, InputSchema}; use lazy_static::lazy_static; -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::marker::PhantomData; use std::ops::Range; use test_store::*; @@ -123,49 +123,40 @@ async fn insert_count( deployment: &DeploymentLocator, block: u8, count: u8, - counter_type: &EntityType, - id: &str, + immutable: bool, ) { - let count_key_local = |id: &str| counter_type.parse_key(id).unwrap(); + let count_key_local = |counter_type: &EntityType, id: &str| counter_type.parse_key(id).unwrap(); let data = entity! { TEST_SUBGRAPH_SCHEMA => - id: id, - count :count as i32, + id: "1", + count: count as i32 }; - let entity_op = EntityOperation::Set { - key: count_key_local(&data.get("id").unwrap().to_string()), - data, + let entity_op = if (block != 3 && block != 5 && block != 7) || !immutable { + EntityOperation::Set { + key: count_key_local(&COUNTER_TYPE, &data.get("id").unwrap().to_string()), + data, + } + } else { + EntityOperation::Remove { + key: count_key_local(&COUNTER_TYPE, &data.get("id").unwrap().to_string()), + } }; - transact_entity_operations(store, deployment, block_pointer(block), vec![entity_op]) + let mut ops = vec![entity_op]; + if immutable && block < 6 { + let data = entity! { TEST_SUBGRAPH_SCHEMA => + id: &block.to_string(), + count :count as i32, + }; + let entity_op = EntityOperation::Set { + key: count_key_local(&COUNTER2_TYPE, &data.get("id").unwrap().to_string()), + data, + }; + ops.push(entity_op); + } + transact_entity_operations(store, deployment, block_pointer(block), ops) .await .unwrap(); } -async fn insert_count_mutable( - store: &Arc, - deployment: &DeploymentLocator, - block: u8, - count: u8, -) { - insert_count(store, deployment, block, count, &COUNTER_TYPE, "1").await; -} - -async fn insert_count_immutable( - store: &Arc, - deployment: &DeploymentLocator, - block: u8, - count: u8, -) { - insert_count( - store, - deployment, - block, - count, - &COUNTER2_TYPE, - &(block / 2).to_string(), - ) - .await; -} - async fn pause_writer(deployment: &DeploymentLocator) { flush(deployment).await.unwrap(); writable::allow_steps(deployment, 0).await; @@ -191,13 +182,13 @@ where } for count in 1..4 { - insert_count_mutable(&subgraph_store, &deployment, count, count).await; + insert_count(&subgraph_store, &deployment, count, count, false).await; } // Test reading back with pending writes to the same entity pause_writer(&deployment).await; for count in 4..7 { - insert_count_mutable(&subgraph_store, &deployment, count, count).await; + insert_count(&subgraph_store, &deployment, count, count, false).await; } assert_eq!(6, read_count()); @@ -206,7 +197,7 @@ where // Test reading back with pending writes and a pending revert for count in 7..10 { - insert_count_mutable(&subgraph_store, &deployment, count, count).await; + insert_count(&subgraph_store, &deployment, count, count, false).await; } writable .revert_block_operations(block_pointer(2), FirehoseCursor::None) @@ -331,19 +322,46 @@ fn restart() { #[test] fn read_range_test() { run_test(|store, writable, deployment| async move { + let result_entities = vec![ + r#"(1, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }])"#, + r#"(2, [EntityWithType { entity_op: Modify, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(4), id: String("2") }, vid: 2 }])"#, + r#"(3, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(6), id: String("3") }, vid: 3 }])"#, + r#"(4, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(8), id: String("4") }, vid: 4 }])"#, + r#"(5, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(10), id: String("5") }, vid: 5 }])"#, + r#"(6, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"#, + r#"(7, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"#, + ]; let subgraph_store = store.subgraph_store(); writable.deployment_synced().unwrap(); - for count in 1..=7 { - insert_count_mutable(&subgraph_store, &deployment, 2 * count, 4 * count).await; - insert_count_immutable(&subgraph_store, &deployment, 2 * count + 1, 4 * count).await; + for count in 1..=5 { + insert_count(&subgraph_store, &deployment, count, 2 * count, true).await; } writable.flush().await.unwrap(); - - let br: Range = 4..8; writable.deployment_synced().unwrap(); + + let br: Range = 0..18; let entity_types = vec![COUNTER_TYPE.clone(), COUNTER2_TYPE.clone()]; - let e = writable.get_range(entity_types, br).unwrap(); - assert_eq!(e.len(), 5) // TODO: fix it - it should be 4 as the range is open + let e: BTreeMap> = writable + .get_range(entity_types.clone(), br.clone()) + .unwrap(); + assert_eq!(e.len(), 5); + for en in &e { + let index = *en.0 - 1; + let a = result_entities[index as usize]; + assert_eq!(a, format!("{:?}", en)); + } + for count in 6..=7 { + insert_count(&subgraph_store, &deployment, count, 2 * count, true).await; + } + writable.flush().await.unwrap(); + writable.deployment_synced().unwrap(); + let e: BTreeMap> = writable.get_range(entity_types, br).unwrap(); + assert_eq!(e.len(), 7); + for en in &e { + let index = *en.0 - 1; + let a = result_entities[index as usize]; + assert_eq!(a, format!("{:?}", en)); + } }) }