diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 1d30a7e8c6..91214f31e2 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -550,10 +550,6 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } - // override for debugging. - batchIDs.SyncBatchID = 364 - batchIDs.NormalizeBatchID = 363 - hasJob, err := c.metadataHasJob(req.FlowJobName) if err != nil { return nil, fmt.Errorf("failed to check if job exists: %w", err) @@ -615,19 +611,13 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) for i, mergeStmt := range mergeStmts { c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..", i+1, len(mergeStmts), tableName), slog.String("mergeStmt", mergeStmt)) - // TODO (re-enable later) - // q := c.client.Query(mergeStmt) - // _, err = q.Read(c.ctx) - // if err != nil { - // return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) - // } + q := c.client.Query(mergeStmt) + _, err = q.Read(c.ctx) + if err != nil { + return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err) + } } } - - if 1 > 0 { - return nil, fmt.Errorf("normalizeRecords will error out temporarily") - } - // update metadata to make the last normalized batch id to the recent last sync batch id. updateMetadataStmt := fmt.Sprintf( "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",