From e4a0bbfb18077937c3350e6bf288d3f43ef9cd1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 19 Feb 2024 18:33:03 +0000 Subject: [PATCH] Move slot size recording into global heartbeat workflow Some misc improvements: 1. sleep in workflow, not activity 2. HandleSlotSize no longer needs to be threadsafe --- flow/activities/flowable.go | 145 ++++++++++++++++++--------- flow/activities/slot.go | 36 ------- flow/cmd/mirror_status.go | 14 +-- flow/connectors/core.go | 1 - flow/connectors/postgres/postgres.go | 12 +-- flow/workflows/heartbeat_flow.go | 67 +++++++++++-- 6 files changed, 162 insertions(+), 113 deletions(-) delete mode 100644 flow/activities/slot.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 4fc6c9d7c4..9636279e71 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -231,11 +231,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } defer connectors.CloseConnector(ctx, srcConn) - slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName) - if input.FlowConnectionConfigs.ReplicationSlotName != "" { - slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName - } - shutdown := utils.HeartbeatRoutine(ctx, func() string { jobName := input.FlowConnectionConfigs.FlowJobName return fmt.Sprintf("transferring records for job - %s", jobName) @@ -243,7 +238,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, defer shutdown() errGroup, errCtx := errgroup.WithContext(ctx) - go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name) batchSize := input.SyncFlowOptions.BatchSize if batchSize <= 0 { @@ -721,11 +715,10 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto if err != nil { return nil, err } - defer optionRows.Close() + var peerName pgtype.Text - var postgresPeers []*protos.Peer var peerOptions sql.RawBytes - for optionRows.Next() { + return pgx.CollectRows(optionRows, func(row pgx.CollectableRow) (*protos.Peer, error) { err := optionRows.Scan(&peerName, &peerOptions) if err != nil { return nil, err @@ -735,13 +728,12 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto if unmarshalErr != nil { return nil, unmarshalErr } - postgresPeers = append(postgresPeers, &protos.Peer{ + return &protos.Peer{ Name: peerName.String, Type: protos.DBType_POSTGRES, Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig}, - }) - } - return postgresPeers, nil + }, nil + }) } func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { @@ -751,53 +743,108 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { return nil } - ticker := time.NewTicker(10 * time.Minute) - defer ticker.Stop() + pgPeers, err := a.getPostgresPeerConfigs(ctx) + if err != nil { + logger.Warn("[sendwalheartbeat] unable to fetch peers. " + + "Skipping walheartbeat send. Error: " + err.Error()) + return err + } - activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes") - for { - select { - case <-ctx.Done(): - logger.Info("context is done, exiting wal heartbeat send loop") + command := ` + BEGIN; + DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); + CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); + DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); + END; + ` + // run above command for each Postgres peer + for _, pgPeer := range pgPeers { + activity.RecordHeartbeat(ctx, pgPeer.Name) + if ctx.Err() != nil { return nil - case <-ticker.C: - pgPeers, err := a.getPostgresPeerConfigs(ctx) + } + + func() { + pgConfig := pgPeer.GetPostgresConfig() + peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig)) + if peerErr != nil { + logger.Error(fmt.Sprintf("error creating pool for postgres peer %v with host %v: %v", + pgPeer.Name, pgConfig.Host, peerErr)) + return + } + defer peerConn.Close(ctx) + + _, err := peerConn.Exec(ctx, command) if err != nil { - logger.Warn("[sendwalheartbeat] unable to fetch peers. " + - "Skipping walheartbeat send. Error: " + err.Error()) - continue + logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err)) } - command := ` - BEGIN; - DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` - // run above command for each Postgres peer - for _, pgPeer := range pgPeers { - pgConfig := pgPeer.GetPostgresConfig() - peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig)) - if peerErr != nil { - return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w", - pgPeer.Name, pgConfig.Host, peerErr) - } + logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name)) + }() + } - _, err := peerConn.Exec(ctx, command) - if err != nil { - logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err)) - } + return nil +} + +func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { + rows, err := a.CatalogPool.Query(ctx, "SELECT flows.name, flows.config_proto FROM flows") + if err != nil { + return err + } - closeErr := peerConn.Close(ctx) - if closeErr != nil { - return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w", - pgPeer.Name, pgConfig.Host, closeErr) + configs, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.FlowConnectionConfigs, error) { + var flowName string + var configProto sql.RawBytes + err := rows.Scan(&flowName, configProto) + if err != nil { + return nil, err + } + + var config protos.FlowConnectionConfigs + err = proto.Unmarshal(configProto, &config) + if err != nil { + return nil, err + } + + return &config, nil + }) + if err != nil { + return err + } + + logger := activity.GetLogger(ctx) + for _, config := range configs { + func() { + srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source) + if err != nil { + if err != connectors.ErrUnsupportedFunctionality { + logger.Error("Failed to create connector to handle slot info", slog.Any("error", err)) } - logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name)) + return + } + defer connectors.CloseConnector(ctx, srcConn) + + slotName := fmt.Sprintf("peerflow_slot_%s", config.FlowJobName) + if config.ReplicationSlotName != "" { + slotName = config.ReplicationSlotName + } + peerName := config.Source.Name + + activity.RecordHeartbeat(ctx, fmt.Sprintf("checking %s on %s", slotName, peerName)) + if ctx.Err() != nil { + return } + err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName) + if err != nil { + logger.Error("Failed to handle slot info", slog.Any("error", err)) + } + }() + if ctx.Err() != nil { + return nil } } + + return nil } func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, diff --git a/flow/activities/slot.go b/flow/activities/slot.go deleted file mode 100644 index 8f6f3325b4..0000000000 --- a/flow/activities/slot.go +++ /dev/null @@ -1,36 +0,0 @@ -package activities - -import ( - "context" - "time" - - "github.com/PeerDB-io/peer-flow/connectors" -) - -func (a *FlowableActivity) recordSlotSizePeriodically( - ctx context.Context, - srcConn connectors.CDCPullConnector, - slotName string, - peerName string, -) { - // ensures slot info is logged at least once per SyncFlow - err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName) - if err != nil { - return - } - - ticker := time.NewTicker(5 * time.Minute) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName) - if err != nil { - return - } - case <-ctx.Done(): - return - } - } -} diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index e6c272a334..85c350c164 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -128,16 +128,10 @@ func (h *FlowRequestHandler) cloneTableSummary( COUNT(CASE WHEN qp.end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted, SUM(qp.rows_in_partition) FILTER (WHERE qp.end_time IS NOT NULL) AS NumRowsSynced, AVG(EXTRACT(EPOCH FROM (qp.end_time - qp.start_time)) * 1000) FILTER (WHERE qp.end_time IS NOT NULL) AS AvgTimePerPartitionMs - FROM - peerdb_stats.qrep_partitions qp - JOIN - peerdb_stats.qrep_runs qr - ON - qp.flow_name = qr.flow_name - WHERE - qp.flow_name ILIKE $1 - GROUP BY - qp.flow_name, qr.config_proto; + FROM peerdb_stats.qrep_partitions qp + JOIN peerdb_stats.qrep_runs qr ON qp.flow_name = qr.flow_name + WHERE qp.flow_name ILIKE $1 + GROUP BY qp.flow_name, qr.config_proto; ` var flowName pgtype.Text diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 1e9401e3b0..889250d804 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -48,7 +48,6 @@ type CDCPullConnector interface { PullFlowCleanup(ctx context.Context, jobName string) error // HandleSlotInfo update monitoring info on slot size etc - // threadsafe HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool *pgxpool.Pool, slotName string, peerName string) error // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1079431c17..c553748ba2 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -897,15 +897,7 @@ func (c *PostgresConnector) HandleSlotInfo( ) error { logger := logger.LoggerFromCtx(ctx) - // must create new connection because HandleSlotInfo is threadsafe - conn, err := c.ssh.NewPostgresConnFromPostgresConfig(ctx, c.config) - if err != nil { - logger.Warn("warning: failed to connect to get slot info", "error", err) - return err - } - defer conn.Close(ctx) - - slotInfo, err := getSlotInfo(ctx, conn, slotName, c.config.Database) + slotInfo, err := getSlotInfo(ctx, c.conn, slotName, c.config.Database) if err != nil { logger.Warn("warning: failed to get slot info", "error", err) return err @@ -919,7 +911,7 @@ func (c *PostgresConnector) HandleSlotInfo( alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0]) // Also handles alerts for PeerDB user connections exceeding a given limit here - res, err := getOpenConnectionsForUser(ctx, conn, c.config.User) + res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User) if err != nil { logger.Warn("warning: failed to get current open connections", "error", err) return err diff --git a/flow/workflows/heartbeat_flow.go b/flow/workflows/heartbeat_flow.go index 1c99900b71..baec11eefa 100644 --- a/flow/workflows/heartbeat_flow.go +++ b/flow/workflows/heartbeat_flow.go @@ -6,16 +6,69 @@ import ( "go.temporal.io/sdk/workflow" ) -// HeartbeatFlowWorkflow is the workflow that sets up heartbeat sending. +// HeartbeatFlowWorkflow sends WAL heartbeats & monitors slot size func HeartbeatFlowWorkflow(ctx workflow.Context) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 7 * 24 * time.Hour, + StartToCloseTimeout: time.Hour, }) - heartbeatFuture := workflow.ExecuteActivity(ctx, flowable.SendWALHeartbeat) - if err := heartbeatFuture.Get(ctx, nil); err != nil { - return err - } + // Use channels to time activities, + // using two primes for frequency to scatter timing + // After sending, next send should be delayed if activity takes longer than frequency + doSlotSize := workflow.NewChannel(ctx) + doHeartbeat := workflow.NewChannel(ctx) + doneSlotSize := workflow.NewChannel(ctx) + doneHeartbeat := workflow.NewChannel(ctx) + workflow.Go(ctx, func(ctx workflow.Context) { + for { + doSlotSize.Send(ctx, nil) + if workflow.Sleep(ctx, 5*time.Minute) != nil { + return + } + doneSlotSize.Receive(ctx, nil) + } + }) + workflow.Go(ctx, func(ctx workflow.Context) { + for { + doHeartbeat.Send(ctx, nil) + if workflow.Sleep(ctx, 11*time.Minute) != nil { + return + } + doneHeartbeat.Receive(ctx, nil) + } + }) + + var canceled bool + activities := 0 + selector := workflow.NewSelector(ctx) + selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { + canceled = true + }) + selector.AddReceive(doSlotSize, func(c workflow.ReceiveChannel, _ bool) { + if c.ReceiveAsync(nil) { + slotSizeFuture := workflow.ExecuteActivity(ctx, flowable.RecordSlotSizes) + selector.AddFuture(slotSizeFuture, func(f workflow.Future) { + doneSlotSize.Send(ctx, nil) + }) + } + }) + selector.AddReceive(doHeartbeat, func(c workflow.ReceiveChannel, _ bool) { + if c.ReceiveAsync(nil) { + heartbeatFuture := workflow.ExecuteActivity(ctx, flowable.SendWALHeartbeat) + selector.AddFuture(heartbeatFuture, func(f workflow.Future) { + doneHeartbeat.Send(ctx, nil) + }) + } + }) + for { + selector.Select(ctx) + if canceled { + return nil + } - return nil + activities += 1 + if activities > 99 { + return workflow.NewContinueAsNewError(ctx, HeartbeatFlowWorkflow) + } + } }