From 90c1b0c05a77fad7483a9d7fc543b83a388865de Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 13 Dec 2023 15:13:04 -0500 Subject: [PATCH] Move configuration params to a central place (#818) --- flow/activities/flowable.go | 7 +- flow/cmd/version.go | 4 +- flow/connectors/eventhub/eventhub.go | 25 ++----- flow/connectors/utils/catalog/env.go | 52 ++++----------- .../utils/cdc_records/cdc_records_storage.go | 12 +--- flow/model/model.go | 4 +- flow/peerdbenv/config.go | 66 +++++++++++++++++++ flow/{connectors/utils => peerdbenv}/env.go | 32 ++++----- flow/shared/constants.go | 4 +- flow/shared/signals.go | 26 ++++++++ flow/utils/signals.go | 27 -------- flow/workflows/cdc_flow.go | 5 +- flow/workflows/qrep_flow.go | 5 +- flow/workflows/xmin_flow.go | 5 +- 14 files changed, 145 insertions(+), 129 deletions(-) create mode 100644 flow/peerdbenv/config.go rename flow/{connectors/utils => peerdbenv}/env.go (65%) create mode 100644 flow/shared/signals.go delete mode 100644 flow/utils/signals.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index bbcea455c8..d29981f703 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -17,6 +17,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" @@ -227,8 +228,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60) - recordBatch := model.NewCDCRecordStream() startTime := time.Now() @@ -255,7 +254,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, TableNameMapping: tblNameMapping, LastSyncState: input.LastSyncState, MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), - IdleTimeout: time.Duration(idleTimeout) * time.Second, + IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(), TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, @@ -557,6 +556,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } pullCtx, pullCancel := context.WithCancel(ctx) + defer pullCancel() srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer) if err != nil { return fmt.Errorf("failed to get qrep source connector: %w", err) @@ -631,7 +631,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } if rowsSynced == 0 { - pullCancel() slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId)) } else { wg.Wait() diff --git a/flow/cmd/version.go b/flow/cmd/version.go index 58074fa75c..94210ea9cb 100644 --- a/flow/cmd/version.go +++ b/flow/cmd/version.go @@ -3,14 +3,14 @@ package main import ( "context" - "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" ) func (h *FlowRequestHandler) GetVersion( ctx context.Context, req *protos.PeerDBVersionRequest, ) (*protos.PeerDBVersionResponse, error) { - version := utils.GetEnvString("PEERDB_VERSION_SHA_SHORT", "unknown") + version := peerdbenv.GetPeerDBVersionShaShort() return &protos.PeerDBVersionResponse{Version: version}, nil } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index ee9d9c8012..11679e1dab 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -138,9 +139,7 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) - eventHubFlushTimeout := - time.Duration(utils.GetEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10)) * - time.Second + eventHubFlushTimeout := peerdbenv.GetPeerDBEventhubFlushTimeoutSeconds() ticker := time.NewTicker(eventHubFlushTimeout) defer ticker.Stop() @@ -232,22 +231,10 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S shutdown <- struct{}{} }() - // if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true - // we kick off processBatch in a goroutine and return immediately. - // otherwise, we block until processBatch is done. - if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) { - go func() { - numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) - if err != nil { - c.logger.Error("[async] failed to process batch", slog.Any("error", err)) - } - }() - } else { - numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) - if err != nil { - c.logger.Error("failed to process batch", slog.Any("error", err)) - return nil, err - } + numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) + if err != nil { + c.logger.Error("failed to process batch", slog.Any("error", err)) + return nil, err } lastCheckpoint, err := req.Records.GetLastCheckpoint() diff --git a/flow/connectors/utils/catalog/env.go b/flow/connectors/utils/catalog/env.go index 0563f9201b..ec99905092 100644 --- a/flow/connectors/utils/catalog/env.go +++ b/flow/connectors/utils/catalog/env.go @@ -3,12 +3,11 @@ package utils import ( "context" "fmt" - "os" - "strconv" "sync" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/jackc/pgx/v5/pgxpool" ) @@ -16,21 +15,19 @@ var poolMutex = &sync.Mutex{} var pool *pgxpool.Pool func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) { + var err error + poolMutex.Lock() defer poolMutex.Unlock() if pool == nil { - catalogConnectionString, err := genCatalogConnectionString() - if err != nil { - return nil, fmt.Errorf("unable to generate catalog connection string: %w", err) - } - + catalogConnectionString := genCatalogConnectionString() pool, err = pgxpool.New(context.Background(), catalogConnectionString) if err != nil { return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) } } - err := pool.Ping(context.Background()) + err = pool.Ping(context.Background()) if err != nil { return pool, fmt.Errorf("unable to establish connection with catalog: %w", err) } @@ -38,37 +35,12 @@ func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) { return pool, nil } -func genCatalogConnectionString() (string, error) { - host, ok := os.LookupEnv("PEERDB_CATALOG_HOST") - if !ok { - return "", fmt.Errorf("PEERDB_CATALOG_HOST is not set") - } - portStr, ok := os.LookupEnv("PEERDB_CATALOG_PORT") - if !ok { - return "", fmt.Errorf("PEERDB_CATALOG_PORT is not set") - } - port, err := strconv.ParseUint(portStr, 10, 32) - if err != nil { - return "", fmt.Errorf("unable to parse PEERDB_CATALOG_PORT as unsigned integer") - } - user, ok := os.LookupEnv("PEERDB_CATALOG_USER") - if !ok { - return "", fmt.Errorf("PEERDB_CATALOG_USER is not set") - } - password, ok := os.LookupEnv("PEERDB_CATALOG_PASSWORD") - if !ok { - return "", fmt.Errorf("PEERDB_CATALOG_PASSWORD is not set") - } - database, ok := os.LookupEnv("PEERDB_CATALOG_DATABASE") - if !ok { - return "", fmt.Errorf("PEERDB_CATALOG_DATABASE is not set") - } - +func genCatalogConnectionString() string { return utils.GetPGConnectionString(&protos.PostgresConfig{ - Host: host, - Port: uint32(port), - User: user, - Password: password, - Database: database, - }), nil + Host: peerdbenv.GetPeerDBCatalogHost(), + Port: peerdbenv.GetPeerDBCatalogPort(), + User: peerdbenv.GetPeerDBCatalogUser(), + Password: peerdbenv.GetPeerDBCatalogPassword(), + Database: peerdbenv.GetPeerDBCatalogDatabase(), + }) } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 7b5dd05ef4..5c6fba4d3e 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -12,17 +12,11 @@ import ( "time" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" util "github.com/PeerDB-io/peer-flow/utils" "github.com/cockroachdb/pebble" ) -const ( - /** begin with in-memory store, and then switch to Pebble DB - when the number of stored records crosses 100k - **/ - defaultNumRecordsSwitchThreshold = 1_00_000 -) - func encVal(val any) ([]byte, error) { buf := new(bytes.Buffer) enc := gob.NewEncoder(buf) @@ -49,7 +43,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore { numRecords: 0, flowJobName: flowJobName, dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, util.RandomString(8)), - numRecordsSwitchThreshold: defaultNumRecordsSwitchThreshold, + numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(), } } @@ -80,7 +74,7 @@ func (c *cdcRecordsStore) initPebbleDB() error { func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error { _, ok := c.inMemoryRecords[key] - if ok || len(c.inMemoryRecords) < defaultNumRecordsSwitchThreshold { + if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold { c.inMemoryRecords[key] = rec } else { if c.pebbleDB == nil { diff --git a/flow/model/model.go b/flow/model/model.go index 511430f7ce..7f97fb3ae4 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -11,6 +11,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/peerdbenv" ) type NameAndExclude struct { @@ -326,8 +327,9 @@ type CDCRecordStream struct { } func NewCDCRecordStream() *CDCRecordStream { + channelBuffer := peerdbenv.GetPeerDBCDCChannelBufferSize() return &CDCRecordStream{ - records: make(chan Record, 1<<18), + records: make(chan Record, channelBuffer), // TODO (kaushik): more than 1024 schema deltas can cause problems! SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10), emptySignal: make(chan bool, 1), diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go new file mode 100644 index 0000000000..cf0e51c21a --- /dev/null +++ b/flow/peerdbenv/config.go @@ -0,0 +1,66 @@ +package peerdbenv + +import ( + "time" +) + +// This file contains functions to get the values of various peerdb environment +// variables. This will help catalog the environment variables that are used +// throughout the codebase. + +// PEERDB_VERSION_SHA_SHORT +func GetPeerDBVersionShaShort() string { + return getEnvString("PEERDB_VERSION_SHA_SHORT", "unknown") +} + +// PEERDB_DEPLOYMENT_UID +func GetPeerDBDeploymentUID() string { + return getEnvString("PEERDB_DEPLOYMENT_UID", "") +} + +// PEERDB_CDC_CHANNEL_BUFFER_SIZE +func GetPeerDBCDCChannelBufferSize() int { + return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) +} + +// PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS +func GetPeerDBEventhubFlushTimeoutSeconds() time.Duration { + x := getEnvInt("PEERDB_EVENTHUB_FLUSH_TIMEOUT_SECONDS", 10) + return time.Duration(x) * time.Second +} + +// PEERDB_CDC_IDLE_TIMEOUT_SECONDS +func GetPeerDBCDCIdleTimeoutSeconds() time.Duration { + x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60) + return time.Duration(x) * time.Second +} + +// PEERDB_CDC_DISK_SPILL_THRESHOLD +func GetPeerDBCDCDiskSpillThreshold() int { + return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000) +} + +// PEERDB_CATALOG_HOST +func GetPeerDBCatalogHost() string { + return getEnvString("PEERDB_CATALOG_HOST", "") +} + +// PEERDB_CATALOG_PORT +func GetPeerDBCatalogPort() uint32 { + return getEnvUint32("PEERDB_CATALOG_PORT", 5432) +} + +// PEERDB_CATALOG_USER +func GetPeerDBCatalogUser() string { + return getEnvString("PEERDB_CATALOG_USER", "") +} + +// PEERDB_CATALOG_PASSWORD +func GetPeerDBCatalogPassword() string { + return getEnvString("PEERDB_CATALOG_PASSWORD", "") +} + +// PEERDB_CATALOG_DATABASE +func GetPeerDBCatalogDatabase() string { + return getEnvString("PEERDB_CATALOG_DATABASE", "") +} diff --git a/flow/connectors/utils/env.go b/flow/peerdbenv/env.go similarity index 65% rename from flow/connectors/utils/env.go rename to flow/peerdbenv/env.go index 6d3065dae4..11e363d1eb 100644 --- a/flow/connectors/utils/env.go +++ b/flow/peerdbenv/env.go @@ -1,4 +1,4 @@ -package utils +package peerdbenv import ( "os" @@ -7,49 +7,49 @@ import ( // GetEnv returns the value of the environment variable with the given name // and a boolean indicating whether the environment variable exists. -func GetEnv(name string) (string, bool) { +func getEnv(name string) (string, bool) { val, exists := os.LookupEnv(name) return val, exists } -// GetEnvBool returns the value of the environment variable with the given name +// GetEnvInt returns the value of the environment variable with the given name // or defaultValue if the environment variable is not set or is not a valid -// boolean value. -func GetEnvBool(name string, defaultValue bool) bool { - val, ok := GetEnv(name) +// integer value. +func getEnvInt(name string, defaultValue int) int { + val, ok := getEnv(name) if !ok { return defaultValue } - b, err := strconv.ParseBool(val) + i, err := strconv.Atoi(val) if err != nil { return defaultValue } - return b + return i } -// GetEnvInt returns the value of the environment variable with the given name +// getEnvUint32 returns the value of the environment variable with the given name // or defaultValue if the environment variable is not set or is not a valid -// integer value. -func GetEnvInt(name string, defaultValue int) int { - val, ok := GetEnv(name) +// uint32 value. +func getEnvUint32(name string, defaultValue uint32) uint32 { + val, ok := getEnv(name) if !ok { return defaultValue } - i, err := strconv.Atoi(val) + i, err := strconv.ParseUint(val, 10, 32) if err != nil { return defaultValue } - return i + return uint32(i) } // GetEnvString returns the value of the environment variable with the given name // or defaultValue if the environment variable is not set. -func GetEnvString(name string, defaultValue string) string { - val, ok := GetEnv(name) +func getEnvString(name string, defaultValue string) string { + val, ok := getEnv(name) if !ok { return defaultValue } diff --git a/flow/shared/constants.go b/flow/shared/constants.go index f9c21e7eb5..e49de60189 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -3,7 +3,7 @@ package shared import ( "fmt" - "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/peerdbenv" ) const ( @@ -50,7 +50,7 @@ func GetPeerFlowTaskQueueName(taskQueueID TaskQueueID) (string, error) { } func prependUIDToTaskQueueName(taskQueueName string) string { - deploymentUID := utils.GetEnvString("PEERDB_DEPLOYMENT_UID", "") + deploymentUID := peerdbenv.GetPeerDBDeploymentUID() if deploymentUID == "" { return taskQueueName } diff --git a/flow/shared/signals.go b/flow/shared/signals.go new file mode 100644 index 0000000000..91e6303e90 --- /dev/null +++ b/flow/shared/signals.go @@ -0,0 +1,26 @@ +package shared + +import ( + "go.temporal.io/sdk/log" +) + +func FlowSignalHandler(activeSignal CDCFlowSignal, + v CDCFlowSignal, logger log.Logger) CDCFlowSignal { + if v == ShutdownSignal { + logger.Info("received shutdown signal") + return v + } else if v == PauseSignal { + logger.Info("received pause signal") + if activeSignal == NoopSignal { + logger.Info("workflow was running, pausing it") + return v + } + } else if v == NoopSignal { + logger.Info("received resume signal") + if activeSignal == PauseSignal { + logger.Info("workflow was paused, resuming it") + return v + } + } + return activeSignal +} diff --git a/flow/utils/signals.go b/flow/utils/signals.go deleted file mode 100644 index 94eddd6289..0000000000 --- a/flow/utils/signals.go +++ /dev/null @@ -1,27 +0,0 @@ -package util - -import ( - "github.com/PeerDB-io/peer-flow/shared" - "go.temporal.io/sdk/log" -) - -func FlowSignalHandler(activeSignal shared.CDCFlowSignal, - v shared.CDCFlowSignal, logger log.Logger) shared.CDCFlowSignal { - if v == shared.ShutdownSignal { - logger.Info("received shutdown signal") - return v - } else if v == shared.PauseSignal { - logger.Info("received pause signal") - if activeSignal == shared.NoopSignal { - logger.Info("workflow was running, pausing it") - return v - } - } else if v == shared.NoopSignal { - logger.Info("received resume signal") - if activeSignal == shared.PauseSignal { - logger.Info("workflow was paused, resuming it") - return v - } - } - return activeSignal -} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index c9c912c831..4e0f760223 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -8,7 +8,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" - util "github.com/PeerDB-io/peer-flow/utils" "github.com/google/uuid" "github.com/hashicorp/go-multierror" "go.temporal.io/api/enums/v1" @@ -144,7 +143,7 @@ func (w *CDCFlowWorkflowExecution) receiveAndHandleSignalAsync(ctx workflow.Cont var signalVal shared.CDCFlowSignal ok := signalChan.ReceiveAsync(&signalVal) if ok { - state.ActiveSignal = util.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) + state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) } } @@ -318,7 +317,7 @@ func CDCFlowWorkflowWithConfig( // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { - state.ActiveSignal = util.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) + state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) } } } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 548feb2398..5f267079e1 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -8,7 +8,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" - util "github.com/PeerDB-io/peer-flow/utils" "github.com/google/uuid" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" @@ -365,7 +364,7 @@ func (q *QRepFlowExecution) receiveAndHandleSignalAsync(ctx workflow.Context) { var signalVal shared.CDCFlowSignal ok := signalChan.ReceiveAsync(&signalVal) if ok { - q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, q.logger) } } @@ -483,7 +482,7 @@ func QRepFlowWorkflow( // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { - q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, q.logger) } } } diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 2a6a1fac9c..0f4857b339 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -8,7 +8,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" - util "github.com/PeerDB-io/peer-flow/utils" "github.com/google/uuid" "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" @@ -149,7 +148,7 @@ func (q *XminFlowExecution) receiveAndHandleSignalAsync(ctx workflow.Context) { var signalVal shared.CDCFlowSignal ok := signalChan.ReceiveAsync(&signalVal) if ok { - q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, q.logger) } } @@ -267,7 +266,7 @@ func XminFlowWorkflow( // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { - q.activeSignal = util.FlowSignalHandler(q.activeSignal, signalVal, q.logger) + q.activeSignal = shared.FlowSignalHandler(q.activeSignal, signalVal, q.logger) } } }