Skip to content

Commit

Permalink
Central slot collecting function for UI and monitoring (#771)
Browse files Browse the repository at this point in the history
## Keeping Track Of The WAL
<img width="1067" alt="Screenshot 2023-12-07 at 7 54 39 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/fe9e94ff-c141-43ec-b063-cd334c584541">

This PR does the following:
1. Moves the API function which queries `pg_replication_slots` to a
PullConnector function for Postgres (in `client.go`).
2. Introduces a table - `peer_slot_size` in catalog, and a function in
`monitoring.go` to upsert data into this table.
3. The API function for UI to get slot changes now calls the Postgres
function for the same.
4. In `StartFlow` in `flowable.go`, we call the same GetSlot function
and put it's results into the `peer_slot_size` table in catalog. Here,
we store the slot information **once every 10 minutes or at the end of
every sync flow - whichever comes first**.

### What is stored ?
- Slot name
- Peer name 
- Redo LSN
- Restart LSN
- Confirmed Flush LSN
- Size of slot AKA Lag
- Timestamp of recording

### Lag calculation
Lag is now calculated (in MB) as:
```sql
round(pg_current_wal_lsn() - confirmed_flush_lsn) -- (divided by 1024 couple of times)
```

**What are those terms ?**
`pg_current_wal_lsn()` is the current LSN the WAL is at. 
`confirmed_flush_lsn` is where the client has told Postgres that it has
read till.

so our "lag" is the difference between where the WAL is vs where we are
at as PeerDB reading the WAL.
  • Loading branch information
Amogh-Bharadwaj authored Dec 7, 2023
1 parent d9b152a commit 92e50b3
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 38 deletions.
43 changes: 43 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,46 @@ func (a *FlowableActivity) CreateNormalizedTable(
return conn.SetupNormalizedTables(config)
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
done <-chan struct{},
peerName string,
) {

timeout := 10 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
for {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
log.Warnf("warning: failed to get slot info: %v", err)
}

if len(slotInfo) == 0 {
continue
}

select {
case <-ticker.C:
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
case <-done:
a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
}
ticker.Stop()
ticker = time.NewTicker(timeout)
}

}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
done := make(chan struct{})
defer close(done)
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)
Expand Down Expand Up @@ -205,6 +241,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName)
if input.FlowConnectionConfigs.ReplicationSlotName != "" {
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

go a.recordSlotSizePeriodically(ctx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name)
// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
Expand Down Expand Up @@ -329,6 +371,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)
done <- struct{}{}

return res, nil
}
Expand Down
50 changes: 20 additions & 30 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,36 @@ import (
"database/sql"
"fmt"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/protobuf/proto"
)

func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) {
func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName string) (*protos.PostgresConfig, error) {
var pgPeerOptions sql.RawBytes
var pgPeerConfig protos.PostgresConfig
err := h.pool.QueryRow(ctx,
"SELECT options FROM peers WHERE name = $1 AND type=3", peerName).Scan(&pgPeerOptions)
if err != nil {
return nil, "", err
return nil, err
}

unmarshalErr := proto.Unmarshal(pgPeerOptions, &pgPeerConfig)
if err != nil {
return nil, "", unmarshalErr
return nil, unmarshalErr
}

connStr := utils.GetPGConnectionString(&pgPeerConfig)
return &pgPeerConfig, nil
}

func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) {
pgPeerConfig, err := h.getPGPeerConfig(ctx, peerName)
if err != nil {
return nil, "", err
}
connStr := utils.GetPGConnectionString(pgPeerConfig)
peerPool, err := pgxpool.New(ctx, connStr)
if err != nil {
return nil, "", err
Expand Down Expand Up @@ -193,40 +202,21 @@ func (h *FlowRequestHandler) GetSlotInfo(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSlotResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
pgConfig, err := h.getPGPeerConfig(ctx, req.PeerName)
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}
defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,active,"+
"round((redo_lsn-restart_lsn) / 1024 / 1024 , 2) AS MB_Behind"+
" FROM pg_control_checkpoint(), pg_replication_slots;")

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}
defer rows.Close()
var slotInfoRows []*protos.SlotInfo
for rows.Next() {
var redoLSN string
var slotName string
var restartLSN string
var active bool
var lagInMB float32
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &active, &lagInMB)
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}

slotInfoRows = append(slotInfoRows, &protos.SlotInfo{
RedoLSN: redoLSN,
RestartLSN: restartLSN,
SlotName: slotName,
Active: active,
LagInMb: lagInMB,
})
slotInfo, err := pgConnector.GetSlotInfo("")
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}
return &protos.PeerSlotResponse{
SlotData: slotInfoRows,
SlotData: slotInfo,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type CDCPullConnector interface {

// SendWALHeartbeat allows for activity to progress restart_lsn on postgres.
SendWALHeartbeat() error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(slotName string) ([]*protos.SlotInfo, error)
}

type CDCSyncConnector interface {
Expand Down
40 changes: 40 additions & 0 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,46 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str
}, nil
}

