Skip to content

Commit

Permalink
make slot flow monitoring create connection to avoid concurrent errors
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 26, 2024
1 parent ccaab5f commit f62fe2f
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 156 deletions.
60 changes: 2 additions & 58 deletions flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,19 @@ package activities

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func (a *FlowableActivity) handleSlotInfo(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

if len(slotInfo) == 0 {
slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
return nil
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
return nil
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) {
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName)
if err != nil {
return
}
Expand All @@ -81,7 +25,7 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
for {
select {
case <-ticker.C:
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName)
if err != nil {
return
}
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
Expand Down Expand Up @@ -45,12 +46,13 @@ type CDCPullConnector interface {
// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error

// HandleSlotInfo update monitoring info on slot size etc
// threadsafe
HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool *pgxpool.Pool, slotName string, peerName string) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(slotName string) ([]*protos.SlotInfo, error)

// GetOpenConnectionsForUser returns the number of open connections for the user configured in the peer.
GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error)

// AddTablesToPublication adds additional tables added to a mirror to the publication also
AddTablesToPublication(req *protos.AddTablesToPublicationInput) error
}
Expand Down
68 changes: 37 additions & 31 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"fmt"
"log/slog"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand All @@ -19,32 +20,39 @@ const (
lastSyncStateTableName = "last_sync_state"
)

type Querier interface {
Begin(ctx context.Context) (pgx.Tx, error)
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
Ping(ctx context.Context) error
}

type PostgresMetadataStore struct {
ctx context.Context
config *protos.PostgresConfig
pool *pgxpool.Pool
conn Querier
schemaName string
logger slog.Logger
}

