Skip to content

Commit

Permalink
Move configuration params to a central place (#818)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Dec 13, 2023
1 parent 9cb34e5 commit 90c1b0c
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 129 deletions.
7 changes: 3 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -227,8 +228,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)

recordBatch := model.NewCDCRecordStream()

startTime := time.Now()
Expand All @@ -255,7 +254,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: time.Duration(idleTimeout) * time.Second,
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
Expand Down Expand Up @@ -557,6 +556,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

pullCtx, pullCancel := context.WithCancel(ctx)
defer pullCancel()
srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down Expand Up @@ -631,7 +631,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

if rowsSynced == 0 {
pullCancel()
slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId))
} else {
wg.Wait()
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package main
import (
"context"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func (h *FlowRequestHandler) GetVersion(
ctx context.Context,
req *protos.PeerDBVersionRequest,
) (*protos.PeerDBVersionResponse, error) {
version := utils.GetEnvString("PEERDB_VERSION_SHA_SHORT", "unknown")
version := peerdbenv.GetPeerDBVersionShaShort()
return &protos.PeerDBVersionResponse{Version: version}, nil
}
25 changes: 6 additions & 19 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -138,9 +139,7 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

eventHubFlushTimeout :=
time.Duration(utils.GetEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)) *
time.Second
eventHubFlushTimeout := peerdbenv.GetPeerDBEventhubFlushTimeoutSeconds()

ticker := time.NewTicker(eventHubFlushTimeout)
defer ticker.Stop()
Expand Down Expand Up @@ -232,22 +231,10 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
shutdown <- struct{}{}
}()

// if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true
// we kick off processBatch in a goroutine and return immediately.
// otherwise, we block until processBatch is done.
if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) {
go func() {
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
c.logger.Error("[async] failed to process batch", slog.Any("error", err))
}
}()
} else {
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
c.logger.Error("failed to process batch", slog.Any("error", err))
return nil, err
}
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
c.logger.Error("failed to process batch", slog.Any("error", err))
return nil, err
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
Expand Down
52 changes: 12 additions & 40 deletions flow/connectors/utils/catalog/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,44 @@ package utils
import (
"context"
"fmt"
"os"
"strconv"
"sync"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/jackc/pgx/v5/pgxpool"
)

var poolMutex = &sync.Mutex{}
var pool *pgxpool.Pool

func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) {
var err error

poolMutex.Lock()
defer poolMutex.Unlock()
if pool == nil {
catalogConnectionString, err := genCatalogConnectionString()
if err != nil {
return nil, fmt.Errorf("unable to generate catalog connection string: %w", err)
}

catalogConnectionString := genCatalogConnectionString()
pool, err = pgxpool.New(context.Background(), catalogConnectionString)
if err != nil {
return nil, fmt.Errorf("unable to establish connection with catalog: %w", err)
}
}

err := pool.Ping(context.Background())
err = pool.Ping(context.Background())
if err != nil {
return pool, fmt.Errorf("unable to establish connection with catalog: %w", err)
}

return pool, nil
}

func genCatalogConnectionString() (string, error) {
host, ok := os.LookupEnv("PEERDB_CATALOG_HOST")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_HOST is not set")
}
portStr, ok := os.LookupEnv("PEERDB_CATALOG_PORT")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PORT is not set")
}
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return "", fmt.Errorf("unable to parse PEERDB_CATALOG_PORT as unsigned integer")
}
user, ok := os.LookupEnv("PEERDB_CATALOG_USER")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_USER is not set")
}
password, ok := os.LookupEnv("PEERDB_CATALOG_PASSWORD")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PASSWORD is not set")
}
database, ok := os.LookupEnv("PEERDB_CATALOG_DATABASE")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_DATABASE is not set")
}

func genCatalogConnectionString() string {
return utils.GetPGConnectionString(&protos.PostgresConfig{
Host: host,
Port: uint32(port),
User: user,
Password: password,
Database: database,
}), nil
Host: peerdbenv.GetPeerDBCatalogHost(),
Port: peerdbenv.GetPeerDBCatalogPort(),
User: peerdbenv.GetPeerDBCatalogUser(),
Password: peerdbenv.GetPeerDBCatalogPassword(),
Database: peerdbenv.GetPeerDBCatalogDatabase(),
})
}
12 changes: 3 additions & 9 deletions flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,11 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/cockroachdb/pebble"
)

const (
/** begin with in-memory store, and then switch to Pebble DB
when the number of stored records crosses 100k
**/
defaultNumRecordsSwitchThreshold = 1_00_000
)

func encVal(val any) ([]byte, error) {
buf := new(bytes.Buffer)
enc := gob.NewEncoder(buf)
Expand All @@ -49,7 +43,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore {
numRecords: 0,
flowJobName: flowJobName,
dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, util.RandomString(8)),
numRecordsSwitchThreshold: defaultNumRecordsSwitchThreshold,
numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(),
}
}

