From ed67a63ec9c0bfee98dc8501ac6194acce781837 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 15:28:19 +0000 Subject: [PATCH 1/3] BQ merge: don't be transactional (#895) Discussed with Kevin BQ lock contention, he made #889 to remove temp table Merge is idempotent, so no need to have transaction, which removes need to have advisory lock on catalog --- flow/connectors/bigquery/bigquery.go | 63 ++-------------------- flow/connectors/bigquery/qrep_avro_sync.go | 26 ++++----- 2 files changed, 18 insertions(+), 71 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 3c0787527e..0a220ef424 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -781,25 +781,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err) } - stmts := []string{} + stmts := make([]string, 0, len(distinctTableNames)+1) // append all the statements to one list c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v", c.datasetID, rawTableName, distinctTableNames)) - release, err := c.grabJobsUpdateLock() - if err != nil { - return nil, fmt.Errorf("failed to grab lock: %v", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - - stmts = append(stmts, "BEGIN TRANSACTION;") - for _, tableName := range distinctTableNames { mergeGen := &mergeStmtGenerator{ Dataset: c.datasetID, @@ -824,11 +810,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - _, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx) + query := strings.Join(stmts, "\n") + _, err = c.client.Query(query).Read(c.ctx) if err != nil { - return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err) + return nil, fmt.Errorf("failed to execute statements %s: %v", query, err) } return &model.NormalizeResponse{ @@ -1023,21 +1009,9 @@ func (c *BigQueryConnector) SetupNormalizedTables( } func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error { - release, err := c.grabJobsUpdateLock() - if err != nil { - return fmt.Errorf("failed to grab lock: %w", err) - } - - defer func() { - err := release() - if err != nil { - c.logger.Error("failed to release lock", slog.Any("error", err)) - } - }() - dataset := c.client.Dataset(c.datasetID) // deleting PeerDB specific tables - err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) + err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx) if err != nil { return fmt.Errorf("failed to delete raw table: %w", err) } @@ -1069,33 +1043,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string { return fmt.Sprintf("_peerdb_staging_%s", flowJobName) } -// Bigquery doesn't allow concurrent updates to the same table. -// we grab a lock on catalog to ensure that only one job is updating -// bigquery tables at a time. -// returns a function to release the lock. -func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) { - tx, err := c.catalogPool.Begin(c.ctx) - if err != nil { - return nil, fmt.Errorf("failed to begin transaction: %w", err) - } - - // grab an advisory lock based on the mirror jobs table hash - mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable) - _, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl) - if err != nil { - err = tx.Rollback(c.ctx) - return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err) - } - - return func() error { - err = tx.Commit(c.ctx) - if err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - return nil - }, nil -} - func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { for _, renameRequest := range req.RenameTableOptions { src := renameRequest.CurrentName diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d52e7c42e3..7ed87b0c06 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -74,12 +74,12 @@ func (s *QRepAvroSyncMethod) SyncRecords( flowJobName, dstTableName, syncBatchID), ) - // execute the statements in a transaction - stmts := []string{} - stmts = append(stmts, "BEGIN TRANSACTION;") - stmts = append(stmts, insertStmt) - stmts = append(stmts, updateMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + updateMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) @@ -136,8 +136,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client datasetID := s.connector.datasetID - // Start a transaction - stmts := []string{"BEGIN TRANSACTION;"} selector := "*" if softDeleteCol != "" { // PeerDB column @@ -150,16 +148,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;", datasetID, dstTableName, selector, datasetID, stagingTable) - stmts = append(stmts, insertStmt) - insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) if err != nil { return -1, fmt.Errorf("failed to create metadata insert statement: %v", err) } slog.Info("Performing transaction inside QRep sync function", flowLog) - stmts = append(stmts, insertMetadataStmt) - stmts = append(stmts, "COMMIT TRANSACTION;") - // Execute the statements in a transaction + + stmts := []string{ + "BEGIN TRANSACTION;", + insertStmt, + insertMetadataStmt, + "COMMIT TRANSACTION;", + } _, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx) if err != nil { return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err) From f1038ba4613edc66850e8b14e5610dc9c00595e2 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 26 Dec 2023 01:09:10 +0530 Subject: [PATCH 2/3] Errors UI: Acknowledge functionality (#901) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Users can now mark errors as 'Acknowledged'. If there are no unacknowledged errors for a mirror, it's status is said to be 'Active' Screenshot 2023-12-25 at 10 51 03 PM Users can click on the status and be taken to the page above. Screenshot 2023-12-25 at 10 22 34 PM - Loading indicator for acknowledge button - Error toast incase acknowledge operation fails --- ui/app/api/mirrors/alerts/route.ts | 19 +++ ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx | 7 +- .../mirrors/errors/[mirrorName]/ackbutton.tsx | 57 +++++++++ ui/app/mirrors/errors/[mirrorName]/page.tsx | 119 +++++++++++------- ui/app/mirrors/mirror-status.tsx | 41 +++--- ui/app/mirrors/tables.tsx | 2 +- 6 files changed, 174 insertions(+), 71 deletions(-) create mode 100644 ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx diff --git a/ui/app/api/mirrors/alerts/route.ts b/ui/app/api/mirrors/alerts/route.ts index ecb9891cbd..13cc612503 100644 --- a/ui/app/api/mirrors/alerts/route.ts +++ b/ui/app/api/mirrors/alerts/route.ts @@ -19,3 +19,22 @@ export async function POST(request: Request) { } return new Response(JSON.stringify(mirrorStatus)); } + +// We accept a list here in preparation for a Select All feature in UI +export async function PUT(request: Request) { + const { mirrorIDStringList } = await request.json(); + const mirrorIDList: bigint[] = mirrorIDStringList.map((id: string) => + BigInt(id) + ); + const success = await prisma.flow_errors.updateMany({ + where: { + id: { + in: mirrorIDList, + }, + }, + data: { + ack: true, + }, + }); + return new Response(JSON.stringify(success.count)); +} diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index e7729c487d..738218c64f 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -32,10 +32,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
- +
diff --git a/ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx b/ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx new file mode 100644 index 0000000000..b6d14eed89 --- /dev/null +++ b/ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx @@ -0,0 +1,57 @@ +'use client'; +import { Button } from '@/lib/Button'; +import { Label } from '@/lib/Label'; +import { ProgressCircle } from '@/lib/ProgressCircle'; +import { useState } from 'react'; +import { toast } from 'react-toastify'; + +const notifyErr = (errMsg: string) => { + toast.error(errMsg, { + position: toast.POSITION.BOTTOM_CENTER, + }); +}; + +const AckButton = ({ ack, id }: { ack: boolean; id: number | bigint }) => { + const [loading, setLoading] = useState(false); + const [updated, setUpdated] = useState(false); + // handleAck updates ack to true for the given mirrorID + const handleAck = async (mirrorID: bigint | number) => { + setLoading(true); + const updateResResult = await fetch('/api/mirrors/alerts', { + method: 'PUT', + body: JSON.stringify({ + mirrorIDStringList: [mirrorID.toString()], + }), + }); + const updateRes = await updateResResult.json(); + setLoading(false); + if (!updateRes) { + notifyErr('Something went wrong when trying to acknowledge'); + return; + } + setUpdated(true); + }; + return ( + <> + {ack !== true && updated !== true ? ( + + ) : ( + + )} + + ); +}; + +export default AckButton; diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index 97efca644a..899b25a496 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -3,6 +3,9 @@ import prisma from '@/app/utils/prisma'; import TimeLabel from '@/components/TimeComponent'; import { Label } from '@/lib/Label'; import { Table, TableCell, TableRow } from '@/lib/Table'; +import { ToastContainer } from 'react-toastify'; +import 'react-toastify/dist/ReactToastify.css'; +import AckButton from './ackbutton'; type MirrorErrorProps = { params: { mirrorName: string }; @@ -14,62 +17,84 @@ const MirrorError = async ({ params: { mirrorName } }: MirrorErrorProps) => { flow_name: mirrorName, error_type: 'error', }, - distinct: ['error_message'], orderBy: { error_timestamp: 'desc', }, }); return ( -
- -
-
- - -
- - Type - - Message - - } + <> +
+ +
+
+ + + +
+ +
+ +
- {mirrorErrors.map((mirrorError) => ( - - - {mirrorError.error_type.toUpperCase()} - - - - - - {mirrorError.error_message} - - - ))} -
+ + Type + + + + Message + + + } + > + {mirrorErrors.map((mirrorError) => ( + + + {mirrorError.error_type.toUpperCase()} + + + + + + {mirrorError.error_message} + + + + + + ))} +
+
-
+ + ); }; diff --git a/ui/app/mirrors/mirror-status.tsx b/ui/app/mirrors/mirror-status.tsx index 27d797e389..b64d87ff23 100644 --- a/ui/app/mirrors/mirror-status.tsx +++ b/ui/app/mirrors/mirror-status.tsx @@ -13,26 +13,21 @@ export const ErrorModal = ({ flowName }: { flowName: string }) => { ); }; -export const MirrorError = ({ - flowName, - detailed, -}: { - flowName: string; - detailed: boolean; -}) => { +export const MirrorError = ({ flowName }: { flowName: string }) => { const [flowStatus, setFlowStatus] = useState(); const [isLoading, setIsLoading] = useState(true); const [error, setError] = useState(null); @@ -81,15 +76,23 @@ export const MirrorError = ({ ); } - if (flowStatus == 'healthy') { - if (detailed) - return ( -
+ if (flowStatus === 'healthy') { + return ( + +
- ); - return ; + + + + ); } return ; diff --git a/ui/app/mirrors/tables.tsx b/ui/app/mirrors/tables.tsx index 106c7cd22d..6c1289befc 100644 --- a/ui/app/mirrors/tables.tsx +++ b/ui/app/mirrors/tables.tsx @@ -90,7 +90,7 @@ export function CDCFlows({ cdcFlows }: { cdcFlows: any }) { - + Date: Tue, 26 Dec 2023 01:37:18 +0530 Subject: [PATCH 3/3] Fix flow err log and cleanup flowable.go (#887) A few places we were logging peer names instead of flow name for the `peerdb_stats.flow_errors` table. Also some long lines have been split in this PR Co-authored-by: Kaushik Iska --- flow/activities/flowable.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9591cab251..66a4a2033f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -72,8 +72,9 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot } defer connectors.CloseConnector(dstConn) + flowName, _ := ctx.Value(shared.FlowNameKey).(string) if err := dstConn.SetupMetadataTables(); err != nil { - a.Alerter.LogFlowError(ctx, config.Name, err) + a.Alerter.LogFlowError(ctx, flowName, err) return fmt.Errorf("failed to setup metadata tables: %w", err) } @@ -112,7 +113,7 @@ func (a *FlowableActivity) EnsurePullability( output, err := srcConn.EnsurePullability(config) if err != nil { - a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } @@ -169,7 +170,8 @@ func (a *FlowableActivity) CreateNormalizedTable( setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config) if err != nil { - a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err) + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to setup normalized tables: %w", err) } @@ -580,7 +582,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, slog.Error("failed to pull records", slog.Any("error", err)) goroutineErr = err } else { - err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords) + err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, + a.CatalogPool, runUUID, partition, numRecords) if err != nil { slog.Error(fmt.Sprintf("%v", err)) goroutineErr = err @@ -935,7 +938,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }, } } - updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) + updateErr := monitoring.InitializeQRepRun( + ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}) if updateErr != nil { return updateErr } @@ -945,7 +949,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, return fmt.Errorf("failed to update start time for partition: %w", err) } - err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) + err = monitoring.UpdatePullEndTimeAndRowsForPartition( + errCtx, a.CatalogPool, runUUID, partition, int64(numRecords)) if err != nil { slog.Error(fmt.Sprintf("%v", err)) return err