Skip to content

Commit

Permalink
UI CDC: Show inserts, updates and deletes (#1525)
Browse files Browse the repository at this point in the history
This PR adds two visual elements to the CDC page in UI:
- Inserts, updates and deletes in total
- Inserts, updates and deletes for each table

<img width="1728" alt="Screenshot 2024-03-22 at 9 33 17 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/5803f7ac-f818-40a5-85b3-d46ef54ea43d">

Update: 
The I/U/D graph is now visible on click expand:
<img width="949" alt="Screenshot 2024-03-25 at 8 31 38 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/b8900617-e4fa-4fa0-8b69-a7c1e15ac7cb">

Fixes #1520
  • Loading branch information
Amogh-Bharadwaj authored Mar 25, 2024
1 parent e1dbd98 commit a735989
Show file tree
Hide file tree
Showing 21 changed files with 355 additions and 57 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
rawTableName string,
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
dstTableMetadata *bigquery.TableMetadata,
syncBatchID int64,
stream *model.QRecordStream,
tableNameRowsMapping map[string]uint32,
tableNameRowsMapping map[string]*model.RecordTypeCounts,
) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx,
fmt.Sprintf("Flow job %s: Obtaining Avro schema"+
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
rawTableIdentifier string,
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncReco
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: make(map[string]uint32),
TableNameRowsMapping: make(map[string]*model.RecordTypeCounts),
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kslog"
"github.com/yuin/gopher-lua"
lua "github.com/yuin/gopher-lua"
"go.temporal.io/sdk/log"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
Expand Down Expand Up @@ -191,7 +191,7 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
}

numRecords := int64(0)
tableNameRowsMapping := make(map[string]uint32)
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)

ls, err := utils.LoadScript(wgCtx, req.Script, func(ls *lua.LState) int {
top := ls.GetTop()
Expand Down Expand Up @@ -236,7 +236,7 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords

wg.Add(1)
c.client.Produce(wgCtx, kr, produceCb)
tableNameRowsMapping[kr.Topic] += 1
record.PopulateCountMap(tableNameRowsMapping)
}
}
numRecords += 1
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
c.logger.Info(fmt.Sprintf("pushing records to Postgres table %s via COPY", rawTableIdentifier))

numRecords := int64(0)
tableNameRowsMapping := make(map[string]uint32)

tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
streamReadFunc := func() ([]any, error) {
record, ok := <-req.Records.GetRecords()

Expand Down Expand Up @@ -434,6 +433,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
req.SyncBatchID,
"",
}

case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
Expand All @@ -460,6 +460,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
req.SyncBatchID,
utils.KeysToString(typedRecord.UnchangedToastColumns),
}

case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
Expand All @@ -479,12 +480,13 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
req.SyncBatchID,
"",
}

default:
return nil, fmt.Errorf("unsupported record type for Postgres flow connector: %T", typedRecord)
}

record.PopulateCountMap(tableNameRowsMapping)
numRecords += 1
tableNameRowsMapping[record.GetDestinationTableName()] += 1
return row, nil
}
}
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"

"cloud.google.com/go/pubsub"
"github.com/yuin/gopher-lua"
lua "github.com/yuin/gopher-lua"
"go.temporal.io/sdk/log"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
Expand Down Expand Up @@ -135,7 +135,7 @@ func lvalueToPubSubMessage(ls *lua.LState, value lua.LValue) (string, *pubsub.Me

func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
numRecords := int64(0)
tableNameRowsMapping := make(map[string]uint32)
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)

ls, err := utils.LoadScript(ctx, req.Script, func(ls *lua.LState) int {
top := ls.GetTop()
Expand Down Expand Up @@ -221,7 +221,7 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord
pubresult := topicClient.Publish(ctx, msg)
wg.Add(1)
publish <- pubresult
tableNameRowsMapping[topic] += 1
record.PopulateCountMap(tableNameRowsMapping)
}
}
numRecords += 1
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (c *S3Connector) SetLastOffset(ctx context.Context, jobName string, offset
}

func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, req.SyncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
rawTableIdentifier string,
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
Expand Down
15 changes: 10 additions & 5 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -106,7 +107,7 @@ func UpdateEndTimeForCDCBatch(
}

func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
batchID int64, tableNameRowsMapping map[string]uint32,
batchID int64, tableNameRowsMapping map[string]*model.RecordTypeCounts,
) error {
insertBatchTablesTx, err := pool.Begin(ctx)
if err != nil {
Expand All @@ -121,11 +122,15 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa
}
}()

