Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid having multiple catalog connection pools #793

Merged
merged 7 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
matrix:
runner: [ubicloud-standard-8-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
timeout-minutes: 40
timeout-minutes: 30
services:
pg_cdc:
image: imresamu/postgis:15-3.4-alpine
Expand Down Expand Up @@ -80,13 +80,14 @@ jobs:
name: "gcs_creds.json"
json: ${{ secrets.GCS_CREDS }}

- name: create hstore extension and increase logical replication limits
- name: create hstore extension, increase logical replication limits, and setup catalog database
run: >
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
-c "ALTER SYSTEM SET wal_level=logical;"
-c "ALTER SYSTEM SET max_replication_slots=192;"
-c "ALTER SYSTEM SET max_wal_senders=256;"
-c "ALTER SYSTEM SET max_connections=2048;" &&
(cat ../nexus/catalog/migrations/V{?,??}__* | docker exec -i pg_cdc psql -h localhost -p 5432 -U postgres) &&
docker restart pg_cdc
working-directory: ./flow
env:
Expand Down
80 changes: 42 additions & 38 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
Expand All @@ -40,7 +40,7 @@ type SlotSnapshotSignal struct {
}

type FlowableActivity struct {
CatalogMirrorMonitor *monitoring.CatalogMirrorMonitor
CatalogPool *pgxpool.Pool
}

// CheckConnection implements CheckConnection.
Expand Down Expand Up @@ -114,7 +114,7 @@ func (a *FlowableActivity) CreateRawTable(
ctx context.Context,
config *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -125,7 +125,7 @@ func (a *FlowableActivity) CreateRawTable(
if err != nil {
return nil, err
}
err = a.CatalogMirrorMonitor.InitializeCDCFlow(ctx, config.FlowJobName)
err = monitoring.InitializeCDCFlow(ctx, a.CatalogPool, config.FlowJobName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func (a *FlowableActivity) handleSlotInfo(
}

if len(slotInfo) != 0 {
return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0])
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
return nil
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool)
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand Down Expand Up @@ -276,13 +276,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("the current sync flow has records: %v", hasRecords)

if a.CatalogMirrorMonitor.IsActive() && hasRecords {
if a.CatalogPool != nil && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}

err = a.CatalogMirrorMonitor.AddCDCBatchForFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: 0,
Expand Down Expand Up @@ -347,8 +347,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = a.CatalogMirrorMonitor.UpdateNumRowsAndEndLSNForCDCBatch(
err = monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
res.CurrentSyncBatchID,
uint32(numRecords),
Expand All @@ -358,13 +359,17 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, err
}

err = a.CatalogMirrorMonitor.
UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName, pglogrepl.LSN(lastCheckpoint))
err = monitoring.UpdateLatestLSNAtTargetForCDCFlow(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
pglogrepl.LSN(lastCheckpoint),
)
if err != nil {
return nil, err
}
if res.TableNameRowsMapping != nil {
err = a.CatalogMirrorMonitor.AddCDCBatchTablesForFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.AddCDCBatchTablesForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
res.CurrentSyncBatchID, res.TableNameRowsMapping)
if err != nil {
return nil, err
Expand Down Expand Up @@ -401,7 +406,7 @@ func (a *FlowableActivity) StartNormalize(
return nil, fmt.Errorf("failed to get last sync batch ID: %v", err)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
lastSyncBatchID)
return nil, err
} else if err != nil {
Expand Down Expand Up @@ -434,8 +439,12 @@ func (a *FlowableActivity) StartNormalize(

// normalize flow did not run due to no records, no need to update end time.
if res.Done {
err = a.CatalogMirrorMonitor.UpdateEndTimeForCDCBatch(ctx, input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID)
err = monitoring.UpdateEndTimeForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -500,8 +509,9 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
}
if len(partitions) > 0 {
err = a.CatalogMirrorMonitor.InitializeQRepRun(
err = monitoring.InitializeQRepRun(
ctx,
a.CatalogPool,
config,
runUUID,
partitions,
Expand All @@ -522,7 +532,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForQRepRun(ctx, runUUID)
err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID)
if err != nil {
return fmt.Errorf("failed to update start time for qrep run: %w", err)
}
Expand All @@ -549,7 +559,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, time.Now())
err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now())
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}
Expand Down Expand Up @@ -587,7 +597,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}).Errorf("failed to pull records: %v", err)
goroutineErr = err
} else {
err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
log.Errorf("%v", err)
goroutineErr = err
Expand All @@ -607,7 +617,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
"flowName": config.FlowJobName,
}).Infof("pulled %d records\n", len(recordBatch.Records))

