Skip to content

Commit

Permalink
Move slot size recording into global heartbeat workflow
Browse files Browse the repository at this point in the history
Some misc improvements:
1. sleep in workflow, not activity
2. HandleSlotSize no longer needs to be threadsafe
  • Loading branch information
serprex committed Feb 19, 2024
1 parent 457de35 commit d9c4871
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 115 deletions.
151 changes: 100 additions & 51 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName)
if input.FlowConnectionConfigs.ReplicationSlotName != "" {
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
defer shutdown()

errGroup, errCtx := errgroup.WithContext(ctx)
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

batchSize := input.SyncFlowOptions.BatchSize
if batchSize <= 0 {
Expand Down Expand Up @@ -721,11 +715,10 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
if err != nil {
return nil, err
}
defer optionRows.Close()

var peerName pgtype.Text
var postgresPeers []*protos.Peer
var peerOptions sql.RawBytes
for optionRows.Next() {
return pgx.CollectRows(optionRows, func(row pgx.CollectableRow) (*protos.Peer, error) {
err := optionRows.Scan(&peerName, &peerOptions)
if err != nil {
return nil, err
Expand All @@ -735,13 +728,12 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
if unmarshalErr != nil {
return nil, unmarshalErr
}
postgresPeers = append(postgresPeers, &protos.Peer{
return &protos.Peer{
Name: peerName.String,
Type: protos.DBType_POSTGRES,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig},
})
}
return postgresPeers, nil
}, nil
})
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
Expand All @@ -751,53 +743,110 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
return nil
}

ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
pgPeers, err := a.getPostgresPeerConfigs(ctx)
if err != nil {
logger.Warn("[sendwalheartbeat] unable to fetch peers. " +
"Skipping walheartbeat send. Error: " + err.Error())
return err
}

activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
for {
select {
case <-ctx.Done():
logger.Info("context is done, exiting wal heartbeat send loop")
command := `
BEGIN;
DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
activity.RecordHeartbeat(ctx, pgPeer.Name)
if ctx.Err() != nil {
return nil
case <-ticker.C:
pgPeers, err := a.getPostgresPeerConfigs(ctx)
if err != nil {
logger.Warn("[sendwalheartbeat] unable to fetch peers. " +
"Skipping walheartbeat send. Error: " + err.Error())
continue
}
}

command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}
defer peerConn.Close(ctx)

Check failure on line 773 in flow/activities/flowable.go

View workflow job for this annotation

GitHub Actions / lint

deferInLoop: Possible resource leak, 'defer' is called in the 'for' loop (gocritic)

_, err := peerConn.Exec(ctx, command)
if err != nil {
logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err))
}
_, err := peerConn.Exec(ctx, command)
if err != nil {
logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err))
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
}
logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name))
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
return nil
}

func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
rows, err := a.CatalogPool.Query(ctx, "SELECT flows.name, flows.config_proto FROM flows")
if err != nil {
return err
}

configs, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.FlowConnectionConfigs, error) {
var flowName string
var configProto sql.RawBytes
err := rows.Scan(&flowName, configProto)
if err != nil {
return nil, err
}

var config protos.FlowConnectionConfigs
err = proto.Unmarshal(configProto, &config)
if err != nil {
return nil, err
}

return &config, nil
})
if err != nil {
return err
}

logger := activity.GetLogger(ctx)
for _, config := range configs {
func() {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
if err != connectors.ErrUnsupportedFunctionality {
logger.Error("Failed to create connector to handle slot info", slog.Any("error", err))
}
logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name))
return
}
defer connectors.CloseConnector(ctx, srcConn)

slotName := fmt.Sprintf("peerflow_slot_%s", config.FlowJobName)
if config.ReplicationSlotName != "" {
slotName = config.ReplicationSlotName
}
peerName := config.Source.Name

activity.RecordHeartbeat(ctx, fmt.Sprintf("checking %s on %s", slotName, peerName))
if ctx.Err() != nil {
return
}
err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName)
if err != nil {
logger.Error("Failed to handle slot info", slog.Any("error", err))
}
}()
if ctx.Err() != nil {
return nil
}
}

return nil
}

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
Expand Down
36 changes: 0 additions & 36 deletions flow/activities/slot.go

This file was deleted.

14 changes: 4 additions & 10 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,10 @@ func (h *FlowRequestHandler) cloneTableSummary(
COUNT(CASE WHEN qp.end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(qp.rows_in_partition) FILTER (WHERE qp.end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (qp.end_time - qp.start_time)) * 1000) FILTER (WHERE qp.end_time IS NOT NULL) AS AvgTimePerPartitionMs
FROM
peerdb_stats.qrep_partitions qp
JOIN
peerdb_stats.qrep_runs qr
ON
qp.flow_name = qr.flow_name
WHERE
qp.flow_name ILIKE $1
GROUP BY
qp.flow_name, qr.config_proto;
FROM peerdb_stats.qrep_partitions qp
JOIN peerdb_stats.qrep_runs qr ON qp.flow_name = qr.flow_name
WHERE qp.flow_name ILIKE $1
GROUP BY qp.flow_name, qr.config_proto;
`

var flowName pgtype.Text
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type CDCPullConnector interface {
PullFlowCleanup(ctx context.Context, jobName string) error

// HandleSlotInfo update monitoring info on slot size etc
// threadsafe
HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool *pgxpool.Pool, slotName string, peerName string) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
Expand Down
12 changes: 2 additions & 10 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,15 +897,7 @@ func (c *PostgresConnector) HandleSlotInfo(
) error {
logger := logger.LoggerFromCtx(ctx)

// must create new connection because HandleSlotInfo is threadsafe
conn, err := c.ssh.NewPostgresConnFromPostgresConfig(ctx, c.config)
if err != nil {
logger.Warn("warning: failed to connect to get slot info", "error", err)
return err
}
defer conn.Close(ctx)

slotInfo, err := getSlotInfo(ctx, conn, slotName, c.config.Database)
slotInfo, err := getSlotInfo(ctx, c.conn, slotName, c.config.Database)
if err != nil {
logger.Warn("warning: failed to get slot info", "error", err)
return err
Expand All @@ -919,7 +911,7 @@ func (c *PostgresConnector) HandleSlotInfo(
alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0])

// Also handles alerts for PeerDB user connections exceeding a given limit here
res, err := getOpenConnectionsForUser(ctx, conn, c.config.User)
res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User)
if err != nil {
logger.Warn("warning: failed to get current open connections", "error", err)
return err
Expand Down
67 changes: 60 additions & 7 deletions flow/workflows/heartbeat_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,69 @@ import (
"go.temporal.io/sdk/workflow"
)

// HeartbeatFlowWorkflow is the workflow that sets up heartbeat sending.
// HeartbeatFlowWorkflow sends WAL heartbeats & monitors slot size
func HeartbeatFlowWorkflow(ctx workflow.Context) error {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
StartToCloseTimeout: time.Hour,
})

heartbeatFuture := workflow.ExecuteActivity(ctx, flowable.SendWALHeartbeat)
if err := heartbeatFuture.Get(ctx, nil); err != nil {
return err
}
// Use channels to time activities,
// using two primes for frequency to scatter timing
// After sending, next send should be delayed if activity takes longer than frequency
doSlotSize := workflow.NewChannel(ctx)
doHeartbeat := workflow.NewChannel(ctx)
doneSlotSize := workflow.NewChannel(ctx)
doneHeartbeat := workflow.NewChannel(ctx)
workflow.Go(ctx, func(ctx workflow.Context) {
for {
doSlotSize.Send(ctx, nil)
if workflow.Sleep(ctx, 5*time.Minute) != nil {
return
}
doneSlotSize.Receive(ctx, nil)
}
})
workflow.Go(ctx, func(ctx workflow.Context) {
for {
doHeartbeat.Send(ctx, nil)
if workflow.Sleep(ctx, 11*time.Minute) != nil {
return
}
doneHeartbeat.Receive(ctx, nil)
}
})

var canceled bool
activities := 0
selector := workflow.NewSelector(ctx)
selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
canceled = true
})
selector.AddReceive(doSlotSize, func(c workflow.ReceiveChannel, _ bool) {
if c.ReceiveAsync(nil) {
slotSizeFuture := workflow.ExecuteActivity(ctx, flowable.RecordSlotSizes)
selector.AddFuture(slotSizeFuture, func(f workflow.Future) {
doneSlotSize.Send(ctx, nil)
})
}
})
selector.AddReceive(doHeartbeat, func(c workflow.ReceiveChannel, _ bool) {
if c.ReceiveAsync(nil) {
heartbeatFuture := workflow.ExecuteActivity(ctx, flowable.SendWALHeartbeat)
selector.AddFuture(heartbeatFuture, func(f workflow.Future) {
doneHeartbeat.Send(ctx, nil)
})
}
})
for {
selector.Select(ctx)
if canceled {
return nil
}

return nil
activities += 1
if activities > 99 {
return workflow.NewContinueAsNewError(ctx, HeartbeatFlowWorkflow)
}
}
}

0 comments on commit d9c4871

Please sign in to comment.