Skip to content

Commit

Permalink
merge: #3502
Browse files Browse the repository at this point in the history
3502: Revert "feat(dal,si-events): Move node weights off the graph" r=zacharyhamm a=zacharyhamm

Reverts #3483

Had a serious negative perf impact which needs to be solved first

Co-authored-by: Zachary Hamm <[email protected]>
  • Loading branch information
si-bors-ng[bot] and zacharyhamm authored Apr 3, 2024
2 parents ee2408a + 65660b2 commit 6d75cf9
Show file tree
Hide file tree
Showing 60 changed files with 8,493 additions and 6,955 deletions.
134 changes: 68 additions & 66 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ directories = "5.0.1"
docker-api = "0.14.0"
dyn-clone = "1.0.17"
flate2 = "1.0.28"
futures = {version = "0.3.30", features = ["executor"]}
futures = "0.3.30"
futures-lite = "2.3.0"
hex = "0.4.3"
http = "0.2.12" # todo: upgrade this alongside hyper/axum/tokio-tungstenite/tower-http
Expand Down
6 changes: 3 additions & 3 deletions lib/dal-test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ rust_library(
deps = [
"//lib/buck2-resources:buck2-resources",
"//lib/council-server:council-server",
"//lib/dal:dal-integration-test",
"//lib/dal:dal",
"//lib/module-index-client:module-index-client",
"//lib/pinga-server:pinga-server-integration-test",
"//lib/rebaser-server:rebaser-server-integration-test",
"//lib/pinga-server:pinga-server",
"//lib/rebaser-server:rebaser-server",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
Expand Down
5 changes: 2 additions & 3 deletions lib/dal-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ impl Config {
config.pg.port = env::var(ENV_VAR_PG_PORT)
.unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string())
.parse()?;

config.pg.pool_max_size = 4;
config.pg.pool_max_size *= 32;
config.pg.certificate_path = Some(config.postgres_key_path.clone().try_into()?);

if let Ok(value) = env::var(ENV_VAR_PG_HOSTNAME) {
Expand All @@ -152,7 +151,7 @@ impl Config {
config.layer_cache_pg_pool.port = env::var(ENV_VAR_PG_PORT)
.unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string())
.parse()?;
config.layer_cache_pg_pool.pool_max_size = 4;
config.layer_cache_pg_pool.pool_max_size *= 32;
config.layer_cache_pg_pool.certificate_path =
Some(config.postgres_key_path.clone().try_into()?);

Expand Down
83 changes: 2 additions & 81 deletions lib/dal/BUCK
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
load(
"@prelude-si//:macros.bzl",
"rust_library",
"rust_library_integration_test",
"rust_test",
)

Expand Down Expand Up @@ -76,83 +75,6 @@ rust_library(
test_unit_deps = [
"//third-party/rust:tempfile",
],
)

rust_library_integration_test(
name = "dal-integration-test",
crate = "dal",
deps = [
"//lib/si-cbor:si-cbor",
"//lib/council-server:council-server",
"//lib/nats-subscriber:nats-subscriber",
"//lib/object-tree:object-tree",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
"//lib/si-events-rs:si-events",
"//lib/si-hash:si-hash",
"//lib/si-layer-cache:si-layer-cache",
"//lib/si-pkg:si-pkg",
"//lib/si-std:si-std",
"//lib/telemetry-rs:telemetry",
"//lib/telemetry-nats-rs:telemetry-nats",
"//lib/veritech-client:veritech-client",
"//third-party/rust:async-recursion",
"//third-party/rust:async-trait",
"//third-party/rust:base64",
"//third-party/rust:blake3",
"//third-party/rust:chrono",
"//third-party/rust:ciborium",
"//third-party/rust:convert_case",
"//third-party/rust:derive_more",
"//third-party/rust:diff",
"//third-party/rust:dyn-clone",
"//third-party/rust:futures",
"//third-party/rust:hex",
"//third-party/rust:iftree",
"//third-party/rust:itertools",
"//third-party/rust:jwt-simple",
"//third-party/rust:lazy_static",
"//third-party/rust:once_cell",
"//third-party/rust:paste",
"//third-party/rust:petgraph",
"//third-party/rust:postcard",
"//third-party/rust:postgres-types",
"//third-party/rust:pretty_assertions_sorted",
"//third-party/rust:rand",
"//third-party/rust:refinery",
"//third-party/rust:regex",
"//third-party/rust:remain",
"//third-party/rust:serde",
"//third-party/rust:serde-aux",
"//third-party/rust:serde_json",
"//third-party/rust:serde_with",
"//third-party/rust:sled",
"//third-party/rust:sodiumoxide",
"//third-party/rust:strum",
"//third-party/rust:thiserror",
"//third-party/rust:tokio",
"//third-party/rust:tokio-stream",
"//third-party/rust:ulid",
"//third-party/rust:url",
],
rustc_flags = [
"--cfg=integration_test",
],
srcs = glob([
"src/**/*.rs",
"src/builtins/func/**",
"src/builtins/schema/data/**/*.json",
"src/builtins/schema/definitions/**/*.json",
"src/migrations/**/*.sql",
"src/queries/**/*.sql",
]),
env = {
"CARGO_MANIFEST_DIR": ".",
},
test_unit_deps = [
"//third-party/rust:tempfile",
],
extra_test_targets = [":test-integration"],
)

Expand All @@ -161,8 +83,7 @@ rust_test(
deps = [
"//lib/dal-test:dal-test",
"//lib/rebaser-core:rebaser-core",
"//lib/rebaser-server:rebaser-server-integration-test",
"//lib/si-events-rs:si-events",
"//lib/rebaser-server:rebaser-server",
"//lib/si-pkg:si-pkg",
"//lib/veritech-client:veritech-client",
"//third-party/rust:base64",
Expand All @@ -177,7 +98,7 @@ rust_test(
"//third-party/rust:tokio",
"//third-party/rust:tokio-util",
"//third-party/rust:ulid",
":dal-integration-test",
":dal",
],
crate_root = "tests/integration.rs",
srcs = glob([
Expand Down
1 change: 0 additions & 1 deletion lib/dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,5 @@ veritech-client = { path = "../../lib/veritech-client" }

itertools = { workspace = true }
pretty_assertions_sorted = { workspace = true }
si-events = { path = "../../lib/si-events-rs" }
tempfile = { workspace = true }
tokio-util = { workspace = true }
2 changes: 1 addition & 1 deletion lib/dal/src/action/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl ActionPrototype {
{
let node_weight = workspace_snapshot.get_node_weight(node_index).await?;
let id = node_weight.id();
if NodeWeightDiscriminants::Func == node_weight.as_ref().into() {
if NodeWeightDiscriminants::Func == node_weight.into() {
return Ok(id.into());
}
}
Expand Down
10 changes: 3 additions & 7 deletions lib/dal/src/attribute/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl AttributePrototype {
{
let node_weight = workspace_snapshot.get_node_weight(node_index).await?;
let node_weight_id = node_weight.id();
if NodeWeightDiscriminants::Func == node_weight.as_ref().into() {
if NodeWeightDiscriminants::Func == node_weight.into() {
return Ok(node_weight_id.into());
}
}
Expand Down Expand Up @@ -356,7 +356,6 @@ impl AttributePrototype {
let (target_id, edge_weight_discrim) = match workspace_snapshot
.get_node_weight(prototype_edge_source)
.await?
.as_ref()
{
NodeWeight::Prop(prop_inner) => {
(prop_inner.id(), EdgeWeightKindDiscriminants::Prop)
Expand Down Expand Up @@ -391,7 +390,6 @@ impl AttributePrototype {
if let NodeWeight::AttributeValue(av_node_weight) = workspace_snapshot
.get_node_weight(attribute_value_target)
.await?
.as_ref()
{
attribute_value_ids.push(av_node_weight.id().into())
}
Expand Down Expand Up @@ -428,10 +426,8 @@ impl AttributePrototype {

Ok(match maybe_value_idxs.first().copied() {
Some(value_idx) => {
if let NodeWeight::AttributeValue(av_node_weight) = workspace_snapshot
.get_node_weight(value_idx)
.await?
.as_ref()
if let NodeWeight::AttributeValue(av_node_weight) =
workspace_snapshot.get_node_weight(value_idx).await?
{
Some(av_node_weight.id().into())
} else {
Expand Down
6 changes: 3 additions & 3 deletions lib/dal/src/attribute/prototype/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl AttributePrototypeArgument {
)
.await?
{
match workspace_snapshot.get_node_weight(node_idx).await?.as_ref() {
match workspace_snapshot.get_node_weight(node_idx).await? {
NodeWeight::Content(inner) => {
let inner_addr_discrim: ContentAddressDiscriminants =
inner.content_address().into();
Expand Down Expand Up @@ -318,7 +318,7 @@ impl AttributePrototypeArgument {
.into_iter()
.next()
{
match workspace_snapshot.get_node_weight(target).await?.as_ref() {
match workspace_snapshot.get_node_weight(target).await? {
NodeWeight::Prop(inner) => {
return Ok(Some(ValueSource::Prop(inner.id().into())));
}
Expand Down Expand Up @@ -513,7 +513,7 @@ impl AttributePrototypeArgument {

for idx in apa_node_idxs {
let node_weight = workspace_snapshot.get_node_weight(idx).await?;
if let NodeWeight::AttributePrototypeArgument(apa_weight) = node_weight.as_ref() {
if let NodeWeight::AttributePrototypeArgument(apa_weight) = &node_weight {
if let Some(ArgumentTargets {
destination_component_id,
..
Expand Down
19 changes: 8 additions & 11 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,8 @@ impl AttributeValue {
let workspace_snapshot = ctx.workspace_snapshot()?;

let prop_node_index = workspace_snapshot.get_node_index_by_id(prop_id).await?;
if let NodeWeight::Prop(prop_inner) = workspace_snapshot
.get_node_weight(prop_node_index)
.await?
.as_ref()
if let NodeWeight::Prop(prop_inner) =
workspace_snapshot.get_node_weight(prop_node_index).await?
{
prop_inner.kind()
} else {
Expand Down Expand Up @@ -1277,7 +1275,6 @@ impl AttributeValue {
.workspace_snapshot()?
.get_node_weight(node_index)
.await?
.as_ref()
{
prop_map.insert(
prop_inner.name().to_string(),
Expand Down Expand Up @@ -1375,7 +1372,6 @@ impl AttributeValue {
match workspace_snapshot
.get_node_weight(element_prop_index)
.await?
.as_ref()
{
NodeWeight::Prop(prop_inner) => (prop_inner.id(), prop_inner.kind()),
_ => {
Expand Down Expand Up @@ -1620,7 +1616,6 @@ impl AttributeValue {
match workspace_snapshot
.get_node_weight(element_prop_index)
.await?
.as_ref()
{
NodeWeight::Prop(prop_inner) => (prop_inner.id(), prop_inner.kind()),
_ => {
Expand Down Expand Up @@ -1822,7 +1817,7 @@ impl AttributeValue {
view: Option<serde_json::Value>,
) -> AttributeValueResult<()> {
let workspace_snapshot = ctx.workspace_snapshot()?;
let (_, av_node_weight) = {
let (av_idx, av_node_weight) = {
let av_idx = workspace_snapshot
.get_node_index_by_id(attribute_value_id)
.await?;
Expand Down Expand Up @@ -1867,6 +1862,7 @@ impl AttributeValue {
workspace_snapshot
.add_node(NodeWeight::AttributeValue(new_av_node_weight))
.await?;
workspace_snapshot.replace_references(av_idx).await?;

Ok(())
}
Expand All @@ -1881,7 +1877,7 @@ impl AttributeValue {
func_execution_pk: FuncExecutionPk,
) -> AttributeValueResult<()> {
let workspace_snapshot = ctx.workspace_snapshot()?;
let (_av_idx, av_node_weight) = {
let (av_idx, av_node_weight) = {
let av_idx = workspace_snapshot
.get_node_index_by_id(attribute_value_id)
.await?;
Expand Down Expand Up @@ -1942,6 +1938,7 @@ impl AttributeValue {
workspace_snapshot
.add_node(NodeWeight::AttributeValue(new_av_node_weight))
.await?;
workspace_snapshot.replace_references(av_idx).await?;

Ok(())
}
Expand Down Expand Up @@ -1978,7 +1975,7 @@ impl AttributeValue {
.await?
{
let target_node_weight = workspace_snapshot.get_node_weight(target).await?;
if let NodeWeight::Prop(prop_node_weight) = target_node_weight.as_ref() {
if let NodeWeight::Prop(prop_node_weight) = &target_node_weight {
maybe_prop_id = match maybe_prop_id {
Some(already_found_prop_id) => {
return Err(AttributeValueError::MultiplePropsFound(
Expand Down Expand Up @@ -2097,7 +2094,7 @@ impl AttributeValue {
.pop()
{
let node_weight = workspace_snapshot.get_node_weight(ordering).await?;
if let NodeWeight::Ordering(ordering_weight) = node_weight.as_ref() {
if let NodeWeight::Ordering(ordering_weight) = node_weight {
Ok(ordering_weight
.order()
.clone()
Expand Down
6 changes: 4 additions & 2 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ impl Component {
if let NodeWeight::Content(content) = workspace_snapshot
.get_node_weight(maybe_schema_variant_index)
.await?
.as_ref()
{
let content_hash_discriminants: ContentAddressDiscriminants =
content.content_address().into();
Expand Down Expand Up @@ -833,7 +832,7 @@ impl Component {
.await?
{
let target_node_weight = workspace_snapshot.get_node_weight(target).await?;
if let NodeWeight::AttributeValue(_) = target_node_weight.as_ref() {
if let NodeWeight::AttributeValue(_) = target_node_weight {
maybe_root_attribute_value_id = match maybe_root_attribute_value_id {
Some(already_found_root_attribute_value_id) => {
return Err(ComponentError::MultipleRootAttributeValuesFound(
Expand Down Expand Up @@ -1243,6 +1242,9 @@ impl Component {
ctx.workspace_snapshot()?
.add_node(NodeWeight::Component(new_component_node_weight))
.await?;
ctx.workspace_snapshot()?
.replace_references(component_idx)
.await?;
}

let updated = ComponentContentV1::from(component.clone());
Expand Down
3 changes: 1 addition & 2 deletions lib/dal/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use tokio::time::Instant;
use veritech_client::{Client as VeritechClient, CycloneEncryptionKey};

use crate::layer_db_types::ContentTypes;
use crate::workspace_snapshot::node_weight::NodeWeight;
use crate::workspace_snapshot::{
conflict::Conflict, graph::WorkspaceSnapshotGraph, update::Update, vector_clock::VectorClockId,
};
Expand All @@ -37,7 +36,7 @@ use crate::{
};
use crate::{EncryptedSecret, Workspace};

pub type DalLayerDb = LayerDb<ContentTypes, EncryptedSecret, WorkspaceSnapshotGraph, NodeWeight>;
pub type DalLayerDb = LayerDb<ContentTypes, EncryptedSecret, WorkspaceSnapshotGraph>;

/// A context type which contains handles to common core service dependencies.
///
Expand Down
Loading

0 comments on commit 6d75cf9

Please sign in to comment.