Skip to content

Commit

Permalink
fix constraints for real
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 15, 2023
1 parent 105abe2 commit ae47b67
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 32 deletions.
54 changes: 39 additions & 15 deletions nexus/catalog/migrations/V9__mirror_stats_rels.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,58 @@
ALTER TABLE peerdb_stats.cdc_batches
ADD COLUMN id SERIAL PRIMARY KEY;

-- add unique constraint on flow_name and batch_id
ALTER TABLE peerdb_stats.cdc_batches
ADD CONSTRAINT uq_cdc_batches_flow_batch UNIQUE (flow_name, batch_id);

-- add incrementing id column to cdc_batch_table, make this the primary key
ALTER TABLE peerdb_stats.cdc_batch_table
ADD COLUMN id SERIAL PRIMARY KEY;

-- For the qrep_runs table, set run_uuid as the primary key
-- add incrementing id column to qrep_runs, make this the primary key
ALTER TABLE peerdb_stats.qrep_runs
ADD COLUMN id SERIAL PRIMARY KEY;

-- add unique for flow_name to qrep_runs
ALTER TABLE peerdb_stats.qrep_runs
ADD CONSTRAINT pk_qrep_runs PRIMARY KEY (run_uuid);
ADD CONSTRAINT uq_qrep_runs_flow_name
UNIQUE (flow_name);

-- For the qrep_partitions table, set partition_uuid as the primary key
-- add incrementing id column to qrep_partitions, make this the primary key
ALTER TABLE peerdb_stats.qrep_partitions
ADD CONSTRAINT pk_qrep_partitions PRIMARY KEY (partition_uuid);
ADD COLUMN id SERIAL PRIMARY KEY;

-- For peerdb_stats.cdc_batches
CREATE INDEX idx_cdc_batches_flow_name ON peerdb_stats.cdc_batches USING HASH(flow_name);
CREATE INDEX idx_cdc_batches_batch_id ON peerdb_stats.cdc_batches(batch_id);
CREATE INDEX idx_cdc_batches_start_time ON peerdb_stats.cdc_batches(start_time);

-- For peerdb_stats.cdc_batch_table
CREATE INDEX idx_cdc_batch_table_flow_name_batch_id ON peerdb_stats.cdc_batch_table(flow_name, batch_id);

-- For peerdb_stats.qrep_runs
CREATE INDEX idx_qrep_runs_flow_name ON peerdb_stats.qrep_runs USING HASH(flow_name);
CREATE INDEX idx_qrep_runs_run_uuid ON peerdb_stats.qrep_runs USING HASH(run_uuid);
CREATE INDEX idx_qrep_runs_start_time ON peerdb_stats.qrep_runs(start_time);

-- For peerdb_stats.qrep_partitions
CREATE INDEX idx_qrep_partitions_flow_name_run_uuid ON peerdb_stats.qrep_partitions(flow_name, run_uuid);
CREATE INDEX idx_qrep_partitions_partition_uuid ON peerdb_stats.qrep_partitions USING HASH(partition_uuid);
CREATE INDEX idx_qrep_partitions_start_time ON peerdb_stats.qrep_partitions(start_time);

-- Foreign key for flow_name in cdc_batches
-- add fkey from cdc_batches to cdc_flows
ALTER TABLE peerdb_stats.cdc_batches
ADD CONSTRAINT fk_cdc_batches_flow_name
FOREIGN KEY (flow_name) REFERENCES peerdb_stats.cdc_flows(flow_name) ON DELETE CASCADE;
FOREIGN KEY (flow_name)
REFERENCES peerdb_stats.cdc_flows (flow_name)
ON DELETE CASCADE;

-- Composite foreign key for flow_name and batch_id in cdc_batch_table
-- add fkey from cdc_batch_table to cdc_flows
ALTER TABLE peerdb_stats.cdc_batch_table
ADD CONSTRAINT fk_cdc_batch_table_flow_batch
FOREIGN KEY (flow_name, batch_id) REFERENCES peerdb_stats.cdc_batches(flow_name, batch_id) ON DELETE CASCADE;
ADD CONSTRAINT fk_cdc_batch_table_flow_name
FOREIGN KEY (flow_name)
REFERENCES peerdb_stats.cdc_flows (flow_name)
ON DELETE CASCADE;

