Skip to content

Commit

Permalink
Remove configurable postgres metadata database
Browse files Browse the repository at this point in the history
Always use catalog
  • Loading branch information
serprex committed Jan 31, 2024
1 parent 31f2641 commit f5dcc8b
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 245 deletions.
3 changes: 1 addition & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func NewClickhouseConnector(ctx context.Context,
}

metadataSchemaName := "peerdb_ch_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx,
clickhouseProtoConfig.GetMetadataDb(), metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
Expand Down
19 changes: 4 additions & 15 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package conneventhub

import (
"context"
"errors"
"fmt"
"log/slog"
"sync/atomic"
Expand Down Expand Up @@ -41,8 +40,7 @@ func NewEventHubConnector(

hubManager := NewEventHubManager(defaultAzureCreds, config)
metadataSchemaName := "peerdb_eventhub_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(),
metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store",
slog.Any("error", err))
Expand All @@ -61,22 +59,13 @@ func NewEventHubConnector(
}

func (c *EventHubConnector) Close() error {
var allErrors error

// close the postgres metadata store.
err := c.pgMetadata.Close()
if err != nil {
c.logger.Error(fmt.Sprintf("failed to close postgres metadata store: %v", err))
allErrors = errors.Join(allErrors, err)
}

err = c.hubManager.Close(context.Background())
err := c.hubManager.Close(context.Background())
if err != nil {
c.logger.Error("failed to close event hub manager", slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
return err
}

return allErrors
return nil
}

func (c *EventHubConnector) ConnectionActive() error {
Expand Down
72 changes: 18 additions & 54 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,83 +6,47 @@ import (
"log/slog"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

const (
lastSyncStateTableName = "last_sync_state"
)

type Querier interface {
Begin(ctx context.Context) (pgx.Tx, error)
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
Ping(ctx context.Context) error
}

type PostgresMetadataStore struct {
ctx context.Context
config *protos.PostgresConfig
conn Querier
pool *pgxpool.Pool
schemaName string
logger slog.Logger
}

func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string,
) (*PostgresMetadataStore, error) {
var storeConn Querier
var err error
if pgConfig == nil {
storeConn, err = cc.GetCatalogConnectionPoolFromEnv()
if err != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %w", err)
}

slog.InfoContext(ctx, "obtained catalog connection pool for metadata store")
} else {
connectionString := utils.GetPGConnectionString(pgConfig)
storeConn, err = pgx.Connect(ctx, connectionString)
if err != nil {
slog.ErrorContext(ctx, "failed to create connection pool", slog.Any("error", err))
return nil, err
}

slog.InfoContext(ctx, "created connection pool for metadata store")
func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*PostgresMetadataStore, error) {
pool, err := cc.GetCatalogConnectionPoolFromEnv()
if err != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %w", err)
}

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
return &PostgresMetadataStore{
ctx: ctx,
config: pgConfig,
conn: storeConn,
pool: pool,
schemaName: schemaName,
logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)),
}, nil
}

func (p *PostgresMetadataStore) Close() error {
// only close p.conn when it isn't catalog
if conn, ok := p.conn.(*pgx.Conn); ok {
conn.Close(p.ctx)
}

return nil
}

func (p *PostgresMetadataStore) QualifyTable(table string) string {
return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table)
}

func (p *PostgresMetadataStore) Ping() error {
pingErr := p.conn.Ping(p.ctx)
pingErr := p.pool.Ping(p.ctx)
if pingErr != nil {
return fmt.Errorf("metadata db ping failed: %w", pingErr)
}
Expand All @@ -92,7 +56,7 @@ func (p *PostgresMetadataStore) Ping() error {

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
row := p.conn.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)
row := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)

