Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move configuration params to a central place #818

Merged
merged 5 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -227,8 +228,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)

recordBatch := model.NewCDCRecordStream()

startTime := time.Now()
Expand All @@ -255,7 +254,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: time.Duration(idleTimeout) * time.Second,
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
Expand Down Expand Up @@ -557,6 +556,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

pullCtx, pullCancel := context.WithCancel(ctx)
defer pullCancel()
srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down Expand Up @@ -631,7 +631,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

if rowsSynced == 0 {
pullCancel()
slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId))
} else {
wg.Wait()
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package main
import (
"context"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func (h *FlowRequestHandler) GetVersion(
ctx context.Context,
req *protos.PeerDBVersionRequest,
) (*protos.PeerDBVersionResponse, error) {
version := utils.GetEnvString("PEERDB_VERSION_SHA_SHORT", "unknown")
version := peerdbenv.GetPeerDBVersionShaShort()
return &protos.PeerDBVersionResponse{Version: version}, nil
}
25 changes: 6 additions & 19 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -138,9 +139,7 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

eventHubFlushTimeout :=
time.Duration(utils.GetEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)) *
time.Second
eventHubFlushTimeout := peerdbenv.GetPeerDBEventhubFlushTimeoutSeconds()

ticker := time.NewTicker(eventHubFlushTimeout)
defer ticker.Stop()
Expand Down Expand Up @@ -232,22 +231,10 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
shutdown <- struct{}{}
}()

// if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true
// we kick off processBatch in a goroutine and return immediately.
// otherwise, we block until processBatch is done.
if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) {
go func() {
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
c.logger.Error("[async] failed to process batch", slog.Any("error", err))
}
}()
} else {
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
c.logger.Error("failed to process batch", slog.Any("error", err))
return nil, err
}
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
c.logger.Error("failed to process batch", slog.Any("error", err))
return nil, err
}

lastCheckpoint, err := req.Records.GetLastCheckpoint()
Expand Down
52 changes: 12 additions & 40 deletions flow/connectors/utils/catalog/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,44 @@ package utils
import (
"context"
"fmt"
"os"
"strconv"
"sync"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/jackc/pgx/v5/pgxpool"
)

var poolMutex = &sync.Mutex{}
var pool *pgxpool.Pool

func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) {
var err error

poolMutex.Lock()
defer poolMutex.Unlock()
if pool == nil {
catalogConnectionString, err := genCatalogConnectionString()
if err != nil {
return nil, fmt.Errorf("unable to generate catalog connection string: %w", err)
}

catalogConnectionString := genCatalogConnectionString()
pool, err = pgxpool.New(context.Background(), catalogConnectionString)
if err != nil {
return nil, fmt.Errorf("unable to establish connection with catalog: %w", err)
}
}

err := pool.Ping(context.Background())
err = pool.Ping(context.Background())
if err != nil {
return pool, fmt.Errorf("unable to establish connection with catalog: %w", err)
}

return pool, nil
}

func genCatalogConnectionString() (string, error) {
host, ok := os.LookupEnv("PEERDB_CATALOG_HOST")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_HOST is not set")
}
portStr, ok := os.LookupEnv("PEERDB_CATALOG_PORT")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PORT is not set")
}
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return "", fmt.Errorf("unable to parse PEERDB_CATALOG_PORT as unsigned integer")
}
user, ok := os.LookupEnv("PEERDB_CATALOG_USER")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_USER is not set")
}
password, ok := os.LookupEnv("PEERDB_CATALOG_PASSWORD")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PASSWORD is not set")
}
database, ok := os.LookupEnv("PEERDB_CATALOG_DATABASE")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_DATABASE is not set")
}

func genCatalogConnectionString() string {
return utils.GetPGConnectionString(&protos.PostgresConfig{
Host: host,
Port: uint32(port),
User: user,
Password: password,
Database: database,
}), nil
Host: peerdbenv.GetPeerDBCatalogHost(),
Port: peerdbenv.GetPeerDBCatalogPort(),
User: peerdbenv.GetPeerDBCatalogUser(),
Password: peerdbenv.GetPeerDBCatalogPassword(),
Database: peerdbenv.GetPeerDBCatalogDatabase(),
})
}
12 changes: 3 additions & 9 deletions flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,11 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/cockroachdb/pebble"
)

const (
/** begin with in-memory store, and then switch to Pebble DB
when the number of stored records crosses 100k
**/
defaultNumRecordsSwitchThreshold = 1_00_000
)

func encVal(val any) ([]byte, error) {
buf := new(bytes.Buffer)
enc := gob.NewEncoder(buf)
Expand All @@ -49,7 +43,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore {
numRecords: 0,
flowJobName: flowJobName,
dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, util.RandomString(8)),
numRecordsSwitchThreshold: defaultNumRecordsSwitchThreshold,
numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(),
}
}

