From 735abae303201511fc092ec014d4d9bd35474754 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 11 Mar 2024 22:09:57 +0530 Subject: [PATCH] support refresh overwrite, clean up UI --- flow/cmd/handler.go | 9 +++++-- flow/connectors/postgres/qrep.go | 12 +++------ flow/connectors/postgres/qrep_sql_sync.go | 11 +------- flow/connectors/utils/postgres.go | 13 +++++++++ flow/workflows/qrep_flow.go | 18 +++++++++++++ ui/app/api/mirrors/qrep/route.ts | 9 ++----- ui/app/mirrors/create/handlers.ts | 32 ++++++++++++----------- ui/app/mirrors/create/helpers/common.ts | 2 +- ui/app/mirrors/create/schema.ts | 30 +++++++++++++++++++-- 9 files changed, 90 insertions(+), 46 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index dc4899bad1..9655a0bfbb 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -14,6 +14,7 @@ import ( "go.temporal.io/sdk/client" "google.golang.org/protobuf/proto" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" @@ -211,7 +212,6 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog( func (h *FlowRequestHandler) CreateQRepFlow( ctx context.Context, req *protos.CreateQRepFlowRequest, ) (*protos.CreateQRepFlowResponse, error) { - slog.Info("QRep endpoint request", slog.Any("req", req)) cfg := req.QrepConfig workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ @@ -265,8 +265,13 @@ func (h *FlowRequestHandler) CreateQRepFlow( sourceTables := make([]string, 0, len(req.TableMapping)) destinationTables := make([]string, 0, len(req.TableMapping)) for _, mapping := range req.TableMapping { + destinationSchemaTable, err := utils.ParseSchemaTable(mapping.DestinationTableIdentifier) + if err != nil { + return nil, fmt.Errorf("unable to parse destination table identifier: %w", err) + } + sourceTables = append(sourceTables, mapping.SourceTableIdentifier) - destinationTables = append(destinationTables, mapping.DestinationTableIdentifier) + destinationTables = append(destinationTables, utils.PostgresSchemaTableNormalize(destinationSchemaTable)) } cfg.WatermarkTable = strings.Join(sourceTables, ";") cfg.DestinationTableIdentifier = strings.Join(destinationTables, ";") diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 0dee72d667..918195456f 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -24,6 +24,7 @@ import ( ) const qRepMetadataTableName = "_peerdb_query_replication_metadata" +const QRepOverwriteTempTablePrefix = "_peerdb_overwrite_" func (c *PostgresConnector) GetQRepPartitions( ctx context.Context, @@ -624,9 +625,9 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi return fmt.Errorf("failed to parse destination table identifier: %w", err) } dstTableIdentifier := pgx.Identifier{dstSchemaTable.Schema, dstSchemaTable.Table} - tempTableIdentifier := pgx.Identifier{dstSchemaTable.Schema, dstSchemaTable.Table + "_overwrite"} + tempTableIdentifier := pgx.Identifier{dstSchemaTable.Schema, QRepOverwriteTempTablePrefix + dstSchemaTable.Table} - _, err = tx.Exec(ctx, fmt.Sprintf("DROP TABLE %s CASCADE", dstTableIdentifier.Sanitize())) + _, err = tx.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s CASCADE", dstTableIdentifier.Sanitize())) if err != nil { return fmt.Errorf("failed to drop %s: %v", dstTableName, err) } @@ -637,13 +638,6 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err) } - _, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s SET LOGGED", - dstTableIdentifier.Sanitize())) - if err != nil { - return fmt.Errorf("failed to rename %s to %s: %v", - tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err) - } - if config.SyncedAtColName != "" { updateSyncedAtStmt := fmt.Sprintf( `UPDATE %s SET %s = CURRENT_TIMESTAMP`, diff --git a/flow/connectors/postgres/qrep_sql_sync.go b/flow/connectors/postgres/qrep_sql_sync.go index 009c299b81..868db7de26 100644 --- a/flow/connectors/postgres/qrep_sql_sync.go +++ b/flow/connectors/postgres/qrep_sql_sync.go @@ -108,16 +108,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( } } } else if writeMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { - dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table} - tempTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table + "_overwrite"} - _, err := tx.Exec(ctx, fmt.Sprintf("CREATE UNLOGGED TABLE %s(LIKE %s);", - tempTableIdentifier.Sanitize(), - dstTableIdentifier.Sanitize(), - )) - if err != nil { - return -1, fmt.Errorf("failed to create %s: %v", tempTableIdentifier, err) - } - + tempTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table} _, err = tx.CopyFrom(ctx, tempTableIdentifier, schema.GetColumnNames(), copySource) if err != nil { return -1, fmt.Errorf("failed to copy records into %s: %v", tempTableIdentifier, err) diff --git a/flow/connectors/utils/postgres.go b/flow/connectors/utils/postgres.go index 0a606c1feb..a2e6412fca 100644 --- a/flow/connectors/utils/postgres.go +++ b/flow/connectors/utils/postgres.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/url" + "strings" "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" @@ -73,3 +74,15 @@ func RegisterHStore(ctx context.Context, conn *pgx.Conn) error { return nil } + +func PostgresIdentifierNormalize(identifier string) string { + if IsUpper(identifier) { + return strings.ToLower(identifier) + } + return identifier +} + +func PostgresSchemaTableNormalize(schemaTable *SchemaTable) string { + return fmt.Sprintf(`%s.%s`, PostgresIdentifierNormalize(schemaTable.Schema), + PostgresIdentifierNormalize(schemaTable.Table)) +} diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 3ed8a1f9e2..4b85b9fff8 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -12,6 +12,8 @@ import ( "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" @@ -462,6 +464,21 @@ func QRepFlowWorkflow( maxParallelWorkers = int(config.MaxParallelWorkers) } + oldDstTableIdentifier := config.DestinationTableIdentifier + if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { + dstTables := strings.Split(q.config.DestinationTableIdentifier, ";") + transformedTables := make([]string, 0, len(dstTables)) + for _, table := range dstTables { + dstSchemaTable, err := utils.ParseSchemaTable(table) + if err != nil { + return fmt.Errorf("failed to parse table name %s: %w", table, err) + } + dstTransformedTable := dstSchemaTable.Schema + "." + connpostgres.QRepOverwriteTempTablePrefix + dstSchemaTable.Table + transformedTables = append(transformedTables, dstTransformedTable) + } + config.DestinationTableIdentifier = strings.Join(transformedTables, ";") + } + err = q.SetupWatermarkTableOnDestination(ctx) if err != nil { return fmt.Errorf("failed to setup watermark table: %w", err) @@ -489,6 +506,7 @@ func QRepFlowWorkflow( return err } + q.config.DestinationTableIdentifier = oldDstTableIdentifier logger.Info("consolidating partitions for peer flow") if err := q.consolidatePartitions(ctx); err != nil { return err diff --git a/ui/app/api/mirrors/qrep/route.ts b/ui/app/api/mirrors/qrep/route.ts index b60a316cee..e6b2e03b20 100644 --- a/ui/app/api/mirrors/qrep/route.ts +++ b/ui/app/api/mirrors/qrep/route.ts @@ -7,14 +7,8 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); - const { config, tableMapping } = body; - const flowServiceAddr = GetFlowHttpAddressFromEnv(); - const req: CreateQRepFlowRequest = { - qrepConfig: config, - createCatalogEntry: true, - tableMapping: tableMapping, - }; + const req: CreateQRepFlowRequest = body; try { const createStatus: CreateQRepFlowResponse = await fetch( `${flowServiceAddr}/v1/flows/qrep/create`, @@ -31,6 +25,7 @@ export async function POST(request: Request) { return new Response(JSON.stringify(response)); } catch (e) { + return new Response(JSON.stringify({ created: false })); console.log(e); } } diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 0b86c745e5..592c6d83d1 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -18,6 +18,7 @@ import { cdcSchema, flowNameSchema, qrepSchema, + sfQrepSchema, tableMappingSchema, } from './schema'; @@ -206,16 +207,27 @@ export const handleCreateQRep = async ( return; } + let tableMapping: TableMapping[] = []; + let reqBody: CreateQRepFlowRequest = { + qrepConfig: config, + createCatalogEntry: true, + tableMapping: tableMapping, + }; + + tableMapping = reformattedTableMapping(rows ?? []); if (config.sourcePeer?.snowflakeConfig) { - config.query = 'SELECT * FROM ' + config.watermarkTable; - if (config.watermarkTable == '') { - notify('Please fill in the source table'); + const sfQRepFieldErr = sfQrepSchema.safeParse(config); + if (!sfQRepFieldErr.success) { + notify(sfQRepFieldErr.error.issues[0].message); return; } - if (config.destinationTableIdentifier == '') { - notify('Please fill in the destination table'); + console.log('rows in handler:', rows); + const tableValidity = tableMappingSchema.safeParse(tableMapping); + if (!tableValidity.success) { + notify(tableValidity.error.issues[0].message); return; } + reqBody.tableMapping = tableMapping; } else { const fieldErr = validateQRepFields(config.query, config); if (fieldErr) { @@ -224,16 +236,6 @@ export const handleCreateQRep = async ( } } config.flowJobName = flowJobName; - let reqBody: CreateQRepFlowRequest = { - qrepConfig: config, - createCatalogEntry: true, - tableMapping: [], - }; - if (rows) { - const tableMapping = reformattedTableMapping(rows); - reqBody.tableMapping = tableMapping; - } - setLoading(true); const statusMessage: UCreateMirrorResponse = await fetch( '/api/mirrors/qrep', diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 51d4a2b5ab..a81b5151ec 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -60,7 +60,7 @@ export const blankQRepSetting = { export const blankSnowflakeQRepSetting = { destinationTableIdentifier: '', - query: '', + query: '-- nothing', watermarkTable: '', watermarkColumn: '', maxParallelWorkers: 4, diff --git a/ui/app/mirrors/create/schema.ts b/ui/app/mirrors/create/schema.ts index 3ca4408315..2a30f1f62a 100644 --- a/ui/app/mirrors/create/schema.ts +++ b/ui/app/mirrors/create/schema.ts @@ -15,10 +15,10 @@ export const tableMappingSchema = z .array( z.object({ sourceTableIdentifier: z - .string() + .string({ required_error: 'Source table name is required' }) .min(1, 'source table names, if added, must be non-empty'), destinationTableIdentifier: z - .string() + .string({ required_error: 'Destination table name is required' }) .min(1, 'destination table names, if added, must be non-empty'), exclude: z.array(z.string()).optional(), partitionKey: z.string().optional(), @@ -182,3 +182,29 @@ export const qrepSchema = z.object({ .min(1, 'Batch wait must be a non-negative integer') .optional(), }); + +export const sfQrepSchema = z.object({ + sourcePeer: z.object( + { + name: z.string().min(1), + type: z.any(), + config: z.any(), + }, + { required_error: 'Source peer is required' } + ), + destinationPeer: z.object( + { + name: z.string().min(1), + type: z.any(), + config: z.any(), + }, + { required_error: 'Destination peer is required' } + ), + waitBetweenBatchesSeconds: z + .number({ + invalid_type_error: 'Batch wait must be a number', + }) + .int() + .min(1, 'Batch wait must be a non-negative integer') + .optional(), +});