Skip to content

Commit

Permalink
flow ci: setup catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 11, 2023
1 parent 2f728e2 commit 87fadec
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 64 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ jobs:
name: "gcs_creds.json"
json: ${{ secrets.GCS_CREDS }}

- name: create hstore extension and increase logical replication limits
- name: create hstore extension, increase logical replication limits, and setup catalog database
run: >
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
-c "ALTER SYSTEM SET wal_level=logical;"
-c "ALTER SYSTEM SET max_replication_slots=192;"
-c "ALTER SYSTEM SET max_wal_senders=256;"
-c "ALTER SYSTEM SET max_connections=2048;" &&
(cat ../nexus/catalog/migrations/V{?,??}__* | docker exec -i pg_cdc psql -h localhost -p 5432 -U postgres) &&
docker restart pg_cdc
working-directory: ./flow
env:
Expand Down
4 changes: 1 addition & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,9 +724,7 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
}

func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
catalogPool := a.CatalogPool

optionRows, err := catalogPool.Query(ctx, `
optionRows, err := a.CatalogPool.Query(ctx, `
SELECT DISTINCT p.name, p.options
FROM peers p
JOIN flows f ON p.id = f.source_peer
Expand Down
60 changes: 0 additions & 60 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ type CDCBatchInfo struct {
}

func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_flows(flow_name,latest_lsn_at_source,latest_lsn_at_target) VALUES($1,0,0)
ON CONFLICT DO NOTHING`, flowJobName)
Expand All @@ -38,10 +34,6 @@ func InitializeCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName stri

func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
latestLSNAtSource pglogrepl.LSN) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_source=$1 WHERE flow_name=$2",
uint64(latestLSNAtSource), flowJobName)
Expand All @@ -53,10 +45,6 @@ func UpdateLatestLSNAtSourceForCDCFlow(ctx context.Context, pool *pgxpool.Pool,

func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
latestLSNAtTarget pglogrepl.LSN) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_flows SET latest_lsn_at_target=$1 WHERE flow_name=$2",
uint64(latestLSNAtTarget), flowJobName)
Expand All @@ -68,10 +56,6 @@ func UpdateLatestLSNAtTargetForCDCFlow(ctx context.Context, pool *pgxpool.Pool,

func AddCDCBatchForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
batchInfo CDCBatchInfo) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx,
`INSERT INTO peerdb_stats.cdc_batches(flow_name,batch_id,rows_in_batch,batch_start_lsn,batch_end_lsn,
start_time) VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING`,
Expand All @@ -92,10 +76,6 @@ func UpdateNumRowsAndEndLSNForCDCBatch(
numRows uint32,
batchEndLSN pglogrepl.LSN,
) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_batches SET rows_in_batch=$1,batch_end_lsn=$2 WHERE flow_name=$3 AND batch_id=$4",
numRows, uint64(batchEndLSN), flowJobName, batchID)
Expand All @@ -111,10 +91,6 @@ func UpdateEndTimeForCDCBatch(
flowJobName string,
batchID int64,
) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.cdc_batches SET end_time=$1 WHERE flow_name=$2 AND batch_id=$3",
time.Now(), flowJobName, batchID)
Expand All @@ -126,10 +102,6 @@ func UpdateEndTimeForCDCBatch(

func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
batchID int64, tableNameRowsMapping map[string]uint32) error {
if pool == nil {
return nil
}

insertBatchTablesTx, err := pool.Begin(ctx)
if err != nil {
return fmt.Errorf("error while beginning transaction for inserting statistics into cdc_batch_table: %w", err)
Expand Down Expand Up @@ -165,10 +137,6 @@ func InitializeQRepRun(
runUUID string,
partitions []*protos.QRepPartition,
) error {
if pool == nil {
return nil
}

flowJobName := config.GetFlowJobName()
_, err := pool.Exec(ctx,
"INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid) VALUES($1,$2) ON CONFLICT DO NOTHING",
Expand Down Expand Up @@ -199,10 +167,6 @@ func InitializeQRepRun(
}

func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET start_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
Expand All @@ -214,10 +178,6 @@ func UpdateStartTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID
}

func UpdateEndTimeForQRepRun(ctx context.Context, pool *pgxpool.Pool, runUUID string) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET end_time=$1 WHERE run_uuid=$2",
time.Now(), runUUID)
Expand All @@ -234,10 +194,6 @@ func AppendSlotSizeInfo(
peerName string,
slotInfo *protos.SlotInfo,
) error {
if pool == nil || slotInfo == nil {
return nil
}

_, err := pool.Exec(ctx,
"INSERT INTO peerdb_stats.peer_slot_size"+
"(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size, wal_status) "+
Expand All @@ -259,10 +215,6 @@ func AppendSlotSizeInfo(

func addPartitionToQRepRun(ctx context.Context, pool *pgxpool.Pool, flowJobName string,
runUUID string, partition *protos.QRepPartition) error {
if pool == nil {
return nil
}

if partition.Range == nil && partition.FullTablePartition {
log.Infof("partition %s is a full table partition. Metrics logging is skipped.", partition.PartitionId)
return nil
Expand Down Expand Up @@ -320,10 +272,6 @@ func UpdateStartTimeForPartition(
partition *protos.QRepPartition,
startTime time.Time,
) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET start_time=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, startTime, runUUID, partition.PartitionId)
if err != nil {
Expand All @@ -334,10 +282,6 @@ func UpdateStartTimeForPartition(

func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string,
partition *protos.QRepPartition, rowsInPartition int64) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET pull_end_time=$1,rows_in_partition=$2
WHERE run_uuid=$3 AND partition_uuid=$4`, time.Now(), rowsInPartition, runUUID, partition.PartitionId)
if err != nil {
Expand All @@ -348,10 +292,6 @@ func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool *pgxpool.Poo

func UpdateEndTimeForPartition(ctx context.Context, pool *pgxpool.Pool, runUUID string,
partition *protos.QRepPartition) error {
if pool == nil {
return nil
}

_, err := pool.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1
WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId)
if err != nil {
Expand Down

0 comments on commit 87fadec

Please sign in to comment.