From a735989a398ca0a4d4e56f4d28e181cab6b2dfed Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 26 Mar 2024 02:00:09 +0530 Subject: [PATCH] UI CDC: Show inserts, updates and deletes (#1525) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds two visual elements to the CDC page in UI: - Inserts, updates and deletes in total - Inserts, updates and deletes for each table Screenshot 2024-03-22 at 9 33 17 PM Update: The I/U/D graph is now visible on click expand: Screenshot 2024-03-25 at 8 31 38 PM Fixes #1520 --- flow/connectors/bigquery/bigquery.go | 2 +- flow/connectors/bigquery/qrep_avro_sync.go | 2 +- flow/connectors/clickhouse/cdc.go | 2 +- flow/connectors/eventhub/eventhub.go | 2 +- flow/connectors/kafka/kafka.go | 6 +- flow/connectors/postgres/postgres.go | 8 +- flow/connectors/pubsub/pubsub.go | 6 +- flow/connectors/s3/s3.go | 2 +- flow/connectors/snowflake/snowflake.go | 2 +- .../connectors/utils/monitoring/monitoring.go | 15 ++-- flow/connectors/utils/stream.go | 26 ++++-- flow/model/model.go | 30 ++++++- flow/model/qrecord_stream.go | 10 ++- .../migrations/V25__recordtypecounts.sql | 4 + ui/app/dto/MirrorsDTO.ts | 8 ++ ui/app/mirrors/[mirrorId]/cdcGraph.tsx | 2 +- ui/app/mirrors/[mirrorId]/rowsDisplay.tsx | 56 +++++++++++++ ui/app/mirrors/[mirrorId]/syncStatus.tsx | 71 ++++++++++++---- ui/app/mirrors/[mirrorId]/syncStatusTable.tsx | 3 +- ui/app/mirrors/[mirrorId]/tableStats.tsx | 75 +++++++++++++++++ ui/prisma/schema.prisma | 80 +++++++++++++++++-- 21 files changed, 355 insertions(+), 57 deletions(-) create mode 100644 nexus/catalog/migrations/V25__recordtypecounts.sql create mode 100644 ui/app/mirrors/[mirrorId]/rowsDisplay.tsx create mode 100644 ui/app/mirrors/[mirrorId]/tableStats.tsx diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 7dc1b3b77e..bad43d61cb 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -370,7 +370,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro( rawTableName string, syncBatchID int64, ) (*model.SyncResponse, error) { - tableNameRowsMapping := make(map[string]uint32) + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) streamRes, err := utils.RecordsToRawTableStream(streamReq) if err != nil { diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index a4b4023d1b..2586436ab6 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -45,7 +45,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( dstTableMetadata *bigquery.TableMetadata, syncBatchID int64, stream *model.QRecordStream, - tableNameRowsMapping map[string]uint32, + tableNameRowsMapping map[string]*model.RecordTypeCounts, ) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, fmt.Sprintf("Flow job %s: Obtaining Avro schema"+ diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 12d1402d87..aabd51c11f 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -80,7 +80,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( rawTableIdentifier string, syncBatchID int64, ) (*model.SyncResponse, error) { - tableNameRowsMapping := make(map[string]uint32) + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) streamRes, err := utils.RecordsToRawTableStream(streamReq) if err != nil { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 7d75298777..a1ab93f391 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -215,7 +215,7 @@ func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncReco CurrentSyncBatchID: req.SyncBatchID, LastSyncedCheckpointID: lastCheckpoint, NumRecordsSynced: int64(numRecords), - TableNameRowsMapping: make(map[string]uint32), + TableNameRowsMapping: make(map[string]*model.RecordTypeCounts), TableSchemaDeltas: req.Records.SchemaDeltas, }, nil } diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index cefe73ca4b..6d878f7bfd 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -12,7 +12,7 @@ import ( "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/pkg/sasl/scram" "github.com/twmb/franz-go/plugin/kslog" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "go.temporal.io/sdk/log" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" @@ -191,7 +191,7 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords } numRecords := int64(0) - tableNameRowsMapping := make(map[string]uint32) + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) ls, err := utils.LoadScript(wgCtx, req.Script, func(ls *lua.LState) int { top := ls.GetTop() @@ -236,7 +236,7 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords wg.Add(1) c.client.Produce(wgCtx, kr, produceCb) - tableNameRowsMapping[kr.Topic] += 1 + record.PopulateCountMap(tableNameRowsMapping) } } numRecords += 1 diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 82cbd9dd6d..c89567a4e6 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -405,8 +405,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco c.logger.Info(fmt.Sprintf("pushing records to Postgres table %s via COPY", rawTableIdentifier)) numRecords := int64(0) - tableNameRowsMapping := make(map[string]uint32) - + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) streamReadFunc := func() ([]any, error) { record, ok := <-req.Records.GetRecords() @@ -434,6 +433,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco req.SyncBatchID, "", } + case *model.UpdateRecord: newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{ UnnestColumns: map[string]struct{}{}, @@ -460,6 +460,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco req.SyncBatchID, utils.KeysToString(typedRecord.UnchangedToastColumns), } + case *model.DeleteRecord: itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{ UnnestColumns: map[string]struct{}{}, @@ -479,12 +480,13 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco req.SyncBatchID, "", } + default: return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord) } + record.PopulateCountMap(tableNameRowsMapping) numRecords += 1 - tableNameRowsMapping[record.GetDestinationTableName()] += 1 return row, nil } } diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 6b51bb246a..111c9815d9 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -7,7 +7,7 @@ import ( "sync" "cloud.google.com/go/pubsub" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "go.temporal.io/sdk/log" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" @@ -135,7 +135,7 @@ func lvalueToPubSubMessage(ls *lua.LState, value lua.LValue) (string, *pubsub.Me func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) { numRecords := int64(0) - tableNameRowsMapping := make(map[string]uint32) + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) ls, err := utils.LoadScript(ctx, req.Script, func(ls *lua.LState) int { top := ls.GetTop() @@ -221,7 +221,7 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord pubresult := topicClient.Publish(ctx, msg) wg.Add(1) publish <- pubresult - tableNameRowsMapping[topic] += 1 + record.PopulateCountMap(tableNameRowsMapping) } } numRecords += 1 diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index ddb6061b0d..7e95e00b43 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -153,7 +153,7 @@ func (c *S3Connector) SetLastOffset(ctx context.Context, jobName string, offset } func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - tableNameRowsMapping := make(map[string]uint32) + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, req.SyncBatchID) streamRes, err := utils.RecordsToRawTableStream(streamReq) if err != nil { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index f67fe59df0..1d9ed6fe58 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -447,7 +447,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( rawTableIdentifier string, syncBatchID int64, ) (*model.SyncResponse, error) { - tableNameRowsMapping := make(map[string]uint32) + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) streamRes, err := utils.RecordsToRawTableStream(streamReq) if err != nil { diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 97fe08616c..472cc1efed 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -14,6 +14,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" ) @@ -106,7 +107,7 @@ func UpdateEndTimeForCDCBatch( } func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string, - batchID int64, tableNameRowsMapping map[string]uint32, + batchID int64, tableNameRowsMapping map[string]*model.RecordTypeCounts, ) error { insertBatchTablesTx, err := pool.Begin(ctx) if err != nil { @@ -121,11 +122,15 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa } }() - for destinationTableName, numRows := range tableNameRowsMapping { + for destinationTableName, rowCounts := range tableNameRowsMapping { + numRows := rowCounts.InsertCount + rowCounts.UpdateCount + rowCounts.DeleteCount _, err = insertBatchTablesTx.Exec(ctx, - `INSERT INTO peerdb_stats.cdc_batch_table(flow_name,batch_id,destination_table_name,num_rows) - VALUES($1,$2,$3,$4) ON CONFLICT DO NOTHING`, - flowJobName, batchID, destinationTableName, numRows) + `INSERT INTO peerdb_stats.cdc_batch_table + (flow_name,batch_id,destination_table_name,num_rows, + insert_count,update_count,delete_count) + VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING`, + flowJobName, batchID, destinationTableName, numRows, + rowCounts.InsertCount, rowCounts.UpdateCount, rowCounts.DeleteCount) if err != nil { return fmt.Errorf("error while inserting statistics into cdc_batch_table: %w", err) } diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 898acd03b5..c2d146c062 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -6,6 +6,7 @@ import ( "github.com/google/uuid" + "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -62,19 +63,19 @@ func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsT go func() { for record := range req.GetRecords() { - qRecordOrError := recordToQRecordOrError(req.TableMapping, req.BatchID, record) + record.PopulateCountMap(req.TableMapping) + qRecordOrError := recordToQRecordOrError(req.BatchID, record) recordStream.Records <- qRecordOrError } close(recordStream.Records) }() - return &model.RecordsToStreamResponse{ Stream: recordStream, }, nil } -func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) model.QRecordOrError { +func recordToQRecordOrError(batchID int64, record model.Record) model.QRecordOrError { var entries [8]qvalue.QValue switch typedRecord := record.(type) { case *model.InsertRecord: @@ -102,7 +103,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor Kind: qvalue.QValueKindString, Value: "", } - tableMapping[typedRecord.DestinationTableName] += 1 + case *model.UpdateRecord: newItemsJSON, err := typedRecord.NewItems.ToJSON() if err != nil { @@ -133,7 +134,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor Kind: qvalue.QValueKindString, Value: KeysToString(typedRecord.UnchangedToastColumns), } - tableMapping[typedRecord.DestinationTableName] += 1 + case *model.DeleteRecord: itemsJSON, err := typedRecord.Items.ToJSON() if err != nil { @@ -158,7 +159,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor Kind: qvalue.QValueKindString, Value: KeysToString(typedRecord.UnchangedToastColumns), } - tableMapping[typedRecord.DestinationTableName] += 1 + default: return model.QRecordOrError{ Err: fmt.Errorf("unknown record type: %T", typedRecord), @@ -186,3 +187,16 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor Record: entries[:], } } + +func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts { + tableNameRowsMapping := make(map[string]*model.RecordTypeCounts, len(tableMaps)) + for _, mapping := range tableMaps { + tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{ + InsertCount: 0, + UpdateCount: 0, + DeleteCount: 0, + } + } + + return tableNameRowsMapping +} diff --git a/flow/model/model.go b/flow/model/model.go index 1561e66544..10fe95d5a7 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -54,6 +54,7 @@ type Record interface { GetSourceTableName() string // get columns and values for the record GetItems() *RecordItems + PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) } type ToJSONOptions struct { @@ -114,6 +115,13 @@ func (r *InsertRecord) GetItems() *RecordItems { return r.Items } +func (r *InsertRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { + recordCount, ok := mapOfCounts[r.DestinationTableName] + if ok { + recordCount.InsertCount++ + } +} + type UpdateRecord struct { BaseRecord // Name of the source table @@ -140,6 +148,13 @@ func (r *UpdateRecord) GetItems() *RecordItems { return r.NewItems } +func (r *UpdateRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { + recordCount, ok := mapOfCounts[r.DestinationTableName] + if ok { + recordCount.UpdateCount++ + } +} + type DeleteRecord struct { BaseRecord // Name of the source table @@ -164,6 +179,13 @@ func (r *DeleteRecord) GetItems() *RecordItems { return r.Items } +func (r *DeleteRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { + recordCount, ok := mapOfCounts[r.DestinationTableName] + if ok { + recordCount.DeleteCount++ + } +} + type TableWithPkey struct { TableName string // SHA256 hash of the primary key columns @@ -196,11 +218,10 @@ type SyncResponse struct { // LastSyncedCheckpointID is the last ID that was synced. LastSyncedCheckpointID int64 // NumRecordsSynced is the number of records that were synced. - NumRecordsSynced int64 - // CurrentSyncBatchID is the ID of the currently synced batch. + NumRecordsSynced int64 CurrentSyncBatchID int64 // TableNameRowsMapping tells how many records need to be synced to each destination table. - TableNameRowsMapping map[string]uint32 + TableNameRowsMapping map[string]*RecordTypeCounts // to be carried to parent workflow TableSchemaDeltas []*protos.TableSchemaDelta } @@ -236,4 +257,7 @@ func (r *RelationRecord) GetItems() *RecordItems { return nil } +func (r *RelationRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { +} + type RelationMessageMapping map[uint32]*pglogrepl.RelationMessage diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index e3fa4ee74e..97546ce247 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -6,6 +6,12 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) +type RecordTypeCounts struct { + InsertCount int + UpdateCount int + DeleteCount int +} + type QRecordOrError struct { Record []qvalue.QValue Err error @@ -25,13 +31,13 @@ type QRecordStream struct { type RecordsToStreamRequest struct { records <-chan Record - TableMapping map[string]uint32 + TableMapping map[string]*RecordTypeCounts BatchID int64 } func NewRecordsToStreamRequest( records <-chan Record, - tableMapping map[string]uint32, + tableMapping map[string]*RecordTypeCounts, batchID int64, ) *RecordsToStreamRequest { return &RecordsToStreamRequest{ diff --git a/nexus/catalog/migrations/V25__recordtypecounts.sql b/nexus/catalog/migrations/V25__recordtypecounts.sql new file mode 100644 index 0000000000..e7f7f18880 --- /dev/null +++ b/nexus/catalog/migrations/V25__recordtypecounts.sql @@ -0,0 +1,4 @@ +ALTER TABLE peerdb_stats.cdc_batch_table +ADD COLUMN insert_count INTEGER, +ADD COLUMN update_count INTEGER, +ADD COLUMN delete_count INTEGER; diff --git a/ui/app/dto/MirrorsDTO.ts b/ui/app/dto/MirrorsDTO.ts index 9b3724902a..0fbb8b9009 100644 --- a/ui/app/dto/MirrorsDTO.ts +++ b/ui/app/dto/MirrorsDTO.ts @@ -35,3 +35,11 @@ export type SyncStatusRow = { endTime: Date | null; numRows: number; }; + +export type MirrorRowsData = { + destinationTableName: string; + insertCount: number; + updateCount: number; + deleteCount: number; + totalCount: number; +}; diff --git a/ui/app/mirrors/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx index c4b5a7c9cc..41c69b9161 100644 --- a/ui/app/mirrors/[mirrorId]/cdcGraph.tsx +++ b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx @@ -38,7 +38,7 @@ function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) { />
- +
+ `${Intl.NumberFormat('us').format(number).toString()}`; + +const RowsDisplay = ({ + totalRowsData, +}: { + totalRowsData: { + total: Number; + inserts: number; + updates: number; + deletes: number; + }; +}) => { + const [show, setShow] = useState(false); + const rowsHero = [ + { name: 'Inserts', value: totalRowsData.inserts }, + { name: 'Updates', value: totalRowsData.updates, color: 'yellow' }, + { name: 'Deletes', value: totalRowsData.deletes, color: 'rose' }, + ]; + return ( +
+

+ Rows Synced +

+

+ {RowDataFormatter(totalRowsData.total.valueOf())} +

+ + {show && ( +
+ +
+ )} +
+ ); +}; + +export default RowsDisplay; diff --git a/ui/app/mirrors/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/[mirrorId]/syncStatus.tsx index ba50ff67b8..98bdba35c3 100644 --- a/ui/app/mirrors/[mirrorId]/syncStatus.tsx +++ b/ui/app/mirrors/[mirrorId]/syncStatus.tsx @@ -1,17 +1,46 @@ -import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; -import { Label } from '@/lib/Label'; +import { MirrorRowsData, SyncStatusRow } from '@/app/dto/MirrorsDTO'; +import prisma from '@/app/utils/prisma'; import CdcGraph from './cdcGraph'; +import RowsDisplay from './rowsDisplay'; import { SyncStatusTable } from './syncStatusTable'; +import TableStats from './tableStats'; -function numberWithCommas(x: Number): string { - return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ','); -} type SyncStatusProps = { flowJobName: string | undefined; rowsSynced: Number; rows: SyncStatusRow[]; }; +const GetTablesStats = async (flowName: string) => { + let tableSyncs = await prisma.cdc_batch_table.groupBy({ + _sum: { + insert_count: true, + update_count: true, + delete_count: true, + }, + by: ['destination_table_name'], + where: { + flow_name: flowName, + }, + }); + + const rowsData: MirrorRowsData[] = []; + tableSyncs.forEach((tableSync) => { + const inserts = tableSync._sum.insert_count ?? 0; + const updates = tableSync._sum.update_count ?? 0; + const deletes = tableSync._sum.delete_count ?? 0; + const total = inserts + updates + deletes; + rowsData.push({ + destinationTableName: tableSync.destination_table_name, + insertCount: inserts, + updateCount: updates, + deleteCount: deletes, + totalCount: total, + }); + }); + return rowsData; +}; + export default async function SyncStatus({ flowJobName, rowsSynced, @@ -21,23 +50,33 @@ export default async function SyncStatus({ return
Flow job name not provided!
; } + const tableSyncs = await GetTablesStats(flowJobName); + const inserts = tableSyncs.reduce( + (acc, tableData) => acc + tableData.insertCount, + 0 + ); + const updates = tableSyncs.reduce( + (acc, tableData) => acc + tableData.updateCount, + 0 + ); + const deletes = tableSyncs.reduce( + (acc, tableData) => acc + tableData.deleteCount, + 0 + ); + const totalRowsData = { + total: inserts + updates + deletes, + inserts, + updates, + deletes, + }; return (
-
-
- -
-
- -
-
- +
+
); } diff --git a/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx index 7d9a453b1c..a5fcdff3ed 100644 --- a/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx +++ b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx @@ -12,6 +12,7 @@ import { Table, TableCell, TableRow } from '@/lib/Table'; import moment from 'moment'; import { useMemo, useState } from 'react'; import ReactSelect from 'react-select'; +import { RowDataFormatter } from './rowsDisplay'; type SyncStatusTableProps = { rows: SyncStatusRow[]; @@ -201,7 +202,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { endTime={row.endTime} /> - {row.numRows} + {RowDataFormatter(row.numRows)} ))} diff --git a/ui/app/mirrors/[mirrorId]/tableStats.tsx b/ui/app/mirrors/[mirrorId]/tableStats.tsx new file mode 100644 index 0000000000..21b4529c4c --- /dev/null +++ b/ui/app/mirrors/[mirrorId]/tableStats.tsx @@ -0,0 +1,75 @@ +'use client'; +import { MirrorRowsData } from '@/app/dto/MirrorsDTO'; +import { Label } from '@/lib/Label'; +import { SearchField } from '@/lib/SearchField/SearchField'; +import { Table, TableCell, TableRow } from '@/lib/Table'; +import { useMemo, useState } from 'react'; +import { RowDataFormatter } from './rowsDisplay'; + +const TableStats = ({ tableSyncs }: { tableSyncs: MirrorRowsData[] }) => { + const [searchQuery, setSearchQuery] = useState(''); + const tableDataToShow = useMemo(() => { + return tableSyncs.filter((tableSync) => + tableSync.destinationTableName + .toLowerCase() + .includes(searchQuery.toLowerCase()) + ); + }, [tableSyncs, searchQuery]); + + return ( +
+ +
+ ) => + setSearchQuery(e.target.value) + } + /> + ), + }} + header={ + + {['Table Name', 'Total', 'Inserts', 'Updates', 'Deletes'].map( + (heading, index) => ( + + + + ) + )} + + } + > + {tableDataToShow.map((tableSync) => { + return ( + + + + + {[ + tableSync.totalCount, + tableSync.insertCount, + tableSync.updateCount, + tableSync.deleteCount, + ].map((rowMetric, id) => { + return ( + + + + ); + })} + + ); + })} +
+
+
+ ); +}; + +export default TableStats; diff --git a/ui/prisma/schema.prisma b/ui/prisma/schema.prisma index 3337588e2b..a690445e36 100644 --- a/ui/prisma/schema.prisma +++ b/ui/prisma/schema.prisma @@ -71,6 +71,9 @@ model cdc_batch_table { num_rows BigInt metadata Json? id Int @id @default(autoincrement()) + insert_count Int? + update_count Int? + delete_count Int? cdc_flows cdc_flows @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batch_table_flow_name") @@index([flow_name, batch_id], map: "idx_cdc_batch_table_flow_name_batch_id") @@ -87,6 +90,9 @@ model cdc_batches { end_time DateTime? @db.Timestamp(6) metadata Json? id Int @id @default(autoincrement()) + insert_count Int @default(0) + update_count Int @default(0) + delete_count Int @default(0) cdc_flows cdc_flows @relation(fields: [flow_name], references: [flow_name], onDelete: Cascade, onUpdate: NoAction, map: "fk_cdc_batches_flow_name") @@index([batch_id], map: "idx_cdc_batches_batch_id") @@ -130,14 +136,16 @@ model qrep_partitions { } model qrep_runs { - flow_name String - run_uuid String - start_time DateTime? @db.Timestamp(6) - end_time DateTime? @db.Timestamp(6) - metadata Json? - config_proto Bytes? - id Int @id @default(autoincrement()) - qrep_partitions qrep_partitions[] + flow_name String + run_uuid String + start_time DateTime? @db.Timestamp(6) + end_time DateTime? @db.Timestamp(6) + metadata Json? + config_proto Bytes? + id Int @id @default(autoincrement()) + fetch_complete Boolean? @default(false) + consolidate_complete Boolean? @default(false) + qrep_partitions qrep_partitions[] @@unique([flow_name, run_uuid], map: "uq_qrep_runs_flow_run") @@index([flow_name], map: "idx_qrep_runs_flow_name", type: Hash) @@ -158,6 +166,7 @@ model peer_slot_size { wal_status String? @@index([slot_name], map: "index_slot_name") + @@index([updated_at], map: "index_slot_updated_at") @@schema("peerdb_stats") } @@ -177,6 +186,7 @@ model alerts_v1 { alert_level String @default("critical") alert_message String created_timestamp DateTime? @default(now()) @db.Timestamp(6) + alert_config_id BigInt? @@schema("peerdb_stats") } @@ -203,3 +213,57 @@ model schema_deltas_audit_log { @@schema("peerdb_stats") } + +model alerting_settings { + id Int @id(map: "alerting_settings_pkey1") @default(autoincrement()) + config_name String + config_value String + + @@schema("public") +} + +model dynamic_settings { + id Int @id(map: "alerting_settings_pkey") @default(autoincrement()) + config_name String @unique(map: "idx_alerting_settings_config_name") + config_value String + setting_description String? + needs_restart Boolean? + + @@schema("public") +} + +model metadata_last_sync_state { + job_name String @id + last_offset BigInt + updated_at DateTime @default(now()) @db.Timestamptz(6) + sync_batch_id BigInt + normalize_batch_id BigInt? + + @@schema("public") +} + +model metadata_qrep_partitions { + job_name String + partition_id String + sync_partition Json @db.Json + sync_start_time DateTime @db.Timestamptz(6) + sync_finish_time DateTime @default(now()) @db.Timestamptz(6) + + @@id([job_name, partition_id]) + @@schema("public") +} + +model scripts { + id Int @id @default(autoincrement()) + lang script_lang + name String @unique + source Bytes + + @@schema("public") +} + +enum script_lang { + lua + + @@schema("public") +}