From b16075360554c2ad2772fff084d5d653362e56d9 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Sun, 24 Dec 2023 11:47:22 +0530 Subject: [PATCH] making BQ merge a single statement without temp tables (#889) --- flow/connectors/bigquery/bigquery.go | 14 +++---- .../bigquery/merge_statement_generator.go | 41 +++++-------------- flow/connectors/bigquery/qrep_avro_sync.go | 3 +- 3 files changed, 17 insertions(+), 41 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index b8831be301..64d12057f6 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -233,11 +233,11 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc return nil } -func (c *BigQueryConnector) WaitForTableReady(tblName string) error { +func (c *BigQueryConnector) waitForTableReady(tblName string) error { table := c.client.Dataset(c.datasetID).Table(tblName) maxDuration := 5 * time.Minute deadline := time.Now().Add(maxDuration) - sleepInterval := 15 * time.Second + sleepInterval := 5 * time.Second attempt := 0 for { @@ -816,20 +816,16 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) }, } // normalize anything between last normalized batch id to last sync batchid - mergeStmts := mergeGen.generateMergeStmts() - stmts = append(stmts, mergeStmts...) + mergeStmt := mergeGen.generateMergeStmt() + stmts = append(stmts, mergeStmt) } // update metadata to make the last normalized batch id to the recent last sync batch id. updateMetadataStmt := fmt.Sprintf( - "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';", + "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) stmts = append(stmts, updateMetadataStmt) stmts = append(stmts, "COMMIT TRANSACTION;") - // put this within a transaction - // TODO - not truncating rows in staging table as of now. - // err = c.truncateTable(staging...) - _, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx) if err != nil { return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 22161c434b..22f876b8c3 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -8,7 +8,6 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" ) type mergeStmtGenerator struct { @@ -30,24 +29,6 @@ type mergeStmtGenerator struct { peerdbCols *protos.PeerDBColumns } -// GenerateMergeStmt generates a merge statements. -func (m *mergeStmtGenerator) generateMergeStmts() []string { - // return an empty array for now - flattenedCTE := m.generateFlattenedCTE() - deDupedCTE := m.generateDeDupedCTE() - tempTable := fmt.Sprintf("_peerdb_de_duplicated_data_%s", shared.RandomString(5)) - // create temp table stmt - createTempTableStmt := fmt.Sprintf( - "CREATE TEMP TABLE %s AS (%s, %s);", - tempTable, flattenedCTE, deDupedCTE) - - mergeStmt := m.generateMergeStmt(tempTable, m.peerdbCols) - - dropTempTableStmt := fmt.Sprintf("DROP TABLE %s;", tempTable) - - return []string{createTempTableStmt, mergeStmt, dropTempTableStmt} -} - // generateFlattenedCTE generates a flattened CTE. func (m *mergeStmtGenerator) generateFlattenedCTE() string { // for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR @@ -129,7 +110,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { } // generateMergeStmt generates a merge statement. -func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *protos.PeerDBColumns) string { +func (m *mergeStmtGenerator) generateMergeStmt() string { // comma separated list of column names backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns)) pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns)) @@ -138,13 +119,13 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro pureColNames = append(pureColNames, colName) } csep := strings.Join(backtickColNames, ", ") - insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName) + insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName) insertValuesSQL := csep + ",CURRENT_TIMESTAMP" updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, - m.UnchangedToastColumns, peerdbCols) + m.UnchangedToastColumns, m.peerdbCols) if m.peerdbCols.SoftDelete { - softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName) + softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", m.peerdbCols.SoftDeleteColName) softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE" updateStatementsforToastCols = append(updateStatementsforToastCols, @@ -162,25 +143,25 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ") deletePart := "DELETE" - if peerdbCols.SoftDelete { - colName := peerdbCols.SoftDeleteColName + if m.peerdbCols.SoftDelete { + colName := m.peerdbCols.SoftDeleteColName deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName) - if peerdbCols.SyncedAtColName != "" { + if m.peerdbCols.SyncedAtColName != "" { deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP", - deletePart, peerdbCols.SyncedAtColName) + deletePart, m.peerdbCols.SyncedAtColName) } } return fmt.Sprintf(` - MERGE %s.%s _peerdb_target USING %s _peerdb_deduped + MERGE %s.%s _peerdb_target USING (%s,%s) _peerdb_deduped ON %s WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN INSERT (%s) VALUES (%s) %s WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN %s; - `, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, insertColumnsSQL, insertValuesSQL, - updateStringToastCols, deletePart) + `, m.Dataset, m.NormalizedTable, m.generateFlattenedCTE(), m.generateDeDupedCTE(), + pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) } /* diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 6a83d23ae8..d52e7c42e3 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -343,7 +343,6 @@ func (s *QRepAvroSyncMethod) writeToStage( slog.String("batchOrPartitionID", syncID), ) if s.gcsBucket != "" { - bucket := s.connector.storageClient.Bucket(s.gcsBucket) avroFilePath := fmt.Sprintf("%s/%s.avro", objectFolder, syncID) obj := bucket.Object(avroFilePath) @@ -415,7 +414,7 @@ func (s *QRepAvroSyncMethod) writeToStage( } slog.Info(fmt.Sprintf("Pushed into %s/%s", avroFile.FilePath, syncID)) - err = s.connector.WaitForTableReady(stagingTable) + err = s.connector.waitForTableReady(stagingTable) if err != nil { return 0, fmt.Errorf("failed to wait for table to be ready: %w", err) }