err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(ctx, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
return err
}
Expand Down Expand Up @@ -641,7 +651,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return goroutineErr
}

err := a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return err
}
Expand All @@ -651,7 +661,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}).Infof("pushed %d records\n", rowsSynced)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
if err != nil {
return err
}
Expand All @@ -663,7 +673,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
runUUID string) error {
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
} else if err != nil {
return err
}
Expand All @@ -681,7 +691,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
return err
}

return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
}

func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error {
Expand Down Expand Up @@ -719,14 +729,8 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
return nil
}

func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv()
if catalogErr != nil {
return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr)
}
defer catalogPool.Close()

optionRows, err := catalogPool.Query(ctx, `
func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
optionRows, err := a.CatalogPool.Query(ctx, `
SELECT DISTINCT p.name, p.options
FROM peers p
JOIN flows f ON p.id = f.source_peer
Expand Down Expand Up @@ -768,7 +772,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
log.Info("context is done, exiting wal heartbeat send loop")
return nil
case <-ticker.C:
pgPeers, err := getPostgresPeerConfigs(ctx)
pgPeers, err := a.getPostgresPeerConfigs(ctx)
if err != nil {
log.Warn("[sendwalheartbeat]: warning: unable to fetch peers." +
"Skipping walheartbeat send. error encountered: " + err.Error())
Expand Down Expand Up @@ -950,17 +954,17 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}},
}
}
updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}

err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, startTime)
err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords))
err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
log.Errorf("%v", err)
return err
Expand Down Expand Up @@ -992,7 +996,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return 0, err
}

err = a.CatalogMirrorMonitor.UpdateRowsSyncedForPartition(ctx, rowsSynced, runUUID, partition)
err = monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return 0, err
}
Expand All @@ -1002,7 +1006,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}).Infof("pushed %d records\n", rowsSynced)
}

err = a.CatalogMirrorMonitor.UpdateEndTimeForPartition(ctx, runUUID, partition)
err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
if err != nil {
return 0, err
}
Expand Down
1 change: 0 additions & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func APIMain(args *APIServerParams) error {
}

flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)
defer flowHandler.Close()

err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace, taskQueue)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,6 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context,
return nil
}

// Close closes the connection pool
func (h *FlowRequestHandler) Close() {
if h.pool != nil {
h.pool.Close()
}
}

func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs
Expand Down
5 changes: 1 addition & 4 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"

Expand Down Expand Up @@ -108,8 +107,6 @@ func WorkerMain(opts *WorkerOptions) error {
if err != nil {
return fmt.Errorf("unable to create catalog connection pool: %w", err)
}
catalogMirrorMonitor := monitoring.NewCatalogMirrorMonitor(conn)
defer catalogMirrorMonitor.Close()

c, err := client.Dial(clientOptions)
if err != nil {
Expand All @@ -134,7 +131,7 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)
w.RegisterActivity(&activities.FlowableActivity{
CatalogMirrorMonitor: catalogMirrorMonitor,
CatalogPool: conn,
})

err = w.Run(worker.InterruptCh())
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (c *BigQueryConnector) Close() error {
if c == nil || c.client == nil {
return nil
}
c.catalogPool.Close()
return c.client.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConf
}

func (p *PostgresMetadataStore) Close() error {
if p.pool != nil {
if p.config != nil && p.pool != nil {
p.pool.Close()
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error {
return err
}

cdcMirrorMonitor, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*monitoring.CatalogMirrorMonitor)
catalogPool, ok := c.ctx.Value(shared.CDCMirrorMonitorKey).(*pgxpool.Pool)
if ok {
latestLSN, err := c.getCurrentLSN()
if err != nil {
return fmt.Errorf("failed to get current LSN: %w", err)
}
err = cdcMirrorMonitor.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, req.FlowJobName, latestLSN)
err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(c.ctx, catalogPool, req.FlowJobName, latestLSN)
if err != nil {
return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err)
}
Expand Down
Loading
Loading