diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index b5cf8e8973..982364ba58 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -181,11 +181,11 @@ func (a *FlowableActivity) handleSlotInfo( } deploymentUIDPrefix := "" - if peerdbenv.GetPeerDBDeploymentUID() != "" { - deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID()) + if peerdbenv.PeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) } - slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold() + slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold() if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! @@ -194,7 +194,7 @@ cc: `, } // Also handles alerts for PeerDB user connections exceeding a given limit here - maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold() + maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold() res, err := srcConn.GetOpenConnectionsForUser() if err != nil { slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) @@ -294,7 +294,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, TableNameMapping: tblNameMapping, LastOffset: input.LastSyncState.Checkpoint, MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), - IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(), + IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(), TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, diff --git a/flow/cmd/version.go b/flow/cmd/version.go index 94210ea9cb..3338a20e6a 100644 --- a/flow/cmd/version.go +++ b/flow/cmd/version.go @@ -11,6 +11,6 @@ func (h *FlowRequestHandler) GetVersion( ctx context.Context, req *protos.PeerDBVersionRequest, ) (*protos.PeerDBVersionResponse, error) { - version := peerdbenv.GetPeerDBVersionShaShort() + version := peerdbenv.PeerDBVersionShaShort() return &protos.PeerDBVersionResponse{Version: version}, nil } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 027d3027fa..05347a4263 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -129,7 +129,7 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) - eventHubFlushTimeout := peerdbenv.GetPeerDBEventhubFlushTimeoutSeconds() + eventHubFlushTimeout := peerdbenv.PeerDBEventhubFlushTimeoutSeconds() ticker := time.NewTicker(eventHubFlushTimeout) defer ticker.Stop() diff --git a/flow/connectors/utils/catalog/env.go b/flow/connectors/utils/catalog/env.go index cdd85535b9..f5c8e0507d 100644 --- a/flow/connectors/utils/catalog/env.go +++ b/flow/connectors/utils/catalog/env.go @@ -39,10 +39,10 @@ func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) { func genCatalogConnectionString() string { return utils.GetPGConnectionString(&protos.PostgresConfig{ - Host: peerdbenv.GetPeerDBCatalogHost(), - Port: peerdbenv.GetPeerDBCatalogPort(), - User: peerdbenv.GetPeerDBCatalogUser(), - Password: peerdbenv.GetPeerDBCatalogPassword(), - Database: peerdbenv.GetPeerDBCatalogDatabase(), + Host: peerdbenv.PeerDBCatalogHost(), + Port: peerdbenv.PeerDBCatalogPort(), + User: peerdbenv.PeerDBCatalogUser(), + Password: peerdbenv.PeerDBCatalogPassword(), + Database: peerdbenv.PeerDBCatalogDatabase(), }) } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 3147045a76..66722b69ed 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -43,7 +43,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore { numRecords: 0, flowJobName: flowJobName, dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)), - numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(), + numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillThreshold(), } } diff --git a/flow/model/model.go b/flow/model/model.go index 487616c531..581b57178b 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -329,7 +329,7 @@ type CDCRecordStream struct { } func NewCDCRecordStream() *CDCRecordStream { - channelBuffer := peerdbenv.GetPeerDBCDCChannelBufferSize() + channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize() return &CDCRecordStream{ records: make(chan Record, channelBuffer), // TODO (kaushik): more than 1024 schema deltas can cause problems! diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 970be3455d..33b1058066 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -9,74 +9,74 @@ import ( // throughout the codebase. // PEERDB_VERSION_SHA_SHORT -func GetPeerDBVersionShaShort() string { +func PeerDBVersionShaShort() string { return getEnvString("PEERDB_VERSION_SHA_SHORT", "unknown") } // PEERDB_DEPLOYMENT_UID -func GetPeerDBDeploymentUID() string { +func PeerDBDeploymentUID() string { return getEnvString("PEERDB_DEPLOYMENT_UID", "") } // PEERDB_CDC_CHANNEL_BUFFER_SIZE -func GetPeerDBCDCChannelBufferSize() int { +func PeerDBCDCChannelBufferSize() int { return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) } // PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS -func GetPeerDBEventhubFlushTimeoutSeconds() time.Duration { +func PeerDBEventhubFlushTimeoutSeconds() time.Duration { x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) return time.Duration(x) * time.Second } // PEERDB_CDC_IDLE_TIMEOUT_SECONDS -func GetPeerDBCDCIdleTimeoutSeconds() time.Duration { +func PeerDBCDCIdleTimeoutSeconds() time.Duration { x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60) return time.Duration(x) * time.Second } // PEERDB_CDC_DISK_SPILL_THRESHOLD -func GetPeerDBCDCDiskSpillThreshold() int { +func PeerDBCDCDiskSpillThreshold() int { return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000) } // PEERDB_CATALOG_HOST -func GetPeerDBCatalogHost() string { +func PeerDBCatalogHost() string { return getEnvString("PEERDB_CATALOG_HOST", "") } // PEERDB_CATALOG_PORT -func GetPeerDBCatalogPort() uint32 { +func PeerDBCatalogPort() uint32 { return getEnvUint32("PEERDB_CATALOG_PORT", 5432) } // PEERDB_CATALOG_USER -func GetPeerDBCatalogUser() string { +func PeerDBCatalogUser() string { return getEnvString("PEERDB_CATALOG_USER", "") } // PEERDB_CATALOG_PASSWORD -func GetPeerDBCatalogPassword() string { +func PeerDBCatalogPassword() string { return getEnvString("PEERDB_CATALOG_PASSWORD", "") } // PEERDB_CATALOG_DATABASE -func GetPeerDBCatalogDatabase() string { +func PeerDBCatalogDatabase() string { return getEnvString("PEERDB_CATALOG_DATABASE", "") } // PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely -func GetPeerDBSlotLagMBAlertThreshold() uint32 { +func PeerDBSlotLagMBAlertThreshold() uint32 { return getEnvUint32("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) } // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely -func GetPeerDBAlertingGapMinutesAsDuration() time.Duration { +func PeerDBAlertingGapMinutesAsDuration() time.Duration { why := time.Duration(getEnvUint32("PEERDB_ALERTING_GAP_MINUTES", 15)) return why * time.Minute } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely -func GetPeerDBOpenConnectionsAlertThreshold() uint32 { +func PeerDBOpenConnectionsAlertThreshold() uint32 { return getEnvUint32("PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) } diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 7dc3cb489f..a3a3d6c6e0 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -57,7 +57,7 @@ func NewAlerter(catalogPool *pgxpool.Pool) *Alerter { // Only raises an alert if another alert with the same key hasn't been raised // in the past X minutes, where X is configurable and defaults to 15 minutes func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) { - if peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() == 0 { + if peerdbenv.PeerDBAlertingGapMinutesAsDuration() == 0 { a.logger.WarnContext(ctx, "Alerting disabled via environment variable, returning") return } @@ -84,7 +84,7 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str return } - if time.Since(createdTimestamp) >= peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() { + if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() { for _, slackAlertSender := range slackAlertSenders { err = slackAlertSender.sendAlert(context.Background(), fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage) diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 8589b55487..aa447520b5 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -51,7 +51,7 @@ func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) { } func prependUIDToTaskQueueName(taskQueueName string) string { - deploymentUID := peerdbenv.GetPeerDBDeploymentUID() + deploymentUID := peerdbenv.PeerDBDeploymentUID() if deploymentUID == "" { return taskQueueName }