Skip to content

Commit

Permalink
suggested change
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 7, 2023
1 parent c8e33c0 commit dbdb9b0
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 11 deletions.
50 changes: 40 additions & 10 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,39 @@ func (a *FlowableActivity) CreateNormalizedTable(
return conn.SetupNormalizedTables(config)
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
done <-chan struct{},
peerName string,
slotInfo []*protos.SlotInfo,
) {
timeout := 10 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
for {
if len(slotInfo) == 0 {
continue
}

select {
case <-ticker.C:
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
case <-done:
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
}
ticker.Stop()
ticker = time.NewTicker(timeout)
}

}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
done := make(chan struct{})
defer close(done)
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)
Expand Down Expand Up @@ -205,6 +234,16 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName)
if input.FlowConnectionConfigs.ReplicationSlotName != "" {
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}
slotInfo, err := srcConn.GetSlotInfo(slotNameForMetrics)
if err != nil {
return nil, fmt.Errorf("failed to get slot info: %w", err)
}

go a.recordSlotSizePeriodically(ctx, done, input.FlowConnectionConfigs.Source.Name, slotInfo)
// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
Expand All @@ -222,16 +261,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})
})

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName)
slotInfo, err := srcConn.GetSlotInfo(slotNameForMetrics)
if err != nil {
return nil, fmt.Errorf("failed to get slot info: %w", err)
}

if len(slotInfo) == 1 {
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, input.FlowConnectionConfigs.Source.Name, slotInfo[0])
}

hasRecords := !recordBatch.WaitAndCheckEmpty()
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
Expand Down Expand Up @@ -339,6 +368,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
done <- struct{}{}

return res, nil
}
Expand Down
2 changes: 2 additions & 0 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str
}

// GetSlotInfo gets the information about the replication slot size and LSNs
// If slotName input is empty, all slot info rows are returned - this is for UI.
// Else, only the row pertaining to that slotName will be returned.
func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) {
specificSlotClause := ""
if slotName != "" {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (c *CatalogMirrorMonitor) AppendSlotSizeInfo(
_, err := c.catalogConn.Exec(ctx,
"INSERT INTO peerdb_stats.peer_slot_size"+
"(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size) "+
"VALUES($1,$2,$3,$4,$5,$6);",
"VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING;",
peerName,
slotInfo.SlotName,
slotInfo.RestartLSN,
Expand Down
3 changes: 3 additions & 0 deletions nexus/catalog/migrations/V12__slot_size.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
CREATE TABLE IF NOT EXISTS peerdb_stats.peer_slot_size (
id SERIAL PRIMARY KEY,
slot_name TEXT NOT NULL,
peer_name TEXT NOT NULL,
redo_lsn TEXT,
Expand All @@ -7,3 +8,5 @@ CREATE TABLE IF NOT EXISTS peerdb_stats.peer_slot_size (
slot_size BIGINT,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX index_slot_name ON peerdb_stats.peer_slot_size (slot_name);

0 comments on commit dbdb9b0

Please sign in to comment.