Skip to content

Commit

Permalink
Merge branch 'main' into soft-delete-tests-bq
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 21, 2023
2 parents 7dc93c7 + 08861aa commit 7de4309
Show file tree
Hide file tree
Showing 19 changed files with 602 additions and 75 deletions.
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ services:
dockerfile: stacks/flow.Dockerfile
target: flow-snapshot-worker
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ services:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
40 changes: 38 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
Expand All @@ -42,6 +43,7 @@ type SlotSnapshotSignal struct {

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
}

// CheckConnection implements CheckConnection.
Expand Down Expand Up @@ -174,10 +176,38 @@ func (a *FlowableActivity) handleSlotInfo(
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
slog.Warn("warning: failed to get slot info", slog.Any("error", err))
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

deploymentUIDPrefix := ""
if peerdbenv.GetPeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
Expand All @@ -190,7 +220,13 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
slotName string,
peerName string,
) {
timeout := 10 * time.Minute
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
Expand Down
2 changes: 2 additions & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"

"github.com/grafana/pyroscope-go"
Expand Down Expand Up @@ -133,6 +134,7 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
})

err = w.Run(worker.InterruptCh())
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type CDCPullConnector interface {

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(slotName string) ([]*protos.SlotInfo, error)

// GetOpenConnectionsForUser returns the number of open connections for the user configured in the peer.
GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error)
}

type CDCSyncConnector interface {
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ const (
)
%s src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2`

dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=$1"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE mirror_job_name=$1"
getNumConnectionsForUser = "SELECT COUNT(*) FROM pg_stat_activity WHERE usename=$1 AND client_addr IS NOT NULL"
)

type ReplicaIdentityType rune
Expand Down
18 changes: 18 additions & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,3 +895,21 @@ func (c *PostgresConnector) SendWALHeartbeat() error {

return nil
}

// GetLastOffset returns the last synced offset for a job.
func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error) {
row := c.pool.
QueryRow(c.ctx, getNumConnectionsForUser, c.config.User)

// COUNT() returns BIGINT
var result pgtype.Int8
err := row.Scan(&result)
if err != nil {
return nil, fmt.Errorf("error while reading result row: %w", err)
}

return &protos.GetOpenConnectionsForUserResult{
UserName: c.config.User,
CurrentOpenConnections: result.Int64,
}, nil
}
6 changes: 5 additions & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -59,7 +60,10 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *t
env.RegisterWorkflow(peerflow.QRepFlowWorkflow)
env.RegisterWorkflow(peerflow.XminFlowWorkflow)
env.RegisterWorkflow(peerflow.QRepPartitionWorkflow)
env.RegisterActivity(&activities.FlowableActivity{CatalogPool: conn})
env.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
})
env.RegisterActivity(&activities.SnapshotActivity{})
}

Expand Down
Loading

0 comments on commit 7de4309

Please sign in to comment.