var exists pgtype.Int8
err := row.Scan(&exists)
Expand All @@ -110,14 +74,14 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {

func (p *PostgresMetadataStore) SetupMetadata() error {
// create the schema
_, err := p.conn.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
_, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create schema", slog.Any("error", err))
return err
}

// create the last sync state table
_, err = p.conn.Exec(p.ctx, `
_, err = p.pool.Exec(p.ctx, `
CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` (
job_name TEXT PRIMARY KEY NOT NULL,
last_offset BIGINT NOT NULL,
Expand All @@ -136,7 +100,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
}

func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
row := p.conn.QueryRow(p.ctx, `
row := p.pool.QueryRow(p.ctx, `
SELECT last_offset
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
Expand All @@ -158,7 +122,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
}

func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
row := p.conn.QueryRow(p.ctx, `
row := p.pool.QueryRow(p.ctx, `
SELECT sync_batch_id
FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
Expand All @@ -181,7 +145,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) {
}

func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) {
rows := p.conn.QueryRow(p.ctx, `
rows := p.pool.QueryRow(p.ctx, `
SELECT normalize_batch_id
FROM `+p.schemaName+`.`+lastSyncStateTableName+`
WHERE job_name = $1
Expand All @@ -206,7 +170,7 @@ func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64,
// update offset for a job
func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error {
p.logger.Info("updating last offset", slog.Int64("offset", offset))
_, err := p.conn.Exec(p.ctx, `
_, err := p.pool.Exec(p.ctx, `
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
Expand All @@ -223,7 +187,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e

func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, offset int64) error {
p.logger.Info("finishing batch", slog.Int64("SyncBatchID", syncBatchID), slog.Int64("offset", offset))
_, err := p.conn.Exec(p.ctx, `
_, err := p.pool.Exec(p.ctx, `
INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
Expand All @@ -242,7 +206,7 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o

func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error {
p.logger.Info("updating normalize batch id for job")
_, err := p.conn.Exec(p.ctx, `
_, err := p.pool.Exec(p.ctx, `
UPDATE `+p.schemaName+`.`+lastSyncStateTableName+`
SET normalize_batch_id=$2 WHERE job_name=$1
`, jobName, batchID)
Expand All @@ -255,7 +219,7 @@ func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID i
}

func (p *PostgresMetadataStore) DropMetadata(jobName string) error {
_, err := p.conn.Exec(p.ctx, `
_, err := p.pool.Exec(p.ctx, `
DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+`
WHERE job_name = $1
`, jobName)
Expand Down
6 changes: 2 additions & 4 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func NewS3Connector(ctx context.Context,
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}
metadataSchemaName := "peerdb_s3_metadata"
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx,
config.GetMetadataDb(), metadataSchemaName)
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName)
if err != nil {
slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err))
return nil, err
Expand All @@ -89,8 +88,7 @@ func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.C
}

func (c *S3Connector) Close() error {
c.logger.Debug("Closing metadata store connection")
return c.pgMetadata.Close()
return nil
}

func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error {
Expand Down
7 changes: 0 additions & 7 deletions flow/e2e/s3/s3_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,6 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) {
SecretAccessKey: &config.SecretAccessKey,
Region: &config.Region,
Endpoint: &endpoint,
MetadataDb: &protos.PostgresConfig{
Host: "localhost",
Port: 7132,
Password: "postgres",
User: "postgres",
Database: "postgres",
},
},
bucketName,
prefix,
Expand Down
53 changes: 0 additions & 53 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,6 @@ fn parse_db_options(
Some(config)
}
DbType::Eventhub => {
let conn_str = opts.get("metadata_db");
let metadata_db = parse_metadata_db_info(conn_str.copied())?;
let subscription_id = opts
.get("subscription_id")
.map(|s| s.to_string())
Expand Down Expand Up @@ -697,7 +695,6 @@ fn parse_db_options(
.get("location")
.context("location not specified")?
.to_string(),
metadata_db,
subscription_id,
partition_count,
message_retention_in_days,
Expand All @@ -706,8 +703,6 @@ fn parse_db_options(
Some(config)
}
DbType::S3 => {
let s3_conn_str = opts.get("metadata_db");
let metadata_db = parse_metadata_db_info(s3_conn_str.copied())?;
let s3_config = S3Config {
url: opts
.get("url")
Expand All @@ -718,7 +713,6 @@ fn parse_db_options(
region: opts.get("region").map(|s| s.to_string()),
role_arn: opts.get("role_arn").map(|s| s.to_string()),
endpoint: opts.get("endpoint").map(|s| s.to_string()),
metadata_db,
};
let config = Config::S3Config(s3_config);
Some(config)
Expand Down Expand Up @@ -746,9 +740,6 @@ fn parse_db_options(
Some(config)
}
DbType::EventhubGroup => {
let conn_str = opts.get("metadata_db");
let metadata_db = parse_metadata_db_info(conn_str.copied())?;

// split comma separated list of columns and trim
let unnest_columns = opts
.get("unnest_columns")
Expand Down Expand Up @@ -782,16 +773,12 @@ fn parse_db_options(

let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig {
eventhubs,
metadata_db,
unnest_columns,
};
let config = Config::EventhubGroupConfig(eventhub_group_config);
Some(config)
}
DbType::Clickhouse => {
let conn_str = opts.get("metadata_db");
let metadata_db = parse_metadata_db_info(conn_str.copied())?;

let s3_int = opts
.get("s3_integration")
.map(|s| s.to_string())
Expand All @@ -817,7 +804,6 @@ fn parse_db_options(
.context("no default database specified")?
.to_string(),
s3_integration: s3_int,
metadata_db,
};
let config = Config::ClickhouseConfig(clickhouse_config);
Some(config)
Expand All @@ -826,42 +812,3 @@ fn parse_db_options(

Ok(config)
}

fn parse_metadata_db_info(conn_str: Option<&str>) -> anyhow::Result<Option<PostgresConfig>> {
let conn_str = match conn_str {
Some(conn_str) => conn_str,
None => return Ok(None),
};

if conn_str.is_empty() {
return Ok(None);
}

let mut metadata_db = PostgresConfig::default();
let param_pairs: Vec<&str> = conn_str.split_whitespace().collect();
match param_pairs.len() {
5 => Ok(true),
_ => Err(anyhow::Error::msg("Invalid connection string. Check formatting and if the required parameters have been specified.")),
}?;

for pair in param_pairs {
let key_value: Vec<&str> = pair.trim().split('=').collect();
match key_value.len() {
2 => Ok(true),
_ => Err(anyhow::Error::msg(
"Invalid config setting for PG. Check the formatting",
)),
}?;
let value = key_value[1].to_string();
match key_value[0] {
"host" => metadata_db.host = value,
"port" => metadata_db.port = value.parse().context("Invalid PG Port")?,
"database" => metadata_db.database = value,
"user" => metadata_db.user = value,
"password" => metadata_db.password = value,
_ => (),
};
}

Ok(Some(metadata_db))
}
Loading

0 comments on commit f5dcc8b

Please sign in to comment.