Expand Down Expand Up @@ -80,7 +74,7 @@ func (c *cdcRecordsStore) initPebbleDB() error {

func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error {
_, ok := c.inMemoryRecords[key]
if ok || len(c.inMemoryRecords) < defaultNumRecordsSwitchThreshold {
if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold {
c.inMemoryRecords[key] = rec
} else {
if c.pebbleDB == nil {
Expand Down
4 changes: 3 additions & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type NameAndExclude struct {
Expand Down Expand Up @@ -326,8 +327,9 @@ type CDCRecordStream struct {
}

func NewCDCRecordStream() *CDCRecordStream {
channelBuffer := peerdbenv.GetPeerDBCDCChannelBufferSize()
return &CDCRecordStream{
records: make(chan Record, 1<<18),
records: make(chan Record, channelBuffer),
// TODO (kaushik): more than 1024 schema deltas can cause problems!
SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10),
emptySignal: make(chan bool, 1),
Expand Down
66 changes: 66 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package peerdbenv

import (
"time"
)

// This file contains functions to get the values of various peerdb environment
// variables. This will help catalog the environment variables that are used
// throughout the codebase.

// PEERDB_VERSION_SHA_SHORT
func GetPeerDBVersionShaShort() string {
return getEnvString("PEERDB_VERSION_SHA_SHORT", "unknown")
}

// PEERDB_DEPLOYMENT_UID
func GetPeerDBDeploymentUID() string {
return getEnvString("PEERDB_DEPLOYMENT_UID", "")
}

// PEERDB_CDC_CHANNEL_BUFFER_SIZE
func GetPeerDBCDCChannelBufferSize() int {
return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18)
}

// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS
func GetPeerDBEventhubFlushTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)
return time.Duration(x) * time.Second
}

// PEERDB_CDC_IDLE_TIMEOUT_SECONDS
func GetPeerDBCDCIdleTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)
return time.Duration(x) * time.Second
}

// PEERDB_CDC_DISK_SPILL_THRESHOLD
func GetPeerDBCDCDiskSpillThreshold() int {
return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000)
}

// PEERDB_CATALOG_HOST
func GetPeerDBCatalogHost() string {
return getEnvString("PEERDB_CATALOG_HOST", "")
}

// PEERDB_CATALOG_PORT
func GetPeerDBCatalogPort() uint32 {
return getEnvUint32("PEERDB_CATALOG_PORT", 5432)
}

// PEERDB_CATALOG_USER
func GetPeerDBCatalogUser() string {
return getEnvString("PEERDB_CATALOG_USER", "")
}

// PEERDB_CATALOG_PASSWORD
func GetPeerDBCatalogPassword() string {
return getEnvString("PEERDB_CATALOG_PASSWORD", "")
}

// PEERDB_CATALOG_DATABASE
func GetPeerDBCatalogDatabase() string {
return getEnvString("PEERDB_CATALOG_DATABASE", "")
}
32 changes: 16 additions & 16 deletions flow/connectors/utils/env.go → flow/peerdbenv/env.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package peerdbenv

import (
"os"
Expand All @@ -7,49 +7,49 @@ import (

// GetEnv returns the value of the environment variable with the given name
// and a boolean indicating whether the environment variable exists.
func GetEnv(name string) (string, bool) {
func getEnv(name string) (string, bool) {
val, exists := os.LookupEnv(name)
return val, exists
}

// GetEnvBool returns the value of the environment variable with the given name
// GetEnvInt returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set or is not a valid
// boolean value.
func GetEnvBool(name string, defaultValue bool) bool {
val, ok := GetEnv(name)
// integer value.
func getEnvInt(name string, defaultValue int) int {
val, ok := getEnv(name)
if !ok {
return defaultValue
}

b, err := strconv.ParseBool(val)
i, err := strconv.Atoi(val)
if err != nil {
return defaultValue
}

return b
return i
}

// GetEnvInt returns the value of the environment variable with the given name
// getEnvUint32 returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set or is not a valid
// integer value.
func GetEnvInt(name string, defaultValue int) int {
val, ok := GetEnv(name)
// uint32 value.
func getEnvUint32(name string, defaultValue uint32) uint32 {
val, ok := getEnv(name)
if !ok {
return defaultValue
}

i, err := strconv.Atoi(val)
i, err := strconv.ParseUint(val, 10, 32)
if err != nil {
return defaultValue
}

return i
return uint32(i)
}

// GetEnvString returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set.
func GetEnvString(name string, defaultValue string) string {
val, ok := GetEnv(name)
func getEnvString(name string, defaultValue string) string {
val, ok := getEnv(name)
if !ok {
return defaultValue
}
Expand Down
Loading
Loading