Skip to content

Commit

Permalink
Move slot size recording into global heartbeat workflow (#1328)
Browse files Browse the repository at this point in the history
Some misc improvements:
1. remove sleeping in activity, instead use temporal cronjobs
2. HandleSlotSize no longer needs to be threadsafe
3. slot size monitored even while sync flows crashing or paused
  • Loading branch information
serprex authored Feb 20, 2024
1 parent d75eb8b commit 5cdc27d
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 169 deletions.
153 changes: 99 additions & 54 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package activities

import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
Expand All @@ -11,7 +10,6 @@ import (
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -231,19 +229,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName)
if input.FlowConnectionConfigs.ReplicationSlotName != "" {
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
defer shutdown()

errGroup, errCtx := errgroup.WithContext(ctx)
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

batchSize := input.SyncFlowOptions.BatchSize
if batchSize <= 0 {
Expand Down Expand Up @@ -721,11 +713,10 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
if err != nil {
return nil, err
}
defer optionRows.Close()
var peerName pgtype.Text
var postgresPeers []*protos.Peer
var peerOptions sql.RawBytes
for optionRows.Next() {

return pgx.CollectRows(optionRows, func(row pgx.CollectableRow) (*protos.Peer, error) {
var peerName string
var peerOptions []byte
err := optionRows.Scan(&peerName, &peerOptions)
if err != nil {
return nil, err
Expand All @@ -735,13 +726,12 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
if unmarshalErr != nil {
return nil, unmarshalErr
}
postgresPeers = append(postgresPeers, &protos.Peer{
Name: peerName.String,
return &protos.Peer{
Name: peerName,
Type: protos.DBType_POSTGRES,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig},
})
}
return postgresPeers, nil
}, nil
})
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
Expand All @@ -751,53 +741,108 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
return nil
}

ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
pgPeers, err := a.getPostgresPeerConfigs(ctx)
if err != nil {
logger.Warn("[sendwalheartbeat] unable to fetch peers. " +
"Skipping walheartbeat send. Error: " + err.Error())
return err
}

activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
for {
select {
case <-ctx.Done():
logger.Info("context is done, exiting wal heartbeat send loop")
command := `
BEGIN;
DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
activity.RecordHeartbeat(ctx, pgPeer.Name)
if ctx.Err() != nil {
return nil
case <-ticker.C:
pgPeers, err := a.getPostgresPeerConfigs(ctx)
}

func() {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
logger.Error(fmt.Sprintf("error creating pool for postgres peer %v with host %v: %v",
pgPeer.Name, pgConfig.Host, peerErr))
return
}
defer peerConn.Close(ctx)

_, err := peerConn.Exec(ctx, command)
if err != nil {
logger.Warn("[sendwalheartbeat] unable to fetch peers. " +
"Skipping walheartbeat send. Error: " + err.Error())
continue
logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err))
}

command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}
logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name))
}()
}

_, err := peerConn.Exec(ctx, command)
if err != nil {
logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err))
}
return nil
}

func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
rows, err := a.CatalogPool.Query(ctx, "SELECT flows.name, flows.config_proto FROM flows")
if err != nil {
return err
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
configs, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.FlowConnectionConfigs, error) {
var flowName string
var configProto []byte
err := rows.Scan(&flowName, &configProto)
if err != nil {
return nil, err
}

var config protos.FlowConnectionConfigs
err = proto.Unmarshal(configProto, &config)
if err != nil {
return nil, err
}

return &config, nil
})
if err != nil {
return err
}

logger := activity.GetLogger(ctx)
for _, config := range configs {
func() {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
if err != connectors.ErrUnsupportedFunctionality {
logger.Error("Failed to create connector to handle slot info", slog.Any("error", err))
}
logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name))
return
}
defer connectors.CloseConnector(ctx, srcConn)

slotName := fmt.Sprintf("peerflow_slot_%s", config.FlowJobName)
if config.ReplicationSlotName != "" {
slotName = config.ReplicationSlotName
}
peerName := config.Source.Name

activity.RecordHeartbeat(ctx, fmt.Sprintf("checking %s on %s", slotName, peerName))
if ctx.Err() != nil {
return
}
err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName)
if err != nil {
logger.Error("Failed to handle slot info", slog.Any("error", err))
}
}()
if ctx.Err() != nil {
return nil
}
}

return nil
}

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
Expand Down
36 changes: 0 additions & 36 deletions flow/activities/slot.go

This file was deleted.

20 changes: 10 additions & 10 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
return server, nil
}

func killExistingHeartbeatFlows(
func killExistingScheduleFlows(
ctx context.Context,
tc client.Client,
namespace string,
Expand All @@ -72,12 +72,12 @@ func killExistingHeartbeatFlows(
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Query: "WorkflowType = 'HeartbeatFlowWorkflow' AND TaskQueue = '" + taskQueue + "'",
Query: "WorkflowType = 'GlobalScheduleManagerWorkflow' AND TaskQueue = '" + taskQueue + "'",
})
if err != nil {
return fmt.Errorf("unable to list workflows: %w", err)
}
slog.Info("Requesting cancellation of pre-existing heartbeat flows")
slog.Info("Requesting cancellation of pre-existing scheduler flows")
for _, workflow := range listRes.Executions {
slog.Info("Cancelling workflow", slog.String("workflowId", workflow.Execution.WorkflowId))
err := tc.CancelWorkflow(ctx,
Expand Down Expand Up @@ -131,24 +131,24 @@ func APIMain(ctx context.Context, args *APIServerParams) error {

flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue)
err = killExistingScheduleFlows(ctx, tc, args.TemporalNamespace, taskQueue)
if err != nil {
return fmt.Errorf("unable to kill existing heartbeat flows: %w", err)
return fmt.Errorf("unable to kill existing scheduler flows: %w", err)
}

workflowID := fmt.Sprintf("heartbeatflow-%s", uuid.New())
workflowID := fmt.Sprintf("scheduler-%s", uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskQueue,
}

_, err = flowHandler.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.HeartbeatFlowWorkflow, // workflow function
ctx,
workflowOptions,
peerflow.GlobalScheduleManagerWorkflow,
)
if err != nil {
return fmt.Errorf("unable to start heartbeat workflow: %w", err)
return fmt.Errorf("unable to start scheduler workflow: %w", err)
}

protos.RegisterFlowServiceServer(grpcServer, flowHandler)
Expand Down
14 changes: 4 additions & 10 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,10 @@ func (h *FlowRequestHandler) cloneTableSummary(
COUNT(CASE WHEN qp.end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(qp.rows_in_partition) FILTER (WHERE qp.end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (qp.end_time - qp.start_time)) * 1000) FILTER (WHERE qp.end_time IS NOT NULL) AS AvgTimePerPartitionMs
FROM
peerdb_stats.qrep_partitions qp
JOIN
peerdb_stats.qrep_runs qr
ON
qp.flow_name = qr.flow_name
WHERE
qp.flow_name ILIKE $1
GROUP BY
qp.flow_name, qr.config_proto;
FROM peerdb_stats.qrep_partitions qp
JOIN peerdb_stats.qrep_runs qr ON qp.flow_name = qr.flow_name
WHERE qp.flow_name ILIKE $1
GROUP BY qp.flow_name, qr.config_proto;
`

var flowName pgtype.Text
Expand Down
10 changes: 1 addition & 9 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,7 @@ func WorkerMain(opts *WorkerOptions) error {
}

w := worker.New(c, taskQueue, worker.Options{})
w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig)
w.RegisterWorkflow(peerflow.SyncFlowWorkflow)
w.RegisterWorkflow(peerflow.SetupFlowWorkflow)
w.RegisterWorkflow(peerflow.NormalizeFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepPartitionWorkflow)
w.RegisterWorkflow(peerflow.XminFlowWorkflow)
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)
peerflow.RegisterFlowWorkerWorkflows(w)

alerter, err := alerting.NewAlerter(conn)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type CDCPullConnector interface {
PullFlowCleanup(ctx context.Context, jobName string) error

// HandleSlotInfo update monitoring info on slot size etc
// threadsafe
HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool *pgxpool.Pool, slotName string, peerName string) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
Expand Down
8 changes: 2 additions & 6 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ func (p *PostgresCDCSource) consumeStream(
}()

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := p.flowJobName
currRecords := cdcRecordsStorage.Len()
msg := fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
msg := fmt.Sprintf("pulling records, currently have %d records", currRecords)
p.logger.Info(msg)
return msg
})
Expand Down Expand Up @@ -279,10 +278,7 @@ func (p *PostgresCDCSource) consumeStream(
// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if !cdcRecordsStorage.IsEmpty() {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records",
p.flowJobName,
cdcRecordsStorage.Len()),
)
p.logger.Info(fmt.Sprintf("standby deadline reached, have %d records", cdcRecordsStorage.Len()))

if !p.commitLock {
p.logger.Info(
Expand Down
Loading

0 comments on commit 5cdc27d

Please sign in to comment.