Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into spiritus-mundi
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 14, 2024
2 parents d217029 + 758727a commit 58c93e4
Show file tree
Hide file tree
Showing 38 changed files with 671 additions and 640 deletions.
3 changes: 0 additions & 3 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type BigQueryServiceAccount struct {
ClientX509CertURL string `json:"client_x509_cert_url"`
}

// BigQueryConnector is a Connector implementation for BigQuery.
type BigQueryConnector struct {
bqConfig *protos.BigqueryConfig
client *bigquery.Client
Expand All @@ -59,7 +58,6 @@ type BigQueryConnector struct {
logger log.Logger
}

// Create BigQueryServiceAccount from BigqueryConfig
func NewBigQueryServiceAccount(bqConfig *protos.BigqueryConfig) (*BigQueryServiceAccount, error) {
var serviceAccount BigQueryServiceAccount
serviceAccount.Type = bqConfig.AuthType
Expand Down Expand Up @@ -178,7 +176,6 @@ func TableCheck(ctx context.Context, client *bigquery.Client, dataset string, pr
return nil
}

// NewBigQueryConnector creates a new BigQueryConnector from a PeerConnectionConfig.
func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*BigQueryConnector, error) {
logger := logger.LoggerFromCtx(ctx)

Expand Down
1 change: 1 addition & 0 deletions flow/connectors/clickhouse/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{
"CHAR": qvalue.QValueKindString,
"TEXT": qvalue.QValueKindString,
"String": qvalue.QValueKindString,
"FixedString(1)": qvalue.QValueKindQChar,
"Bool": qvalue.QValueKindBoolean,
"DateTime": qvalue.QValueKindTimestamp,
"TIMESTAMP": qvalue.QValueKindTimestamp,
Expand Down
30 changes: 17 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,20 @@ import (
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type PostgresCDCSource struct {
replConn *pgx.Conn
*PostgresConnector
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
slot string
publication string
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
commitLock bool
customTypeMapping map[uint32]string

// for partitioned tables, maps child relid to parent relid
childToParentRelIDMapping map[uint32]uint32
logger slog.Logger

// for storing chema delta audit logs to catalog
catalogPool *pgxpool.Pool
Expand All @@ -65,9 +62,9 @@ type startReplicationOpts struct {
}

// Create a new PostgresCDCSource
func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, customTypeMap map[uint32]string) (*PostgresCDCSource, error) {
func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) {
return &PostgresCDCSource{
replConn: cdcConfig.Connection,
PostgresConnector: c,
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
Expand All @@ -76,8 +73,6 @@ func NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig, cus
typeMap: pgtype.NewMap(),
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
commitLock: false,
customTypeMapping: customTypeMap,
logger: *slog.With(slog.String(string(shared.FlowNameKey), cdcConfig.FlowJobName)),
catalogPool: cdcConfig.CatalogPool,
flowJobName: cdcConfig.FlowJobName,
}, nil
Expand Down Expand Up @@ -642,22 +637,31 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
parsedData, err = dt.Codec.DecodeValue(p.typeMap, dataType, formatCode, data)
}
if err != nil {
if dt.Name == "time" || dt.Name == "timetz" ||
dt.Name == "timestamp" || dt.Name == "timestamptz" {
// indicates year is more than 4 digits or something similar,
// which you can insert into postgres,
// but not representable by time.Time
p.logger.Warn(fmt.Sprintf("Invalidated and hence nulled %s data: %s",
dt.Name, string(data)))
return qvalue.QValue{}, nil
}
return qvalue.QValue{}, err
}
retVal, err := parseFieldFromPostgresOID(dataType, parsedData)
retVal, err := p.parseFieldFromPostgresOID(dataType, parsedData)
if err != nil {
return qvalue.QValue{}, err
}
return retVal, nil
} else if dataType == uint32(oid.T_timetz) { // ugly TIMETZ workaround for CDC decoding.
retVal, err := parseFieldFromPostgresOID(dataType, string(data))
retVal, err := p.parseFieldFromPostgresOID(dataType, string(data))
if err != nil {
return qvalue.QValue{}, err
}
return retVal, nil
}

typeName, ok := p.customTypeMapping[dataType]
typeName, ok := p.customTypesMapping[dataType]
if ok {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
Expand Down Expand Up @@ -748,9 +752,9 @@ func (p *PostgresCDCSource) processRelationMessage(
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
if prevRelMap[column.Name] == nil {
qKind := postgresOIDToQValueKind(column.DataType)
qKind := p.postgresOIDToQValueKind(column.DataType)
if qKind == qvalue.QValueKindInvalid {
typeName, ok := p.customTypeMapping[column.DataType]
typeName, ok := p.customTypesMapping[column.DataType]
if ok {
qKind = customTypeToQKind(typeName)
}
Expand Down
17 changes: 13 additions & 4 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (c *PostgresConnector) getDefaultPublicationName(jobName string) string {

func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []string, pubName string) error {
if c.conn == nil {
return fmt.Errorf("check tables: conn is nil")
return errors.New("check tables: conn is nil")
}

// Check that we can select from all tables
Expand All @@ -649,11 +649,20 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []
}
}

// Check if tables belong to publication
tableStr := strings.Join(tableArr, ",")
if pubName != "" {
// Check if publication exists
err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil)
if err != nil {
if err == pgx.ErrNoRows {
return fmt.Errorf("publication does not exist: %s", pubName)
}
return fmt.Errorf("error while checking for publication existence: %w", err)
}

// Check if tables belong to publication
var pubTableCount int
err := c.conn.QueryRow(ctx, fmt.Sprintf(`
err = c.conn.QueryRow(ctx, fmt.Sprintf(`
with source_table_components (sname, tname) as (values %s)
select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables
INNER JOIN source_table_components stc
Expand All @@ -663,7 +672,7 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []
}

if pubTableCount != len(tableNames) {
return fmt.Errorf("not all tables belong to publication")
return errors.New("not all tables belong to publication")
}
}

Expand Down
19 changes: 11 additions & 8 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

// PostgresConnector is a Connector implementation for Postgres.
type PostgresConnector struct {
connStr string
config *protos.PostgresConfig
Expand All @@ -38,6 +37,7 @@ type PostgresConnector struct {
replLock sync.Mutex
customTypesMapping map[uint32]string
metadataSchema string
hushWarnOID map[uint32]struct{}
logger log.Logger
}

Expand All @@ -47,7 +47,6 @@ type ReplState struct {
Offset int64
}

// NewPostgresConnector creates a new instance of PostgresConnector.
func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

Expand Down Expand Up @@ -97,6 +96,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
replLock: sync.Mutex{},
customTypesMapping: customTypeMap,
metadataSchema: metadataSchema,
hushWarnOID: make(map[uint32]struct{}),
logger: logger.LoggerFromCtx(ctx),
}, nil
}
Expand Down Expand Up @@ -229,6 +229,10 @@ func (c *PostgresConnector) Close(ctx context.Context) error {
return nil
}

func (c *PostgresConnector) Conn() *pgx.Conn {
return c.conn
}

// ConnectionActive returns nil if the connection is active.
func (c *PostgresConnector) ConnectionActive(ctx context.Context) error {
if c.conn == nil {
Expand Down Expand Up @@ -344,8 +348,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
return err
}

cdc, err := NewPostgresCDCSource(ctx, &PostgresCDCConfig{
Connection: c.replConn,
cdc, err := c.NewPostgresCDCSource(&PostgresCDCConfig{
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Slot: slotName,
Publication: publicationName,
Expand All @@ -354,7 +357,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
RelationMessageMapping: req.RelationMessageMapping,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
}, c.customTypesMapping)
})
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
}
Expand Down Expand Up @@ -735,7 +738,7 @@ func (c *PostgresConnector) getTableSchemaForTable(
columnNames := make([]string, 0, len(fields))
columns := make([]*protos.FieldDescription, 0, len(fields))
for _, fieldDescription := range fields {
genericColType := postgresOIDToQValueKind(fieldDescription.DataTypeOID)
genericColType := c.postgresOIDToQValueKind(fieldDescription.DataTypeOID)
if genericColType == qvalue.QValueKindInvalid {
typeName, ok := c.customTypesMapping[fieldDescription.DataTypeOID]
if ok {
Expand Down Expand Up @@ -1104,7 +1107,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro
// just check if we have all the tables already in the publication for custom publications
if req.PublicationName != "" {
rows, err := c.conn.Query(ctx,
"SELECT tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName)
"SELECT schemaname || '.' || tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName)
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}
Expand All @@ -1113,7 +1116,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}
notPresentTables := utils.ArrayMinus(tableNames, additionalSrcTables)
notPresentTables := utils.ArrayMinus(additionalSrcTables, tableNames)
if len(notPresentTables) > 0 {
return fmt.Errorf("some additional tables not present in custom publication: %s",
strings.Join(notPresentTables, ", "))
Expand Down
33 changes: 7 additions & 26 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,8 @@ func (c *PostgresConnector) PullQRepRecords(
partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId)
if partition.FullTablePartition {
c.logger.Info("pulling full table partition", partitionIdLog)
executor, err := NewQRepQueryExecutorSnapshot(ctx,
c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}
query := config.Query
return executor.ExecuteAndProcessQuery(ctx, query)
}
Expand Down Expand Up @@ -368,12 +364,8 @@ func (c *PostgresConnector) PullQRepRecords(
return nil, err
}

executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return nil, err
}

records, err := executor.ExecuteAndProcessQuery(ctx, query, rangeStart, rangeEnd)
if err != nil {
Expand All @@ -392,15 +384,11 @@ func (c *PostgresConnector) PullQRepRecordStream(
partitionIdLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId)
if partition.FullTablePartition {
c.logger.Info("pulling full table partition", partitionIdLog)
executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

query := config.Query
_, err = executor.ExecuteAndProcessQueryStream(ctx, stream, query)
_, err := executor.ExecuteAndProcessQueryStream(ctx, stream, query)
return 0, err
}
c.logger.Info("Obtained ranges for partition for PullQRepStream", partitionIdLog)
Expand Down Expand Up @@ -438,12 +426,8 @@ func (c *PostgresConnector) PullQRepRecordStream(
return 0, err
}

executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, err
}

numRecords, err := executor.ExecuteAndProcessQueryStream(ctx, stream, query, rangeStart, rangeEnd)
if err != nil {
Expand Down Expand Up @@ -539,13 +523,10 @@ func (c *PostgresConnector) PullXminRecordStream(
query += " WHERE age(xmin) > 0 AND age(xmin) <= age($1::xid)"
}

executor, err := NewQRepQueryExecutorSnapshot(
ctx, c.conn, c.config.TransactionSnapshot,
executor := c.NewQRepQueryExecutorSnapshot(c.config.TransactionSnapshot,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, currentSnapshotXmin, err
}

var err error
var numRecords int
if partition.Range != nil {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(
Expand Down
18 changes: 11 additions & 7 deletions flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ import (
"context"
"testing"

"github.com/jackc/pgx/v5"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

func BenchmarkQRepQueryExecutor(b *testing.B) {
connectionString := "postgres://postgres:postgres@localhost:7132/postgres"
query := "SELECT * FROM bench.large_table"

ctx := context.Background()

// Create a separate connection for non-replication queries
conn, err := pgx.Connect(ctx, connectionString)
connector, err := NewPostgresConnector(ctx,
&protos.PostgresConfig{
Host: "localhost",
Port: 7132,
User: "postgres",
Password: "postgres",
Database: "postgres",
})
if err != nil {
b.Fatalf("failed to create connection: %v", err)
}
defer conn.Close(context.Background())
defer connector.Close(ctx)

// Create a new QRepQueryExecutor instance
qe := NewQRepQueryExecutor(conn, context.Background(), "test flow", "test part")
qe := connector.NewQRepQueryExecutor("test flow", "test part")

// Run the benchmark
b.ResetTimer()
Expand Down
Loading

0 comments on commit 58c93e4

Please sign in to comment.