Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changing naming convention for getters in peerdbenv #875

Merged
merged 2 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ func (a *FlowableActivity) handleSlotInfo(
}

deploymentUIDPrefix := ""
if peerdbenv.GetPeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID())
if peerdbenv.PeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold()
slotLagInMBThreshold := peerdbenv.PeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
Expand All @@ -194,7 +194,7 @@ cc: <!channel>`,
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold()
maxOpenConnectionsThreshold := peerdbenv.PeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
Expand Down Expand Up @@ -294,7 +294,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ func (h *FlowRequestHandler) GetVersion(
ctx context.Context,
req *protos.PeerDBVersionRequest,
) (*protos.PeerDBVersionResponse, error) {
version := peerdbenv.GetPeerDBVersionShaShort()
version := peerdbenv.PeerDBVersionShaShort()
return &protos.PeerDBVersionResponse{Version: version}, nil
}
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

eventHubFlushTimeout := peerdbenv.GetPeerDBEventhubFlushTimeoutSeconds()
eventHubFlushTimeout := peerdbenv.PeerDBEventhubFlushTimeoutSeconds()

ticker := time.NewTicker(eventHubFlushTimeout)
defer ticker.Stop()
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/utils/catalog/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) {

func genCatalogConnectionString() string {
return utils.GetPGConnectionString(&protos.PostgresConfig{
Host: peerdbenv.GetPeerDBCatalogHost(),
Port: peerdbenv.GetPeerDBCatalogPort(),
User: peerdbenv.GetPeerDBCatalogUser(),
Password: peerdbenv.GetPeerDBCatalogPassword(),
Database: peerdbenv.GetPeerDBCatalogDatabase(),
Host: peerdbenv.PeerDBCatalogHost(),
Port: peerdbenv.PeerDBCatalogPort(),
User: peerdbenv.PeerDBCatalogUser(),
Password: peerdbenv.PeerDBCatalogPassword(),
Database: peerdbenv.PeerDBCatalogDatabase(),
})
}
2 changes: 1 addition & 1 deletion flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore {
numRecords: 0,
flowJobName: flowJobName,
dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)),
numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(),
numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillThreshold(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ type CDCRecordStream struct {
}

func NewCDCRecordStream() *CDCRecordStream {
channelBuffer := peerdbenv.GetPeerDBCDCChannelBufferSize()
channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize()
return &CDCRecordStream{
records: make(chan Record, channelBuffer),
// TODO (kaushik): more than 1024 schema deltas can cause problems!
Expand Down
28 changes: 14 additions & 14 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,74 +9,74 @@ import (
// throughout the codebase.

// PEERDB_VERSION_SHA_SHORT
func GetPeerDBVersionShaShort() string {
func PeerDBVersionShaShort() string {
return getEnvString("PEERDB_VERSION_SHA_SHORT", "unknown")
}

// PEERDB_DEPLOYMENT_UID
func GetPeerDBDeploymentUID() string {
func PeerDBDeploymentUID() string {
return getEnvString("PEERDB_DEPLOYMENT_UID", "")
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func GetPeerDBCDCChannelBufferSize() int {
func PeerDBCDCChannelBufferSize() int {
return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func GetPeerDBEventhubFlushTimeoutSeconds() time.Duration {
func PeerDBEventhubFlushTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}

// PEERDB_CDC_IDLE_TIMEOUT_SECONDS
func GetPeerDBCDCIdleTimeoutSeconds() time.Duration {
func PeerDBCDCIdleTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)
return time.Duration(x) * time.Second
}

// PEERDB_CDC_DISK_SPILL_THRESHOLD
func GetPeerDBCDCDiskSpillThreshold() int {
func PeerDBCDCDiskSpillThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000)
}

// PEERDB_CATALOG_HOST
func GetPeerDBCatalogHost() string {
func PeerDBCatalogHost() string {
return getEnvString("PEERDB_CATALOG_HOST", "")
}

// PEERDB_CATALOG_PORT
func GetPeerDBCatalogPort() uint32 {
func PeerDBCatalogPort() uint32 {
return getEnvUint32("PEERDB_CATALOG_PORT", 5432)
}

// PEERDB_CATALOG_USER
func GetPeerDBCatalogUser() string {
func PeerDBCatalogUser() string {
return getEnvString("PEERDB_CATALOG_USER", "")
}

// PEERDB_CATALOG_PASSWORD
func GetPeerDBCatalogPassword() string {
func PeerDBCatalogPassword() string {
return getEnvString("PEERDB_CATALOG_PASSWORD", "")
}

// PEERDB_CATALOG_DATABASE
func GetPeerDBCatalogDatabase() string {
func PeerDBCatalogDatabase() string {
return getEnvString("PEERDB_CATALOG_DATABASE", "")
}

// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely
func GetPeerDBSlotLagMBAlertThreshold() uint32 {
func PeerDBSlotLagMBAlertThreshold() uint32 {
return getEnvUint32("PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000)
}

// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely
func GetPeerDBAlertingGapMinutesAsDuration() time.Duration {
func PeerDBAlertingGapMinutesAsDuration() time.Duration {
why := time.Duration(getEnvUint32("PEERDB_ALERTING_GAP_MINUTES", 15))
return why * time.Minute
}

// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely
func GetPeerDBOpenConnectionsAlertThreshold() uint32 {
func PeerDBOpenConnectionsAlertThreshold() uint32 {
return getEnvUint32("PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}
4 changes: 2 additions & 2 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
// Only raises an alert if another alert with the same key hasn't been raised
// in the past X minutes, where X is configurable and defaults to 15 minutes
func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage string) {
if peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() == 0 {
if peerdbenv.PeerDBAlertingGapMinutesAsDuration() == 0 {
a.logger.WarnContext(ctx, "Alerting disabled via environment variable, returning")
return
}
Expand All @@ -84,7 +84,7 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str
return
}

if time.Since(createdTimestamp) >= peerdbenv.GetPeerDBAlertingGapMinutesAsDuration() {
if time.Since(createdTimestamp) >= peerdbenv.PeerDBAlertingGapMinutesAsDuration() {
for _, slackAlertSender := range slackAlertSenders {
err = slackAlertSender.sendAlert(context.Background(),
fmt.Sprintf(":rotating_light:Alert:rotating_light:: %s", alertKey), alertMessage)
Expand Down
2 changes: 1 addition & 1 deletion flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) {
}

func prependUIDToTaskQueueName(taskQueueName string) string {
deploymentUID := peerdbenv.GetPeerDBDeploymentUID()
deploymentUID := peerdbenv.PeerDBDeploymentUID()
if deploymentUID == "" {
return taskQueueName
}
Expand Down
Loading