Skip to content

Commit

Permalink
postgres connector: replace pgxpool.Pool with pgx.Conn
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 29, 2024
1 parent 75cdc2c commit 1fcde2c
Show file tree
Hide file tree
Showing 25 changed files with 512 additions and 512 deletions.
60 changes: 2 additions & 58 deletions flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,19 @@ package activities

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func (a *FlowableActivity) handleSlotInfo(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

if len(slotInfo) == 0 {
slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
return nil
}

deploymentUIDPrefix := ""
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
return nil
}

func (a *FlowableActivity) recordSlotSizePeriodically(
ctx context.Context,
srcConn connectors.CDCPullConnector,
slotName string,
peerName string,
) {
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName)
if err != nil {
return
}
Expand All @@ -81,7 +25,7 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
for {
select {
case <-ticker.C:
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName)
if err != nil {
return
}
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
Expand Down Expand Up @@ -45,12 +46,13 @@ type CDCPullConnector interface {
// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error

// HandleSlotInfo update monitoring info on slot size etc
// threadsafe
HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool *pgxpool.Pool, slotName string, peerName string) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(slotName string) ([]*protos.SlotInfo, error)

// GetOpenConnectionsForUser returns the number of open connections for the user configured in the peer.
GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error)

// AddTablesToPublication adds additional tables added to a mirror to the publication also
AddTablesToPublication(req *protos.AddTablesToPublicationInput) error
}
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {
}

pgConn := p.replConn.PgConn()
p.logger.Info("created replication connection")

// start replication
var clientXLogPos, startLSN pglogrepl.LSN
Expand Down
Loading

0 comments on commit 1fcde2c

Please sign in to comment.