-- Foreign key for run_uuid in qrep_partitions
-- add fkey from qrep_partitions to qrep_runs
ALTER TABLE peerdb_stats.qrep_partitions
ADD CONSTRAINT fk_qrep_partitions_run_uuid
FOREIGN KEY (run_uuid) REFERENCES peerdb_stats.qrep_runs(run_uuid) ON DELETE CASCADE;
FOREIGN KEY (flow_name)
REFERENCES peerdb_stats.qrep_runs (flow_name)
ON DELETE CASCADE;
45 changes: 28 additions & 17 deletions ui/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -70,34 +70,37 @@ model cdc_batch_table {
destination_table_name String
num_rows BigInt
metadata Json?
id Int @id @default(autoincrement())
cdc_batches cdc_batches @relation(fields: [flow_name, batch_id], references: [flow_name, batch_id], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batch_table_flow_batch")
id Int @id @default(autoincrement())
cdc_flows cdc_flows @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batch_table_flow_name")
@@index([flow_name, batch_id], map: "idx_cdc_batch_table_flow_name_batch_id")
@@schema("peerdb_stats")
}

model cdc_batches {
flow_name String
batch_id BigInt
rows_in_batch Int
batch_start_lsn Decimal @db.Decimal
batch_end_lsn Decimal @db.Decimal
start_time DateTime @db.Timestamp(6)
end_time DateTime? @db.Timestamp(6)
batch_start_lsn Decimal @db.Decimal
batch_end_lsn Decimal @db.Decimal
start_time DateTime @db.Timestamp(6)
end_time DateTime? @db.Timestamp(6)
metadata Json?
id Int @id @default(autoincrement())
cdc_batch_table cdc_batch_table[]
cdc_flows cdc_flows @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batches_flow_name")
id Int @id @default(autoincrement())
cdc_flows cdc_flows @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batches_flow_name")
@@unique([flow_name, batch_id], map: "uq_cdc_batches_flow_batch")
@@index([batch_id], map: "idx_cdc_batches_batch_id")
@@index([flow_name], map: "idx_cdc_batches_flow_name", type: Hash)
@@index([start_time], map: "idx_cdc_batches_start_time")
@@schema("peerdb_stats")
}

model cdc_flows {
flow_name String @id
latest_lsn_at_source Decimal @db.Decimal
latest_lsn_at_target Decimal @db.Decimal
flow_name String @id
latest_lsn_at_source Decimal @db.Decimal
latest_lsn_at_target Decimal @db.Decimal
metadata Json?
cdc_batch_table cdc_batch_table[]
cdc_batches cdc_batches[]
@@schema("peerdb_stats")
Expand All @@ -106,7 +109,7 @@ model cdc_flows {
model qrep_partitions {
flow_name String
run_uuid String
partition_uuid String @id(map: "pk_qrep_partitions")
partition_uuid String
partition_start String
partition_end String
rows_in_partition Int?
Expand All @@ -115,20 +118,28 @@ model qrep_partitions {
end_time DateTime? @db.Timestamp(6)
restart_count Int
metadata Json?
qrep_runs qrep_runs @relation(fields: [run_uuid], references: [run_uuid], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run_uuid")
id Int @id @default(autoincrement())
qrep_runs qrep_runs @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run_uuid")
@@unique([run_uuid, partition_uuid])
@@index([flow_name, run_uuid], map: "idx_qrep_partitions_flow_name_run_uuid")
@@index([partition_uuid], map: "idx_qrep_partitions_partition_uuid", type: Hash)
@@index([start_time], map: "idx_qrep_partitions_start_time")
@@schema("peerdb_stats")
}

model qrep_runs {
flow_name String
run_uuid String @id(map: "pk_qrep_runs")
flow_name String @unique(map: "uq_qrep_runs_flow_name")
run_uuid String
start_time DateTime @db.Timestamp(6)
end_time DateTime? @db.Timestamp(6)
metadata Json?
config_proto Bytes?
id Int @id @default(autoincrement())
qrep_partitions qrep_partitions[]
@@index([flow_name], map: "idx_qrep_runs_flow_name", type: Hash)
@@index([run_uuid], map: "idx_qrep_runs_run_uuid", type: Hash)
@@index([start_time], map: "idx_qrep_runs_start_time")
@@schema("peerdb_stats")
}

0 comments on commit ae47b67

Please sign in to comment.