for destinationTableName, numRows := range tableNameRowsMapping {
for destinationTableName, rowCounts := range tableNameRowsMapping {
numRows := rowCounts.InsertCount + rowCounts.UpdateCount + rowCounts.DeleteCount
_, err = insertBatchTablesTx.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_batch_table(flow_name,batch_id,destination_table_name,num_rows)
VALUES($1,$2,$3,$4) ON CONFLICT DO NOTHING`,
flowJobName, batchID, destinationTableName, numRows)
`INSERT INTO peerdb_stats.cdc_batch_table
(flow_name,batch_id,destination_table_name,num_rows,
insert_count,update_count,delete_count)
VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING`,
flowJobName, batchID, destinationTableName, numRows,
rowCounts.InsertCount, rowCounts.UpdateCount, rowCounts.DeleteCount)
if err != nil {
return fmt.Errorf("error while inserting statistics into cdc_batch_table: %w", err)
}
Expand Down
26 changes: 20 additions & 6 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/google/uuid"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)
Expand Down Expand Up @@ -62,19 +63,19 @@ func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsT

go func() {
for record := range req.GetRecords() {
qRecordOrError := recordToQRecordOrError(req.TableMapping, req.BatchID, record)
record.PopulateCountMap(req.TableMapping)
qRecordOrError := recordToQRecordOrError(req.BatchID, record)
recordStream.Records <- qRecordOrError
}

close(recordStream.Records)
}()

return &model.RecordsToStreamResponse{
Stream: recordStream,
}, nil
}

func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) model.QRecordOrError {
func recordToQRecordOrError(batchID int64, record model.Record) model.QRecordOrError {
var entries [8]qvalue.QValue
switch typedRecord := record.(type) {
case *model.InsertRecord:
Expand Down Expand Up @@ -102,7 +103,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
Kind: qvalue.QValueKindString,
Value: "",
}
tableMapping[typedRecord.DestinationTableName] += 1

case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSON()
if err != nil {
Expand Down Expand Up @@ -133,7 +134,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
Kind: qvalue.QValueKindString,
Value: KeysToString(typedRecord.UnchangedToastColumns),
}
tableMapping[typedRecord.DestinationTableName] += 1

case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSON()
if err != nil {
Expand All @@ -158,7 +159,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
Kind: qvalue.QValueKindString,
Value: KeysToString(typedRecord.UnchangedToastColumns),
}
tableMapping[typedRecord.DestinationTableName] += 1

default:
return model.QRecordOrError{
Err: fmt.Errorf("unknown record type: %T", typedRecord),
Expand Down Expand Up @@ -186,3 +187,16 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
Record: entries[:],
}
}

func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts {
tableNameRowsMapping := make(map[string]*model.RecordTypeCounts, len(tableMaps))
for _, mapping := range tableMaps {
tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{
InsertCount: 0,
UpdateCount: 0,
DeleteCount: 0,
}
}

return tableNameRowsMapping
}
30 changes: 27 additions & 3 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Record interface {
GetSourceTableName() string
// get columns and values for the record
GetItems() *RecordItems
PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts)
}

type ToJSONOptions struct {
Expand Down Expand Up @@ -114,6 +115,13 @@ func (r *InsertRecord) GetItems() *RecordItems {
return r.Items
}

func (r *InsertRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
recordCount, ok := mapOfCounts[r.DestinationTableName]
if ok {
recordCount.InsertCount++
}
}

type UpdateRecord struct {
BaseRecord
// Name of the source table
Expand All @@ -140,6 +148,13 @@ func (r *UpdateRecord) GetItems() *RecordItems {
return r.NewItems
}

func (r *UpdateRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
recordCount, ok := mapOfCounts[r.DestinationTableName]
if ok {
recordCount.UpdateCount++
}
}

type DeleteRecord struct {
BaseRecord
// Name of the source table
Expand All @@ -164,6 +179,13 @@ func (r *DeleteRecord) GetItems() *RecordItems {
return r.Items
}

func (r *DeleteRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
recordCount, ok := mapOfCounts[r.DestinationTableName]
if ok {
recordCount.DeleteCount++
}
}

type TableWithPkey struct {
TableName string
// SHA256 hash of the primary key columns
Expand Down Expand Up @@ -196,11 +218,10 @@ type SyncResponse struct {
// LastSyncedCheckpointID is the last ID that was synced.
LastSyncedCheckpointID int64
// NumRecordsSynced is the number of records that were synced.
NumRecordsSynced int64
// CurrentSyncBatchID is the ID of the currently synced batch.
NumRecordsSynced int64
CurrentSyncBatchID int64
// TableNameRowsMapping tells how many records need to be synced to each destination table.
TableNameRowsMapping map[string]uint32
TableNameRowsMapping map[string]*RecordTypeCounts
// to be carried to parent workflow
TableSchemaDeltas []*protos.TableSchemaDelta
}
Expand Down Expand Up @@ -236,4 +257,7 @@ func (r *RelationRecord) GetItems() *RecordItems {
return nil
}

func (r *RelationRecord) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) {
}

type RelationMessageMapping map[uint32]*pglogrepl.RelationMessage
10 changes: 8 additions & 2 deletions flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

type RecordTypeCounts struct {
InsertCount int
UpdateCount int
DeleteCount int
}

type QRecordOrError struct {
Record []qvalue.QValue
Err error
Expand All @@ -25,13 +31,13 @@ type QRecordStream struct {

type RecordsToStreamRequest struct {
records <-chan Record
TableMapping map[string]uint32
TableMapping map[string]*RecordTypeCounts
BatchID int64
}

func NewRecordsToStreamRequest(
records <-chan Record,
tableMapping map[string]uint32,
tableMapping map[string]*RecordTypeCounts,
batchID int64,
) *RecordsToStreamRequest {
return &RecordsToStreamRequest{
Expand Down
4 changes: 4 additions & 0 deletions nexus/catalog/migrations/V25__recordtypecounts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE peerdb_stats.cdc_batch_table
ADD COLUMN insert_count INTEGER,
ADD COLUMN update_count INTEGER,
ADD COLUMN delete_count INTEGER;
8 changes: 8 additions & 0 deletions ui/app/dto/MirrorsDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,11 @@ export type SyncStatusRow = {
endTime: Date | null;
numRows: number;
};

export type MirrorRowsData = {
destinationTableName: string;
insertCount: number;
updateCount: number;
deleteCount: number;
totalCount: number;
};
2 changes: 1 addition & 1 deletion ui/app/mirrors/[mirrorId]/cdcGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) {
/>
</div>
<div style={{ height: '3rem' }}>
<Label variant='body'>Sync history</Label>
<Label variant='headline'>Sync history</Label>
</div>
<BarChart
className='mt-3'
Expand Down
Loading

0 comments on commit a735989

Please sign in to comment.