Skip to content

Commit

Permalink
changing naming convention for getters in peerdbenv
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 22, 2023
1 parent 08861aa commit eaf541b
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 31 deletions.
10 changes: 5 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ func (a *FlowableActivity) handleSlotInfo(
}

deploymentUIDPrefix := ""
if peerdbenv.GetPeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID())
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold()
slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
Expand All @@ -194,7 +194,7 @@ cc: <!channel>`,
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold()
maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
Expand Down Expand Up @@ -294,7 +294,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ func (h *FlowRequestHandler) GetVersion(
ctx context.Context,
req *protos.PeerDBVersionRequest,
) (*protos.PeerDBVersionResponse, error) {
version := peerdbenv.GetPeerDBVersionShaShort()
version := peerdbenv.PeerDBVersionShaShort()
return &protos.PeerDBVersionResponse{Version: version}, nil
}
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

eventHubFlushTimeout := peerdbenv.GetPeerDBEventhubFlushTimeoutSeconds()
eventHubFlushTimeout := peerdbenv.PeerDBEventhubFlushTimeoutSeconds()

ticker := time.NewTicker(eventHubFlushTimeout)
defer ticker.Stop()
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/utils/catalog/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) {

func genCatalogConnectionString() string {
return utils.GetPGConnectionString(&protos.PostgresConfig{
Host: peerdbenv.GetPeerDBCatalogHost(),
Port: peerdbenv.GetPeerDBCatalogPort(),
User: peerdbenv.GetPeerDBCatalogUser(),
Password: peerdbenv.GetPeerDBCatalogPassword(),
Database: peerdbenv.GetPeerDBCatalogDatabase(),
Host: peerdbenv.PeerDBCatalogHost(),
Port: peerdbenv.PeerDBCatalogPort(),
User: peerdbenv.PeerDBCatalogUser(),
Password: peerdbenv.PeerDBCatalogPassword(),
Database: peerdbenv.PeerDBCatalogDatabase(),
})
}
2 changes: 1 addition & 1 deletion flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore {
numRecords: 0,
flowJobName: flowJobName,
dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)),
numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(),
numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillThreshold(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ type CDCRecordStream struct {
}

func NewCDCRecordStream() *CDCRecordStream {
channelBuffer := peerdbenv.GetPeerDBCDCChannelBufferSize()
channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize()
return &CDCRecordStream{
records: make(chan Record, channelBuffer),
// TODO (kaushik): more than 1024 schema deltas can cause problems!
Expand Down
28 changes: 14 additions & 14 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,74 +9,74 @@ import (
// throughout the codebase.

// PEERDB_VERSION_SHA_SHORT
func GetPeerDBVersionShaShort() string {
func PeerDBVersionShaShort() string {
return getEnvString("PEERDB_VERSION_SHA_SHORT", "unknown")
}

// PEERDB_DEPLOYMENT_UID
func GetPeerDBDeploymentUID() string {
func PeerDBDeploymentUID() string {
return getEnvString("PEERDB_DEPLOYMENT_UID", "")
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func GetPeerDBCDCChannelBufferSize() int {
func PeerDBCDCChannelBufferSize() int {
return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func GetPeerDBEventhubFlushTimeoutSeconds() time.Duration {
func PeerDBEventhubFlushTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}

// PEERDB_CDC_IDLE_TIMEOUT_SECONDS
func GetPeerDBCDCIdleTimeoutSeconds() time.Duration {
func PeerDBCDCIdleTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)
return time.Duration(x) * time.Second
}

// PEERDB_CDC_DISK_SPILL_THRESHOLD
func GetPeerDBCDCDiskSpillThreshold() int {
func PeerDBCDCDiskSpillThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000)
}

// PEERDB_CATALOG_HOST
func GetPeerDBCatalogHost() string {
func PeerDBCatalogHost() string {
return getEnvString("PEERDB_CATALOG_HOST", "")
}

// PEERDB_CATALOG_PORT
func GetPeerDBCatalogPort() uint32 {
func PeerDBCatalogPort() uint32 {
return getEnvUint32("PEERDB_CATALOG_PORT", 5432)
}

// PEERDB_CATALOG_USER
func GetPeerDBCatalogUser() string {
func PeerDBCatalogUser() string {
return getEnvString("PEERDB_CATALOG_USER", "")
}

// PEERDB_CATALOG_PASSWORD
func GetPeerDBCatalogPassword() string {
func PeerDBCatalogPassword() string {
return getEnvString("PEERDB_CATALOG_PASSWORD", "")
}

// PEERDB_CATALOG_DATABASE
func GetPeerDBCatalogDatabase() string {
func PeerDBCatalogDatabase() string {
return getEnvString("PEERDB_CATALOG_DATABASE", "")
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func GetPeerDBSlotLagMBAlertThreshold() uint32 {
func PeerDBSlotLagMBAlertThreshold() uint32 {
return getEnvUint32("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}

// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely
func GetPeerDBAlertingGapMinutesAsDuration() time.Duration {
func PeerDBAlertingGapMinutesAsDuration() time.Duration {
why := time.Duration(getEnvUint32("PEERDB_ALERTING_GAP_MINUTES", 15))
return why * time.Minute
}

// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely
func GetPeerDBOpenConnectionsAlertThreshold() uint32 {
func PeerDBOpenConnectionsAlertThreshold() uint32 {
return getEnvUint32("PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}
4 changes: 2 additions & 2 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) {
if peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() == 0 {
if peerdbenv.PeerDBAlertingGapMinutesAsDuration() == 0 {
a.logger.WarnContext(ctx, "Alerting disabled via environment variable, returning")
return
}
Expand All @@ -84,7 +84,7 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str
return
}

if time.Since(createdTimestamp) >= peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() {
if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() {
for _, slackAlertSender := range slackAlertSenders {
err = slackAlertSender.sendAlert(context.Background(),
fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage)
Expand Down
2 changes: 1 addition & 1 deletion flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) {
}

func prependUIDToTaskQueueName(taskQueueName string) string {
deploymentUID := peerdbenv.GetPeerDBDeploymentUID()
deploymentUID := peerdbenv.PeerDBDeploymentUID()
if deploymentUID == "" {
return taskQueueName
}
Expand Down

0 comments on commit eaf541b

Please sign in to comment.