From e2be383367fcf3fe5fbb8c18857eb98187fae008 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 9 May 2024 15:27:55 +0530 Subject: [PATCH 1/6] Query layer: support ssh fields, sync interval, simplify code path (#1704) - Adds parsing for SSHConfig fields in Create Peer command for Postgres ```sql CREATE PEER postgres_peer FROM POSTGRES WITH ( host = '', port = 5432, user = '', password = '', database = '', ssh_config = '{ "host": "", "port": 22, "user": "", "password": "", "private_key": "" }' ); ``` - Also you can now specify `sync_interval` in the create mirror command - Remove catalog flows table entry and update operations in query layer side, as we are anyways hitting a grpc endpoint which takes care of that. This makes the create_catalog_entry flag in the request to create/validate mirror endpoints redundant, so that is removed from route.proto - Fixes a bug in validate mirror where select * from sourcetables limit 0 was not checked for empty publication Functionally tested via query layer and UI --- flow/cmd/handler.go | 12 ++--- flow/cmd/validate_mirror.go | 28 +++++----- flow/connectors/postgres/postgres.go | 2 +- flow/connectors/postgres/validate.go | 33 ++++++------ nexus/analyzer/src/lib.rs | 27 ++++++++-- nexus/analyzer/src/qrep.rs | 5 +- nexus/catalog/src/lib.rs | 66 +----------------------- nexus/flow-rs/src/grpc.rs | 3 +- nexus/pt/src/flow_model.rs | 1 + nexus/server/src/main.rs | 22 +------- protos/route.proto | 1 - ui/app/api/mirrors/cdc/route.ts | 1 - ui/app/api/mirrors/cdc/validate/route.ts | 1 - 13 files changed, 72 insertions(+), 130 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index a7538a814e..22865042d6 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -154,15 +154,13 @@ func (h *FlowRequestHandler) CreateCDCFlow( req.ConnectionConfigs.SyncedAtColName = strings.ToUpper(req.ConnectionConfigs.SyncedAtColName) } - if req.CreateCatalogEntry { - err := h.createCdcJobEntry(ctx, req, workflowID) - if err != nil { - slog.Error("unable to create flow job entry", slog.Any("error", err)) - return nil, fmt.Errorf("unable to create flow job entry: %w", err) - } + err := h.createCdcJobEntry(ctx, req, workflowID) + if err != nil { + slog.Error("unable to create flow job entry", slog.Any("error", err)) + return nil, fmt.Errorf("unable to create flow job entry: %w", err) } - err := h.updateFlowConfigInCatalog(ctx, cfg) + err = h.updateFlowConfigInCatalog(ctx, cfg) if err != nil { slog.Error("unable to update flow config in catalog", slog.Any("error", err)) return nil, fmt.Errorf("unable to update flow config in catalog: %w", err) diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 89e34d8ee4..b4304b0504 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -17,7 +17,7 @@ import ( func (h *FlowRequestHandler) ValidateCDCMirror( ctx context.Context, req *protos.CreateCDCFlowRequest, ) (*protos.ValidateCDCMirrorResponse, error) { - if req.CreateCatalogEntry && !req.ConnectionConfigs.Resync { + if !req.ConnectionConfigs.Resync { mirrorExists, existCheckErr := h.CheckIfMirrorNameExists(ctx, req.ConnectionConfigs.FlowJobName) if existCheckErr != nil { slog.Error("/validatecdc failed to check if mirror name exists", slog.Any("error", existCheckErr)) @@ -46,7 +46,9 @@ func (h *FlowRequestHandler) ValidateCDCMirror( sourcePeerConfig := req.ConnectionConfigs.Source.GetPostgresConfig() if sourcePeerConfig == nil { slog.Error("/validatecdc source peer config is nil", slog.Any("peer", req.ConnectionConfigs.Source)) - return nil, errors.New("source peer config is nil") + return &protos.ValidateCDCMirrorResponse{ + Ok: false, + }, errors.New("source peer config is nil") } pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig) @@ -103,17 +105,17 @@ func (h *FlowRequestHandler) ValidateCDCMirror( } pubName := req.ConnectionConfigs.PublicationName - if pubName != "" { - err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName) - if err != nil { - displayErr := fmt.Errorf("provided source tables invalidated: %v", err) - h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, - fmt.Sprint(displayErr), - ) - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, displayErr - } + + err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName) + if err != nil { + displayErr := fmt.Errorf("provided source tables invalidated: %v", err) + slog.Error(displayErr.Error()) + h.alerter.LogNonFlowWarning(ctx, telemetry.CreateMirror, req.ConnectionConfigs.FlowJobName, + fmt.Sprint(displayErr), + ) + return &protos.ValidateCDCMirrorResponse{ + Ok: false, + }, displayErr } return &protos.ValidateCDCMirrorResponse{ diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 03ee0c02b1..3642289756 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -62,11 +62,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) // create a separate connection pool for non-replication queries as replication connections cannot // be used for extended query protocol, i.e. prepared statements connConfig, err := pgx.ParseConfig(connectionString) - replConfig := connConfig.Copy() if err != nil { return nil, fmt.Errorf("failed to parse connection string: %w", err) } + replConfig := connConfig.Copy() runtimeParams := connConfig.Config.RuntimeParams runtimeParams["idle_in_transaction_session_timeout"] = "0" runtimeParams["statement_timeout"] = "0" diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index 8e715278d8..0b16e8d0c6 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -34,28 +34,31 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, } tableStr := strings.Join(tableArr, ",") - // Check if publication exists - err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) - if err != nil { - if err == pgx.ErrNoRows { - return fmt.Errorf("publication does not exist: %s", pubName) + + if pubName != "" { + // Check if publication exists + err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) + if err != nil { + if err == pgx.ErrNoRows { + return fmt.Errorf("publication does not exist: %s", pubName) + } + return fmt.Errorf("error while checking for publication existence: %w", err) } - return fmt.Errorf("error while checking for publication existence: %w", err) - } - // Check if tables belong to publication - var pubTableCount int - err = c.conn.QueryRow(ctx, fmt.Sprintf(` + // Check if tables belong to publication + var pubTableCount int + err = c.conn.QueryRow(ctx, fmt.Sprintf(` with source_table_components (sname, tname) as (values %s) select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables INNER JOIN source_table_components stc ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount) - if err != nil { - return err - } + if err != nil { + return err + } - if pubTableCount != len(tableNames) { - return errors.New("not all tables belong to publication") + if pubTableCount != len(tableNames) { + return errors.New("not all tables belong to publication") + } } return nil diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index b0ff741612..4548d2eec9 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -11,7 +11,7 @@ use pt::{ peerdb_peers::{ peer::Config, BigqueryConfig, ClickhouseConfig, DbType, EventHubConfig, GcpServiceAccount, KafkaConfig, MongoConfig, Peer, PostgresConfig, PubSubConfig, S3Config, SnowflakeConfig, - SqlServerConfig, + SqlServerConfig, SshConfig, }, }; use qrep::process_options; @@ -300,6 +300,11 @@ impl StatementAnalyzer for PeerDDLAnalyzer { _ => None, }; + let sync_interval: Option = match raw_options.remove("sync_interval") { + Some(Expr::Value(ast::Value::Number(n, _))) => Some(n.parse::()?), + _ => None, + }; + let soft_delete_col_name: Option = match raw_options .remove("soft_delete_col_name") { @@ -347,6 +352,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { push_batch_size, push_parallelism, max_batch_size, + sync_interval, resync, soft_delete_col_name, synced_at_col_name, @@ -646,6 +652,19 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu Config::MongoConfig(mongo_config) } DbType::Postgres => { + let ssh_fields: Option = match opts.get("ssh_config") { + Some(ssh_config) => { + let ssh_config_str = ssh_config.to_string(); + if ssh_config_str.is_empty() { + None + } else { + serde_json::from_str(&ssh_config_str) + .context("failed to deserialize ssh_config")? + } + } + None => None, + }; + let postgres_config = PostgresConfig { host: opts.get("host").context("no host specified")?.to_string(), port: opts @@ -667,8 +686,9 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .to_string(), metadata_schema: opts.get("metadata_schema").map(|s| s.to_string()), transaction_snapshot: "".to_string(), - ssh_config: None, + ssh_config: ssh_fields, }; + Config::PostgresConfig(postgres_config) } DbType::S3 => { @@ -744,7 +764,8 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .unwrap_or_default(), disable_tls: opts .get("disable_tls") - .map(|s| s.parse::().unwrap_or_default()).unwrap_or_default(), + .map(|s| s.parse::().unwrap_or_default()) + .unwrap_or_default(), endpoint: opts.get("endpoint").map(|s| s.to_string()), }; Config::ClickhouseConfig(clickhouse_config) diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index 41ab1aa40a..e1348f0fc6 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -196,7 +196,10 @@ pub fn process_options( // If mode is upsert, we need unique key columns if opts.get("mode") == Some(&Value::String(String::from("upsert"))) - && opts.get("unique_key_columns").map(|ukc| ukc == &Value::Array(Vec::new())).unwrap_or(true) + && opts + .get("unique_key_columns") + .map(|ukc| ukc == &Value::Array(Vec::new())) + .unwrap_or(true) { anyhow::bail!("For upsert mode, unique_key_columns must be specified"); } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index e8a569f669..f498992a9f 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -6,7 +6,7 @@ use peer_postgres::{self, ast}; use pgwire::error::PgWireResult; use postgres_connection::{connect_postgres, get_pg_connection_string}; use pt::{ - flow_model::{FlowJob, QRepFlowJob}, + flow_model::QRepFlowJob, peerdb_peers::PostgresConfig, peerdb_peers::{peer::Config, DbType, Peer}, prost::Message, @@ -332,70 +332,6 @@ impl Catalog { }) } - async fn normalize_schema_for_table_identifier( - &self, - table_identifier: &str, - peer_id: i32, - ) -> anyhow::Result { - let peer_dbtype = self.get_peer_type_for_id(peer_id).await?; - - if !table_identifier.contains('.') && peer_dbtype != DbType::Bigquery { - Ok(format!("public.{}", table_identifier)) - } else { - Ok(String::from(table_identifier)) - } - } - - pub async fn create_cdc_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> { - let source_peer_id = self - .get_peer_id_i32(&job.source_peer) - .await - .context("unable to get source peer id")?; - let destination_peer_id = self - .get_peer_id_i32(&job.target_peer) - .await - .context("unable to get destination peer id")?; - - let stmt = self - .pg - .prepare_typed( - "INSERT INTO flows (name, source_peer, destination_peer, description, - source_table_identifier, destination_table_identifier) VALUES ($1, $2, $3, $4, $5, $6)", - &[types::Type::TEXT, types::Type::INT4, types::Type::INT4, types::Type::TEXT, - types::Type::TEXT, types::Type::TEXT], - ) - .await?; - - for table_mapping in &job.table_mappings { - let _rows = self - .pg - .execute( - &stmt, - &[ - &job.name, - &source_peer_id, - &destination_peer_id, - &job.description, - &self - .normalize_schema_for_table_identifier( - &table_mapping.source_table_identifier, - source_peer_id, - ) - .await?, - &self - .normalize_schema_for_table_identifier( - &table_mapping.destination_table_identifier, - destination_peer_id, - ) - .await?, - ], - ) - .await?; - } - - Ok(()) - } - pub async fn get_qrep_flow_job_by_name( &self, job_name: &str, diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 70c463e44c..a07115c1ec 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -79,7 +79,6 @@ impl FlowGrpcClient { ) -> anyhow::Result { let create_peer_flow_req = pt::peerdb_route::CreateCdcFlowRequest { connection_configs: Some(peer_flow_config), - create_catalog_entry: false, }; let response = self.client.create_cdc_flow(create_peer_flow_req).await?; let workflow_id = response.into_inner().workflow_id; @@ -176,7 +175,7 @@ impl FlowGrpcClient { initial_snapshot_only: job.initial_snapshot_only, script: job.script.clone(), system: system as i32, - ..Default::default() + idle_timeout_seconds: job.sync_interval.unwrap_or_default(), }; self.start_peer_flow(flow_conn_cfg).await diff --git a/nexus/pt/src/flow_model.rs b/nexus/pt/src/flow_model.rs index 8dd40edc42..ce4d47108e 100644 --- a/nexus/pt/src/flow_model.rs +++ b/nexus/pt/src/flow_model.rs @@ -30,6 +30,7 @@ pub struct FlowJob { pub push_parallelism: Option, pub push_batch_size: Option, pub max_batch_size: Option, + pub sync_interval: Option, pub resync: bool, pub soft_delete_col_name: Option, pub synced_at_col_name: Option, diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 176928a71b..93d1c5af14 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -413,15 +413,6 @@ impl NexusBackend { } } - self.catalog - .create_cdc_flow_job_entry(flow_job) - .await - .map_err(|err| { - PgWireError::ApiError( - format!("unable to create mirror job entry: {:?}", err).into(), - ) - })?; - // get source and destination peers let (src_peer, dst_peer) = join!( Self::get_peer_of_mirror(self.catalog.as_ref(), &flow_job.source_peer), @@ -432,21 +423,12 @@ impl NexusBackend { // make a request to the flow service to start the job. let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; - let workflow_id = flow_handler + flow_handler .start_peer_flow_job(flow_job, src_peer, dst_peer) .await .map_err(|err| { PgWireError::ApiError( - format!("unable to submit job: {:?}", err).into(), - ) - })?; - - self.catalog - .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) - .await - .map_err(|err| { - PgWireError::ApiError( - format!("unable to save job metadata: {:?}", err).into(), + format!("unable to submit job: {:?}", err.to_string()).into(), ) })?; diff --git a/protos/route.proto b/protos/route.proto index 180576fc1b..61a3b89b75 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -10,7 +10,6 @@ package peerdb_route; message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; - bool create_catalog_entry = 2; } message CreateCDCFlowResponse { diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts index d83c8d398b..c7c7bde379 100644 --- a/ui/app/api/mirrors/cdc/route.ts +++ b/ui/app/api/mirrors/cdc/route.ts @@ -12,7 +12,6 @@ export async function POST(request: Request) { const flowServiceAddr = GetFlowHttpAddressFromEnv(); const req: CreateCDCFlowRequest = { connectionConfigs: config, - createCatalogEntry: true, }; try { const createStatus: CreateCDCFlowResponse = await fetch( diff --git a/ui/app/api/mirrors/cdc/validate/route.ts b/ui/app/api/mirrors/cdc/validate/route.ts index 83107dcbc7..8354dbef88 100644 --- a/ui/app/api/mirrors/cdc/validate/route.ts +++ b/ui/app/api/mirrors/cdc/validate/route.ts @@ -11,7 +11,6 @@ export async function POST(request: NextRequest) { const flowServiceAddr = GetFlowHttpAddressFromEnv(); const req: CreateCDCFlowRequest = { connectionConfigs: config, - createCatalogEntry: false, }; try { const validateResponse: ValidateCDCMirrorResponse = await fetch( From e551e731c61031cee83235a0e42f286d993e65e5 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 9 May 2024 18:04:58 +0530 Subject: [PATCH 2/6] Remove forced upper case for peerdb columns (#1706) Removes upper casing of user-provided peerdb column names in handler.go for create cdc and qrep endpoints Restrict custom names for these columns to just bigquery in UI Functionally tested with soft delete Added a custom name for these columns in a BQ test --- flow/cmd/handler.go | 10 +--------- flow/e2e/bigquery/peer_flow_bq_test.go | 6 +++--- ui/app/mirrors/create/cdc/cdc.tsx | 1 - 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 22865042d6..41dd04b4e7 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -142,16 +142,10 @@ func (h *FlowRequestHandler) CreateCDCFlow( if req.ConnectionConfigs.SoftDeleteColName == "" { req.ConnectionConfigs.SoftDeleteColName = "_PEERDB_IS_DELETED" - } else { - // make them all uppercase - req.ConnectionConfigs.SoftDeleteColName = strings.ToUpper(req.ConnectionConfigs.SoftDeleteColName) } if req.ConnectionConfigs.SyncedAtColName == "" { req.ConnectionConfigs.SyncedAtColName = "_PEERDB_SYNCED_AT" - } else { - // make them all uppercase - req.ConnectionConfigs.SyncedAtColName = strings.ToUpper(req.ConnectionConfigs.SyncedAtColName) } err := h.createCdcJobEntry(ctx, req, workflowID) @@ -256,10 +250,8 @@ func (h *FlowRequestHandler) CreateQRepFlow( if req.QrepConfig.SyncedAtColName == "" { cfg.SyncedAtColName = "_PEERDB_SYNCED_AT" - } else { - // make them all uppercase - cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName) } + _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state) if err != nil { slog.Error("unable to start QRepFlow workflow", diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index da4f036b0a..303fa0c8ea 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1110,8 +1110,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, - SoftDeleteColName: "_PEERDB_IS_DELETED", - SyncedAtColName: "_PEERDB_SYNCED_AT", + SoftDeleteColName: "_custom_deleted", + SyncedAtColName: "_custom_synced", MaxBatchSize: 100, } @@ -1141,7 +1141,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_softdel_iud", "id,c1,c2,t") e2e.EnvWaitFor(s.t, env, 3*time.Minute, "checking soft delete", func() bool { newerSyncedAtQuery := fmt.Sprintf( - "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", + "SELECT COUNT(*) FROM `%s.%s` WHERE _custom_deleted", s.bqHelper.Config.DatasetId, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) e2e.EnvNoError(s.t, env, err) diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index 2ef76be5be..353d988d19 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -92,7 +92,6 @@ export default function CDCConfigForm({ mirrorConfig.destination?.type !== DBType.POSTGRES) && label.includes('type system')) || (mirrorConfig.destination?.type !== DBType.BIGQUERY && - mirrorConfig.destination?.type !== DBType.SNOWFLAKE && label.includes('column name')) ) { return false; From 9d74565b2338fa5142471cfe366b07607348428a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 9 May 2024 12:42:29 +0000 Subject: [PATCH 3/6] UI: enable initial load options for non-EH queues (#1683) --- ui/app/mirrors/create/cdc/cdc.tsx | 4 ++-- ui/app/mirrors/create/handlers.ts | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index 353d988d19..495a406cc5 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -83,10 +83,10 @@ export default function CDCConfigForm({ !isQueue) || (label.includes('staging path') && defaultSyncMode(mirrorConfig.destination?.type) !== 'AVRO') || - (isQueue && + (isQueue && label.includes('soft delete')) || + (mirrorConfig.destination?.type === DBType.EVENTHUBS && (label.includes('initial copy') || label.includes('initial load') || - label.includes('soft delete') || label.includes('snapshot'))) || ((mirrorConfig.source?.type !== DBType.POSTGRES || mirrorConfig.destination?.type !== DBType.POSTGRES) && diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index d924dc2588..1c45210c4d 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -25,13 +25,11 @@ import { } from './schema'; export const IsQueuePeer = (peerType?: DBType): boolean => { - if (!peerType) { - return false; - } return ( - peerType === DBType.KAFKA || - peerType === DBType.PUBSUB || - peerType === DBType.EVENTHUBS + !!peerType && + (peerType === DBType.KAFKA || + peerType === DBType.PUBSUB || + peerType === DBType.EVENTHUBS) ); }; From c6d7b282790dd460fba5ab93d1f50a57b37c17c9 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 9 May 2024 23:46:43 +0530 Subject: [PATCH 4/6] Clickhouse insert-select: account for lower version (#1707) This PR tweaks the insert into select queries of clickhouse qrep to account for lower clickhouse versions where session token as an argument is not supported for the s3() table function. https://github.com/ClickHouse/ClickHouse/issues/61230 functionally tested --- flow/connectors/clickhouse/qrep_avro_sync.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index a0a9fb9a71..b015ef47e4 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -47,10 +47,15 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a if err != nil { return err } + + sessionTokenPart := "" + if creds.AWS.SessionToken != "" { + sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) + } //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', '%s', 'Avro')", + query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s'%s, 'Avro')", s.config.DestinationTableIdentifier, avroFileUrl, - creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken) + creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) _, err = s.connector.database.ExecContext(ctx, query) @@ -137,10 +142,15 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( selector = append(selector, "`"+colName+"`") } selectorStr := strings.Join(selector, ",") + + sessionTokenPart := "" + if creds.AWS.SessionToken != "" { + sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) + } //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s', '%s', 'Avro')", + query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')", config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl, - creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken) + creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) _, err = s.connector.database.ExecContext(ctx, query) if err != nil { From b80d43462126bf2aea23fa9f0966937d2227200e Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Fri, 10 May 2024 00:15:23 +0530 Subject: [PATCH 5/6] Snowflake merge statement: filter out empty peerdb data (#1708) This prevents NormaliseFlow from erroring when we truncate rows > 16MB for PG to Snowflake CDC mirrors --- flow/connectors/snowflake/snowflake.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 27bb69d07f..a0fcc1754d 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -44,6 +44,7 @@ const ( SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE, _PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,_PEERDB_UNCHANGED_TOAST_COLUMNS FROM _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID = %d AND + _PEERDB_DATA != '' AND _PEERDB_DESTINATION_TABLE_NAME = ? ), FLATTENED AS (SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID, _PEERDB_UNCHANGED_TOAST_COLUMNS,%s From ffd494f11d0076cb8a9af8a14534680afdb92319 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 10 May 2024 13:38:15 +0000 Subject: [PATCH 6/6] QRep scripting (#1625) Scripts for qrep may define a function `transformRow(row)` which can reassign fields' values (without changing types) --- flow/activities/flowable_core.go | 23 +++- flow/connectors/eventhub/eventhub.go | 13 +- flow/connectors/kafka/kafka.go | 13 +- flow/connectors/pubsub/pubsub.go | 13 +- flow/connectors/utils/lua.go | 13 ++ flow/e2e/postgres/qrep_flow_pg_test.go | 52 ++++++++ flow/pua/peerdb.go | 174 +++++++++++++++++++++++++ flow/pua/stream_adapter.go | 33 +++++ flow/shared/lua.go | 11 +- ui/app/mirrors/create/helpers/qrep.ts | 8 +- 10 files changed, 318 insertions(+), 35 deletions(-) create mode 100644 flow/pua/stream_adapter.go diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 319da34c49..ad3dca378a 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -23,6 +24,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/pua" "github.com/PeerDB-io/peer-flow/shared" ) @@ -343,10 +345,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) defer shutdown() - var rowsSynced int bufferSize := shared.FetchAndChannelSize - errGroup, errCtx := errgroup.WithContext(ctx) stream := model.NewQRecordStream(bufferSize) + outstream := stream + if config.Script != "" { + ls, err := utils.LoadScript(ctx, config.Script, utils.LuaPrintFn(func(s string) { + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, s) + })) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } + lfn := ls.Env.RawGetString("transformRow") + if fn, ok := lfn.(*lua.LFunction); ok { + outstream = pua.AttachToStream(ls, fn, stream) + } + } + + var rowsSynced int + errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { tmp, err := srcConn.PullQRepRecords(errCtx, config, partition, stream) if err != nil { @@ -363,7 +380,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) errGroup.Go(func() error { - rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, stream) + rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, outstream) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to sync records: %w", err) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 4f95d1007e..1182f4d413 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "strings" "sync/atomic" "time" @@ -196,15 +195,9 @@ func (c *EventHubConnector) processBatch( var fn *lua.LFunction if req.Script != "" { var err error - ls, err = utils.LoadScript(ctx, req.Script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - _ = c.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err = utils.LoadScript(ctx, req.Script, utils.LuaPrintFn(func(s string) { + _ = c.LogFlowInfo(ctx, req.FlowJobName, s) + })) if err != nil { return 0, err } diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index cfe4652598..c58da5e50f 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "fmt" "log/slog" - "strings" "sync/atomic" "time" @@ -178,15 +177,9 @@ func (c *KafkaConnector) createPool( } return utils.LuaPool(func() (*lua.LState, error) { - ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - _ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { + _ = c.LogFlowInfo(ctx, flowJobName, s) + })) if err != nil { return nil, err } diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 54031f016d..0a8709b3b2 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "strings" "sync" "sync/atomic" "time" @@ -130,15 +129,9 @@ func (c *PubSubConnector) createPool( queueErr func(error), ) (*utils.LPool[[]PubSubMessage], error) { return utils.LuaPool(func() (*lua.LState, error) { - ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - _ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { + _ = c.LogFlowInfo(ctx, flowJobName, s) + })) if err != nil { return nil, err } diff --git a/flow/connectors/utils/lua.go b/flow/connectors/utils/lua.go index 47676721b3..f1d82f373f 100644 --- a/flow/connectors/utils/lua.go +++ b/flow/connectors/utils/lua.go @@ -3,6 +3,7 @@ package utils import ( "context" "fmt" + "strings" "github.com/yuin/gopher-lua" @@ -35,6 +36,18 @@ func LVAsStringOrNil(ls *lua.LState, v lua.LValue) (string, error) { } } +func LuaPrintFn(fn func(string)) lua.LGFunction { + return func(ls *lua.LState) int { + top := ls.GetTop() + ss := make([]string, top) + for i := range top { + ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() + } + fn(strings.Join(ss, "\t")) + return 0 + } +} + func LoadScript(ctx context.Context, script string, printfn lua.LGFunction) (*lua.LState, error) { ls := lua.NewState(lua.Options{SkipOpenLibs: true}) ls.SetContext(ctx) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 63a226ae13..4f2f944a97 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -406,3 +406,55 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { env.Cancel() e2e.RequireEnvCanceled(s.t, env) } + +func (s PeerFlowE2ETestSuitePG) TestTransform() { + numRows := 10 + + srcTable := "test_transform" + s.setupSourceTable(srcTable, numRows) + + dstTable := "test_transformdst" + + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) + + query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcSchemaQualified) + + postgresPeer := e2e.GeneratePostgresPeer() + + _, err := s.Conn().Exec(context.Background(), `insert into public.scripts (name, lang, source) values + ('pgtransform', 'lua', 'function transformRow(row) row.myreal = 1729 end') on conflict do nothing`) + require.NoError(s.t, err) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig( + "test_transform", + srcSchemaQualified, + dstSchemaQualified, + query, + postgresPeer, + "", + true, + "_PEERDB_SYNCED_AT", + "", + ) + require.NoError(s.t, err) + qrepConfig.WriteMode = &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE, + } + qrepConfig.InitialCopyOnly = false + qrepConfig.Script = "pgtransform" + + tc := e2e.NewTemporalClient(s.t) + env := e2e.RunQRepFlowWorkflow(tc, qrepConfig) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "waiting for first sync to complete", func() bool { + err = s.compareCounts(dstSchemaQualified, int64(numRows)) + return err == nil + }) + require.NoError(s.t, env.Error()) + + var exists bool + err = s.Conn().QueryRow(context.Background(), + fmt.Sprintf("select exists(select * from %s where myreal <> 1729)", dstSchemaQualified)).Scan(&exists) + require.NoError(s.t, err) + require.False(s.t, exists) +} diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index 7330cb9d09..c44aeb497a 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -3,6 +3,7 @@ package pua import ( "bytes" "fmt" + "math" "math/big" "time" @@ -52,6 +53,7 @@ func RegisterTypes(ls *lua.LState) { mt = LuaRow.NewMetatable(ls) mt.RawSetString("__index", ls.NewFunction(LuaRowIndex)) + mt.RawSetString("__newindex", ls.NewFunction(LuaRowNewIndex)) mt.RawSetString("__len", ls.NewFunction(LuaRowLen)) mt = shared.LuaUuid.NewMetatable(ls) @@ -157,6 +159,178 @@ func LuaRowIndex(ls *lua.LState) int { return 1 } +func LVAsTime(ls *lua.LState, lv lua.LValue) time.Time { + switch v := lv.(type) { + case lua.LNumber: + ipart, fpart := math.Modf(float64(v)) + return time.Unix(int64(ipart), int64(fpart*1e9)) + case *lua.LUserData: + if tm, ok := v.Value.(time.Time); ok { + return tm + } + } + ls.RaiseError("Cannot convert %T to time.Time", lv) + return time.Time{} +} + +func LuaRowNewIndex(ls *lua.LState) int { + _, row := LuaRow.Check(ls, 1) + key := ls.CheckString(2) + val := ls.Get(3) + qv := row.GetColumnValue(key) + kind := qv.Kind() + if val == lua.LNil { + row.AddColumn(key, qvalue.QValueNull(kind)) + } + var newqv qvalue.QValue + switch kind { + case qvalue.QValueKindInvalid: + newqv = qvalue.QValueInvalid{Val: lua.LVAsString(val)} + case qvalue.QValueKindFloat32: + newqv = qvalue.QValueFloat32{Val: float32(lua.LVAsNumber(val))} + case qvalue.QValueKindFloat64: + newqv = qvalue.QValueFloat64{Val: float64(lua.LVAsNumber(val))} + case qvalue.QValueKindInt16: + newqv = qvalue.QValueInt16{Val: int16(lua.LVAsNumber(val))} + case qvalue.QValueKindInt32: + newqv = qvalue.QValueInt32{Val: int32(lua.LVAsNumber(val))} + case qvalue.QValueKindInt64: + switch v := val.(type) { + case lua.LNumber: + newqv = qvalue.QValueInt64{Val: int64(v)} + case *lua.LUserData: + switch i64 := v.Value.(type) { + case int64: + newqv = qvalue.QValueInt64{Val: i64} + case uint64: + newqv = qvalue.QValueInt64{Val: int64(i64)} + } + } + if newqv == nil { + ls.RaiseError("invalid int64") + } + case qvalue.QValueKindBoolean: + newqv = qvalue.QValueBoolean{Val: lua.LVAsBool(val)} + case qvalue.QValueKindQChar: + switch v := val.(type) { + case lua.LNumber: + newqv = qvalue.QValueQChar{Val: uint8(v)} + case lua.LString: + if len(v) > 0 { + newqv = qvalue.QValueQChar{Val: v[0]} + } + default: + ls.RaiseError("invalid \"char\"") + } + case qvalue.QValueKindString: + newqv = qvalue.QValueString{Val: lua.LVAsString(val)} + case qvalue.QValueKindTimestamp: + newqv = qvalue.QValueTimestamp{Val: LVAsTime(ls, val)} + case qvalue.QValueKindTimestampTZ: + newqv = qvalue.QValueTimestampTZ{Val: LVAsTime(ls, val)} + case qvalue.QValueKindDate: + newqv = qvalue.QValueDate{Val: LVAsTime(ls, val)} + case qvalue.QValueKindTime: + newqv = qvalue.QValueTime{Val: LVAsTime(ls, val)} + case qvalue.QValueKindTimeTZ: + newqv = qvalue.QValueTimeTZ{Val: LVAsTime(ls, val)} + case qvalue.QValueKindNumeric: + newqv = qvalue.QValueNumeric{Val: LVAsDecimal(ls, val)} + case qvalue.QValueKindBytes: + newqv = qvalue.QValueBytes{Val: []byte(lua.LVAsString(val))} + case qvalue.QValueKindUUID: + if ud, ok := val.(*lua.LUserData); ok { + if id, ok := ud.Value.(uuid.UUID); ok { + newqv = qvalue.QValueUUID{Val: [16]byte(id)} + } + } + case qvalue.QValueKindJSON: + newqv = qvalue.QValueJSON{Val: lua.LVAsString(val)} + case qvalue.QValueKindBit: + newqv = qvalue.QValueBit{Val: []byte(lua.LVAsString(val))} + case qvalue.QValueKindArrayFloat32: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat32{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float32 { + return float32(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayFloat64: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat64{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float64 { + return float64(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayInt16: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat64{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float64 { + return float64(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayInt32: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat64{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float64 { + return float64(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayInt64: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat64{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float64 { + return float64(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayString: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayString{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) string { + return lua.LVAsString(v) + }), + } + } + case qvalue.QValueKindArrayDate: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayDate{ + Val: shared.LTableToSlice(ls, tbl, LVAsTime), + } + } + case qvalue.QValueKindArrayTimestamp: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayDate{ + Val: shared.LTableToSlice(ls, tbl, LVAsTime), + } + } + case qvalue.QValueKindArrayTimestampTZ: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayDate{ + Val: shared.LTableToSlice(ls, tbl, LVAsTime), + } + } + case qvalue.QValueKindArrayBoolean: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayBoolean{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) bool { + return lua.LVAsBool(v) + }), + } + } + default: + ls.RaiseError(fmt.Sprintf("no support for reassigning %s", kind)) + return 0 + } + + row.AddColumn(key, newqv) + return 1 +} + func LuaRowLen(ls *lua.LState) int { row := LuaRow.StartMethod(ls) ls.Push(lua.LNumber(len(row.ColToVal))) diff --git a/flow/pua/stream_adapter.go b/flow/pua/stream_adapter.go new file mode 100644 index 0000000000..0367134368 --- /dev/null +++ b/flow/pua/stream_adapter.go @@ -0,0 +1,33 @@ +package pua + +import ( + "github.com/yuin/gopher-lua" + + "github.com/PeerDB-io/peer-flow/model" +) + +func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStream) *model.QRecordStream { + output := model.NewQRecordStream(0) + go func() { + schema := stream.Schema() + output.SetSchema(schema) + for record := range stream.Records { + row := model.NewRecordItems(len(record)) + for i, qv := range record { + row.AddColumn(schema.Fields[i].Name, qv) + } + ls.Push(lfn) + ls.Push(LuaRow.New(ls, row)) + if err := ls.PCall(1, 0, nil); err != nil { + output.Close(err) + return + } + for i, field := range schema.Fields { + record[i] = row.GetColumnValue(field.Name) + } + output.Records <- record + } + output.Close(stream.Err()) + }() + return output +} diff --git a/flow/shared/lua.go b/flow/shared/lua.go index 2b95c3464c..26aeb4fe2a 100644 --- a/flow/shared/lua.go +++ b/flow/shared/lua.go @@ -18,7 +18,7 @@ var ( LuaDecimal = glua64.UserDataType[decimal.Decimal]{Name: "peerdb_decimal"} ) -func SliceToLTable[T any](ls *lua.LState, s []T, f func(x T) lua.LValue) *lua.LTable { +func SliceToLTable[T any](ls *lua.LState, s []T, f func(T) lua.LValue) *lua.LTable { tbl := ls.CreateTable(len(s), 0) tbl.Metatable = ls.GetTypeMetatable("Array") for idx, val := range s { @@ -26,3 +26,12 @@ func SliceToLTable[T any](ls *lua.LState, s []T, f func(x T) lua.LValue) *lua.LT } return tbl } + +func LTableToSlice[T any](ls *lua.LState, tbl *lua.LTable, f func(*lua.LState, lua.LValue) T) []T { + tlen := tbl.Len() + slice := make([]T, 0, tlen) + for i := range tlen { + slice = append(slice, f(ls, tbl.RawGetInt(i))) + } + return slice +} diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index 392439ee77..aa2b80d462 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -123,8 +123,14 @@ export const qrepSettings: MirrorSetting[] = [ ...curr, waitBetweenBatchesSeconds: parseInt(value as string, 10) || 30, })), - tips: 'Time to wait (in seconds) between getting partitions to process. The default is 30 seconds', + tips: 'Time to wait (in seconds) between getting partitions to process. The default is 30 seconds.', default: 30, type: 'number', }, + { + label: 'Script', + stateHandler: (value, setter) => + setter((curr: QRepConfig) => ({ ...curr, script: value as string })), + tips: 'Script to use for row transformations. The default is no scripting.', + }, ];