Skip to content

Commit

Permalink
Merge branch 'main' into bigquery-mixed-case
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 27, 2023
2 parents 0565424 + 00ea6f0 commit 233587a
Show file tree
Hide file tree
Showing 41 changed files with 4,510 additions and 921 deletions.
21 changes: 21 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
Expand Down Expand Up @@ -702,3 +703,23 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,

return nil
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

// check if destination is snowflake, if not error out
if config.Peer.Type != protos.DBType_SNOWFLAKE {
return nil, fmt.Errorf("rename tables is only supported for snowflake")
}

sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
}

return sfConn.RenameTables(config)
}
118 changes: 118 additions & 0 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main

import (
"context"
"database/sql"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/protobuf/proto"
)

func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, error) {
var pgPeerOptions sql.RawBytes
var pgPeerConfig protos.PostgresConfig
err := h.pool.QueryRow(ctx,
"SELECT options FROM peers WHERE name = $1 AND type=3", peerName).Scan(&pgPeerOptions)
if err != nil {
return nil, err
}

unmarshalErr := proto.Unmarshal(pgPeerOptions, &pgPeerConfig)
if err != nil {
return nil, unmarshalErr
}

connStr := utils.GetPGConnectionString(&pgPeerConfig)
peerPool, err := pgxpool.New(ctx, connStr)
if err != nil {
return nil, unmarshalErr
}
return peerPool, nil
}

func (h *FlowRequestHandler) GetSlotInfo(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSlotResponse, error) {
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}
defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,active,"+
"round((redo_lsn-restart_lsn) / 1024 / 1024 , 2) AS MB_Behind"+
" FROM pg_control_checkpoint(), pg_replication_slots;")
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}
defer rows.Close()
var slotInfoRows []*protos.SlotInfo
for rows.Next() {
var redoLSN string
var slotName string
var restartLSN string
var active bool
var lagInMB float32
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &active, &lagInMB)
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}

slotInfoRows = append(slotInfoRows, &protos.SlotInfo{
RedoLSN: redoLSN,
RestartLSN: restartLSN,
SlotName: slotName,
Active: active,
LagInMb: lagInMB,
})
}
return &protos.PeerSlotResponse{
SlotData: slotInfoRows,
}, nil
}

func (h *FlowRequestHandler) GetStatInfo(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerStatResponse, error) {
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}
defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
"EXTRACT(epoch FROM(now()-query_start)) AS dur"+
" FROM pg_stat_activity WHERE usename='peerdb_user' AND state != 'idle' AND query_start IS NOT NULL;")
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}
defer rows.Close()
var statInfoRows []*protos.StatInfo
for rows.Next() {
var pid int64
var waitEvent string
var waitEventType string
var queryStart string
var query string
var duration float32

err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}

statInfoRows = append(statInfoRows, &protos.StatInfo{
Pid: pid,
WaitEvent: waitEvent,
WaitEventType: waitEventType,
QueryStart: queryStart,
Query: query,
Duration: duration,
})
}
return &protos.PeerStatResponse{
StatData: statInfoRows,
}, nil
}
6 changes: 5 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
log "github.com/sirupsen/logrus"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
Expand Down Expand Up @@ -254,5 +255,8 @@ func CloseConnector(conn Connector) {
return
}

conn.Close()
err := conn.Close()
if err != nil {
log.Errorf("error closing connector: %v", err)
}
}
88 changes: 74 additions & 14 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/jackc/pgx/v5/pgtype"
Expand All @@ -32,6 +33,9 @@ type PostgresCDCSource struct {
startLSN pglogrepl.LSN
commitLock bool
customTypeMapping map[uint32]string

// for partitioned tables, maps child relid to parent relid
chIdToParRelId map[uint32]uint32
}

