From 5d32604b4b738af2ded63e9e1779ebd906d92c3d Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 22 Dec 2023 18:25:36 +0530 Subject: [PATCH 1/7] only initialize replication pool when needed in CDCPull (#872) --- flow/cmd/peer_data.go | 4 +-- flow/connectors/core.go | 15 ++++++---- flow/connectors/postgres/postgres.go | 30 ++++++++++--------- .../connectors/postgres/postgres_repl_test.go | 2 +- .../postgres/postgres_schema_delta_test.go | 2 +- flow/e2e/postgres/qrep_flow_pg_test.go | 2 +- 6 files changed, 30 insertions(+), 25 deletions(-) diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 454dceb20e..34f31219ed 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -209,7 +209,7 @@ func (h *FlowRequestHandler) GetSlotInfo( return &protos.PeerSlotResponse{SlotData: nil}, err } - pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig) + pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false) if err != nil { slog.Error("Failed to create postgres connector", slog.Any("error", err)) return &protos.PeerSlotResponse{SlotData: nil}, err @@ -236,7 +236,7 @@ func (h *FlowRequestHandler) GetStatInfo( return &protos.PeerStatResponse{StatData: nil}, err } - pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig) + pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig, false) if err != nil { slog.Error("Failed to create postgres connector", slog.Any("error", err)) return &protos.PeerStatResponse{StatData: nil}, err diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e5efec63ca..1b823b04ae 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -140,7 +140,7 @@ func GetCDCPullConnector(ctx context.Context, config *protos.Peer) (CDCPullConne inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), true) default: return nil, ErrUnsupportedFunctionality } @@ -150,7 +150,7 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false) case *protos.Peer_BigqueryConfig: return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig()) case *protos.Peer_SnowflakeConfig: @@ -172,7 +172,7 @@ func GetCDCNormalizeConnector(ctx context.Context, inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false) case *protos.Peer_BigqueryConfig: return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig()) case *protos.Peer_SnowflakeConfig: @@ -186,7 +186,7 @@ func GetQRepPullConnector(ctx context.Context, config *protos.Peer) (QRepPullCon inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false) case *protos.Peer_SqlserverConfig: return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) default: @@ -198,7 +198,7 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon inner := config.Config switch inner.(type) { case *protos.Peer_PostgresConfig: - return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig()) + return connpostgres.NewPostgresConnector(ctx, config.GetPostgresConfig(), false) case *protos.Peer_BigqueryConfig: return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig()) case *protos.Peer_SnowflakeConfig: @@ -219,7 +219,10 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { if pgConfig == nil { return nil, fmt.Errorf("missing postgres config for %s peer %s", peer.Type.String(), peer.Name) } - return connpostgres.NewPostgresConnector(ctx, pgConfig) + // we can't decide if a PG peer should have replication permissions on it because we don't know + // what the user wants to do with it, so defaulting to being permissive. + // can be revisited in the future or we can use some UI wizardry. + return connpostgres.NewPostgresConnector(ctx, pgConfig, false) case protos.DBType_BIGQUERY: bqConfig := peer.GetBigqueryConfig() if bqConfig == nil { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index a46005fe01..2a6406cfea 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -34,7 +34,7 @@ type PostgresConnector struct { } // NewPostgresConnector creates a new instance of PostgresConnector. -func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { +func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig, initializeReplPool bool) (*PostgresConnector, error) { connectionString := utils.GetPGConnectionString(pgConfig) // create a separate connection pool for non-replication queries as replication connections cannot @@ -62,21 +62,23 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) return nil, fmt.Errorf("failed to get custom type map: %w", err) } - // ensure that replication is set to database - replConnConfig, err := pgxpool.ParseConfig(connectionString) - if err != nil { - return nil, fmt.Errorf("failed to parse connection string: %w", err) - } + // only initialize for CDCPullConnector to reduce number of idle connections + var replPool *SSHWrappedPostgresPool + if initializeReplPool { + // ensure that replication is set to database + replConnConfig, err := pgxpool.ParseConfig(connectionString) + if err != nil { + return nil, fmt.Errorf("failed to parse connection string: %w", err) + } - replConnConfig.ConnConfig.RuntimeParams["replication"] = "database" - replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex" - replConnConfig.MaxConns = 1 + replConnConfig.ConnConfig.RuntimeParams["replication"] = "database" + replConnConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex" + replConnConfig.MaxConns = 1 - // TODO: replPool not initializing might be intentional, if we only want to use QRep mirrors - // and the user doesn't have the REPLICATION permission - replPool, err := NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig) - if err != nil { - return nil, fmt.Errorf("failed to create connection pool: %w", err) + replPool, err = NewSSHWrappedPostgresPool(ctx, replConnConfig, pgConfig.SshConfig) + if err != nil { + return nil, fmt.Errorf("failed to create replication connection pool: %w", err) + } } metadataSchema := "_peerdb_internal" diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index b50a1f89fc..df3a7de13f 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -28,7 +28,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() { User: "postgres", Password: "postgres", Database: "postgres", - }) + }, true) require.NoError(suite.T(), err) setupTx, err := suite.connector.pool.Begin(context.Background()) diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 98a6a47b99..8a919eb214 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -32,7 +32,7 @@ func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() { User: "postgres", Password: "postgres", Database: "postgres", - }) + }, false) suite.failTestError(err) setupTx, err := suite.connector.pool.Begin(context.Background()) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 1c86c973b9..0bb886f9a3 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -55,7 +55,7 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { User: "postgres", Password: "postgres", Database: "postgres", - }) + }, false) s.NoError(err) } From fd9aebe35dd740428b6a372cfa6d7b89a893a1c2 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 22 Dec 2023 18:27:03 +0530 Subject: [PATCH 2/7] remove SendWALHeartbeat from the CDCPullConnector interface (#873) Since this is done on the activity level now for all Postgres peers Co-authored-by: Kaushik Iska --- flow/connectors/core.go | 3 --- flow/connectors/postgres/postgres.go | 16 ---------------- 2 files changed, 19 deletions(-) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 1b823b04ae..477b7cf46b 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -43,9 +43,6 @@ type CDCPullConnector interface { // PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR PullFlowCleanup(jobName string) error - // SendWALHeartbeat allows for activity to progress restart_lsn on postgres. - SendWALHeartbeat() error - // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 2a6406cfea..1152493b01 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -882,22 +882,6 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error { return nil } -func (c *PostgresConnector) SendWALHeartbeat() error { - command := ` - BEGIN; - DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` - _, err := c.pool.Exec(c.ctx, command) - if err != nil { - return fmt.Errorf("error bumping wal position: %w", err) - } - - return nil -} - // GetLastOffset returns the last synced offset for a job. func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnectionsForUserResult, error) { row := c.pool. From 45a420573f560ef8fc3ce6eda8f2ec7fabd2394f Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 22 Dec 2023 18:27:28 +0530 Subject: [PATCH 3/7] changing naming convention for getters in peerdbenv (#875) the Go convention is not to have `Get` in the function name, `GetPeer` vs `Peer`. Can be changed in other places on a case by case basis. Co-authored-by: Kaushik Iska --- flow/activities/flowable.go | 10 +++---- flow/cmd/version.go | 2 +- flow/connectors/eventhub/eventhub.go | 2 +- flow/connectors/utils/catalog/env.go | 10 +++---- .../utils/cdc_records/cdc_records_storage.go | 2 +- flow/model/model.go | 2 +- flow/peerdbenv/config.go | 28 +++++++++---------- flow/shared/alerting/alerting.go | 4 +-- flow/shared/constants.go | 2 +- 9 files changed, 31 insertions(+), 31 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index b5cf8e8973..982364ba58 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -181,11 +181,11 @@ func (a *FlowableActivity) handleSlotInfo( } deploymentUIDPrefix := "" - if peerdbenv.GetPeerDBDeploymentUID() != "" { - deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID()) + if peerdbenv.PeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) } - slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold() + slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold() if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! @@ -194,7 +194,7 @@ cc: `, } // Also handles alerts for PeerDB user connections exceeding a given limit here - maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold() + maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold() res, err := srcConn.GetOpenConnectionsForUser() if err != nil { slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) @@ -294,7 +294,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, TableNameMapping: tblNameMapping, LastOffset: input.LastSyncState.Checkpoint, MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), - IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(), + IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(), TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, diff --git a/flow/cmd/version.go b/flow/cmd/version.go index 94210ea9cb..3338a20e6a 100644 --- a/flow/cmd/version.go +++ b/flow/cmd/version.go @@ -11,6 +11,6 @@ func (h *FlowRequestHandler) GetVersion( ctx context.Context, req *protos.PeerDBVersionRequest, ) (*protos.PeerDBVersionResponse, error) { - version := peerdbenv.GetPeerDBVersionShaShort() + version := peerdbenv.PeerDBVersionShaShort() return &protos.PeerDBVersionResponse{Version: version}, nil } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 027d3027fa..05347a4263 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -129,7 +129,7 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) - eventHubFlushTimeout := peerdbenv.GetPeerDBEventhubFlushTimeoutSeconds() + eventHubFlushTimeout := peerdbenv.PeerDBEventhubFlushTimeoutSeconds() ticker := time.NewTicker(eventHubFlushTimeout) defer ticker.Stop() diff --git a/flow/connectors/utils/catalog/env.go b/flow/connectors/utils/catalog/env.go index cdd85535b9..f5c8e0507d 100644 --- a/flow/connectors/utils/catalog/env.go +++ b/flow/connectors/utils/catalog/env.go @@ -39,10 +39,10 @@ func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) { func genCatalogConnectionString() string { return utils.GetPGConnectionString(&protos.PostgresConfig{ - Host: peerdbenv.GetPeerDBCatalogHost(), - Port: peerdbenv.GetPeerDBCatalogPort(), - User: peerdbenv.GetPeerDBCatalogUser(), - Password: peerdbenv.GetPeerDBCatalogPassword(), - Database: peerdbenv.GetPeerDBCatalogDatabase(), + Host: peerdbenv.PeerDBCatalogHost(), + Port: peerdbenv.PeerDBCatalogPort(), + User: peerdbenv.PeerDBCatalogUser(), + Password: peerdbenv.PeerDBCatalogPassword(), + Database: peerdbenv.PeerDBCatalogDatabase(), }) } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 3147045a76..66722b69ed 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -43,7 +43,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore { numRecords: 0, flowJobName: flowJobName, dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)), - numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(), + numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillThreshold(), } } diff --git a/flow/model/model.go b/flow/model/model.go index 487616c531..581b57178b 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -329,7 +329,7 @@ type CDCRecordStream struct { } func NewCDCRecordStream() *CDCRecordStream { - channelBuffer := peerdbenv.GetPeerDBCDCChannelBufferSize() + channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize() return &CDCRecordStream{ records: make(chan Record, channelBuffer), // TODO (kaushik): more than 1024 schema deltas can cause problems! diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 970be3455d..33b1058066 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -9,74 +9,74 @@ import ( // throughout the codebase. // PEERDB_VERSION_SHA_SHORT -func GetPeerDBVersionShaShort() string { +func PeerDBVersionShaShort() string { return getEnvString("PEERDB_VERSION_SHA_SHORT", "unknown") } // PEERDB_DEPLOYMENT_UID -func GetPeerDBDeploymentUID() string { +func PeerDBDeploymentUID() string { return getEnvString("PEERDB_DEPLOYMENT_UID", "") } // PEERDB_CDC_CHANNEL_BUFFER_SIZE -func GetPeerDBCDCChannelBufferSize() int { +func PeerDBCDCChannelBufferSize() int { return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) } // PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS -func GetPeerDBEventhubFlushTimeoutSeconds() time.Duration { +func PeerDBEventhubFlushTimeoutSeconds() time.Duration { x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) return time.Duration(x) * time.Second } // PEERDB_CDC_IDLE_TIMEOUT_SECONDS -func GetPeerDBCDCIdleTimeoutSeconds() time.Duration { +func PeerDBCDCIdleTimeoutSeconds() time.Duration { x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60) return time.Duration(x) * time.Second } // PEERDB_CDC_DISK_SPILL_THRESHOLD -func GetPeerDBCDCDiskSpillThreshold() int { +func PeerDBCDCDiskSpillThreshold() int { return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000) } // PEERDB_CATALOG_HOST -func GetPeerDBCatalogHost() string { +func PeerDBCatalogHost() string { return getEnvString("PEERDB_CATALOG_HOST", "") } // PEERDB_CATALOG_PORT -func GetPeerDBCatalogPort() uint32 { +func PeerDBCatalogPort() uint32 { return getEnvUint32("PEERDB_CATALOG_PORT", 5432) } // PEERDB_CATALOG_USER -func GetPeerDBCatalogUser() string { +func PeerDBCatalogUser() string { return getEnvString("PEERDB_CATALOG_USER", "") } // PEERDB_CATALOG_PASSWORD -func GetPeerDBCatalogPassword() string { +func PeerDBCatalogPassword() string { return getEnvString("PEERDB_CATALOG_PASSWORD", "") } // PEERDB_CATALOG_DATABASE -func GetPeerDBCatalogDatabase() string { +func PeerDBCatalogDatabase() string { return getEnvString("PEERDB_CATALOG_DATABASE", "") } // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely -func GetPeerDBSlotLagMBAlertThreshold() uint32 { +func PeerDBSlotLagMBAlertThreshold() uint32 { return getEnvUint32("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) } // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely -func GetPeerDBAlertingGapMinutesAsDuration() time.Duration { +func PeerDBAlertingGapMinutesAsDuration() time.Duration { why := time.Duration(getEnvUint32("PEERDB_ALERTING_GAP_MINUTES", 15)) return why * time.Minute } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely -func GetPeerDBOpenConnectionsAlertThreshold() uint32 { +func PeerDBOpenConnectionsAlertThreshold() uint32 { return getEnvUint32("PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) } diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 7dc3cb489f..a3a3d6c6e0 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -57,7 +57,7 @@ func NewAlerter(catalogPool *pgxpool.Pool) *Alerter { // Only raises an alert if another alert with the same key hasn't been raised // in the past X minutes, where X is configurable and defaults to 15 minutes func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) { - if peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() == 0 { + if peerdbenv.PeerDBAlertingGapMinutesAsDuration() == 0 { a.logger.WarnContext(ctx, "Alerting disabled via environment variable, returning") return } @@ -84,7 +84,7 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str return } - if time.Since(createdTimestamp) >= peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() { + if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() { for _, slackAlertSender := range slackAlertSenders { err = slackAlertSender.sendAlert(context.Background(), fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 8589b55487..aa447520b5 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -51,7 +51,7 @@ func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) { } func prependUIDToTaskQueueName(taskQueueName string) string { - deploymentUID := peerdbenv.GetPeerDBDeploymentUID() + deploymentUID := peerdbenv.PeerDBDeploymentUID() if deploymentUID == "" { return taskQueueName } From 60044de384a0f7244410233b54f659d356290f2f Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 22 Dec 2023 18:28:34 +0530 Subject: [PATCH 4/7] making all Snowflake DB access use a context (#876) Closes #766 Co-authored-by: Kaushik Iska --- flow/connectors/snowflake/qrep.go | 4 ++-- flow/connectors/snowflake/qrep_avro_sync.go | 16 +++++++--------- flow/connectors/snowflake/snowflake.go | 5 +++-- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 98d20b63ff..def870c183 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -84,7 +84,7 @@ func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType LIMIT 0 `, tableName) - rows, err := c.database.Query(queryString) + rows, err := c.database.QueryContext(c.ctx, queryString) if err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) } @@ -294,7 +294,7 @@ func (c *SnowflakeConnector) getColsFromTable(tableName string) (*model.ColumnIn WHERE UPPER(table_name) = '%s' AND UPPER(table_schema) = '%s' `, components.tableIdentifier, components.schemaIdentifier) - rows, err := c.database.Query(queryString) + rows, err := c.database.QueryContext(c.ctx, queryString) if err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 7184898ae3..30834c2554 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -112,7 +112,6 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords( s.connector.logger.Info("sync function called and schema acquired", partitionLog) err = s.addMissingColumns( - config.FlowJobName, schema, dstTableSchema, dstTableName, @@ -152,7 +151,6 @@ func (s *SnowflakeAvroSyncMethod) SyncQRepRecords( } func (s *SnowflakeAvroSyncMethod) addMissingColumns( - flowJobName string, schema *model.QRecordSchema, dstTableSchema []*sql.ColumnType, dstTableName string, @@ -197,7 +195,7 @@ func (s *SnowflakeAvroSyncMethod) addMissingColumns( s.connector.logger.Info(fmt.Sprintf("altering destination table %s with command `%s`", dstTableName, alterTableCmd), partitionLog) - if _, err := tx.Exec(alterTableCmd); err != nil { + if _, err := tx.ExecContext(s.connector.ctx, alterTableCmd); err != nil { return fmt.Errorf("failed to alter destination table: %w", err) } } @@ -290,7 +288,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage shutdown <- struct{}{} }() - if _, err := s.connector.database.Exec(putCmd); err != nil { + if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil { return fmt.Errorf("failed to put file to stage: %w", err) } @@ -395,7 +393,7 @@ func (s *SnowflakeAvroSyncMethod) insertMetadata( return fmt.Errorf("failed to create metadata insert statement: %v", err) } - if _, err := s.connector.database.Exec(insertMetadataStmt); err != nil { + if _, err := s.connector.database.ExecContext(s.connector.ctx, insertMetadataStmt); err != nil { s.connector.logger.Error("failed to execute metadata insert statement "+insertMetadataStmt, slog.Any("error", err), partitionLog) return fmt.Errorf("failed to execute metadata insert statement: %v", err) @@ -434,7 +432,7 @@ func (s *SnowflakeAvroWriteHandler) HandleAppendMode( copyCmd := fmt.Sprintf("COPY INTO %s(%s) FROM (SELECT %s FROM @%s) %s", s.dstTableName, copyInfo.columnsSQL, copyInfo.transformationSQL, s.stage, strings.Join(s.copyOpts, ",")) s.connector.logger.Info("running copy command: " + copyCmd) - _, err := s.connector.database.Exec(copyCmd) + _, err := s.connector.database.ExecContext(s.connector.ctx, copyCmd) if err != nil { return fmt.Errorf("failed to run COPY INTO command: %w", err) } @@ -518,7 +516,7 @@ func (s *SnowflakeAvroWriteHandler) HandleUpsertMode( //nolint:gosec createTempTableCmd := fmt.Sprintf("CREATE TEMPORARY TABLE %s AS SELECT * FROM %s LIMIT 0", tempTableName, s.dstTableName) - if _, err := s.connector.database.Exec(createTempTableCmd); err != nil { + if _, err := s.connector.database.ExecContext(s.connector.ctx, createTempTableCmd); err != nil { return fmt.Errorf("failed to create temp table: %w", err) } s.connector.logger.Info("created temp table " + tempTableName) @@ -526,7 +524,7 @@ func (s *SnowflakeAvroWriteHandler) HandleUpsertMode( //nolint:gosec copyCmd := fmt.Sprintf("COPY INTO %s(%s) FROM (SELECT %s FROM @%s) %s", tempTableName, copyInfo.columnsSQL, copyInfo.transformationSQL, s.stage, strings.Join(s.copyOpts, ",")) - _, err = s.connector.database.Exec(copyCmd) + _, err = s.connector.database.ExecContext(s.connector.ctx, copyCmd) if err != nil { return fmt.Errorf("failed to run COPY INTO command: %w", err) } @@ -538,7 +536,7 @@ func (s *SnowflakeAvroWriteHandler) HandleUpsertMode( } startTime := time.Now() - rows, err := s.connector.database.Exec(mergeCmd) + rows, err := s.connector.database.ExecContext(s.connector.ctx, mergeCmd) if err != nil { return fmt.Errorf("failed to merge data into destination table '%s': %w", mergeCmd, err) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 8cd8240f11..e61830db88 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -480,8 +480,9 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(flowJobName string, return fmt.Errorf("failed to convert column type %s to snowflake type: %w", addedColumn.ColumnType, err) } - _, err = tableSchemaModifyTx.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s", - schemaDelta.DstTableName, strings.ToUpper(addedColumn.ColumnName), sfColtype)) + _, err = tableSchemaModifyTx.ExecContext(c.ctx, + fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s", + schemaDelta.DstTableName, strings.ToUpper(addedColumn.ColumnName), sfColtype)) if err != nil { return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName, schemaDelta.DstTableName, err) From 7f97c0010a7ff2e75ce59a063b05b8d837019467 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 22 Dec 2023 08:55:47 -0500 Subject: [PATCH 5/7] Make alerter independent of slack (#877) This makes the alerter log to catalog whether slack based alerting is setup or not. Also fixes usage of Context in the alerting class. --- flow/cmd/worker.go | 8 ++- flow/e2e/test_utils.go | 8 ++- flow/shared/alerting/alerting.go | 85 ++++++++++++++++++-------------- 3 files changed, 63 insertions(+), 38 deletions(-) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index a42ac76b47..eea0e9184f 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -132,9 +132,15 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterWorkflow(peerflow.XminFlowWorkflow) w.RegisterWorkflow(peerflow.DropFlowWorkflow) w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow) + + alerter, err := alerting.NewAlerter(conn) + if err != nil { + return fmt.Errorf("unable to create alerter: %w", err) + } + w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, - Alerter: alerting.NewAlerter(conn), + Alerter: alerter, }) err = w.Run(worker.InterruptCh()) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 13ca8044e5..8bea8cf984 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -60,9 +60,15 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *t env.RegisterWorkflow(peerflow.QRepFlowWorkflow) env.RegisterWorkflow(peerflow.XminFlowWorkflow) env.RegisterWorkflow(peerflow.QRepPartitionWorkflow) + + alerter, err := alerting.NewAlerter(conn) + if err != nil { + t.Fatalf("unable to create alerter: %v", err) + } + env.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, - Alerter: alerting.NewAlerter(conn), + Alerter: alerter, }) env.RegisterActivity(&activities.SnapshotActivity{}) } diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index a3a3d6c6e0..394623825d 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -18,8 +18,8 @@ type Alerter struct { logger *slog.Logger } -func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) { - rows, err := catalogPool.Query(context.Background(), +func registerSendersFromPool(ctx context.Context, catalogPool *pgxpool.Pool) ([]*slackAlertSender, error) { + rows, err := catalogPool.Query(ctx, "SELECT service_type,service_config FROM peerdb_stats.alerting_config") if err != nil { return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err) @@ -47,11 +47,17 @@ func registerSendersFromPool(catalogPool *pgxpool.Pool) ([]*slackAlertSender, er } // doesn't take care of closing pool, needs to be done externally. -func NewAlerter(catalogPool *pgxpool.Pool) *Alerter { +func NewAlerter(catalogPool *pgxpool.Pool) (*Alerter, error) { + logger := slog.Default() + if catalogPool == nil { + logger.Error("catalog pool is nil for Alerter") + return nil, fmt.Errorf("catalog pool is nil for Alerter") + } + return &Alerter{ catalogPool: catalogPool, - logger: slog.Default(), - } + logger: logger, + }, nil } // Only raises an alert if another alert with the same key hasn't been raised @@ -62,40 +68,47 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str return } - if a.catalogPool != nil { - slackAlertSenders, err := registerSendersFromPool(a.catalogPool) - if err != nil { - a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err)) - return - } - if len(slackAlertSenders) == 0 { - a.logger.WarnContext(ctx, "no Slack senders configured, returning") - return - } - - row := a.catalogPool.QueryRow(context.Background(), - `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 + var err error + row := a.catalogPool.QueryRow(ctx, + `SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1 ORDER BY created_timestamp DESC LIMIT 1`, - alertKey) - var createdTimestamp time.Time - err = row.Scan(&createdTimestamp) - if err != nil && err != pgx.ErrNoRows { - a.logger.Warn("failed to send alert: %v", err) + alertKey) + var createdTimestamp time.Time + err = row.Scan(&createdTimestamp) + if err != nil && err != pgx.ErrNoRows { + a.logger.Warn("failed to send alert: %v", err) + return + } + + if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() { + a.AddAlertToCatalog(ctx, alertKey, alertMessage) + a.AlertToSlack(ctx, alertKey, alertMessage) + } +} + +func (a *Alerter) AlertToSlack(ctx context.Context, alertKey string, alertMessage string) { + slackAlertSenders, err := registerSendersFromPool(ctx, a.catalogPool) + if err != nil { + a.logger.WarnContext(ctx, "failed to set Slack senders", slog.Any("error", err)) + return + } + + for _, slackAlertSender := range slackAlertSenders { + err = slackAlertSender.sendAlert(ctx, + fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) + if err != nil { + a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err)) return } + } +} - if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() { - for _, slackAlertSender := range slackAlertSenders { - err = slackAlertSender.sendAlert(context.Background(), - fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) - if err != nil { - a.logger.WarnContext(ctx, "failed to send alert", slog.Any("error", err)) - return - } - _, _ = a.catalogPool.Exec(context.Background(), - "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", - alertKey, alertMessage) - } - } +func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertMessage string) { + _, err := a.catalogPool.Exec(ctx, + "INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)", + alertKey, alertMessage) + if err != nil { + a.logger.WarnContext(ctx, "failed to insert alert", slog.Any("error", err)) + return } } From 68616141d708263b5f15194134a691672f0a3aa6 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 22 Dec 2023 09:41:02 -0500 Subject: [PATCH 6/7] log common errors to catalog for user acknowledgement (#878) --- flow/activities/flowable.go | 122 ++++++------------ flow/activities/slot.go | 89 +++++++++++++ flow/shared/alerting/alerting.go | 11 ++ .../catalog/migrations/V17__mirror_errors.sql | 10 ++ 4 files changed, 147 insertions(+), 85 deletions(-) create mode 100644 flow/activities/slot.go create mode 100644 nexus/catalog/migrations/V17__mirror_errors.sql diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 982364ba58..8fe8e4be2b 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -73,6 +73,7 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot defer connectors.CloseConnector(dstConn) if err := dstConn.SetupMetadataTables(); err != nil { + a.Alerter.LogFlowError(ctx, config.Name, err) return fmt.Errorf("failed to setup metadata tables: %w", err) } @@ -111,6 +112,7 @@ func (a *FlowableActivity) EnsurePullability( output, err := srcConn.EnsurePullability(config) if err != nil { + a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } @@ -165,84 +167,13 @@ func (a *FlowableActivity) CreateNormalizedTable( } defer connectors.CloseConnector(conn) - return conn.SetupNormalizedTables(config) -} - -func (a *FlowableActivity) handleSlotInfo( - ctx context.Context, - srcConn connectors.CDCPullConnector, - slotName string, - peerName string, -) error { - slotInfo, err := srcConn.GetSlotInfo(slotName) + setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config) if err != nil { - slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err)) - return err - } - - deploymentUIDPrefix := "" - if peerdbenv.PeerDBDeploymentUID() != "" { - deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) + a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + return nil, fmt.Errorf("failed to setup normalized tables: %w", err) } - slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold() - if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { - a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), - fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! -cc: `, - deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) - } - - // Also handles alerts for PeerDB user connections exceeding a given limit here - maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold() - res, err := srcConn.GetOpenConnectionsForUser() - if err != nil { - slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) - return err - } - if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) { - a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName), - fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ - ` has exceeded threshold size of %d connections, currently at %d connections! -cc: `, - deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections)) - } - - if len(slotInfo) != 0 { - return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0]) - } - return nil -} - -func (a *FlowableActivity) recordSlotSizePeriodically( - ctx context.Context, - srcConn connectors.CDCPullConnector, - slotName string, - peerName string, -) { - // ensures slot info is logged at least once per SyncFlow - err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) - if err != nil { - return - } - - timeout := 5 * time.Minute - ticker := time.NewTicker(timeout) - - defer ticker.Stop() - for { - select { - case <-ticker.C: - err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) - if err != nil { - return - } - case <-ctx.Done(): - return - } - ticker.Stop() - ticker = time.NewTicker(timeout) - } + return setupNormalizedTablesOutput, nil } // StartFlow implements StartFlow. @@ -256,6 +187,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, fmt.Errorf("failed to get destination connector: %w", err) } defer connectors.CloseConnector(dstConn) + slog.InfoContext(ctx, "initializing table schema...") err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping) if err != nil { @@ -268,10 +200,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - recordBatch := model.NewCDCRecordStream() - - startTime := time.Now() - errGroup, errCtx := errgroup.WithContext(ctx) srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source) if err != nil { @@ -287,9 +215,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name) // start a goroutine to pull records from the source + recordBatch := model.NewCDCRecordStream() + startTime := time.Now() + flowName := input.FlowConnectionConfigs.FlowJobName errGroup.Go(func() error { return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{ - FlowJobName: input.FlowConnectionConfigs.FlowJobName, + FlowJobName: flowName, SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, LastOffset: input.LastSyncState.Checkpoint, @@ -301,7 +232,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, RelationMessageMapping: input.RelationMessageMapping, RecordStream: recordBatch, SetLastOffset: func(lastOffset int64) error { - return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset) + return dstConn.SetLastOffset(flowName, lastOffset) }, }) }) @@ -309,12 +240,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, hasRecords := !recordBatch.WaitAndCheckEmpty() slog.InfoContext(ctx, fmt.Sprintf("the current sync flow has records: %v", hasRecords)) if a.CatalogPool != nil && hasRecords { - syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName) + syncBatchID, err := dstConn.GetLastSyncBatchID(flowName) if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB { return nil, err } - err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, + err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, flowName, monitoring.CDCBatchInfo{ BatchID: syncBatchID + 1, RowsInBatch: 0, @@ -322,6 +253,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, StartTime: startTime, }) if err != nil { + a.Alerter.LogFlowError(ctx, flowName, err) return nil, err } } @@ -330,6 +262,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, // wait for the pull goroutine to finish err = errGroup.Wait() if err != nil { + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to pull records: %w", err) } slog.InfoContext(ctx, "no records to push") @@ -358,11 +291,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) if err != nil { slog.Warn("failed to push records", slog.Any("error", err)) + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to push records: %w", err) } err = errGroup.Wait() if err != nil { + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to pull records: %w", err) } @@ -465,6 +400,7 @@ func (a *FlowableActivity) StartNormalize( SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName, }) if err != nil { + a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) return nil, fmt.Errorf("failed to normalized records: %w", err) } @@ -502,7 +438,13 @@ func (a *FlowableActivity) ReplayTableSchemaDeltas( } defer connectors.CloseConnector(dest) - return dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas) + err = dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas) + if err != nil { + a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err) + return fmt.Errorf("failed to replay table schema deltas: %w", err) + } + + return nil } // SetupQRepMetadataTables sets up the metadata tables for QReplication. @@ -513,7 +455,13 @@ func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config * } defer connectors.CloseConnector(conn) - return conn.SetupQRepMetadataTables(config) + err = conn.SetupQRepMetadataTables(config) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to setup metadata tables: %w", err) + } + + return nil } // GetQRepPartitions returns the partitions for a given QRepConfig. @@ -538,6 +486,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, partitions, err := srcConn.GetQRepPartitions(config, last) if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return nil, fmt.Errorf("failed to get partitions from source: %w", err) } if len(partitions) > 0 { @@ -578,6 +527,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId)) err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID) if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err } } @@ -717,6 +667,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config err = dstConn.ConsolidateQRepPartitions(config) if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err } @@ -1017,6 +968,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, } else { err := errGroup.Wait() if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return 0, err } diff --git a/flow/activities/slot.go b/flow/activities/slot.go new file mode 100644 index 0000000000..cf1375b4e4 --- /dev/null +++ b/flow/activities/slot.go @@ -0,0 +1,89 @@ +package activities + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/PeerDB-io/peer-flow/connectors" + "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +func (a *FlowableActivity) handleSlotInfo( + ctx context.Context, + srcConn connectors.CDCPullConnector, + slotName string, + peerName string, +) error { + slotInfo, err := srcConn.GetSlotInfo(slotName) + if err != nil { + slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err)) + return err + } + + deploymentUIDPrefix := "" + if peerdbenv.PeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) + } + + slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold() + if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { + a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), + fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! +cc: `, + deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) + } + + // Also handles alerts for PeerDB user connections exceeding a given limit here + maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold() + res, err := srcConn.GetOpenConnectionsForUser() + if err != nil { + slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) + return err + } + if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) { + a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName), + fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ + ` has exceeded threshold size of %d connections, currently at %d connections! +cc: `, + deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections)) + } + + if len(slotInfo) != 0 { + return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0]) + } + return nil +} + +func (a *FlowableActivity) recordSlotSizePeriodically( + ctx context.Context, + srcConn connectors.CDCPullConnector, + slotName string, + peerName string, +) { + // ensures slot info is logged at least once per SyncFlow + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + return + } + + timeout := 5 * time.Minute + ticker := time.NewTicker(timeout) + + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + return + } + case <-ctx.Done(): + return + } + ticker.Stop() + ticker = time.NewTicker(timeout) + } +} diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 394623825d..50608087d2 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -112,3 +112,14 @@ func (a *Alerter) AddAlertToCatalog(ctx context.Context, alertKey string, alertM return } } + +func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) { + errorWithStack := fmt.Sprintf("%+v", err) + _, err = a.catalogPool.Exec(ctx, + "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", + flowName, errorWithStack, "error") + if err != nil { + a.logger.WarnContext(ctx, "failed to insert flow error", slog.Any("error", err)) + return + } +} diff --git a/nexus/catalog/migrations/V17__mirror_errors.sql b/nexus/catalog/migrations/V17__mirror_errors.sql new file mode 100644 index 0000000000..06f2352ea2 --- /dev/null +++ b/nexus/catalog/migrations/V17__mirror_errors.sql @@ -0,0 +1,10 @@ +CREATE TABLE peerdb_stats.flow_errors ( + id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + flow_name TEXT NOT NULL, + error_message TEXT NOT NULL, + error_type TEXT NOT NULL, + error_timestamp TIMESTAMP NOT NULL DEFAULT now(), + ack BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE INDEX idx_flow_errors_flow_name ON peerdb_stats.flow_errors (flow_name); From 8dbe7c932a53760dd3925b5acd67a24312095db2 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Fri, 22 Dec 2023 21:27:19 +0530 Subject: [PATCH 7/7] Fix: Add placeholder for wal_status (#880) Co-authored-by: Kaushik Iska --- flow/connectors/utils/monitoring/monitoring.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 7815372277..7e9263cf26 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -204,7 +204,7 @@ func AppendSlotSizeInfo( _, err := pool.Exec(ctx, "INSERT INTO peerdb_stats.peer_slot_size"+ "(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size, wal_status) "+ - "VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING;", + "VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING;", peerName, slotInfo.SlotName, slotInfo.RestartLSN,