diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9591cab251..66a4a2033f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -72,8 +72,9 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot } defer connectors.CloseConnector(dstConn) + flowName, _ := ctx.Value(shared.FlowNameKey).(string) if err := dstConn.SetupMetadataTables(); err != nil { - a.Alerter.LogFlowError(ctx, config.Name, err) + a.Alerter.LogFlowError(ctx, flowName, err) return fmt.Errorf("failed to setup metadata tables: %w", err) } @@ -112,7 +113,7 @@ func (a *FlowableActivity) EnsurePullability( output, err := srcConn.EnsurePullability(config) if err != nil { - a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } @@ -169,7 +170,8 @@ func (a *FlowableActivity) CreateNormalizedTable( setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config) if err != nil { - a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to setup normalized tables: %w", err) } @@ -580,7 +582,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, slog.Error("failed to pull records", slog.Any("error", err)) goroutineErr = err } else { - err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, + a.CatalogPool, runUUID, partition, numRecords) if err != nil { slog.Error(fmt.Sprintf("%v", err)) goroutineErr = err @@ -935,7 +938,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }, } } - updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) + updateErr := monitoring.InitializeQRepRun( + ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) if updateErr != nil { return updateErr } @@ -945,7 +949,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, return fmt.Errorf("failed to update start time for partition: %w", err) } - err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) + err = monitoring.UpdatePullEndTimeAndRowsForPartition( + errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) if err != nil { slog.Error(fmt.Sprintf("%v", err)) return err diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3c0787527e..0a220ef424 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -781,25 +781,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - stmts := []string{} + stmts := make([]string, 0, len(distinctTableNames)+1) // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) - release, err := c.grabJobsUpdateLock() - if err != nil { - return nil, fmt.Errorf("failed to grab lock: %v", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - - stmts = append(stmts, "BEGIN TRANSACTION;") - for _, tableName := range distinctTableNames { mergeGen := &mergeStmtGenerator{ Dataset: c.datasetID, @@ -824,11 +810,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - _, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx) + query := strings.Join(stmts, "\n") + _, err = c.client.Query(query).Read(c.ctx) if err != nil { - return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err) + return nil, fmt.Errorf("failed to execute statements %s: %v", query, err) } return &model.NormalizeResponse{ @@ -1023,21 +1009,9 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { - release, err := c.grabJobsUpdateLock() - if err != nil { - return fmt.Errorf("failed to grab lock: %w", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - dataset := c.client.Dataset(c.datasetID) // deleting PeerDB specific tables - err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } @@ -1069,33 +1043,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string { return fmt.Sprintf("_peerdb_staging_%s", flowJobName) } -// Bigquery doesn't allow concurrent updates to the same table. -// we grab a lock on catalog to ensure that only one job is updating -// bigquery tables at a time. -// returns a function to release the lock. -func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { - tx, err := c.catalogPool.Begin(c.ctx) - if err != nil { - return nil, fmt.Errorf("failed to begin transaction: %w", err) - } - - // grab an advisory lock based on the mirror jobs table hash - mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) - _, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl) - if err != nil { - err = tx.Rollback(c.ctx) - return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) - } - - return func() error { - err = tx.Commit(c.ctx) - if err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return nil - }, nil -} - func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { for _, renameRequest := range req.RenameTableOptions { src := renameRequest.CurrentName diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d52e7c42e3..7ed87b0c06 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -74,12 +74,12 @@ func (s *QRepAvroSyncMethod) SyncRecords( flowJobName, dstTableName, syncBatchID), ) - // execute the statements in a transaction - stmts := []string{} - stmts = append(stmts, "BEGIN TRANSACTION;") - stmts = append(stmts, insertStmt) - stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + updateMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) @@ -136,8 +136,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client datasetID := s.connector.datasetID - // Start a transaction - stmts := []string{"BEGIN TRANSACTION;"} selector := "*" if softDeleteCol != "" { // PeerDB column @@ -150,16 +148,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;", datasetID, dstTableName, selector, datasetID, stagingTable) - stmts = append(stmts, insertStmt) - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) if err != nil { return -1, fmt.Errorf("failed to create metadata insert statement: %v", err) } slog.Info("Performing transaction inside QRep sync function", flowLog) - stmts = append(stmts, insertMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - // Execute the statements in a transaction + + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + insertMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) diff --git a/ui/app/api/mirrors/alerts/route.ts b/ui/app/api/mirrors/alerts/route.ts index ecb9891cbd..13cc612503 100644 --- a/ui/app/api/mirrors/alerts/route.ts +++ b/ui/app/api/mirrors/alerts/route.ts @@ -19,3 +19,22 @@ export async function POST(request: Request) { } return new Response(JSON.stringify(mirrorStatus)); } + +// We accept a list here in preparation for a Select All feature in UI +export async function PUT(request: Request) { + const { mirrorIDStringList } = await request.json(); + const mirrorIDList: bigint[] = mirrorIDStringList.map((id: string) => + BigInt(id) + ); + const success = await prisma.flow_errors.updateMany({ + where: { + id: { + in: mirrorIDList, + }, + }, + data: { + ack: true, + }, + }); + return new Response(JSON.stringify(success.count)); +} diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index e7729c487d..738218c64f 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -32,10 +32,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {