diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 91214f31e2..1d30a7e8c6 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -550,6 +550,10 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } + // override for debugging. + batchIDs.SyncBatchID = 364 + batchIDs.NormalizeBatchID = 363 + hasJob, err := c.metadataHasJob(req.FlowJobName) if err != nil { return nil, fmt.Errorf("failed to check if job exists: %w", err) @@ -611,13 +615,19 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) for i, mergeStmt := range mergeStmts { c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..", i+1, len(mergeStmts), tableName), slog.String("mergeStmt", mergeStmt)) - q := c.client.Query(mergeStmt) - _, err = q.Read(c.ctx) - if err != nil { - return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) - } + // TODO (re-enable later) + // q := c.client.Query(mergeStmt) + // _, err = q.Read(c.ctx) + // if err != nil { + // return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) + // } } } + + if 1 > 0 { + return nil, fmt.Errorf("normalizeRecords will error out temporarily") + } + // update metadata to make the last normalized batch id to the recent last sync batch id. updateMetadataStmt := fmt.Sprintf( "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",