From e2be383367fcf3fe5fbb8c18857eb98187fae008 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 9 May 2024 15:27:55 +0530 Subject: [PATCH] Query layer: support ssh fields, sync interval, simplify code path (#1704) - Adds parsing for SSHConfig fields in Create Peer command for Postgres ```sql CREATE PEER postgres_peer FROM POSTGRES WITH ( host = '', port = 5432, user = '', password = '', database = '', ssh_config = '{ "host": "", "port": 22, "user": "", "password": "", "private_key": "" }' ); ``` - Also you can now specify `sync_interval` in the create mirror command - Remove catalog flows table entry and update operations in query layer side, as we are anyways hitting a grpc endpoint which takes care of that. This makes the create_catalog_entry flag in the request to create/validate mirror endpoints redundant, so that is removed from route.proto - Fixes a bug in validate mirror where select * from sourcetables limit 0 was not checked for empty publication Functionally tested via query layer and UI --- flow/cmd/handler.go | 12 ++--- flow/cmd/validate_mirror.go | 28 +++++----- flow/connectors/postgres/postgres.go | 2 +- flow/connectors/postgres/validate.go | 33 ++++++------ nexus/analyzer/src/lib.rs | 27 ++++++++-- nexus/analyzer/src/qrep.rs | 5 +- nexus/catalog/src/lib.rs | 66 +----------------------- nexus/flow-rs/src/grpc.rs | 3 +- nexus/pt/src/flow_model.rs | 1 + nexus/server/src/main.rs | 22 +------- protos/route.proto | 1 - ui/app/api/mirrors/cdc/route.ts | 1 - ui/app/api/mirrors/cdc/validate/route.ts | 1 - 13 files changed, 72 insertions(+), 130 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index a7538a814e..22865042d6 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -154,15 +154,13 @@ func (h *FlowRequestHandler) CreateCDCFlow( req.ConnectionConfigs.SyncedAtColName = strings.ToUpper(req.ConnectionConfigs.SyncedAtColName) } - if req.CreateCatalogEntry { - err := h.createCdcJobEntry(ctx, req, workflowID) - if err != nil { - slog.Error("unable to create flow job entry", slog.Any("error", err)) - return nil, fmt.Errorf("unable to create flow job entry: %w", err) - } + err := h.createCdcJobEntry(ctx, req, workflowID) + if err != nil { + slog.Error("unable to create flow job entry", slog.Any("error", err)) + return nil, fmt.Errorf("unable to create flow job entry: %w", err) } - err := h.updateFlowConfigInCatalog(ctx, cfg) + err = h.updateFlowConfigInCatalog(ctx, cfg) if err != nil { slog.Error("unable to update flow config in catalog", slog.Any("error", err)) return nil, fmt.Errorf("unable to update flow config in catalog: %w", err) diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 89e34d8ee4..b4304b0504 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -17,7 +17,7 @@ import ( func (h *FlowRequestHandler) ValidateCDCMirror( ctx context.Context, req *protos.CreateCDCFlowRequest, ) (*protos.ValidateCDCMirrorResponse, error) { - if req.CreateCatalogEntry && !req.ConnectionConfigs.Resync { + if !req.ConnectionConfigs.Resync { mirrorExists, existCheckErr := h.CheckIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName) if existCheckErr != nil { slog.Error("/validatecdc failed to check if mirror name exists", slog.Any("error", existCheckErr)) @@ -46,7 +46,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror( sourcePeerConfig := req.ConnectionConfigs.Source.GetPostgresConfig() if sourcePeerConfig == nil { slog.Error("/validatecdc source peer config is nil", slog.Any("peer", req.ConnectionConfigs.Source)) - return nil, errors.New("source peer config is nil") + return &protos.ValidateCDCMirrorResponse{ + Ok: false, + }, errors.New("source peer config is nil") } pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig) @@ -103,17 +105,17 @@ func (h *FlowRequestHandler) ValidateCDCMirror( } pubName := req.ConnectionConfigs.PublicationName - if pubName != "" { - err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName) - if err != nil { - displayErr := fmt.Errorf("provided source tables invalidated: %v", err) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), - ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr - } + + err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName) + if err != nil { + displayErr := fmt.Errorf("provided source tables invalidated: %v", err) + slog.Error(displayErr.Error()) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, + fmt.Sprint(displayErr), + ) + return &protos.ValidateCDCMirrorResponse{ + Ok: false, + }, displayErr } return &protos.ValidateCDCMirrorResponse{ diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 03ee0c02b1..3642289756 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -62,11 +62,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) // create a separate connection pool for non-replication queries as replication connections cannot // be used for extended query protocol, i.e. prepared statements connConfig, err := pgx.ParseConfig(connectionString) - replConfig := connConfig.Copy() if err != nil { return nil, fmt.Errorf("failed to parse connection string: %w", err) } + replConfig := connConfig.Copy() runtimeParams := connConfig.Config.RuntimeParams runtimeParams["idle_in_transaction_session_timeout"] = "0" runtimeParams["statement_timeout"] = "0" diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 8e715278d8..0b16e8d0c6 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -34,28 +34,31 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, } tableStr := strings.Join(tableArr, ",") - // Check if publication exists - err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) - if err != nil { - if err == pgx.ErrNoRows { - return fmt.Errorf("publication does not exist: %s", pubName) + + if pubName != "" { + // Check if publication exists + err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) + if err != nil { + if err == pgx.ErrNoRows { + return fmt.Errorf("publication does not exist: %s", pubName) + } + return fmt.Errorf("error while checking for publication existence: %w", err) } - return fmt.Errorf("error while checking for publication existence: %w", err) - } - // Check if tables belong to publication - var pubTableCount int - err = c.conn.QueryRow(ctx, fmt.Sprintf(` + // Check if tables belong to publication + var pubTableCount int + err = c.conn.QueryRow(ctx, fmt.Sprintf(` with source_table_components (sname, tname) as (values %s) select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables INNER JOIN source_table_components stc ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount) - if err != nil { - return err - } + if err != nil { + return err + } - if pubTableCount != len(tableNames) { - return errors.New("not all tables belong to publication") + if pubTableCount != len(tableNames) { + return errors.New("not all tables belong to publication") + } } return nil diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index b0ff741612..4548d2eec9 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -11,7 +11,7 @@ use pt::{ peerdb_peers::{ peer::Config, BigqueryConfig, ClickhouseConfig, DbType, EventHubConfig, GcpServiceAccount, KafkaConfig, MongoConfig, Peer, PostgresConfig, PubSubConfig, S3Config, SnowflakeConfig, - SqlServerConfig, + SqlServerConfig, SshConfig, }, }; use qrep::process_options; @@ -300,6 +300,11 @@ impl StatementAnalyzer for PeerDDLAnalyzer { _ => None, }; + let sync_interval: Option = match raw_options.remove("sync_interval") { + Some(Expr::Value(ast::Value::Number(n, _))) => Some(n.parse::()?), + _ => None, + }; + let soft_delete_col_name: Option = match raw_options .remove("soft_delete_col_name") { @@ -347,6 +352,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { push_batch_size, push_parallelism, max_batch_size, + sync_interval, resync, soft_delete_col_name, synced_at_col_name, @@ -646,6 +652,19 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu Config::MongoConfig(mongo_config) } DbType::Postgres => { + let ssh_fields: Option = match opts.get("ssh_config") { + Some(ssh_config) => { + let ssh_config_str = ssh_config.to_string(); + if ssh_config_str.is_empty() { + None + } else { + serde_json::from_str(&ssh_config_str) + .context("failed to deserialize ssh_config")? + } + } + None => None, + }; + let postgres_config = PostgresConfig { host: opts.get("host").context("no host specified")?.to_string(), port: opts @@ -667,8 +686,9 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .to_string(), metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()), transaction_snapshot: "".to_string(), - ssh_config: None, + ssh_config: ssh_fields, }; + Config::PostgresConfig(postgres_config) } DbType::S3 => { @@ -744,7 +764,8 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .unwrap_or_default(), disable_tls: opts .get("disable_tls") - .map(|s| s.parse::().unwrap_or_default()).unwrap_or_default(), + .map(|s| s.parse::().unwrap_or_default()) + .unwrap_or_default(), endpoint: opts.get("endpoint").map(|s| s.to_string()), }; Config::ClickhouseConfig(clickhouse_config) diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index 41ab1aa40a..e1348f0fc6 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -196,7 +196,10 @@ pub fn process_options( // If mode is upsert, we need unique key columns if opts.get("mode") == Some(&Value::String(String::from("upsert"))) - && opts.get("unique_key_columns").map(|ukc| ukc == &Value::Array(Vec::new())).unwrap_or(true) + && opts + .get("unique_key_columns") + .map(|ukc| ukc == &Value::Array(Vec::new())) + .unwrap_or(true) { anyhow::bail!("For upsert mode, unique_key_columns must be specified"); } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index e8a569f669..f498992a9f 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -6,7 +6,7 @@ use peer_postgres::{self, ast}; use pgwire::error::PgWireResult; use postgres_connection::{connect_postgres, get_pg_connection_string}; use pt::{ - flow_model::{FlowJob, QRepFlowJob}, + flow_model::QRepFlowJob, peerdb_peers::PostgresConfig, peerdb_peers::{peer::Config, DbType, Peer}, prost::Message, @@ -332,70 +332,6 @@ impl Catalog { }) } - async fn normalize_schema_for_table_identifier( - &self, - table_identifier: &str, - peer_id: i32, - ) -> anyhow::Result { - let peer_dbtype = self.get_peer_type_for_id(peer_id).await?; - - if !table_identifier.contains('.') && peer_dbtype != DbType::Bigquery { - Ok(format!("public.{}", table_identifier)) - } else { - Ok(String::from(table_identifier)) - } - } - - pub async fn create_cdc_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> { - let source_peer_id = self - .get_peer_id_i32(&job.source_peer) - .await - .context("unable to get source peer id")?; - let destination_peer_id = self - .get_peer_id_i32(&job.target_peer) - .await - .context("unable to get destination peer id")?; - - let stmt = self - .pg - .prepare_typed( - "INSERT INTO flows (name, source_peer, destination_peer, description, - source_table_identifier, destination_table_identifier) VALUES ($1, $2, $3, $4, $5, $6)", - &[types::Type::TEXT, types::Type::INT4, types::Type::INT4, types::Type::TEXT, - types::Type::TEXT, types::Type::TEXT], - ) - .await?; - - for table_mapping in &job.table_mappings { - let _rows = self - .pg - .execute( - &stmt, - &[ - &job.name, - &source_peer_id, - &destination_peer_id, - &job.description, - &self - .normalize_schema_for_table_identifier( - &table_mapping.source_table_identifier, - source_peer_id, - ) - .await?, - &self - .normalize_schema_for_table_identifier( - &table_mapping.destination_table_identifier, - destination_peer_id, - ) - .await?, - ], - ) - .await?; - } - - Ok(()) - } - pub async fn get_qrep_flow_job_by_name( &self, job_name: &str, diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 70c463e44c..a07115c1ec 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -79,7 +79,6 @@ impl FlowGrpcClient { ) -> anyhow::Result { let create_peer_flow_req = pt::peerdb_route::CreateCdcFlowRequest { connection_configs: Some(peer_flow_config), - create_catalog_entry: false, }; let response = self.client.create_cdc_flow(create_peer_flow_req).await?; let workflow_id = response.into_inner().workflow_id; @@ -176,7 +175,7 @@ impl FlowGrpcClient { initial_snapshot_only: job.initial_snapshot_only, script: job.script.clone(), system: system as i32, - ..Default::default() + idle_timeout_seconds: job.sync_interval.unwrap_or_default(), }; self.start_peer_flow(flow_conn_cfg).await diff --git a/nexus/pt/src/flow_model.rs b/nexus/pt/src/flow_model.rs index 8dd40edc42..ce4d47108e 100644 --- a/nexus/pt/src/flow_model.rs +++ b/nexus/pt/src/flow_model.rs @@ -30,6 +30,7 @@ pub struct FlowJob { pub push_parallelism: Option, pub push_batch_size: Option, pub max_batch_size: Option, + pub sync_interval: Option, pub resync: bool, pub soft_delete_col_name: Option, pub synced_at_col_name: Option, diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 176928a71b..93d1c5af14 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -413,15 +413,6 @@ impl NexusBackend { } } - self.catalog - .create_cdc_flow_job_entry(flow_job) - .await - .map_err(|err| { - PgWireError::ApiError( - format!("unable to create mirror job entry: {:?}", err).into(), - ) - })?; - // get source and destination peers let (src_peer, dst_peer) = join!( Self::get_peer_of_mirror(self.catalog.as_ref(), &flow_job.source_peer), @@ -432,21 +423,12 @@ impl NexusBackend { // make a request to the flow service to start the job. let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; - let workflow_id = flow_handler + flow_handler .start_peer_flow_job(flow_job, src_peer, dst_peer) .await .map_err(|err| { PgWireError::ApiError( - format!("unable to submit job: {:?}", err).into(), - ) - })?; - - self.catalog - .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) - .await - .map_err(|err| { - PgWireError::ApiError( - format!("unable to save job metadata: {:?}", err).into(), + format!("unable to submit job: {:?}", err.to_string()).into(), ) })?; diff --git a/protos/route.proto b/protos/route.proto index 180576fc1b..61a3b89b75 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -10,7 +10,6 @@ package peerdb_route; message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; - bool create_catalog_entry = 2; } message CreateCDCFlowResponse { diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts index d83c8d398b..c7c7bde379 100644 --- a/ui/app/api/mirrors/cdc/route.ts +++ b/ui/app/api/mirrors/cdc/route.ts @@ -12,7 +12,6 @@ export async function POST(request: Request) { const flowServiceAddr = GetFlowHttpAddressFromEnv(); const req: CreateCDCFlowRequest = { connectionConfigs: config, - createCatalogEntry: true, }; try { const createStatus: CreateCDCFlowResponse = await fetch( diff --git a/ui/app/api/mirrors/cdc/validate/route.ts b/ui/app/api/mirrors/cdc/validate/route.ts index 83107dcbc7..8354dbef88 100644 --- a/ui/app/api/mirrors/cdc/validate/route.ts +++ b/ui/app/api/mirrors/cdc/validate/route.ts @@ -11,7 +11,6 @@ export async function POST(request: NextRequest) { const flowServiceAddr = GetFlowHttpAddressFromEnv(); const req: CreateCDCFlowRequest = { connectionConfigs: config, - createCatalogEntry: false, }; try { const validateResponse: ValidateCDCMirrorResponse = await fetch(