func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string,
) (*PostgresMetadataStore, error) {
var storePool *pgxpool.Pool
var poolErr error
var storeConn Querier
var err error
if pgConfig == nil {
storePool, poolErr = cc.GetCatalogConnectionPoolFromEnv()
if poolErr != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %v", poolErr)
storeConn, err = cc.GetCatalogConnectionPoolFromEnv()
if err != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %w", err)
}

slog.InfoContext(ctx, "obtained catalog connection pool for metadata store")
} else {
connectionString := utils.GetPGConnectionString(pgConfig)
storePool, poolErr = pgxpool.New(ctx, connectionString)
if poolErr != nil {
slog.ErrorContext(ctx, "failed to create connection pool", slog.Any("error", poolErr))
return nil, poolErr
storeConn, err = pgx.Connect(ctx, connectionString)
if err != nil {
slog.ErrorContext(ctx, "failed to create connection pool", slog.Any("error", err))
return nil, err
}

slog.InfoContext(ctx, "created connection pool for metadata store")
Expand All @@ -54,15 +62,16 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf
return &PostgresMetadataStore{
ctx: ctx,
config: pgConfig,
pool: storePool,
conn: storeConn,
schemaName: schemaName,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
}, nil
}

func (p *PostgresMetadataStore) Close() error {
if p.config != nil && p.pool != nil {
p.pool.Close()
// only close p.conn when it isn't catalog
if conn, ok := p.conn.(*pgx.Conn); ok {
conn.Close(p.ctx)
}

return nil
Expand All @@ -73,10 +82,7 @@ func (p *PostgresMetadataStore) QualifyTable(table string) string {
}

func (p *PostgresMetadataStore) Ping() error {
if p.pool == nil {
return fmt.Errorf("metadata db ping failed as pool does not exist")
}
pingErr := p.pool.Ping(p.ctx)
pingErr := p.conn.Ping(p.ctx)
if pingErr != nil {
return fmt.Errorf("metadata db ping failed: %w", pingErr)
}
Expand All @@ -86,10 +92,10 @@ func (p *PostgresMetadataStore) Ping() error {

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)
row := p.conn.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)

var exists pgtype.Int8
err := rows.Scan(&exists)
err := row.Scan(&exists)
if err != nil {
p.logger.Error("failed to check if schema exists", slog.Any("error", err))
return false
Expand All @@ -104,14 +110,14 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {

func (p *PostgresMetadataStore) SetupMetadata() error {
// create the schema
_, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
_, err := p.conn.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create schema", slog.Any("error", err))
return err
}

// create the last sync state table
_, err = p.pool.Exec(p.ctx, `
_, err = p.conn.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
Expand All @@ -129,15 +135,15 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
}

func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
row := p.conn.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
var offset pgtype.Int8
err := rows.Scan(&offset)
err := row.Scan(&offset)
if err != nil {
if err.Error() == "no rows in result set" {
if err == pgx.ErrNoRows {
return 0, nil
}

Expand All @@ -151,17 +157,17 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
}

func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
rows := p.pool.QueryRow(p.ctx, `
row := p.conn.QueryRow(p.ctx, `
SELECT sync_batch_id
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)

var syncBatchID pgtype.Int8
err := rows.Scan(&syncBatchID)
err := row.Scan(&syncBatchID)
if err != nil {
// if the job doesn't exist, return 0
if err.Error() == "no rows in result set" {
if err == pgx.ErrNoRows {
return 0, nil
}

Expand All @@ -176,7 +182,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
// update offset for a job
func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error {
// start a transaction
tx, err := p.pool.Begin(p.ctx)
tx, err := p.conn.Begin(p.ctx)
if err != nil {
p.logger.Error("failed to start transaction", slog.Any("error", err))
return err
Expand Down Expand Up @@ -210,7 +216,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
// update offset for a job
func (p *PostgresMetadataStore) IncrementID(jobName string) error {
p.logger.Info("incrementing sync batch id for job")
_, err := p.pool.Exec(p.ctx, `
_, err := p.conn.Exec(p.ctx, `
UPDATE `+p.QualifyTable(lastSyncStateTableName)+`
SET sync_batch_id=sync_batch_id+1 WHERE job_name=$1
`, jobName)
Expand All @@ -223,7 +229,7 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error {
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.pool.Exec(p.ctx, `
_, err := p.conn.Exec(p.ctx, `
DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
Expand Down
6 changes: 1 addition & 5 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,7 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {
return fmt.Errorf("error getting replication options: %w", err)
}

// create replication connection
replicationConn := p.replConn

pgConn := replicationConn.PgConn()
p.logger.Info("created replication connection")
pgConn := p.replConn.PgConn()

// start replication
var clientXLogPos, startLSN pglogrepl.LSN
Expand Down
29 changes: 19 additions & 10 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package connpostgres

import (
"context"
"errors"
"fmt"
"regexp"
Expand Down Expand Up @@ -251,26 +252,23 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str
}, nil
}

// GetSlotInfo gets the information about the replication slot size and LSNs
// If slotName input is empty, all slot info rows are returned - this is for UI.
// Else, only the row pertaining to that slotName will be returned.
func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) {
func getSlotInfo(ctx context.Context, conn *pgx.Conn, slotName string, database string) ([]*protos.SlotInfo, error) {
var whereClause string
if slotName != "" {
whereClause = fmt.Sprintf("WHERE slot_name=%s", QuoteLiteral(slotName))
} else {
whereClause = fmt.Sprintf("WHERE database=%s", QuoteLiteral(c.config.Database))
whereClause = fmt.Sprintf("WHERE database=%s", QuoteLiteral(database))
}

hasWALStatus, _, err := c.MajorVersionCheck(POSTGRES_13)
hasWALStatus, _, err := majorVersionCheck(ctx, conn, POSTGRES_13)
if err != nil {
return nil, err
}
walStatusSelector := "wal_status"
if !hasWALStatus {
walStatusSelector = "'unknown'"
}
rows, err := c.conn.Query(c.ctx, fmt.Sprintf(`SELECT slot_name, redo_lsn::Text,restart_lsn::text,%s,
rows, err := conn.Query(ctx, fmt.Sprintf(`SELECT slot_name, redo_lsn::Text,restart_lsn::text,%s,
confirmed_flush_lsn::text,active,
round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END
- confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind
Expand Down Expand Up @@ -306,6 +304,13 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er
return slotInfoRows, nil
}

// GetSlotInfo gets the information about the replication slot size and LSNs
// If slotName input is empty, all slot info rows are returned - this is for UI.
// Else, only the row pertaining to that slotName will be returned.
func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) {
return getSlotInfo(c.ctx, c.conn.Conn, slotName, c.config.Database)
}

// createSlotAndPublication creates the replication slot and publication.
func (c *PostgresConnector) createSlotAndPublication(
signal SlotSignal,
Expand Down Expand Up @@ -350,7 +355,7 @@ func (c *PostgresConnector) createSlotAndPublication(

// create slot only after we succeeded in creating publication.
if !s.SlotExists {
conn, err := c.GetReplConn(c.ctx)
conn, err := c.CreateReplConn(c.ctx)
if err != nil {
return fmt.Errorf("[slot] error acquiring pool: %w", err)
}
Expand Down Expand Up @@ -491,16 +496,20 @@ func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) {
return result.Bool, nil
}

func (c *PostgresConnector) MajorVersionCheck(majorVersion PGVersion) (bool, int64, error) {
func majorVersionCheck(ctx context.Context, conn *pgx.Conn, majorVersion PGVersion) (bool, int64, error) {
var version pgtype.Int8
err := c.conn.QueryRow(c.ctx, "SELECT current_setting('server_version_num')::INTEGER").Scan(&version)
err := conn.QueryRow(ctx, "SELECT current_setting('server_version_num')::INTEGER").Scan(&version)
if err != nil {
return false, 0, fmt.Errorf("failed to get server version: %w", err)
}

return version.Int64 >= int64(majorVersion), version.Int64, nil
}

func (c *PostgresConnector) MajorVersionCheck(majorVersion PGVersion) (bool, int64, error) {
return majorVersionCheck(c.ctx, c.conn.Conn, majorVersion)
}

func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64, syncBatchID int64,
syncRecordsTx pgx.Tx,
) error {
Expand Down
Loading

0 comments on commit f62fe2f

Please sign in to comment.