diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 4fc6c9d7c4..65f78e3b35 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -2,7 +2,6 @@ package activities import ( "context" - "database/sql" "errors" "fmt" "log/slog" @@ -11,7 +10,6 @@ import ( "time" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/activity" "golang.org/x/sync/errgroup" @@ -231,11 +229,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } defer connectors.CloseConnector(ctx, srcConn) - slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName) - if input.FlowConnectionConfigs.ReplicationSlotName != "" { - slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName - } - shutdown := utils.HeartbeatRoutine(ctx, func() string { jobName := input.FlowConnectionConfigs.FlowJobName return fmt.Sprintf("transferring records for job - %s", jobName) @@ -243,7 +236,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, defer shutdown() errGroup, errCtx := errgroup.WithContext(ctx) - go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name) batchSize := input.SyncFlowOptions.BatchSize if batchSize <= 0 { @@ -721,11 +713,10 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto if err != nil { return nil, err } - defer optionRows.Close() - var peerName pgtype.Text - var postgresPeers []*protos.Peer - var peerOptions sql.RawBytes - for optionRows.Next() { + + return pgx.CollectRows(optionRows, func(row pgx.CollectableRow) (*protos.Peer, error) { + var peerName string + var peerOptions []byte err := optionRows.Scan(&peerName, &peerOptions) if err != nil { return nil, err @@ -735,13 +726,12 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto if unmarshalErr != nil { return nil, unmarshalErr } - postgresPeers = append(postgresPeers, &protos.Peer{ - Name: peerName.String, + return &protos.Peer{ + Name: peerName, Type: protos.DBType_POSTGRES, Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig}, - }) - } - return postgresPeers, nil + }, nil + }) } func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { @@ -751,53 +741,108 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { return nil } - ticker := time.NewTicker(10 * time.Minute) - defer ticker.Stop() + pgPeers, err := a.getPostgresPeerConfigs(ctx) + if err != nil { + logger.Warn("[sendwalheartbeat] unable to fetch peers. " + + "Skipping walheartbeat send. Error: " + err.Error()) + return err + } - activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes") - for { - select { - case <-ctx.Done(): - logger.Info("context is done, exiting wal heartbeat send loop") + command := ` + BEGIN; + DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); + CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); + DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); + END; + ` + // run above command for each Postgres peer + for _, pgPeer := range pgPeers { + activity.RecordHeartbeat(ctx, pgPeer.Name) + if ctx.Err() != nil { return nil - case <-ticker.C: - pgPeers, err := a.getPostgresPeerConfigs(ctx) + } + + func() { + pgConfig := pgPeer.GetPostgresConfig() + peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig)) + if peerErr != nil { + logger.Error(fmt.Sprintf("error creating pool for postgres peer %v with host %v: %v", + pgPeer.Name, pgConfig.Host, peerErr)) + return + } + defer peerConn.Close(ctx) + + _, err := peerConn.Exec(ctx, command) if err != nil { - logger.Warn("[sendwalheartbeat] unable to fetch peers. " + - "Skipping walheartbeat send. Error: " + err.Error()) - continue + logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err)) } - command := ` - BEGIN; - DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` - // run above command for each Postgres peer - for _, pgPeer := range pgPeers { - pgConfig := pgPeer.GetPostgresConfig() - peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig)) - if peerErr != nil { - return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w", - pgPeer.Name, pgConfig.Host, peerErr) - } + logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name)) + }() + } - _, err := peerConn.Exec(ctx, command) - if err != nil { - logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, err)) - } + return nil +} + +func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { + rows, err := a.CatalogPool.Query(ctx, "SELECT flows.name, flows.config_proto FROM flows") + if err != nil { + return err + } - closeErr := peerConn.Close(ctx) - if closeErr != nil { - return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w", - pgPeer.Name, pgConfig.Host, closeErr) + configs, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.FlowConnectionConfigs, error) { + var flowName string + var configProto []byte + err := rows.Scan(&flowName, &configProto) + if err != nil { + return nil, err + } + + var config protos.FlowConnectionConfigs + err = proto.Unmarshal(configProto, &config) + if err != nil { + return nil, err + } + + return &config, nil + }) + if err != nil { + return err + } + + logger := activity.GetLogger(ctx) + for _, config := range configs { + func() { + srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source) + if err != nil { + if err != connectors.ErrUnsupportedFunctionality { + logger.Error("Failed to create connector to handle slot info", slog.Any("error", err)) } - logger.Info(fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name)) + return + } + defer connectors.CloseConnector(ctx, srcConn) + + slotName := fmt.Sprintf("peerflow_slot_%s", config.FlowJobName) + if config.ReplicationSlotName != "" { + slotName = config.ReplicationSlotName + } + peerName := config.Source.Name + + activity.RecordHeartbeat(ctx, fmt.Sprintf("checking %s on %s", slotName, peerName)) + if ctx.Err() != nil { + return } + err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName) + if err != nil { + logger.Error("Failed to handle slot info", slog.Any("error", err)) + } + }() + if ctx.Err() != nil { + return nil } } + + return nil } func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, diff --git a/flow/activities/slot.go b/flow/activities/slot.go deleted file mode 100644 index 8f6f3325b4..0000000000 --- a/flow/activities/slot.go +++ /dev/null @@ -1,36 +0,0 @@ -package activities - -import ( - "context" - "time" - - "github.com/PeerDB-io/peer-flow/connectors" -) - -func (a *FlowableActivity) recordSlotSizePeriodically( - ctx context.Context, - srcConn connectors.CDCPullConnector, - slotName string, - peerName string, -) { - // ensures slot info is logged at least once per SyncFlow - err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName) - if err != nil { - return - } - - ticker := time.NewTicker(5 * time.Minute) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName) - if err != nil { - return - } - case <-ctx.Done(): - return - } - } -} diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 2634aa3bf8..656fad33d0 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -63,7 +63,7 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) { return server, nil } -func killExistingHeartbeatFlows( +func killExistingScheduleFlows( ctx context.Context, tc client.Client, namespace string, @@ -72,12 +72,12 @@ func killExistingHeartbeatFlows( listRes, err := tc.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ Namespace: namespace, - Query: "WorkflowType = 'HeartbeatFlowWorkflow' AND TaskQueue = '" + taskQueue + "'", + Query: "WorkflowType = 'GlobalScheduleManagerWorkflow' AND TaskQueue = '" + taskQueue + "'", }) if err != nil { return fmt.Errorf("unable to list workflows: %w", err) } - slog.Info("Requesting cancellation of pre-existing heartbeat flows") + slog.Info("Requesting cancellation of pre-existing scheduler flows") for _, workflow := range listRes.Executions { slog.Info("Cancelling workflow", slog.String("workflowId", workflow.Execution.WorkflowId)) err := tc.CancelWorkflow(ctx, @@ -131,24 +131,24 @@ func APIMain(ctx context.Context, args *APIServerParams) error { flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue) - err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue) + err = killExistingScheduleFlows(ctx, tc, args.TemporalNamespace, taskQueue) if err != nil { - return fmt.Errorf("unable to kill existing heartbeat flows: %w", err) + return fmt.Errorf("unable to kill existing scheduler flows: %w", err) } - workflowID := fmt.Sprintf("heartbeatflow-%s", uuid.New()) + workflowID := fmt.Sprintf("scheduler-%s", uuid.New()) workflowOptions := client.StartWorkflowOptions{ ID: workflowID, TaskQueue: taskQueue, } _, err = flowHandler.temporalClient.ExecuteWorkflow( - ctx, // context - workflowOptions, // workflow start options - peerflow.HeartbeatFlowWorkflow, // workflow function + ctx, + workflowOptions, + peerflow.GlobalScheduleManagerWorkflow, ) if err != nil { - return fmt.Errorf("unable to start heartbeat workflow: %w", err) + return fmt.Errorf("unable to start scheduler workflow: %w", err) } protos.RegisterFlowServiceServer(grpcServer, flowHandler) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index e6c272a334..85c350c164 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -128,16 +128,10 @@ func (h *FlowRequestHandler) cloneTableSummary( COUNT(CASE WHEN qp.end_time IS NOT NULL THEN 1 END) AS NumPartitionsCompleted, SUM(qp.rows_in_partition) FILTER (WHERE qp.end_time IS NOT NULL) AS NumRowsSynced, AVG(EXTRACT(EPOCH FROM (qp.end_time - qp.start_time)) * 1000) FILTER (WHERE qp.end_time IS NOT NULL) AS AvgTimePerPartitionMs - FROM - peerdb_stats.qrep_partitions qp - JOIN - peerdb_stats.qrep_runs qr - ON - qp.flow_name = qr.flow_name - WHERE - qp.flow_name ILIKE $1 - GROUP BY - qp.flow_name, qr.config_proto; + FROM peerdb_stats.qrep_partitions qp + JOIN peerdb_stats.qrep_runs qr ON qp.flow_name = qr.flow_name + WHERE qp.flow_name ILIKE $1 + GROUP BY qp.flow_name, qr.config_proto; ` var flowName pgtype.Text diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index c63511470d..753ef14f14 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -128,15 +128,7 @@ func WorkerMain(opts *WorkerOptions) error { } w := worker.New(c, taskQueue, worker.Options{}) - w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig) - w.RegisterWorkflow(peerflow.SyncFlowWorkflow) - w.RegisterWorkflow(peerflow.SetupFlowWorkflow) - w.RegisterWorkflow(peerflow.NormalizeFlowWorkflow) - w.RegisterWorkflow(peerflow.QRepFlowWorkflow) - w.RegisterWorkflow(peerflow.QRepPartitionWorkflow) - w.RegisterWorkflow(peerflow.XminFlowWorkflow) - w.RegisterWorkflow(peerflow.DropFlowWorkflow) - w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) + peerflow.RegisterFlowWorkerWorkflows(w) alerter, err := alerting.NewAlerter(conn) if err != nil { diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 1e9401e3b0..889250d804 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -48,7 +48,6 @@ type CDCPullConnector interface { PullFlowCleanup(ctx context.Context, jobName string) error // HandleSlotInfo update monitoring info on slot size etc - // threadsafe HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool *pgxpool.Pool, slotName string, peerName string) error // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 3cd4f828a0..774c57a0ca 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -215,9 +215,8 @@ func (p *PostgresCDCSource) consumeStream( }() shutdown := utils.HeartbeatRoutine(ctx, func() string { - jobName := p.flowJobName currRecords := cdcRecordsStorage.Len() - msg := fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords) + msg := fmt.Sprintf("pulling records, currently have %d records", currRecords) p.logger.Info(msg) return msg }) @@ -279,10 +278,7 @@ func (p *PostgresCDCSource) consumeStream( // if we are past the next standby deadline (?) if time.Now().After(nextStandbyMessageDeadline) { if !cdcRecordsStorage.IsEmpty() { - p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, have %d records", - p.flowJobName, - cdcRecordsStorage.Len()), - ) + p.logger.Info(fmt.Sprintf("standby deadline reached, have %d records", cdcRecordsStorage.Len())) if !p.commitLock { p.logger.Info( diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1079431c17..052b148fef 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -897,15 +897,7 @@ func (c *PostgresConnector) HandleSlotInfo( ) error { logger := logger.LoggerFromCtx(ctx) - // must create new connection because HandleSlotInfo is threadsafe - conn, err := c.ssh.NewPostgresConnFromPostgresConfig(ctx, c.config) - if err != nil { - logger.Warn("warning: failed to connect to get slot info", "error", err) - return err - } - defer conn.Close(ctx) - - slotInfo, err := getSlotInfo(ctx, conn, slotName, c.config.Database) + slotInfo, err := getSlotInfo(ctx, c.conn, slotName, c.config.Database) if err != nil { logger.Warn("warning: failed to get slot info", "error", err) return err @@ -916,20 +908,18 @@ func (c *PostgresConnector) HandleSlotInfo( return nil } + logger.Info(fmt.Sprintf("Checking %s lag for %s", slotName, peerName), slog.Float64("LagInMB", float64(slotInfo[0].LagInMb))) alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0]) // Also handles alerts for PeerDB user connections exceeding a given limit here - res, err := getOpenConnectionsForUser(ctx, conn, c.config.User) + res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User) if err != nil { logger.Warn("warning: failed to get current open connections", "error", err) return err } alerter.AlertIfOpenConnections(ctx, peerName, res) - if len(slotInfo) != 0 { - return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0]) - } - return nil + return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0]) } // GetLastOffset returns the last synced offset for a job. diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index efcee75636..602ace6b8f 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -57,14 +57,7 @@ func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnv // set a 5 minute timeout for the workflow to execute a few runs. env.SetTestTimeout(5 * time.Minute) - env.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig) - env.RegisterWorkflow(peerflow.SyncFlowWorkflow) - env.RegisterWorkflow(peerflow.SetupFlowWorkflow) - env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) - env.RegisterWorkflow(peerflow.NormalizeFlowWorkflow) - env.RegisterWorkflow(peerflow.QRepFlowWorkflow) - env.RegisterWorkflow(peerflow.XminFlowWorkflow) - env.RegisterWorkflow(peerflow.QRepPartitionWorkflow) + peerflow.RegisterFlowWorkerWorkflows(env) alerter, err := alerting.NewAlerter(conn) if err != nil { @@ -555,6 +548,7 @@ func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnv testSuite.SetLogger(&tLogger) env := testSuite.NewTestWorkflowEnvironment() RegisterWorkflowsAndActivities(t, env) + env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) return env } diff --git a/flow/workflows/heartbeat_flow.go b/flow/workflows/heartbeat_flow.go deleted file mode 100644 index 1c99900b71..0000000000 --- a/flow/workflows/heartbeat_flow.go +++ /dev/null @@ -1,21 +0,0 @@ -package peerflow - -import ( - "time" - - "go.temporal.io/sdk/workflow" -) - -// HeartbeatFlowWorkflow is the workflow that sets up heartbeat sending. -func HeartbeatFlowWorkflow(ctx workflow.Context) error { - ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 7 * 24 * time.Hour, - }) - - heartbeatFuture := workflow.ExecuteActivity(ctx, flowable.SendWALHeartbeat) - if err := heartbeatFuture.Get(ctx, nil); err != nil { - return err - } - - return nil -} diff --git a/flow/workflows/register.go b/flow/workflows/register.go new file mode 100644 index 0000000000..2f7e4411cf --- /dev/null +++ b/flow/workflows/register.go @@ -0,0 +1,20 @@ +package peerflow + +import ( + "go.temporal.io/sdk/worker" +) + +func RegisterFlowWorkerWorkflows(w worker.WorkflowRegistry) { + w.RegisterWorkflow(CDCFlowWorkflowWithConfig) + w.RegisterWorkflow(DropFlowWorkflow) + w.RegisterWorkflow(NormalizeFlowWorkflow) + w.RegisterWorkflow(SetupFlowWorkflow) + w.RegisterWorkflow(SyncFlowWorkflow) + w.RegisterWorkflow(QRepFlowWorkflow) + w.RegisterWorkflow(QRepPartitionWorkflow) + w.RegisterWorkflow(XminFlowWorkflow) + + w.RegisterWorkflow(GlobalScheduleManagerWorkflow) + w.RegisterWorkflow(HeartbeatFlowWorkflow) + w.RegisterWorkflow(RecordSlotSizeWorkflow) +} diff --git a/flow/workflows/scheduled_flows.go b/flow/workflows/scheduled_flows.go new file mode 100644 index 0000000000..8fc08ce36b --- /dev/null +++ b/flow/workflows/scheduled_flows.go @@ -0,0 +1,58 @@ +package peerflow + +import ( + "fmt" + "time" + + "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/workflow" +) + +// RecordSlotSizeWorkflow monitors replication slot size +func RecordSlotSizeWorkflow(ctx workflow.Context) error { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Hour, + }) + slotSizeFuture := workflow.ExecuteActivity(ctx, flowable.RecordSlotSizes) + return slotSizeFuture.Get(ctx, nil) +} + +// HeartbeatFlowWorkflow sends WAL heartbeats +func HeartbeatFlowWorkflow(ctx workflow.Context) error { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Hour, + }) + heartbeatFuture := workflow.ExecuteActivity(ctx, flowable.SendWALHeartbeat) + return heartbeatFuture.Get(ctx, nil) +} + +func withCronOptions(ctx workflow.Context, workflowID string, cron string) workflow.Context { + return workflow.WithChildOptions(ctx, + workflow.ChildWorkflowOptions{ + WorkflowID: workflowID, + ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, + WaitForCancellation: true, + CronSchedule: cron, + }, + ) +} + +func GlobalScheduleManagerWorkflow(ctx workflow.Context) error { + info := workflow.GetInfo(ctx) + + heartbeatCtx := withCronOptions(ctx, + fmt.Sprintf("wal-heartbeat-%s", info.OriginalRunID), + "*/12 * * * *") + workflow.ExecuteChildWorkflow( + heartbeatCtx, + HeartbeatFlowWorkflow, + ) + + slotSizeCtx := withCronOptions(ctx, + fmt.Sprintf("record-slot-size-%s", info.OriginalRunID), + "*/5 * * * *") + workflow.ExecuteChildWorkflow(slotSizeCtx, RecordSlotSizeWorkflow) + + ctx.Done().Receive(ctx, nil) + return ctx.Err() +}