diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index b8831be301..ab1063dd28 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -816,15 +816,16 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) }, } // normalize anything between last normalized batch id to last sync batchid - mergeStmts := mergeGen.generateMergeStmts() - stmts = append(stmts, mergeStmts...) + mergeStmt := mergeGen.generateMergeStmt() + stmts = append(stmts, mergeStmt) } // update metadata to make the last normalized batch id to the recent last sync batch id. updateMetadataStmt := fmt.Sprintf( - "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name = '%s';", + "UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';", c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName) stmts = append(stmts, updateMetadataStmt) stmts = append(stmts, "COMMIT TRANSACTION;") + fmt.Println(stmts) // put this within a transaction // TODO - not truncating rows in staging table as of now. diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 22161c434b..22f876b8c3 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -8,7 +8,6 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared" ) type mergeStmtGenerator struct { @@ -30,24 +29,6 @@ type mergeStmtGenerator struct { peerdbCols *protos.PeerDBColumns } -// GenerateMergeStmt generates a merge statements. -func (m *mergeStmtGenerator) generateMergeStmts() []string { - // return an empty array for now - flattenedCTE := m.generateFlattenedCTE() - deDupedCTE := m.generateDeDupedCTE() - tempTable := fmt.Sprintf("_peerdb_de_duplicated_data_%s", shared.RandomString(5)) - // create temp table stmt - createTempTableStmt := fmt.Sprintf( - "CREATE TEMP TABLE %s AS (%s, %s);", - tempTable, flattenedCTE, deDupedCTE) - - mergeStmt := m.generateMergeStmt(tempTable, m.peerdbCols) - - dropTempTableStmt := fmt.Sprintf("DROP TABLE %s;", tempTable) - - return []string{createTempTableStmt, mergeStmt, dropTempTableStmt} -} - // generateFlattenedCTE generates a flattened CTE. func (m *mergeStmtGenerator) generateFlattenedCTE() string { // for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR @@ -129,7 +110,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { } // generateMergeStmt generates a merge statement. -func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *protos.PeerDBColumns) string { +func (m *mergeStmtGenerator) generateMergeStmt() string { // comma separated list of column names backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns)) pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns)) @@ -138,13 +119,13 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro pureColNames = append(pureColNames, colName) } csep := strings.Join(backtickColNames, ", ") - insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName) + insertColumnsSQL := csep + fmt.Sprintf(", `%s`", m.peerdbCols.SyncedAtColName) insertValuesSQL := csep + ",CURRENT_TIMESTAMP" updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, - m.UnchangedToastColumns, peerdbCols) + m.UnchangedToastColumns, m.peerdbCols) if m.peerdbCols.SoftDelete { - softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName) + softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", m.peerdbCols.SoftDeleteColName) softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE" updateStatementsforToastCols = append(updateStatementsforToastCols, @@ -162,25 +143,25 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ") deletePart := "DELETE" - if peerdbCols.SoftDelete { - colName := peerdbCols.SoftDeleteColName + if m.peerdbCols.SoftDelete { + colName := m.peerdbCols.SoftDeleteColName deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName) - if peerdbCols.SyncedAtColName != "" { + if m.peerdbCols.SyncedAtColName != "" { deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP", - deletePart, peerdbCols.SyncedAtColName) + deletePart, m.peerdbCols.SyncedAtColName) } } return fmt.Sprintf(` - MERGE %s.%s _peerdb_target USING %s _peerdb_deduped + MERGE %s.%s _peerdb_target USING (%s,%s) _peerdb_deduped ON %s WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN INSERT (%s) VALUES (%s) %s WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN %s; - `, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, insertColumnsSQL, insertValuesSQL, - updateStringToastCols, deletePart) + `, m.Dataset, m.NormalizedTable, m.generateFlattenedCTE(), m.generateDeDupedCTE(), + pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) } /*