From f5dcc8b27272d0b7c7db618a990cfce94499dccf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 31 Jan 2024 20:57:26 +0000 Subject: [PATCH] Remove configurable postgres metadata database Always use catalog --- flow/connectors/clickhouse/clickhouse.go | 3 +- flow/connectors/eventhub/eventhub.go | 19 +--- flow/connectors/external_metadata/store.go | 72 ++++----------- flow/connectors/s3/s3.go | 6 +- flow/e2e/s3/s3_helper.go | 7 -- nexus/analyzer/src/lib.rs | 53 ----------- protos/peers.proto | 4 - ui/app/peers/[peerName]/omitKeys.ts | 2 +- ui/app/peers/create/[peerType]/helpers/ch.ts | 1 - ui/app/peers/create/[peerType]/helpers/s3.ts | 12 --- ui/app/peers/create/[peerType]/schema.ts | 1 - ui/components/PeerForms/S3Form.tsx | 94 +------------------- 12 files changed, 29 insertions(+), 245 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 376612e839..a66e396ded 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -32,8 +32,7 @@ func NewClickhouseConnector(ctx context.Context, } metadataSchemaName := "peerdb_ch_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, - clickhouseProtoConfig.GetMetadataDb(), metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 12390eca3e..3707dcb2a9 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -2,7 +2,6 @@ package conneventhub import ( "context" - "errors" "fmt" "log/slog" "sync/atomic" @@ -41,8 +40,7 @@ func NewEventHubConnector( hubManager := NewEventHubManager(defaultAzureCreds, config) metadataSchemaName := "peerdb_eventhub_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), - metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) @@ -61,22 +59,13 @@ func NewEventHubConnector( } func (c *EventHubConnector) Close() error { - var allErrors error - - // close the postgres metadata store. - err := c.pgMetadata.Close() - if err != nil { - c.logger.Error(fmt.Sprintf("failed to close postgres metadata store: %v", err)) - allErrors = errors.Join(allErrors, err) - } - - err = c.hubManager.Close(context.Background()) + err := c.hubManager.Close(context.Background()) if err != nil { c.logger.Error("failed to close event hub manager", slog.Any("error", err)) - allErrors = errors.Join(allErrors, err) + return err } - return allErrors + return nil } func (c *EventHubConnector) ConnectionActive() error { diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index ab5224b2ee..767a7ed732 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -6,13 +6,12 @@ import ( "log/slog" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" - "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) @@ -20,69 +19,34 @@ const ( lastSyncStateTableName = "last_sync_state" ) -type Querier interface { - Begin(ctx context.Context) (pgx.Tx, error) - Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) - QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row - Ping(ctx context.Context) error -} - type PostgresMetadataStore struct { ctx context.Context - config *protos.PostgresConfig - conn Querier + pool *pgxpool.Pool schemaName string logger slog.Logger } -func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig, - schemaName string, -) (*PostgresMetadataStore, error) { - var storeConn Querier - var err error - if pgConfig == nil { - storeConn, err = cc.GetCatalogConnectionPoolFromEnv() - if err != nil { - return nil, fmt.Errorf("failed to create catalog connection pool: %w", err) - } - - slog.InfoContext(ctx, "obtained catalog connection pool for metadata store") - } else { - connectionString := utils.GetPGConnectionString(pgConfig) - storeConn, err = pgx.Connect(ctx, connectionString) - if err != nil { - slog.ErrorContext(ctx, "failed to create connection pool", slog.Any("error", err)) - return nil, err - } - - slog.InfoContext(ctx, "created connection pool for metadata store") +func NewPostgresMetadataStore(ctx context.Context, schemaName string) (*PostgresMetadataStore, error) { + pool, err := cc.GetCatalogConnectionPoolFromEnv() + if err != nil { + return nil, fmt.Errorf("failed to create catalog connection pool: %w", err) } flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &PostgresMetadataStore{ ctx: ctx, - config: pgConfig, - conn: storeConn, + pool: pool, schemaName: schemaName, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), }, nil } -func (p *PostgresMetadataStore) Close() error { - // only close p.conn when it isn't catalog - if conn, ok := p.conn.(*pgx.Conn); ok { - conn.Close(p.ctx) - } - - return nil -} - func (p *PostgresMetadataStore) QualifyTable(table string) string { return connpostgres.QuoteIdentifier(p.schemaName) + "." + connpostgres.QuoteIdentifier(table) } func (p *PostgresMetadataStore) Ping() error { - pingErr := p.conn.Ping(p.ctx) + pingErr := p.pool.Ping(p.ctx) if pingErr != nil { return fmt.Errorf("metadata db ping failed: %w", pingErr) } @@ -92,7 +56,7 @@ func (p *PostgresMetadataStore) Ping() error { func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { // check if schema exists - row := p.conn.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) + row := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) var exists pgtype.Int8 err := row.Scan(&exists) @@ -110,14 +74,14 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { func (p *PostgresMetadataStore) SetupMetadata() error { // create the schema - _, err := p.conn.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) + _, err := p.pool.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName) if err != nil && !utils.IsUniqueError(err) { p.logger.Error("failed to create schema", slog.Any("error", err)) return err } // create the last sync state table - _, err = p.conn.Exec(p.ctx, ` + _, err = p.pool.Exec(p.ctx, ` CREATE TABLE IF NOT EXISTS `+p.QualifyTable(lastSyncStateTableName)+` ( job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, @@ -136,7 +100,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error { } func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { - row := p.conn.QueryRow(p.ctx, ` + row := p.pool.QueryRow(p.ctx, ` SELECT last_offset FROM `+p.QualifyTable(lastSyncStateTableName)+` WHERE job_name = $1 @@ -158,7 +122,7 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { } func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { - row := p.conn.QueryRow(p.ctx, ` + row := p.pool.QueryRow(p.ctx, ` SELECT sync_batch_id FROM `+p.QualifyTable(lastSyncStateTableName)+` WHERE job_name = $1 @@ -181,7 +145,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { } func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) { - rows := p.conn.QueryRow(p.ctx, ` + rows := p.pool.QueryRow(p.ctx, ` SELECT normalize_batch_id FROM `+p.schemaName+`.`+lastSyncStateTableName+` WHERE job_name = $1 @@ -206,7 +170,7 @@ func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, // update offset for a job func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error { p.logger.Info("updating last offset", slog.Int64("offset", offset)) - _, err := p.conn.Exec(p.ctx, ` + _, err := p.pool.Exec(p.ctx, ` INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) @@ -223,7 +187,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, offset int64) error { p.logger.Info("finishing batch", slog.Int64("SyncBatchID", syncBatchID), slog.Int64("offset", offset)) - _, err := p.conn.Exec(p.ctx, ` + _, err := p.pool.Exec(p.ctx, ` INSERT INTO `+p.QualifyTable(lastSyncStateTableName)+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) @@ -242,7 +206,7 @@ func (p *PostgresMetadataStore) FinishBatch(jobName string, syncBatchID int64, o func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error { p.logger.Info("updating normalize batch id for job") - _, err := p.conn.Exec(p.ctx, ` + _, err := p.pool.Exec(p.ctx, ` UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` SET normalize_batch_id=$2 WHERE job_name=$1 `, jobName, batchID) @@ -255,7 +219,7 @@ func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID i } func (p *PostgresMetadataStore) DropMetadata(jobName string) error { - _, err := p.conn.Exec(p.ctx, ` + _, err := p.pool.Exec(p.ctx, ` DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+` WHERE job_name = $1 `, jobName) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index df031c9b3b..a99266e3a3 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -66,8 +66,7 @@ func NewS3Connector(ctx context.Context, return nil, fmt.Errorf("failed to create S3 client: %w", err) } metadataSchemaName := "peerdb_s3_metadata" - pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, - config.GetMetadataDb(), metadataSchemaName) + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, metadataSchemaName) if err != nil { slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) return nil, err @@ -89,8 +88,7 @@ func (c *S3Connector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.C } func (c *S3Connector) Close() error { - c.logger.Debug("Closing metadata store connection") - return c.pgMetadata.Close() + return nil } func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error { diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 2faa57aedc..34a9d4b008 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -62,13 +62,6 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { SecretAccessKey: &config.SecretAccessKey, Region: &config.Region, Endpoint: &endpoint, - MetadataDb: &protos.PostgresConfig{ - Host: "localhost", - Port: 7132, - Password: "postgres", - User: "postgres", - Database: "postgres", - }, }, bucketName, prefix, diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index d2856cc381..64c28d1693 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -663,8 +663,6 @@ fn parse_db_options( Some(config) } DbType::Eventhub => { - let conn_str = opts.get("metadata_db"); - let metadata_db = parse_metadata_db_info(conn_str.copied())?; let subscription_id = opts .get("subscription_id") .map(|s| s.to_string()) @@ -697,7 +695,6 @@ fn parse_db_options( .get("location") .context("location not specified")? .to_string(), - metadata_db, subscription_id, partition_count, message_retention_in_days, @@ -706,8 +703,6 @@ fn parse_db_options( Some(config) } DbType::S3 => { - let s3_conn_str = opts.get("metadata_db"); - let metadata_db = parse_metadata_db_info(s3_conn_str.copied())?; let s3_config = S3Config { url: opts .get("url") @@ -718,7 +713,6 @@ fn parse_db_options( region: opts.get("region").map(|s| s.to_string()), role_arn: opts.get("role_arn").map(|s| s.to_string()), endpoint: opts.get("endpoint").map(|s| s.to_string()), - metadata_db, }; let config = Config::S3Config(s3_config); Some(config) @@ -746,9 +740,6 @@ fn parse_db_options( Some(config) } DbType::EventhubGroup => { - let conn_str = opts.get("metadata_db"); - let metadata_db = parse_metadata_db_info(conn_str.copied())?; - // split comma separated list of columns and trim let unnest_columns = opts .get("unnest_columns") @@ -782,16 +773,12 @@ fn parse_db_options( let eventhub_group_config = pt::peerdb_peers::EventHubGroupConfig { eventhubs, - metadata_db, unnest_columns, }; let config = Config::EventhubGroupConfig(eventhub_group_config); Some(config) } DbType::Clickhouse => { - let conn_str = opts.get("metadata_db"); - let metadata_db = parse_metadata_db_info(conn_str.copied())?; - let s3_int = opts .get("s3_integration") .map(|s| s.to_string()) @@ -817,7 +804,6 @@ fn parse_db_options( .context("no default database specified")? .to_string(), s3_integration: s3_int, - metadata_db, }; let config = Config::ClickhouseConfig(clickhouse_config); Some(config) @@ -826,42 +812,3 @@ fn parse_db_options( Ok(config) } - -fn parse_metadata_db_info(conn_str: Option<&str>) -> anyhow::Result> { - let conn_str = match conn_str { - Some(conn_str) => conn_str, - None => return Ok(None), - }; - - if conn_str.is_empty() { - return Ok(None); - } - - let mut metadata_db = PostgresConfig::default(); - let param_pairs: Vec<&str> = conn_str.split_whitespace().collect(); - match param_pairs.len() { - 5 => Ok(true), - _ => Err(anyhow::Error::msg("Invalid connection string. Check formatting and if the required parameters have been specified.")), - }?; - - for pair in param_pairs { - let key_value: Vec<&str> = pair.trim().split('=').collect(); - match key_value.len() { - 2 => Ok(true), - _ => Err(anyhow::Error::msg( - "Invalid config setting for PG. Check the formatting", - )), - }?; - let value = key_value[1].to_string(); - match key_value[0] { - "host" => metadata_db.host = value, - "port" => metadata_db.port = value.parse().context("Invalid PG Port")?, - "database" => metadata_db.database = value, - "user" => metadata_db.user = value, - "password" => metadata_db.password = value, - _ => (), - }; - } - - Ok(Some(metadata_db)) -} diff --git a/protos/peers.proto b/protos/peers.proto index 351b11b8ce..aac2627a19 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -64,7 +64,6 @@ message EventHubConfig { string namespace = 1; string resource_group = 2; string location = 3; - PostgresConfig metadata_db = 4; // if this is empty PeerDB uses `AZURE_SUBSCRIPTION_ID` environment variable. string subscription_id = 5; // defaults to 3 @@ -76,7 +75,6 @@ message EventHubConfig { message EventHubGroupConfig { // event hub peer name to event hub config map eventhubs = 1; - PostgresConfig metadata_db = 2; repeated string unnest_columns = 3; } @@ -87,7 +85,6 @@ message S3Config { optional string role_arn = 4; optional string region = 5; optional string endpoint = 6; - PostgresConfig metadata_db = 7; } message ClickhouseConfig{ @@ -97,7 +94,6 @@ message ClickhouseConfig{ string password = 4; string database = 5; string s3_integration = 6; // staging to store avro files - PostgresConfig metadata_db = 7; } message SqlServerConfig { diff --git a/ui/app/peers/[peerName]/omitKeys.ts b/ui/app/peers/[peerName]/omitKeys.ts index e87f1501ca..3f23eab3b0 100644 --- a/ui/app/peers/[peerName]/omitKeys.ts +++ b/ui/app/peers/[peerName]/omitKeys.ts @@ -1,7 +1,7 @@ // sensitive keys const omitKeys = [ 'privateKey', // snowflake and bigquery - 'password', // postgres, metadatadb for non-dwh peers, snowflake + 'password', // postgres, snowflake 'secretAccessKey', // s3/gcs 'subscriptionId', // eventhub 'privateKeyId', // bigquery diff --git a/ui/app/peers/create/[peerType]/helpers/ch.ts b/ui/app/peers/create/[peerType]/helpers/ch.ts index ff8615267a..57eee3c39a 100644 --- a/ui/app/peers/create/[peerType]/helpers/ch.ts +++ b/ui/app/peers/create/[peerType]/helpers/ch.ts @@ -57,5 +57,4 @@ export const blankClickhouseSetting: ClickhouseConfig = { password: '', database: '', s3Integration: '', - metadataDb: undefined, }; diff --git a/ui/app/peers/create/[peerType]/helpers/s3.ts b/ui/app/peers/create/[peerType]/helpers/s3.ts index 1cd0f38d61..2fbf96ca44 100644 --- a/ui/app/peers/create/[peerType]/helpers/s3.ts +++ b/ui/app/peers/create/[peerType]/helpers/s3.ts @@ -45,15 +45,6 @@ export const s3Setting: PeerSetting[] = [ }, ]; -export const blankMetadata = { - host: '', - port: 5432, - user: 'postgres', - password: '', - database: 'postgres', - transactionSnapshot: '', -}; - export const blankS3Setting: S3Config = { url: 's3:///', accessKeyId: undefined, @@ -61,7 +52,4 @@ export const blankS3Setting: S3Config = { roleArn: undefined, region: undefined, endpoint: '', - // For Storage peers created in UI - // we use catalog as the metadata DB - metadataDb: blankMetadata, }; diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index 0bf959b904..fed2119b12 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -305,5 +305,4 @@ export const s3Schema = z.object({ invalid_type_error: 'Endpoint must be a string', }) .optional(), - metadataDb: pgSchema.optional(), }); diff --git a/ui/components/PeerForms/S3Form.tsx b/ui/components/PeerForms/S3Form.tsx index 8bd2bdc426..61cda2424b 100644 --- a/ui/components/PeerForms/S3Form.tsx +++ b/ui/components/PeerForms/S3Form.tsx @@ -1,15 +1,9 @@ 'use client'; -import { PeerConfig, PeerSetter } from '@/app/dto/PeersDTO'; -import { postgresSetting } from '@/app/peers/create/[peerType]/helpers/pg'; -import { - blankS3Setting, - s3Setting, -} from '@/app/peers/create/[peerType]/helpers/s3'; -import { PostgresConfig } from '@/grpc_generated/peers'; +import { PeerSetter } from '@/app/dto/PeersDTO'; +import { s3Setting } from '@/app/peers/create/[peerType]/helpers/s3'; import { Label } from '@/lib/Label'; import { RowWithRadiobutton, RowWithTextField } from '@/lib/Layout'; import { RadioButton, RadioButtonGroup } from '@/lib/RadioButtonGroup'; -import { Switch } from '@/lib/Switch'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; import { useEffect, useState } from 'react'; @@ -19,10 +13,6 @@ interface S3Props { setter: PeerSetter; } const S3Form = ({ setter }: S3Props) => { - const [showMetadata, setShowMetadata] = useState(false); - const [metadataDB, setMetadataDB] = useState( - blankS3Setting.metadataDb! - ); const [storageType, setStorageType] = useState<'S3' | 'GCS'>('S3'); const displayCondition = (label: string) => { return !( @@ -36,7 +26,6 @@ const S3Form = ({ setter }: S3Props) => { setter((prev) => { return { ...prev, - metadataDb: showMetadata ? (metadataDB as PostgresConfig) : undefined, endpoint, }; }); @@ -49,7 +38,7 @@ const S3Form = ({ setter }: S3Props) => { }; }); } - }, [metadataDB, storageType, setter, showMetadata]); + }, [storageType, setter]); return (
@@ -132,83 +121,6 @@ const S3Form = ({ setter }: S3Props) => { /> ); })} - - - -
- - setShowMetadata(state)} /> -
- {showMetadata && - postgresSetting.map( - (pgSetting, index) => - pgSetting.label !== 'Transaction Snapshot' && ( - - {pgSetting.label}{' '} - - - - - } - action={ -
- ) => - pgSetting.stateHandler(e.target.value, setMetadataDB) - } - defaultValue={ - (metadataDB as PostgresConfig)[ - pgSetting.label.toLowerCase() as - | 'host' - | 'port' - | 'user' - | 'password' - | 'database' - ] || '' - } - /> - {pgSetting.tips && ( - - )} -
- } - /> - ) - )}
); };