diff --git a/nexus/catalog/migrations/V9__mirror_stats_rels.sql b/nexus/catalog/migrations/V9__mirror_stats_rels.sql index e4b518c640..e462258f81 100644 --- a/nexus/catalog/migrations/V9__mirror_stats_rels.sql +++ b/nexus/catalog/migrations/V9__mirror_stats_rels.sql @@ -2,34 +2,58 @@ ALTER TABLE peerdb_stats.cdc_batches ADD COLUMN id SERIAL PRIMARY KEY; --- add unique constraint on flow_name and batch_id -ALTER TABLE peerdb_stats.cdc_batches -ADD CONSTRAINT uq_cdc_batches_flow_batch UNIQUE (flow_name, batch_id); - -- add incrementing id column to cdc_batch_table, make this the primary key ALTER TABLE peerdb_stats.cdc_batch_table ADD COLUMN id SERIAL PRIMARY KEY; --- For the qrep_runs table, set run_uuid as the primary key +-- add incrementing id column to qrep_runs, make this the primary key +ALTER TABLE peerdb_stats.qrep_runs +ADD COLUMN id SERIAL PRIMARY KEY; + +-- add unique for flow_name to qrep_runs ALTER TABLE peerdb_stats.qrep_runs -ADD CONSTRAINT pk_qrep_runs PRIMARY KEY (run_uuid); +ADD CONSTRAINT uq_qrep_runs_flow_name +UNIQUE (flow_name); --- For the qrep_partitions table, set partition_uuid as the primary key +-- add incrementing id column to qrep_partitions, make this the primary key ALTER TABLE peerdb_stats.qrep_partitions -ADD CONSTRAINT pk_qrep_partitions PRIMARY KEY (partition_uuid); +ADD COLUMN id SERIAL PRIMARY KEY; + +-- For peerdb_stats.cdc_batches +CREATE INDEX idx_cdc_batches_flow_name ON peerdb_stats.cdc_batches USING HASH(flow_name); +CREATE INDEX idx_cdc_batches_batch_id ON peerdb_stats.cdc_batches(batch_id); +CREATE INDEX idx_cdc_batches_start_time ON peerdb_stats.cdc_batches(start_time); + +-- For peerdb_stats.cdc_batch_table +CREATE INDEX idx_cdc_batch_table_flow_name_batch_id ON peerdb_stats.cdc_batch_table(flow_name, batch_id); + +-- For peerdb_stats.qrep_runs +CREATE INDEX idx_qrep_runs_flow_name ON peerdb_stats.qrep_runs USING HASH(flow_name); +CREATE INDEX idx_qrep_runs_run_uuid ON peerdb_stats.qrep_runs USING HASH(run_uuid); +CREATE INDEX idx_qrep_runs_start_time ON peerdb_stats.qrep_runs(start_time); +-- For peerdb_stats.qrep_partitions +CREATE INDEX idx_qrep_partitions_flow_name_run_uuid ON peerdb_stats.qrep_partitions(flow_name, run_uuid); +CREATE INDEX idx_qrep_partitions_partition_uuid ON peerdb_stats.qrep_partitions USING HASH(partition_uuid); +CREATE INDEX idx_qrep_partitions_start_time ON peerdb_stats.qrep_partitions(start_time); --- Foreign key for flow_name in cdc_batches +-- add fkey from cdc_batches to cdc_flows ALTER TABLE peerdb_stats.cdc_batches ADD CONSTRAINT fk_cdc_batches_flow_name -FOREIGN KEY (flow_name) REFERENCES peerdb_stats.cdc_flows(flow_name) ON DELETE CASCADE; +FOREIGN KEY (flow_name) +REFERENCES peerdb_stats.cdc_flows (flow_name) +ON DELETE CASCADE; --- Composite foreign key for flow_name and batch_id in cdc_batch_table +-- add fkey from cdc_batch_table to cdc_flows ALTER TABLE peerdb_stats.cdc_batch_table -ADD CONSTRAINT fk_cdc_batch_table_flow_batch -FOREIGN KEY (flow_name, batch_id) REFERENCES peerdb_stats.cdc_batches(flow_name, batch_id) ON DELETE CASCADE; +ADD CONSTRAINT fk_cdc_batch_table_flow_name +FOREIGN KEY (flow_name) +REFERENCES peerdb_stats.cdc_flows (flow_name) +ON DELETE CASCADE; --- Foreign key for run_uuid in qrep_partitions +-- add fkey from qrep_partitions to qrep_runs ALTER TABLE peerdb_stats.qrep_partitions ADD CONSTRAINT fk_qrep_partitions_run_uuid -FOREIGN KEY (run_uuid) REFERENCES peerdb_stats.qrep_runs(run_uuid) ON DELETE CASCADE; +FOREIGN KEY (flow_name) +REFERENCES peerdb_stats.qrep_runs (flow_name) +ON DELETE CASCADE; diff --git a/ui/prisma/schema.prisma b/ui/prisma/schema.prisma index a39ca5fe3f..0e55d6eaa4 100644 --- a/ui/prisma/schema.prisma +++ b/ui/prisma/schema.prisma @@ -70,9 +70,10 @@ model cdc_batch_table { destination_table_name String num_rows BigInt metadata Json? - id Int @id @default(autoincrement()) - cdc_batches cdc_batches @relation(fields: [flow_name, batch_id], references: [flow_name, batch_id], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batch_table_flow_batch") + id Int @id @default(autoincrement()) + cdc_flows cdc_flows @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batch_table_flow_name") + @@index([flow_name, batch_id], map: "idx_cdc_batch_table_flow_name_batch_id") @@schema("peerdb_stats") } @@ -80,24 +81,26 @@ model cdc_batches { flow_name String batch_id BigInt rows_in_batch Int - batch_start_lsn Decimal @db.Decimal - batch_end_lsn Decimal @db.Decimal - start_time DateTime @db.Timestamp(6) - end_time DateTime? @db.Timestamp(6) + batch_start_lsn Decimal @db.Decimal + batch_end_lsn Decimal @db.Decimal + start_time DateTime @db.Timestamp(6) + end_time DateTime? @db.Timestamp(6) metadata Json? - id Int @id @default(autoincrement()) - cdc_batch_table cdc_batch_table[] - cdc_flows cdc_flows @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batches_flow_name") + id Int @id @default(autoincrement()) + cdc_flows cdc_flows @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batches_flow_name") - @@unique([flow_name, batch_id], map: "uq_cdc_batches_flow_batch") + @@index([batch_id], map: "idx_cdc_batches_batch_id") + @@index([flow_name], map: "idx_cdc_batches_flow_name", type: Hash) + @@index([start_time], map: "idx_cdc_batches_start_time") @@schema("peerdb_stats") } model cdc_flows { - flow_name String @id - latest_lsn_at_source Decimal @db.Decimal - latest_lsn_at_target Decimal @db.Decimal + flow_name String @id + latest_lsn_at_source Decimal @db.Decimal + latest_lsn_at_target Decimal @db.Decimal metadata Json? + cdc_batch_table cdc_batch_table[] cdc_batches cdc_batches[] @@schema("peerdb_stats") @@ -106,7 +109,7 @@ model cdc_flows { model qrep_partitions { flow_name String run_uuid String - partition_uuid String @id(map: "pk_qrep_partitions") + partition_uuid String partition_start String partition_end String rows_in_partition Int? @@ -115,20 +118,28 @@ model qrep_partitions { end_time DateTime? @db.Timestamp(6) restart_count Int metadata Json? - qrep_runs qrep_runs @relation(fields: [run_uuid], references: [run_uuid], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run_uuid") + id Int @id @default(autoincrement()) + qrep_runs qrep_runs @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run_uuid") @@unique([run_uuid, partition_uuid]) + @@index([flow_name, run_uuid], map: "idx_qrep_partitions_flow_name_run_uuid") + @@index([partition_uuid], map: "idx_qrep_partitions_partition_uuid", type: Hash) + @@index([start_time], map: "idx_qrep_partitions_start_time") @@schema("peerdb_stats") } model qrep_runs { - flow_name String - run_uuid String @id(map: "pk_qrep_runs") + flow_name String @unique(map: "uq_qrep_runs_flow_name") + run_uuid String start_time DateTime @db.Timestamp(6) end_time DateTime? @db.Timestamp(6) metadata Json? config_proto Bytes? + id Int @id @default(autoincrement()) qrep_partitions qrep_partitions[] + @@index([flow_name], map: "idx_qrep_runs_flow_name", type: Hash) + @@index([run_uuid], map: "idx_qrep_runs_run_uuid", type: Hash) + @@index([start_time], map: "idx_qrep_runs_start_time") @@schema("peerdb_stats") }