diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go
index d08935cf37..754a77829d 100644
--- a/flow/connectors/bigquery/bigquery.go
+++ b/flow/connectors/bigquery/bigquery.go
@@ -564,76 +564,86 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
 			EndBatchID:   batchIDs.SyncBatchID,
 		}, nil
 	}
-	distinctTableNames, err := c.getDistinctTableNamesInBatch(
-		req.FlowJobName,
-		batchIDs.SyncBatchID,
-		batchIDs.NormalizeBatchID,
-	)
-	if err != nil {
-		return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
-	}
 
-	tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
-		req.FlowJobName,
-		batchIDs.SyncBatchID,
-		batchIDs.NormalizeBatchID,
-	)
-	if err != nil {
-		return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
-	}
-
-	// append all the statements to one list
-	c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
-		c.datasetID, rawTableName, distinctTableNames))
-
-	for _, tableName := range distinctTableNames {
-		dstDatasetTable, _ := c.convertToDatasetTable(tableName)
-		mergeGen := &mergeStmtGenerator{
-			rawDatasetTable: &datasetTable{
-				dataset: c.datasetID,
-				table:   rawTableName,
-			},
-			dstTableName:          tableName,
-			dstDatasetTable:       dstDatasetTable,
-			normalizedTableSchema: c.tableNameSchemaMapping[tableName],
-			syncBatchID:           batchIDs.SyncBatchID,
-			normalizeBatchID:      batchIDs.NormalizeBatchID,
-			unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
-			peerdbCols: &protos.PeerDBColumns{
-				SoftDeleteColName: req.SoftDeleteColName,
-				SyncedAtColName:   req.SyncedAtColName,
-				SoftDelete:        req.SoftDelete,
-			},
-			shortColumn: map[string]string{},
+	prevNormalizedUntil := batchIDs.NormalizeBatchID
+	normalizeUntil := batchIDs.NormalizeBatchID
+	for normalizeUntil < batchIDs.SyncBatchID {
+		normalizeUntil += 1
+		distinctTableNames, err := c.getDistinctTableNamesInBatch(
+			req.FlowJobName,
+			normalizeUntil,
+			prevNormalizedUntil,
+		)
+		if err != nil {
+			return nil, fmt.Errorf("couldn't get distinct table names to normalize: %w", err)
 		}
 
-		// normalize anything between last normalized batch id to last sync batchid
-		mergeStmts := mergeGen.generateMergeStmts()
+		tableNametoUnchangedToastCols, err := c.getTableNametoUnchangedCols(
+			req.FlowJobName,
+			normalizeUntil,
+			prevNormalizedUntil,
+		)
+		if err != nil {
+			return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
+		}
 
-		// run the merge statement
-		for i, mergeStmt := range mergeStmts {
-			c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
-				i+1, len(mergeStmts), tableName))
-			q := c.client.Query(mergeStmt)
-			_, err = q.Read(c.ctx)
-			if err != nil {
-				return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
+		// append all the statements to one list
+		c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
+			c.datasetID, rawTableName, distinctTableNames))
+
+		for _, tableName := range distinctTableNames {
+			dstDatasetTable, _ := c.convertToDatasetTable(tableName)
+			mergeGen := &mergeStmtGenerator{
+				rawDatasetTable: &datasetTable{
+					dataset: c.datasetID,
+					table:   rawTableName,
+				},
+				dstTableName:          tableName,
+				dstDatasetTable:       dstDatasetTable,
+				normalizedTableSchema: c.tableNameSchemaMapping[tableName],
+				syncBatchID:           normalizeUntil,
+				normalizeBatchID:      prevNormalizedUntil,
+				unchangedToastColumns: tableNametoUnchangedToastCols[tableName],
+				peerdbCols: &protos.PeerDBColumns{
+					SoftDeleteColName: req.SoftDeleteColName,
+					SyncedAtColName:   req.SyncedAtColName,
+					SoftDelete:        req.SoftDelete,
+				},
+				shortColumn: map[string]string{},
+			}
+
+			// normalize anything between last normalized batch id to last sync batchid
+			mergeStmts := mergeGen.generateMergeStmts()
+
+			// run the merge statement
+			for i, mergeStmt := range mergeStmts {
+				c.logger.Info(fmt.Sprintf("running merge statement [%d/%d] for table %s..",
+					i+1, len(mergeStmts), tableName))
+				q := c.client.Query(mergeStmt)
+				_, err = q.Read(c.ctx)
+				if err != nil {
+					return nil, fmt.Errorf("failed to execute merge statement %s: %v", mergeStmt, err)
+				}
 			}
 		}
-	}
-	// update metadata to make the last normalized batch id to the recent last sync batch id.
-	updateMetadataStmt := fmt.Sprintf(
-		"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
-		c.datasetID, MirrorJobsTable, batchIDs.SyncBatchID, req.FlowJobName)
-	_, err = c.client.Query(updateMetadataStmt).Read(c.ctx)
-	if err != nil {
-		return nil, fmt.Errorf("failed to update metadata table: %v", err)
+
+		// update metadata to make the last normalized batch id to the recent last sync batch id.
+		updateMetadataStmt := fmt.Sprintf(
+			"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
+			c.datasetID, MirrorJobsTable, normalizeUntil, req.FlowJobName)
+		_, err = c.client.Query(updateMetadataStmt).Read(c.ctx)
+		if err != nil {
+			return nil, fmt.Errorf("failed to update metadata table: %v", err)
+		}
+
+		prevNormalizedUntil = normalizeUntil
+		c.logger.Info(fmt.Sprintf("normalized batch %d / %d", normalizeUntil, batchIDs.SyncBatchID))
 	}
 
 	return &model.NormalizeResponse{
 		Done:         true,
 		StartBatchID: batchIDs.NormalizeBatchID + 1,
-		EndBatchID:   batchIDs.SyncBatchID,
+		EndBatchID:   normalizeUntil,
 	}, nil
 }