type PostgresCDCConfig struct {
Expand All @@ -46,6 +50,11 @@ type PostgresCDCConfig struct {

// Create a new PostgresCDCSource
func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) {
childToParentRelIdMap, err := getChildToParentRelIdMap(cdcConfig.AppContext, cdcConfig.Connection)
if err != nil {
return nil, fmt.Errorf("error getting child to parent relid map: %w", err)
}

return &PostgresCDCSource{
ctx: cdcConfig.AppContext,
replPool: cdcConfig.Connection,
Expand All @@ -55,11 +64,44 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
chIdToParRelId: childToParentRelIdMap,
commitLock: false,
customTypeMapping: customTypeMap,
}, nil
}

func getChildToParentRelIdMap(ctx context.Context, pool *pgxpool.Pool) (map[uint32]uint32, error) {
query := `
SELECT
parent.oid AS parentrelid,
child.oid AS childrelid
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relkind='p';
`

rows, err := pool.Query(ctx, query, pgx.QueryExecModeSimpleProtocol)
if err != nil {
return nil, fmt.Errorf("error querying for child to parent relid map: %w", err)
}

defer rows.Close()

childToParentRelIdMap := make(map[uint32]uint32)
for rows.Next() {
var parentRelId uint32
var childRelId uint32
err := rows.Scan(&parentRelId, &childRelId)
if err != nil {
return nil, fmt.Errorf("error scanning child to parent relid map: %w", err)
}
childToParentRelIdMap[childRelId] = parentRelId
}

return childToParentRelIdMap, nil
}

// PullRecords pulls records from the cdc stream
func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) (
*model.RecordsWithTableSchemaDelta, error) {
Expand Down Expand Up @@ -166,6 +208,8 @@ func (p *PostgresCDCSource) consumeStream(
shutdown <- true
}()

firstProcessed := false

for {
if time.Now().After(nextStandbyMessageDeadline) ||
(len(records.Records) >= int(req.MaxBatchSize)) {
Expand Down Expand Up @@ -213,8 +257,6 @@ func (p *PostgresCDCSource) consumeStream(
continue
}

firstProcessed := false

switch msg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:])
Expand Down Expand Up @@ -347,6 +389,9 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre
batch.LastCheckPointID = int64(xld.WALStart)
p.commitLock = false
case *pglogrepl.RelationMessage:
// treat all relation messages as correponding to parent if partitioned.
msg.RelationID = p.getParentRelIdIfPartitioned(msg.RelationID)

// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
// the state of the relation message somewhere
Expand All @@ -372,17 +417,19 @@ func (p *PostgresCDCSource) processInsertMessage(
lsn pglogrepl.LSN,
msg *pglogrepl.InsertMessage,
) (model.Record, error) {
tableName, exists := p.SrcTableIDNameMapping[msg.RelationID]
relId := p.getParentRelIdIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relId]
if !exists {
return nil, nil
}

// log lsn and relation id for debugging
log.Debugf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, msg.RelationID, tableName)
log.Debugf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relId, tableName)

rel, ok := p.relationMessageMapping[msg.RelationID]
rel, ok := p.relationMessageMapping[relId]
if !ok {
return nil, fmt.Errorf("unknown relation id: %d", msg.RelationID)
return nil, fmt.Errorf("unknown relation id: %d", relId)
}

// create empty map of string to interface{}
Expand All @@ -404,17 +451,19 @@ func (p *PostgresCDCSource) processUpdateMessage(
lsn pglogrepl.LSN,
msg *pglogrepl.UpdateMessage,
) (model.Record, error) {
tableName, exists := p.SrcTableIDNameMapping[msg.RelationID]
relID := p.getParentRelIdIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
if !exists {
return nil, nil
}

// log lsn and relation id for debugging
log.Debugf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, msg.RelationID, tableName)
log.Debugf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)

rel, ok := p.relationMessageMapping[msg.RelationID]
rel, ok := p.relationMessageMapping[relID]
if !ok {
return nil, fmt.Errorf("unknown relation id: %d", msg.RelationID)
return nil, fmt.Errorf("unknown relation id: %d", relID)
}

// create empty map of string to interface{}
Expand Down Expand Up @@ -443,17 +492,19 @@ func (p *PostgresCDCSource) processDeleteMessage(
lsn pglogrepl.LSN,
msg *pglogrepl.DeleteMessage,
) (model.Record, error) {
tableName, exists := p.SrcTableIDNameMapping[msg.RelationID]
relID := p.getParentRelIdIfPartitioned(msg.RelationID)

tableName, exists := p.SrcTableIDNameMapping[relID]
if !exists {
return nil, nil
}

// log lsn and relation id for debugging
log.Debugf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, msg.RelationID, tableName)
log.Debugf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)

rel, ok := p.relationMessageMapping[msg.RelationID]
rel, ok := p.relationMessageMapping[relID]
if !ok {
return nil, fmt.Errorf("unknown relation id: %d", msg.RelationID)
return nil, fmt.Errorf("unknown relation id: %d", relID)
}

// create empty map of string to interface{}
Expand Down Expand Up @@ -667,3 +718,12 @@ func (p *PostgresCDCSource) compositePKeyToString(req *model.PullRecordsRequest,
hasher.Write(pkeyColsMerged)
return fmt.Sprintf("%x", hasher.Sum(nil)), nil
}

func (p *PostgresCDCSource) getParentRelIdIfPartitioned(relId uint32) uint32 {
parentRelId, ok := p.chIdToParRelId[relId]
if ok {
return parentRelId
}

return relId
}
Loading

0 comments on commit 233587a

Please sign in to comment.