diff --git a/dev-peerdb.sh b/dev-peerdb.sh index 7c32151845..fb5da35f1b 100755 --- a/dev-peerdb.sh +++ b/dev-peerdb.sh @@ -1,33 +1,27 @@ #!/bin/sh -if test -z "$USE_PODMAN" +set -Eeu + +DOCKER="docker" +EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui" + +if test -n "${USE_PODMAN:=}" then - if ! command -v docker &> /dev/null - then - if command -v podman-compose - then - echo "docker could not be found on PATH, using podman-compose" + # 0 is found, checking for not found so we check for podman then + if $(docker compose &>/dev/null) && [ $? -ne 0 ]; then + if $(podman compose &>/dev/null) && [ $? -eq 0 ]; then + echo "docker could not be found on PATH, using podman compose" USE_PODMAN=1 else - echo "docker could not be found on PATH" + echo "docker compose could not be found on PATH" exit 1 fi fi fi -if test -z "$USE_PODMAN" -then - DOCKER="docker compose" - EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui" -else - DOCKER="podman-compose --podman-run-args=--replace" - EXTRA_ARGS="" -fi - -# check if peerdb_network exists if not create it -if ! $DOCKER network inspect peerdb_network &> /dev/null -then - $DOCKER network create peerdb_network +if test -n "$USE_PODMAN"; then + DOCKER="podman" + EXTRA_ARGS="--podman-run-args=--replace" fi export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD) -exec $DOCKER -f docker-compose-dev.yml up --build $EXTRA_ARGS +exec $DOCKER compose -f docker-compose-dev.yml up --build $EXTRA_ARGS diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index d7d93f6de9..1868c755bf 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -221,5 +221,4 @@ volumes: networks: default: - external: true name: peerdb_network diff --git a/docker-compose.yml b/docker-compose.yml index 42fa2f26fc..fb144173aa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -192,5 +192,4 @@ volumes: networks: default: - external: true name: peerdb_network diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index a7538a814e..41dd04b4e7 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -142,27 +142,19 @@ func (h *FlowRequestHandler) CreateCDCFlow( if req.ConnectionConfigs.SoftDeleteColName == "" { req.ConnectionConfigs.SoftDeleteColName = "_PEERDB_IS_DELETED" - } else { - // make them all uppercase - req.ConnectionConfigs.SoftDeleteColName = strings.ToUpper(req.ConnectionConfigs.SoftDeleteColName) } if req.ConnectionConfigs.SyncedAtColName == "" { req.ConnectionConfigs.SyncedAtColName = "_PEERDB_SYNCED_AT" - } else { - // make them all uppercase - req.ConnectionConfigs.SyncedAtColName = strings.ToUpper(req.ConnectionConfigs.SyncedAtColName) } - if req.CreateCatalogEntry { - err := h.createCdcJobEntry(ctx, req, workflowID) - if err != nil { - slog.Error("unable to create flow job entry", slog.Any("error", err)) - return nil, fmt.Errorf("unable to create flow job entry: %w", err) - } + err := h.createCdcJobEntry(ctx, req, workflowID) + if err != nil { + slog.Error("unable to create flow job entry", slog.Any("error", err)) + return nil, fmt.Errorf("unable to create flow job entry: %w", err) } - err := h.updateFlowConfigInCatalog(ctx, cfg) + err = h.updateFlowConfigInCatalog(ctx, cfg) if err != nil { slog.Error("unable to update flow config in catalog", slog.Any("error", err)) return nil, fmt.Errorf("unable to update flow config in catalog: %w", err) @@ -258,10 +250,8 @@ func (h *FlowRequestHandler) CreateQRepFlow( if req.QrepConfig.SyncedAtColName == "" { cfg.SyncedAtColName = "_PEERDB_SYNCED_AT" - } else { - // make them all uppercase - cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName) } + _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state) if err != nil { slog.Error("unable to start QRepFlow workflow", diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 89e34d8ee4..b4304b0504 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -17,7 +17,7 @@ import ( func (h *FlowRequestHandler) ValidateCDCMirror( ctx context.Context, req *protos.CreateCDCFlowRequest, ) (*protos.ValidateCDCMirrorResponse, error) { - if req.CreateCatalogEntry && !req.ConnectionConfigs.Resync { + if !req.ConnectionConfigs.Resync { mirrorExists, existCheckErr := h.CheckIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName) if existCheckErr != nil { slog.Error("/validatecdc failed to check if mirror name exists", slog.Any("error", existCheckErr)) @@ -46,7 +46,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror( sourcePeerConfig := req.ConnectionConfigs.Source.GetPostgresConfig() if sourcePeerConfig == nil { slog.Error("/validatecdc source peer config is nil", slog.Any("peer", req.ConnectionConfigs.Source)) - return nil, errors.New("source peer config is nil") + return &protos.ValidateCDCMirrorResponse{ + Ok: false, + }, errors.New("source peer config is nil") } pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig) @@ -103,17 +105,17 @@ func (h *FlowRequestHandler) ValidateCDCMirror( } pubName := req.ConnectionConfigs.PublicationName - if pubName != "" { - err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName) - if err != nil { - displayErr := fmt.Errorf("provided source tables invalidated: %v", err) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), - ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr - } + + err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName) + if err != nil { + displayErr := fmt.Errorf("provided source tables invalidated: %v", err) + slog.Error(displayErr.Error()) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, + fmt.Sprint(displayErr), + ) + return &protos.ValidateCDCMirrorResponse{ + Ok: false, + }, displayErr } return &protos.ValidateCDCMirrorResponse{ diff --git a/flow/connectors/connelasticsearch/elasticsearch.go b/flow/connectors/connelasticsearch/elasticsearch.go index 8d5f092790..f9f6c1819f 100644 --- a/flow/connectors/connelasticsearch/elasticsearch.go +++ b/flow/connectors/connelasticsearch/elasticsearch.go @@ -121,11 +121,8 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems], ) (*model.SyncResponse, error) { tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) - // atomics for counts will be unnecessary in other destinations, using a mutex instead - var recordCountsUpdateMutex sync.Mutex - // we're taking a mutex anyway, avoid atomic var lastSeenLSN atomic.Int64 - var numRecords atomic.Int64 + var numRecords int64 // no I don't like this either esBulkIndexerCache := make(map[string]esutil.BulkIndexer) @@ -133,13 +130,14 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context, // true if we saw errors while closing cacheCloser := func() bool { closeHasErrors := false - if bulkIndexersHaveShutdown { + if !bulkIndexersHaveShutdown { for _, esBulkIndexer := range maps.Values(esBulkIndexerCache) { err := esBulkIndexer.Close(context.Background()) if err != nil { esc.logger.Error("[es] failed to close bulk indexer", slog.Any("error", err)) closeHasErrors = true } + numRecords += int64(esBulkIndexer.Stats().NumFlushed) } bulkIndexersHaveShutdown = true } @@ -237,9 +235,6 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context, OnSuccess: func(_ context.Context, _ esutil.BulkIndexerItem, _ esutil.BulkIndexerResponseItem) { shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID()) - numRecords.Add(1) - recordCountsUpdateMutex.Lock() - defer recordCountsUpdateMutex.Unlock() record.PopulateCountMap(tableNameRowsMapping) }, // OnFailure is called for each failed operation, log and let parent handle @@ -284,7 +279,6 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context, esc.logger.Error("[es] failed to close bulk indexer(s)") return nil, errors.New("[es] failed to close bulk indexer(s)") } - bulkIndexersHaveShutdown = true if len(bulkIndexErrors) > 0 { for _, err := range bulkIndexErrors { esc.logger.Error("[es] failed to index record", slog.Any("err", err)) @@ -299,7 +293,7 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context, return &model.SyncResponse{ CurrentSyncBatchID: req.SyncBatchID, LastSyncedCheckpointID: lastCheckpoint, - NumRecordsSynced: numRecords.Load(), + NumRecordsSynced: numRecords, TableNameRowsMapping: tableNameRowsMapping, TableSchemaDeltas: req.Records.SchemaDeltas, }, nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 03ee0c02b1..3642289756 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -62,11 +62,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) // create a separate connection pool for non-replication queries as replication connections cannot // be used for extended query protocol, i.e. prepared statements connConfig, err := pgx.ParseConfig(connectionString) - replConfig := connConfig.Copy() if err != nil { return nil, fmt.Errorf("failed to parse connection string: %w", err) } + replConfig := connConfig.Copy() runtimeParams := connConfig.Config.RuntimeParams runtimeParams["idle_in_transaction_session_timeout"] = "0" runtimeParams["statement_timeout"] = "0" diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 8e715278d8..0b16e8d0c6 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -34,28 +34,31 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, } tableStr := strings.Join(tableArr, ",") - // Check if publication exists - err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) - if err != nil { - if err == pgx.ErrNoRows { - return fmt.Errorf("publication does not exist: %s", pubName) + + if pubName != "" { + // Check if publication exists + err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) + if err != nil { + if err == pgx.ErrNoRows { + return fmt.Errorf("publication does not exist: %s", pubName) + } + return fmt.Errorf("error while checking for publication existence: %w", err) } - return fmt.Errorf("error while checking for publication existence: %w", err) - } - // Check if tables belong to publication - var pubTableCount int - err = c.conn.QueryRow(ctx, fmt.Sprintf(` + // Check if tables belong to publication + var pubTableCount int + err = c.conn.QueryRow(ctx, fmt.Sprintf(` with source_table_components (sname, tname) as (values %s) select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables INNER JOIN source_table_components stc ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount) - if err != nil { - return err - } + if err != nil { + return err + } - if pubTableCount != len(tableNames) { - return errors.New("not all tables belong to publication") + if pubTableCount != len(tableNames) { + return errors.New("not all tables belong to publication") + } } return nil diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 7ecf248c0a..16f65655c5 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -123,14 +123,16 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa }() for destinationTableName, rowCounts := range tableNameRowsMapping { - numRows := rowCounts.InsertCount + rowCounts.UpdateCount + rowCounts.DeleteCount + inserts := rowCounts.InsertCount.Load() + updates := rowCounts.UpdateCount.Load() + deletes := rowCounts.DeleteCount.Load() _, err = insertBatchTablesTx.Exec(ctx, `INSERT INTO peerdb_stats.cdc_batch_table (flow_name,batch_id,destination_table_name,num_rows, insert_count,update_count,delete_count) VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING`, - flowJobName, batchID, destinationTableName, numRows, - rowCounts.InsertCount, rowCounts.UpdateCount, rowCounts.DeleteCount) + flowJobName, batchID, destinationTableName, + inserts+updates+deletes, inserts, updates, deletes) if err != nil { return fmt.Errorf("error while inserting statistics into cdc_batch_table: %w", err) } diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 483d0f58ee..00688cc2ce 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -130,11 +130,7 @@ func recordToQRecordOrError[Items model.Items](batchID int64, record model.Recor func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts { tableNameRowsMapping := make(map[string]*model.RecordTypeCounts, len(tableMaps)) for _, mapping := range tableMaps { - tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{ - InsertCount: 0, - UpdateCount: 0, - DeleteCount: 0, - } + tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{} } return tableNameRowsMapping diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index da4f036b0a..303fa0c8ea 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1110,8 +1110,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, - SoftDeleteColName: "_PEERDB_IS_DELETED", - SyncedAtColName: "_PEERDB_SYNCED_AT", + SoftDeleteColName: "_custom_deleted", + SyncedAtColName: "_custom_synced", MaxBatchSize: 100, } @@ -1141,7 +1141,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_softdel_iud", "id,c1,c2,t") e2e.EnvWaitFor(s.t, env, 3*time.Minute, "checking soft delete", func() bool { newerSyncedAtQuery := fmt.Sprintf( - "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", + "SELECT COUNT(*) FROM `%s.%s` WHERE _custom_deleted", s.bqHelper.Config.DatasetId, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) e2e.EnvNoError(s.t, env, err) diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 0a0c26d2e4..576074db3b 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -1,13 +1,15 @@ package model import ( + "sync/atomic" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) type RecordTypeCounts struct { - InsertCount int - UpdateCount int - DeleteCount int + InsertCount atomic.Int32 + UpdateCount atomic.Int32 + DeleteCount atomic.Int32 } type QRecordStream struct { diff --git a/flow/model/record.go b/flow/model/record.go index 30f3493cf4..9b728ff705 100644 --- a/flow/model/record.go +++ b/flow/model/record.go @@ -58,7 +58,7 @@ func (r *InsertRecord[T]) GetItems() T { func (r *InsertRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { recordCount, ok := mapOfCounts[r.DestinationTableName] if ok { - recordCount.InsertCount++ + recordCount.InsertCount.Add(1) } } @@ -91,7 +91,7 @@ func (r *UpdateRecord[T]) GetItems() T { func (r *UpdateRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { recordCount, ok := mapOfCounts[r.DestinationTableName] if ok { - recordCount.UpdateCount++ + recordCount.UpdateCount.Add(1) } } @@ -122,7 +122,7 @@ func (r *DeleteRecord[T]) GetItems() T { func (r *DeleteRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { recordCount, ok := mapOfCounts[r.DestinationTableName] if ok { - recordCount.DeleteCount++ + recordCount.DeleteCount.Add(1) } } diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index b0ff741612..4548d2eec9 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -11,7 +11,7 @@ use pt::{ peerdb_peers::{ peer::Config, BigqueryConfig, ClickhouseConfig, DbType, EventHubConfig, GcpServiceAccount, KafkaConfig, MongoConfig, Peer, PostgresConfig, PubSubConfig, S3Config, SnowflakeConfig, - SqlServerConfig, + SqlServerConfig, SshConfig, }, }; use qrep::process_options; @@ -300,6 +300,11 @@ impl StatementAnalyzer for PeerDDLAnalyzer { _ => None, }; + let sync_interval: Option = match raw_options.remove("sync_interval") { + Some(Expr::Value(ast::Value::Number(n, _))) => Some(n.parse::()?), + _ => None, + }; + let soft_delete_col_name: Option = match raw_options .remove("soft_delete_col_name") { @@ -347,6 +352,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { push_batch_size, push_parallelism, max_batch_size, + sync_interval, resync, soft_delete_col_name, synced_at_col_name, @@ -646,6 +652,19 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu Config::MongoConfig(mongo_config) } DbType::Postgres => { + let ssh_fields: Option = match opts.get("ssh_config") { + Some(ssh_config) => { + let ssh_config_str = ssh_config.to_string(); + if ssh_config_str.is_empty() { + None + } else { + serde_json::from_str(&ssh_config_str) + .context("failed to deserialize ssh_config")? + } + } + None => None, + }; + let postgres_config = PostgresConfig { host: opts.get("host").context("no host specified")?.to_string(), port: opts @@ -667,8 +686,9 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .to_string(), metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()), transaction_snapshot: "".to_string(), - ssh_config: None, + ssh_config: ssh_fields, }; + Config::PostgresConfig(postgres_config) } DbType::S3 => { @@ -744,7 +764,8 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .unwrap_or_default(), disable_tls: opts .get("disable_tls") - .map(|s| s.parse::().unwrap_or_default()).unwrap_or_default(), + .map(|s| s.parse::().unwrap_or_default()) + .unwrap_or_default(), endpoint: opts.get("endpoint").map(|s| s.to_string()), }; Config::ClickhouseConfig(clickhouse_config) diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index 41ab1aa40a..e1348f0fc6 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -196,7 +196,10 @@ pub fn process_options( // If mode is upsert, we need unique key columns if opts.get("mode") == Some(&Value::String(String::from("upsert"))) - && opts.get("unique_key_columns").map(|ukc| ukc == &Value::Array(Vec::new())).unwrap_or(true) + && opts + .get("unique_key_columns") + .map(|ukc| ukc == &Value::Array(Vec::new())) + .unwrap_or(true) { anyhow::bail!("For upsert mode, unique_key_columns must be specified"); } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index e8a569f669..f498992a9f 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -6,7 +6,7 @@ use peer_postgres::{self, ast}; use pgwire::error::PgWireResult; use postgres_connection::{connect_postgres, get_pg_connection_string}; use pt::{ - flow_model::{FlowJob, QRepFlowJob}, + flow_model::QRepFlowJob, peerdb_peers::PostgresConfig, peerdb_peers::{peer::Config, DbType, Peer}, prost::Message, @@ -332,70 +332,6 @@ impl Catalog { }) } - async fn normalize_schema_for_table_identifier( - &self, - table_identifier: &str, - peer_id: i32, - ) -> anyhow::Result { - let peer_dbtype = self.get_peer_type_for_id(peer_id).await?; - - if !table_identifier.contains('.') && peer_dbtype != DbType::Bigquery { - Ok(format!("public.{}", table_identifier)) - } else { - Ok(String::from(table_identifier)) - } - } - - pub async fn create_cdc_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> { - let source_peer_id = self - .get_peer_id_i32(&job.source_peer) - .await - .context("unable to get source peer id")?; - let destination_peer_id = self - .get_peer_id_i32(&job.target_peer) - .await - .context("unable to get destination peer id")?; - - let stmt = self - .pg - .prepare_typed( - "INSERT INTO flows (name, source_peer, destination_peer, description, - source_table_identifier, destination_table_identifier) VALUES ($1, $2, $3, $4, $5, $6)", - &[types::Type::TEXT, types::Type::INT4, types::Type::INT4, types::Type::TEXT, - types::Type::TEXT, types::Type::TEXT], - ) - .await?; - - for table_mapping in &job.table_mappings { - let _rows = self - .pg - .execute( - &stmt, - &[ - &job.name, - &source_peer_id, - &destination_peer_id, - &job.description, - &self - .normalize_schema_for_table_identifier( - &table_mapping.source_table_identifier, - source_peer_id, - ) - .await?, - &self - .normalize_schema_for_table_identifier( - &table_mapping.destination_table_identifier, - destination_peer_id, - ) - .await?, - ], - ) - .await?; - } - - Ok(()) - } - pub async fn get_qrep_flow_job_by_name( &self, job_name: &str, diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 70c463e44c..a07115c1ec 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -79,7 +79,6 @@ impl FlowGrpcClient { ) -> anyhow::Result { let create_peer_flow_req = pt::peerdb_route::CreateCdcFlowRequest { connection_configs: Some(peer_flow_config), - create_catalog_entry: false, }; let response = self.client.create_cdc_flow(create_peer_flow_req).await?; let workflow_id = response.into_inner().workflow_id; @@ -176,7 +175,7 @@ impl FlowGrpcClient { initial_snapshot_only: job.initial_snapshot_only, script: job.script.clone(), system: system as i32, - ..Default::default() + idle_timeout_seconds: job.sync_interval.unwrap_or_default(), }; self.start_peer_flow(flow_conn_cfg).await diff --git a/nexus/pt/src/flow_model.rs b/nexus/pt/src/flow_model.rs index 8dd40edc42..ce4d47108e 100644 --- a/nexus/pt/src/flow_model.rs +++ b/nexus/pt/src/flow_model.rs @@ -30,6 +30,7 @@ pub struct FlowJob { pub push_parallelism: Option, pub push_batch_size: Option, pub max_batch_size: Option, + pub sync_interval: Option, pub resync: bool, pub soft_delete_col_name: Option, pub synced_at_col_name: Option, diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 176928a71b..93d1c5af14 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -413,15 +413,6 @@ impl NexusBackend { } } - self.catalog - .create_cdc_flow_job_entry(flow_job) - .await - .map_err(|err| { - PgWireError::ApiError( - format!("unable to create mirror job entry: {:?}", err).into(), - ) - })?; - // get source and destination peers let (src_peer, dst_peer) = join!( Self::get_peer_of_mirror(self.catalog.as_ref(), &flow_job.source_peer), @@ -432,21 +423,12 @@ impl NexusBackend { // make a request to the flow service to start the job. let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; - let workflow_id = flow_handler + flow_handler .start_peer_flow_job(flow_job, src_peer, dst_peer) .await .map_err(|err| { PgWireError::ApiError( - format!("unable to submit job: {:?}", err).into(), - ) - })?; - - self.catalog - .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) - .await - .map_err(|err| { - PgWireError::ApiError( - format!("unable to save job metadata: {:?}", err).into(), + format!("unable to submit job: {:?}", err.to_string()).into(), ) })?; diff --git a/protos/route.proto b/protos/route.proto index 180576fc1b..61a3b89b75 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -10,7 +10,6 @@ package peerdb_route; message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; - bool create_catalog_entry = 2; } message CreateCDCFlowResponse { diff --git a/run-peerdb.sh b/run-peerdb.sh index 002bf93679..91bce75a02 100755 --- a/run-peerdb.sh +++ b/run-peerdb.sh @@ -1,17 +1,24 @@ #!/bin/sh set -Eeu -if ! command -v docker &> /dev/null +DOCKER="docker" + +if test -n "${USE_PODMAN:=}" then - echo "docker could not be found on PATH" - exit 1 + if ! (command -v docker &> /dev/null); then + if (command -v podman &> /dev/null); then + echo "docker could not be found on PATH, using podman" + USE_PODMAN=1 + else + echo "docker could not be found on PATH" + exit 1 + fi + fi fi -# check if peerdb_network exists if not create it -if ! docker network inspect peerdb_network &> /dev/null -then - docker network create peerdb_network +if test -n "$USE_PODMAN"; then + DOCKER="podman" fi -docker compose pull -docker compose -f docker-compose.yml up --no-attach catalog --no-attach temporal --no-attach temporal-ui --no-attach temporal-admin-tools +$DOCKER compose pull +exec $DOCKER compose -f docker-compose.yml up --no-attach catalog --no-attach temporal --no-attach temporal-ui --no-attach temporal-admin-tools diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts index d83c8d398b..c7c7bde379 100644 --- a/ui/app/api/mirrors/cdc/route.ts +++ b/ui/app/api/mirrors/cdc/route.ts @@ -12,7 +12,6 @@ export async function POST(request: Request) { const flowServiceAddr = GetFlowHttpAddressFromEnv(); const req: CreateCDCFlowRequest = { connectionConfigs: config, - createCatalogEntry: true, }; try { const createStatus: CreateCDCFlowResponse = await fetch( diff --git a/ui/app/api/mirrors/cdc/validate/route.ts b/ui/app/api/mirrors/cdc/validate/route.ts index 83107dcbc7..8354dbef88 100644 --- a/ui/app/api/mirrors/cdc/validate/route.ts +++ b/ui/app/api/mirrors/cdc/validate/route.ts @@ -11,7 +11,6 @@ export async function POST(request: NextRequest) { const flowServiceAddr = GetFlowHttpAddressFromEnv(); const req: CreateCDCFlowRequest = { connectionConfigs: config, - createCatalogEntry: false, }; try { const validateResponse: ValidateCDCMirrorResponse = await fetch( diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index d56b6d4ece..495a406cc5 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -92,7 +92,6 @@ export default function CDCConfigForm({ mirrorConfig.destination?.type !== DBType.POSTGRES) && label.includes('type system')) || (mirrorConfig.destination?.type !== DBType.BIGQUERY && - mirrorConfig.destination?.type !== DBType.SNOWFLAKE && label.includes('column name')) ) { return false; diff --git a/ui/app/peers/create/[peerType]/helpers/ch.ts b/ui/app/peers/create/[peerType]/helpers/ch.ts index d45029a4e3..09626d73c0 100644 --- a/ui/app/peers/create/[peerType]/helpers/ch.ts +++ b/ui/app/peers/create/[peerType]/helpers/ch.ts @@ -52,6 +52,7 @@ export const clickhouseSetting: PeerSetting[] = [ stateHandler: (value, setter) => setter((curr) => ({ ...curr, s3Path: value as string })), tips: `This is an S3 bucket/object URL field. This bucket will be used as our intermediate stage for CDC`, + placeholder: 's3://', }, { label: 'Access Key ID', diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index 6527a20247..bedbf41531 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -37,7 +37,7 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => { return '/svgs/ms.svg'; case DBType.KAFKA: case 'KAFKA': - return '/svgs/kafka.svg'; + return '/svgs/redpanda.svg'; case DBType.PUBSUB: case 'PUBSUB': return '/svgs/pubsub.svg'; diff --git a/ui/components/PeerForms/ClickhouseConfig.tsx b/ui/components/PeerForms/ClickhouseConfig.tsx index f2ea31eb0c..405dd9af77 100644 --- a/ui/components/PeerForms/ClickhouseConfig.tsx +++ b/ui/components/PeerForms/ClickhouseConfig.tsx @@ -8,6 +8,7 @@ import { RowWithSwitch, RowWithTextField } from '@/lib/Layout'; import { Switch } from '@/lib/Switch'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; +import Link from 'next/link'; import { useState } from 'react'; import { InfoPopover } from '../InfoPopover'; interface ConfigProps { @@ -122,6 +123,14 @@ export default function ClickhouseForm({ settings, setter }: ConfigProps) {

If you want this stage to belong to you, you can configure a bucket below. +

+ + Setting up an S3 bucket +