Expand Down Expand Up @@ -80,7 +74,7 @@ func (c *cdcRecordsStore) initPebbleDB() error {

func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error {
_, ok := c.inMemoryRecords[key]
if ok || len(c.inMemoryRecords) < defaultNumRecordsSwitchThreshold {
if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold {
c.inMemoryRecords[key] = rec
} else {
if c.pebbleDB == nil {
Expand Down
4 changes: 3 additions & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type NameAndExclude struct {
Expand Down Expand Up @@ -326,8 +327,9 @@ type CDCRecordStream struct {
}

func NewCDCRecordStream() *CDCRecordStream {
channelBuffer := peerdbenv.GetPeerDBCDCChannelBufferSize()
return &CDCRecordStream{
records: make(chan Record, 1<<18),
records: make(chan Record, channelBuffer),
// TODO (kaushik): more than 1024 schema deltas can cause problems!
SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10),
emptySignal: make(chan bool, 1),
Expand Down
66 changes: 66 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package peerdbenv

import (
"time"
)

// This file contains functions to get the values of various peerdb environment
// variables. This will help catalog the environment variables that are used
// throughout the codebase.

// PEERDB_VERSION_SHA_SHORT
func GetPeerDBVersionShaShort() string {
return getEnvString("PEERDB_VERSION_SHA_SHORT", "unknown")
}

// PEERDB_DEPLOYMENT_UID
func GetPeerDBDeploymentUID() string {
return getEnvString("PEERDB_DEPLOYMENT_UID", "")
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func GetPeerDBCDCChannelBufferSize() int {
return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func GetPeerDBEventhubFlushTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}

// PEERDB_CDC_IDLE_TIMEOUT_SECONDS
func GetPeerDBCDCIdleTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)
return time.Duration(x) * time.Second
}

// PEERDB_CDC_DISK_SPILL_THRESHOLD
func GetPeerDBCDCDiskSpillThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000)
}

// PEERDB_CATALOG_HOST
func GetPeerDBCatalogHost() string {
return getEnvString("PEERDB_CATALOG_HOST", "")
}

// PEERDB_CATALOG_PORT
func GetPeerDBCatalogPort() uint32 {
return getEnvUint32("PEERDB_CATALOG_PORT", 5432)
}

// PEERDB_CATALOG_USER
func GetPeerDBCatalogUser() string {
return getEnvString("PEERDB_CATALOG_USER", "")
}

// PEERDB_CATALOG_PASSWORD
func GetPeerDBCatalogPassword() string {
return getEnvString("PEERDB_CATALOG_PASSWORD", "")
}

// PEERDB_CATALOG_DATABASE
func GetPeerDBCatalogDatabase() string {
return getEnvString("PEERDB_CATALOG_DATABASE", "")
}
32 changes: 16 additions & 16 deletions flow/connectors/utils/env.go → flow/peerdbenv/env.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package peerdbenv

import (
"os"
Expand All @@ -7,49 +7,49 @@ import (

// GetEnv returns the value of the environment variable with the given name
// and a boolean indicating whether the environment variable exists.
func GetEnv(name string) (string, bool) {
func getEnv(name string) (string, bool) {
val, exists := os.LookupEnv(name)
return val, exists
}

// GetEnvBool returns the value of the environment variable with the given name
// GetEnvInt returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set or is not a valid
// boolean value.
func GetEnvBool(name string, defaultValue bool) bool {
val, ok := GetEnv(name)
// integer value.
func getEnvInt(name string, defaultValue int) int {
val, ok := getEnv(name)
if !ok {
return defaultValue
}

b, err := strconv.ParseBool(val)
i, err := strconv.Atoi(val)
if err != nil {
return defaultValue
}

return b
return i
}

// GetEnvInt returns the value of the environment variable with the given name
// getEnvUint32 returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set or is not a valid
// integer value.
func GetEnvInt(name string, defaultValue int) int {
val, ok := GetEnv(name)
// uint32 value.
func getEnvUint32(name string, defaultValue uint32) uint32 {
val, ok := getEnv(name)
if !ok {
return defaultValue
}

i, err := strconv.Atoi(val)
i, err := strconv.ParseUint(val, 10, 32)
if err != nil {
return defaultValue
}

return i
return uint32(i)
}

// GetEnvString returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set.
func GetEnvString(name string, defaultValue string) string {
val, ok := GetEnv(name)
func getEnvString(name string, defaultValue string) string {
val, ok := getEnv(name)
if !ok {
return defaultValue
}
Expand Down
Loading

0 comments on commit 90c1b0c

Please sign in to comment.