From 7e517b7d4a7c6e60ddace2d58262720d4d6d72d9 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 28 Feb 2024 02:12:21 +0530 Subject: [PATCH] changes --- flow/connectors/postgres/qrep_sql_sync.go | 28 +++++++++++------------ ui/app/mirrors/create/handlers.ts | 4 ---- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/flow/connectors/postgres/qrep_sql_sync.go b/flow/connectors/postgres/qrep_sql_sync.go index bc97fcf93d..42e113d518 100644 --- a/flow/connectors/postgres/qrep_sql_sync.go +++ b/flow/connectors/postgres/qrep_sql_sync.go @@ -70,7 +70,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( return 0, fmt.Errorf("failed to begin transaction: %w", err) } defer func() { - if err := tx.Rollback(context.Background()); err != nil { + if err := tx.Rollback(ctx); err != nil { if err != pgx.ErrTxClosed { logger.LoggerFromCtx(ctx).Error("failed to rollback transaction tx2", slog.Any("error", err), syncLog) } @@ -87,7 +87,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( writeMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_APPEND { // Perform the COPY FROM operation numRowsSynced, err = tx.CopyFrom( - context.Background(), + ctx, pgx.Identifier{dstTableName.Schema, dstTableName.Table}, schema.GetColumnNames(), copySource, @@ -103,7 +103,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( QuoteIdentifier(syncedAtCol), QuoteIdentifier(syncedAtCol), ) - _, err = tx.Exec(context.Background(), updateSyncedAtStmt) + _, err = tx.Exec(ctx, updateSyncedAtStmt) if err != nil { return -1, fmt.Errorf("failed to update synced_at column: %v", err) } @@ -119,7 +119,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( } newColumns = append(newColumns, fmt.Sprintf("%s TIMESTAMP DEFAULT CURRENT_TIMESTAMP", QuoteIdentifier(syncedAtCol))) - _, err := tx.Exec(context.Background(), fmt.Sprintf("CREATE UNLOGGED TABLE %s (%s);", + _, err := tx.Exec(ctx, fmt.Sprintf("CREATE UNLOGGED TABLE %s (%s);", overwriteTempTableIdentifier, strings.Join(newColumns, ", "), )) @@ -127,17 +127,17 @@ func (s *QRepStagingTableSync) SyncQRepRecords( return -1, fmt.Errorf("failed to create %s: %v", overwriteTempTableIdentifier, err) } - _, err = tx.CopyFrom(context.Background(), overwriteTempTable, schema.GetColumnNames(), copySource) + _, err = tx.CopyFrom(ctx, overwriteTempTable, schema.GetColumnNames(), copySource) if err != nil { return -1, fmt.Errorf("failed to copy records into %s: %v", overwriteTempTableIdentifier, err) } - _, err = tx.Exec(context.Background(), fmt.Sprintf("DROP TABLE %s;", dstTableIdentifier)) + _, err = tx.Exec(ctx, fmt.Sprintf("DROP TABLE %s;", dstTableIdentifier)) if err != nil { return -1, fmt.Errorf("failed to drop %s: %v", dstTableIdentifier, err) } - _, err = tx.Exec(context.Background(), fmt.Sprintf("ALTER TABLE %s RENAME TO %s;", + _, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s;", overwriteTempTableIdentifier, QuoteIdentifier(dstTableName.Table))) if err != nil { return -1, fmt.Errorf("failed to rename %s to %s: %v", @@ -151,7 +151,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( QuoteIdentifier(syncedAtCol), QuoteIdentifier(syncedAtCol), ) - _, err = tx.Exec(context.Background(), updateSyncedAtStmt) + _, err = tx.Exec(ctx, updateSyncedAtStmt) if err != nil { return -1, fmt.Errorf("failed to update synced_at column: %v", err) } @@ -170,14 +170,14 @@ func (s *QRepStagingTableSync) SyncQRepRecords( s.connector.logger.Info(fmt.Sprintf("Creating staging table %s - '%s'", stagingTableName, createStagingTableStmt), syncLog) - _, err = tx.Exec(context.Background(), createStagingTableStmt) + _, err = tx.Exec(ctx, createStagingTableStmt) if err != nil { return -1, fmt.Errorf("failed to create staging table: %v", err) } // Step 2.2: Insert records into the staging table numRowsSynced, err = tx.CopyFrom( - context.Background(), + ctx, stagingTableIdentifier, schema.GetColumnNames(), copySource, @@ -220,7 +220,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( setClause, ) s.connector.logger.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog) - res, err := tx.Exec(context.Background(), upsertStmt) + res, err := tx.Exec(ctx, upsertStmt) if err != nil { return -1, fmt.Errorf("failed to perform upsert operation: %v", err) } @@ -233,7 +233,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( stagingTableIdentifier.Sanitize(), ) s.connector.logger.Info("Dropping staging table", slog.String("stagingTable", stagingTableName), syncLog) - _, err = tx.Exec(context.Background(), dropStagingTableStmt) + _, err = tx.Exec(ctx, dropStagingTableStmt) if err != nil { return -1, fmt.Errorf("failed to drop staging table: %v", err) } @@ -254,7 +254,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( ) s.connector.logger.Info("Executing transaction inside Qrep sync", syncLog) _, err = tx.Exec( - context.Background(), + ctx, insertMetadataStmt, flowJobName, partitionID, @@ -266,7 +266,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) } - err = tx.Commit(context.Background()) + err = tx.Commit(ctx) if err != nil { return -1, fmt.Errorf("failed to commit transaction: %v", err) } diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 6e1ee21d76..ac63e50488 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -205,10 +205,6 @@ export const handleCreateQRep = async ( } if (config.sourcePeer?.snowflakeConfig) { - if (config.query.includes('')) { - notify('Please fill in the query correctly'); - return; - } if (config.watermarkTable == '') { notify('Please fill in the source table'); return;