Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move slot size recording into global heartbeat workflow #1328

Merged
merged 9 commits into from
Feb 20, 2024
153 changes: 99 additions & 54 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package activities

import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
Expand All @@ -11,7 +10,6 @@ import (
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -231,19 +229,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName)
if input.FlowConnectionConfigs.ReplicationSlotName != "" {
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
defer shutdown()

errGroup, errCtx := errgroup.WithContext(ctx)
go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

batchSize := input.SyncFlowOptions.BatchSize
if batchSize <= 0 {
Expand Down Expand Up @@ -721,11 +713,10 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
if err != nil {
return nil, err
}
defer optionRows.Close()
var peerName pgtype.Text
var postgresPeers []*protos.Peer
var peerOptions sql.RawBytes
for optionRows.Next() {

return pgx.CollectRows(optionRows, func(row pgx.CollectableRow) (*protos.Peer, error) {
var peerName string
var peerOptions []byte
err := optionRows.Scan(&peerName, &peerOptions)
if err != nil {
return nil, err
Expand All @@ -735,13 +726,12 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
if unmarshalErr != nil {
return nil, unmarshalErr
}
postgresPeers = append(postgresPeers, &protos.Peer{
Name: peerName.String,
return &protos.Peer{
Name: peerName,
Type: protos.DBType_POSTGRES,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig},
})
}
return postgresPeers, nil
}, nil
})
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
Expand All @@ -751,53 +741,108 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
return nil
}

ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
pgPeers, err := a.getPostgresPeerConfigs(ctx)
if err != nil {
logger.Warn("[sendwalheartbeat] unable to fetch peers. " +
"Skipping walheartbeat send. Error: " + err.Error())
return err
}

activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
for {
select {
case <-ctx.Done():
logger.Info("context is done, exiting wal heartbeat send loop")
command := `
BEGIN;
DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
activity.RecordHeartbeat(ctx, pgPeer.Name)
if ctx.Err() != nil {
return nil
case <-ticker.C:
pgPeers, err := a.getPostgresPeerConfigs(ctx)
}

func() {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
logger.Error(fmt.Sprintf("error creating pool for postgres peer %v with host %v: %v",
pgPeer.Name, pgConfig.Host, peerErr))
return
}
defer peerConn.Close(ctx)

_, err := peerConn.Exec(ctx, command)
if err != nil {
logger.Warn("[sendwalheartbeat] unable to fetch peers. " +
"Skipping walheartbeat send. Error: " + err.Error())
continue
logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err))
}

command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}
logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name))
}()
}

_, err := peerConn.Exec(ctx, command)
if err != nil {
logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err))
}
return nil
}

func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
rows, err := a.CatalogPool.Query(ctx, "SELECT flows.name, flows.config_proto FROM flows")
if err != nil {
return err
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
configs, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.FlowConnectionConfigs, error) {
var flowName string
var configProto []byte
err := rows.Scan(&flowName, &configProto)
if err != nil {
return nil, err
}

var config protos.FlowConnectionConfigs
err = proto.Unmarshal(configProto, &config)
if err != nil {
return nil, err
}

return &config, nil
})
if err != nil {
return err
}

logger := activity.GetLogger(ctx)
for _, config := range configs {
func() {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
if err != connectors.ErrUnsupportedFunctionality {
logger.Error("Failed to create connector to handle slot info", slog.Any("error", err))
}
logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name))
return
}
defer connectors.CloseConnector(ctx, srcConn)

slotName := fmt.Sprintf("peerflow_slot_%s", config.FlowJobName)
if config.ReplicationSlotName != "" {
slotName = config.ReplicationSlotName
}
peerName := config.Source.Name

activity.RecordHeartbeat(ctx, fmt.Sprintf("checking %s on %s", slotName, peerName))
if ctx.Err() != nil {
return
}
err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName)
if err != nil {
logger.Error("Failed to handle slot info", slog.Any("error", err))
}
}()
if ctx.Err() != nil {
return nil
}
}

return nil
}

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
Expand Down
36 changes: 0 additions & 36 deletions flow/activities/slot.go

This file was deleted.

20 changes: 10 additions & 10 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
return server, nil
}

func killExistingHeartbeatFlows(
func killExistingScheduleFlows(
ctx context.Context,
tc client.Client,
namespace string,
Expand All @@ -72,12 +72,12 @@ func killExistingHeartbeatFlows(
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Query: "WorkflowType = 'HeartbeatFlowWorkflow' AND TaskQueue = '" + taskQueue + "'",
Query: "WorkflowType = 'GlobalScheduleManagerWorkflow' AND TaskQueue = '" + taskQueue + "'",
})
if err != nil {
return fmt.Errorf("unable to list workflows: %w", err)
}
slog.Info("Requesting cancellation of pre-existing heartbeat flows")
slog.Info("Requesting cancellation of pre-existing scheduler flows")
for _, workflow := range listRes.Executions {
slog.Info("Cancelling workflow", slog.String("workflowId", workflow.Execution.WorkflowId))
err := tc.CancelWorkflow(ctx,
Expand Down Expand Up @@ -131,24 +131,24 @@ func APIMain(ctx context.Context, args *APIServerParams) error {

flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue)
err = killExistingScheduleFlows(ctx, tc, args.TemporalNamespace, taskQueue)
if err != nil {
return fmt.Errorf("unable to kill existing heartbeat flows: %w", err)
return fmt.Errorf("unable to kill existing scheduler flows: %w", err)
}

workflowID := fmt.Sprintf("heartbeatflow-%s", uuid.New())
workflowID := fmt.Sprintf("scheduler-%s", uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskQueue,
}

_, err = flowHandler.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.HeartbeatFlowWorkflow, // workflow function
ctx,
workflowOptions,
peerflow.GlobalScheduleManagerWorkflow,
)
if err != nil {
return fmt.Errorf("unable to start heartbeat workflow: %w", err)
return fmt.Errorf("unable to start scheduler workflow: %w", err)
}

protos.RegisterFlowServiceServer(grpcServer, flowHandler)
Expand Down
14 changes: 4 additions & 10 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,10 @@ func (h *FlowRequestHandler) cloneTableSummary(
COUNT(CASE WHEN qp.end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted,
SUM(qp.rows_in_partition) FILTER (WHERE qp.end_time IS NOT NULL) AS NumRowsSynced,
AVG(EXTRACT(EPOCH FROM (qp.end_time - qp.start_time)) * 1000) FILTER (WHERE qp.end_time IS NOT NULL) AS AvgTimePerPartitionMs
FROM
peerdb_stats.qrep_partitions qp
JOIN
peerdb_stats.qrep_runs qr
ON
qp.flow_name = qr.flow_name
WHERE
qp.flow_name ILIKE $1
GROUP BY
qp.flow_name, qr.config_proto;
FROM peerdb_stats.qrep_partitions qp
JOIN peerdb_stats.qrep_runs qr ON qp.flow_name = qr.flow_name
WHERE qp.flow_name ILIKE $1
GROUP BY qp.flow_name, qr.config_proto;
`

var flowName pgtype.Text
Expand Down
10 changes: 1 addition & 9 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,7 @@ func WorkerMain(opts *WorkerOptions) error {
}

w := worker.New(c, taskQueue, worker.Options{})
w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig)
w.RegisterWorkflow(peerflow.SyncFlowWorkflow)
w.RegisterWorkflow(peerflow.SetupFlowWorkflow)
w.RegisterWorkflow(peerflow.NormalizeFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepPartitionWorkflow)
w.RegisterWorkflow(peerflow.XminFlowWorkflow)
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)
peerflow.RegisterFlowWorkerWorkflows(w)

alerter, err := alerting.NewAlerter(conn)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type CDCPullConnector interface {
PullFlowCleanup(ctx context.Context, jobName string) error

// HandleSlotInfo update monitoring info on slot size etc
// threadsafe
HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool *pgxpool.Pool, slotName string, peerName string) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
Expand Down
8 changes: 2 additions & 6 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ func (p *PostgresCDCSource) consumeStream(
}()

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := p.flowJobName
currRecords := cdcRecordsStorage.Len()
msg := fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
msg := fmt.Sprintf("pulling records, currently have %d records", currRecords)
p.logger.Info(msg)
return msg
})
Expand Down Expand Up @@ -279,10 +278,7 @@ func (p *PostgresCDCSource) consumeStream(
// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if !cdcRecordsStorage.IsEmpty() {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records",
p.flowJobName,
cdcRecordsStorage.Len()),
)
p.logger.Info(fmt.Sprintf("standby deadline reached, have %d records", cdcRecordsStorage.Len()))

if !p.commitLock {
p.logger.Info(
Expand Down
Loading
Loading