Skip to content

Commit

Permalink
Merge pull request #3021 from redpanda-data/pgcdc-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Nov 20, 2024
2 parents cbea8d8 + b835093 commit 82bb8af
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 40 deletions.
18 changes: 11 additions & 7 deletions internal/impl/postgresql/input_pg_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/impl/postgresql/pglogicalstream"
"github.com/redpanda-data/connect/v4/internal/periodic"
)

const (
Expand Down Expand Up @@ -304,6 +305,16 @@ func (p *pgStreamInput) Connect(ctx context.Context) error {
}

func (p *pgStreamInput) processStream(pgStream *pglogicalstream.Stream, batcher *service.Batcher) {
monitorLoop := periodic.New(p.streamConfig.WalMonitorInterval, func() {
// Periodically collect stats
report := pgStream.GetProgress()
for name, progress := range report.TableProgress {
p.snapshotMetrics.SetFloat64(progress, name)
}
p.replicationLag.Set(report.WalLagInBytes)
})
monitorLoop.Start()
defer monitorLoop.Stop()
ctx, _ := p.stopSig.SoftStopCtx(context.Background())
defer func() {
ctx, _ := p.stopSig.HardStopCtx(context.Background())
Expand Down Expand Up @@ -358,13 +369,6 @@ func (p *pgStreamInput) processStream(pgStream *pglogicalstream.Stream, batcher
if message.Lsn != nil {
batchMsg.MetaSet("lsn", *message.Lsn)
}
if message.Changes[0].TableSnapshotProgress != nil {
p.snapshotMetrics.SetFloat64(*message.Changes[0].TableSnapshotProgress, message.Changes[0].Table)
}
if message.WALLagBytes != nil {
p.replicationLag.Set(*message.WALLagBytes)
}

if batcher.Add(batchMsg) {
nextTimedBatchChan = nil
flushedBatch, err := batcher.Flush(ctx)
Expand Down
4 changes: 0 additions & 4 deletions internal/impl/postgresql/pglogicalstream/logical_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,6 @@ func (s *Stream) processSnapshot() error {
s.monitor.UpdateSnapshotProgressForTable(tableWithoutSchema, rowsCount+offset)
}

tableProgress := s.monitor.GetSnapshotProgressForTable(tableWithoutSchema)
snapshotChangePacket.Changes[0].TableSnapshotProgress = &tableProgress
snapshotChangePacket.Mode = StreamModeSnapshot

waitingFromBenthos := time.Now()
select {
case s.messages <- snapshotChangePacket:
Expand Down
7 changes: 0 additions & 7 deletions internal/impl/postgresql/pglogicalstream/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ func NewMonitor(
return m, nil
}

// GetSnapshotProgressForTable returns the snapshot ingestion progress for a given table
func (m *Monitor) GetSnapshotProgressForTable(table string) float64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.snapshotProgress[table]
}

// UpdateSnapshotProgressForTable updates the snapshot ingestion progress for a given table
func (m *Monitor) UpdateSnapshotProgressForTable(table string, position int) {
m.lock.Lock()
Expand Down
14 changes: 6 additions & 8 deletions internal/impl/postgresql/pglogicalstream/pluginhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,9 @@ func (p *PgOutputUnbufferedPluginHandler) Handle(ctx context.Context, clientXLog
if message != nil {
lsn := clientXLogPos.String()
msg := StreamMessage{
Lsn: &lsn,
Changes: []StreamMessageChanges{*message},
Mode: StreamModeStreaming,
WALLagBytes: &p.monitor.Report().WalLagInBytes,
Lsn: &lsn,
Changes: []StreamMessageChanges{*message},
Mode: StreamModeStreaming,
}
select {
case p.messages <- msg:
Expand Down Expand Up @@ -150,10 +149,9 @@ func (p *PgOutputBufferedPluginHandler) Handle(ctx context.Context, clientXLogPo
// send all collected changes
lsn := clientXLogPos.String()
msg := StreamMessage{
Lsn: &lsn,
Changes: p.pgoutputChanges,
Mode: StreamModeStreaming,
WALLagBytes: &p.monitor.Report().WalLagInBytes,
Lsn: &lsn,
Changes: p.pgoutputChanges,
Mode: StreamModeStreaming,
}
select {
case p.messages <- msg:
Expand Down
20 changes: 6 additions & 14 deletions internal/impl/postgresql/pglogicalstream/stream_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,13 @@ package pglogicalstream
// StreamMessageChanges represents the changes in a single message
// Single message can have multiple changes
type StreamMessageChanges struct {
Operation string `json:"operation"`
Schema string `json:"schema"`
Table string `json:"table"`
TableSnapshotProgress *float64 `json:"table_snapshot_progress,omitempty"`
Operation string `json:"operation"`
Schema string `json:"schema"`
Table string `json:"table"`
// For deleted messages - there will be old changes if replica identity set to full or empty changes
Data map[string]any `json:"data"`
}

// StreamMessageMetrics represents the metrics of a stream. Passed to each message
type StreamMessageMetrics struct {
WALLagBytes *int64 `json:"wal_lag_bytes"`
IsStreaming bool `json:"is_streaming"`
}

// StreamMode represents the mode of the stream at the time of the message
type StreamMode string

Expand All @@ -37,8 +30,7 @@ const (

// StreamMessage represents a single message after it has been decoded by the plugin
type StreamMessage struct {
Lsn *string `json:"lsn"`
Changes []StreamMessageChanges `json:"changes"`
Mode StreamMode `json:"mode"`
WALLagBytes *int64 `json:"wal_lag_bytes,omitempty"`
Lsn *string `json:"lsn"`
Changes []StreamMessageChanges `json:"changes"`
Mode StreamMode `json:"mode"`
}

0 comments on commit 82bb8af

Please sign in to comment.