Skip to content

Commit

Permalink
congen: cannot execute another query until rows closed
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 29, 2024
1 parent 1fcde2c commit db04baf
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 23 deletions.
18 changes: 9 additions & 9 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,19 +913,19 @@ func (c *PostgresConnector) HandleSlotInfo(
// must create new connection because HandleSlotInfo is threadsafe
conn, err := c.ssh.NewPostgresConnFromPostgresConfig(ctx, c.config)
if err != nil {
slog.WarnContext(c.ctx, "warning: failed to connect to get slot info", slog.Any("error", err))
slog.WarnContext(ctx, "warning: failed to connect to get slot info", slog.Any("error", err))
return err
}
defer conn.Close(ctx)

slotInfo, err := c.GetSlotInfo(slotName)
if err != nil {
slog.WarnContext(c.ctx, "warning: failed to get slot info", slog.Any("error", err))
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

if len(slotInfo) == 0 {
slog.WarnContext(c.ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
return nil
}

Expand All @@ -934,31 +934,31 @@ func (c *PostgresConnector) HandleSlotInfo(
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(c.ctx)
slotLagInMBThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
alerter.AlertIf(c.ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(c.ctx)
maxOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
res, err := getOpenConnectionsForUser(ctx, conn, c.config.User)
if err != nil {
slog.WarnContext(c.ctx, "warning: failed to get current open connections", slog.Any("error", err))
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
alerter.AlertIf(c.ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(c.ctx, catalogPool, peerName, slotInfo[0])
return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0])
}
return nil
}
Expand Down
19 changes: 7 additions & 12 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand Down Expand Up @@ -58,19 +57,15 @@ func cleanPostgres(conn *pgx.Conn, suffix string) error {
if err != nil {
return fmt.Errorf("failed to list publications: %w", err)
}
defer rows.Close()

// drop all publications with the given suffix
for rows.Next() {
var pubName pgtype.Text
err = rows.Scan(&pubName)
if err != nil {
return fmt.Errorf("failed to scan publication name: %w", err)
}
publications, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return fmt.Errorf("failed to read publications: %w", err)
}

_, err = conn.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubName.String))
for _, pubName := range publications {
_, err = conn.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubName))
if err != nil {
return fmt.Errorf("failed to drop publication %s: %w", pubName.String, err)
return fmt.Errorf("failed to drop publication %s: %w", pubName, err)
}
}

Expand Down
3 changes: 1 addition & 2 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG {
require.Fail(t, "failed to setup postgres", err)
}

var connector *connpostgres.PostgresConnector
connector, err = connpostgres.NewPostgresConnector(context.Background(),
connector, err := connpostgres.NewPostgresConnector(context.Background(),
&protos.PostgresConfig{
Host: "localhost",
Port: 7132,
Expand Down

0 comments on commit db04baf

Please sign in to comment.