// GetSlotInfo gets the information about the replication slot size and LSNs
// If slotName input is empty, all slot info rows are returned - this is for UI.
// Else, only the row pertaining to that slotName will be returned.
func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) {
specificSlotClause := ""
if slotName != "" {
specificSlotClause = fmt.Sprintf(" WHERE slot_name = '%s'", slotName)
}
rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,confirmed_flush_lsn::text,active,"+
"round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+
" FROM pg_control_checkpoint(), pg_replication_slots"+specificSlotClause+";")
if err != nil {
return nil, err
}
defer rows.Close()
var slotInfoRows []*protos.SlotInfo
for rows.Next() {
var redoLSN string
var slotName string
var restartLSN string
var confirmedFlushLSN string
var active bool
var lagInMB float32
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &confirmedFlushLSN, &active, &lagInMB)
if err != nil {
return nil, err
}

slotInfoRows = append(slotInfoRows, &protos.SlotInfo{
RedoLSN: redoLSN,
RestartLSN: restartLSN,
ConfirmedFlushLSN: confirmedFlushLSN,
SlotName: slotName,
Active: active,
LagInMb: lagInMB,
})
}
return slotInfoRows, nil
}

// createSlotAndPublication creates the replication slot and publication.
func (c *PostgresConnector) createSlotAndPublication(
signal *SlotSignal,
Expand Down
27 changes: 27 additions & 0 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,33 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU
return nil
}

func (c *CatalogMirrorMonitor) AppendSlotSizeInfo(
ctx context.Context,
peerName string,
slotInfo *protos.SlotInfo,
) error {
if c == nil || c.catalogConn == nil || slotInfo == nil {
return nil
}

_, err := c.catalogConn.Exec(ctx,
"INSERT INTO peerdb_stats.peer_slot_size"+
"(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size) "+
"VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING;",
peerName,
slotInfo.SlotName,
slotInfo.RestartLSN,
slotInfo.RedoLSN,
slotInfo.ConfirmedFlushLSN,
slotInfo.LagInMb,
)
if err != nil {
return fmt.Errorf("error while upserting row for slot_size: %w", err)
}

return nil
}

func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJobName string,
runUUID string, partition *protos.QRepPartition) error {
if c == nil || c.catalogConn == nil {
Expand Down
25 changes: 18 additions & 7 deletions flow/generated/protos/route.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions nexus/catalog/migrations/V12__slot_size.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS peerdb_stats.peer_slot_size (
id SERIAL PRIMARY KEY,
slot_name TEXT NOT NULL,
peer_name TEXT NOT NULL,
redo_lsn TEXT,
restart_lsn TEXT,
confirmed_flush_lsn TEXT,
slot_size BIGINT,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX index_slot_name ON peerdb_stats.peer_slot_size (slot_name);
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ pub struct SlotInfo {
pub active: bool,
#[prost(float, tag="5")]
pub lag_in_mb: f32,
#[prost(string, tag="6")]
pub confirmed_flush_l_sn: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_route.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3210,6 +3210,9 @@ impl serde::Serialize for SlotInfo {
if self.lag_in_mb != 0. {
len += 1;
}
if !self.confirmed_flush_l_sn.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_route.SlotInfo", len)?;
if !self.slot_name.is_empty() {
struct_ser.serialize_field("slotName", &self.slot_name)?;
Expand All @@ -3226,6 +3229,9 @@ impl serde::Serialize for SlotInfo {
if self.lag_in_mb != 0. {
struct_ser.serialize_field("lagInMb", &self.lag_in_mb)?;
}
if !self.confirmed_flush_l_sn.is_empty() {
struct_ser.serialize_field("confirmedFlushLSN", &self.confirmed_flush_l_sn)?;
}
struct_ser.end()
}
}
Expand All @@ -3245,6 +3251,8 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
"active",
"lag_in_mb",
"lagInMb",
"confirmed_flush_lSN",
"confirmedFlushLSN",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -3254,6 +3262,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
RestartLSn,
Active,
LagInMb,
ConfirmedFlushLSn,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -3281,6 +3290,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
"restartLSN" | "restart_lSN" => Ok(GeneratedField::RestartLSn),
"active" => Ok(GeneratedField::Active),
"lagInMb" | "lag_in_mb" => Ok(GeneratedField::LagInMb),
"confirmedFlushLSN" | "confirmed_flush_lSN" => Ok(GeneratedField::ConfirmedFlushLSn),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -3305,6 +3315,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
let mut restart_l_sn__ = None;
let mut active__ = None;
let mut lag_in_mb__ = None;
let mut confirmed_flush_l_sn__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::SlotName => {
Expand Down Expand Up @@ -3339,6 +3350,12 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::ConfirmedFlushLSn => {
if confirmed_flush_l_sn__.is_some() {
return Err(serde::de::Error::duplicate_field("confirmedFlushLSN"));
}
confirmed_flush_l_sn__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -3350,6 +3367,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
restart_l_sn: restart_l_sn__.unwrap_or_default(),
active: active__.unwrap_or_default(),
lag_in_mb: lag_in_mb__.unwrap_or_default(),
confirmed_flush_l_sn: confirmed_flush_l_sn__.unwrap_or_default(),
})
}
}
Expand Down
Loading

0 comments on commit 92e50b3

Please sign in to comment.