Skip to content

Commit

Permalink
Merge branch 'main' into flatten-qvalue-avro
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 20, 2023
2 parents 4fcab72 + 8e4e7ba commit d99f7de
Show file tree
Hide file tree
Showing 47 changed files with 1,581 additions and 455 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ on:
push:
branches: [main, release/*]
pull_request:
branches:
- "main"
- "release/*"
branches: [main, release/*]

jobs:
build:
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ name: Flow build and test

on:
pull_request:
branches:
- "main"
branches: [main]
push:
branches: [main]

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ name: GolangCI-Lint

on:
pull_request:
branches:
- "main"
branches: [main]
paths: [flow/**]

jobs:
golangci-lint:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/rust-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ name: clippy-action

on:
pull_request:
branches:
- "main"
branches: [main]
paths: [nexus/**]

jobs:
clippy:
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/ui-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ name: Build & Test UI

on:
push:
branches:
- main
branches: [main]
pull_request:
branches:
- main
branches: [main]
paths: [ui/**]

jobs:
build-test:
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/ui-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ name: Lint UI

on:
push:
branches:
- main
branches: [main]
pull_request:
branches:
- main
branches: [main]
paths: [ui/**]

permissions:
checks: write
Expand Down
7 changes: 7 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ func (h *FlowRequestHandler) CreateQRepFlow(
} else {
workflowFn = peerflow.QRepFlowWorkflow
}

if req.QrepConfig.SyncedAtColName == "" {
cfg.SyncedAtColName = "_PEERDB_SYNCED_AT"
} else {
// make them all uppercase
cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName)
}
_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
Expand Down
23 changes: 22 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
SyncBatchID: syncBatchID,
NormalizeBatchID: normalizeBatchID,
UnchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()
Expand Down Expand Up @@ -961,7 +966,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns))
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns), len(tableSchema.Columns)+2)
idx := 0
for colName, genericColType := range tableSchema.Columns {
columns[idx] = &bigquery.FieldSchema{
Expand All @@ -972,6 +977,22 @@ func (c *BigQueryConnector) SetupNormalizedTables(
idx++
}

if req.SoftDeleteColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SoftDeleteColName,
Type: bigquery.BooleanFieldType,
Repeated: false,
})
}

if req.SyncedAtColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SyncedAtColName,
Type: bigquery.TimestampFieldType,
Repeated: false,
})
}

// create the table using the columns
schema := bigquery.Schema(columns)
err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema})
Expand Down
54 changes: 47 additions & 7 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type mergeStmtGenerator struct {
NormalizedTableSchema *protos.TableSchema
// array of toast column combinations that are unchanged
UnchangedToastColumns []string
// _PEERDB_IS_DELETED and _SYNCED_AT columns
peerdbCols *protos.PeerDBColumns
}

// GenerateMergeStmt generates a merge statements.
Expand All @@ -39,7 +41,7 @@ func (m *mergeStmtGenerator) generateMergeStmts() []string {
"CREATE TEMP TABLE %s AS (%s, %s);",
tempTable, flattenedCTE, deDupedCTE)

mergeStmt := m.generateMergeStmt(tempTable)
mergeStmt := m.generateMergeStmt(tempTable, m.peerdbCols)

dropTempTableStmt := fmt.Sprintf("DROP TABLE %s;", tempTable)

Expand Down Expand Up @@ -127,7 +129,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
}

// generateMergeStmt generates a merge statement.
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *protos.PeerDBColumns) string {
// comma separated list of column names
backtickColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
pureColNames := make([]string, 0, len(m.NormalizedTableSchema.Columns))
Expand All @@ -136,8 +138,19 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
pureColNames = append(pureColNames, colName)
}
csep := strings.Join(backtickColNames, ", ")

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.UnchangedToastColumns)
insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName)
insertValuesSQL := csep + ",CURRENT_TIMESTAMP"

updateStatementsforToastCols := m.generateUpdateStatements(pureColNames,
m.UnchangedToastColumns, peerdbCols)
if m.peerdbCols.SoftDelete {
softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName)
softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE"

updateStatementsforToastCols = append(updateStatementsforToastCols,
fmt.Sprintf("WHEN NOT MATCHED AND (_peerdb_deduped._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)",
softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL))
}
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(m.NormalizedTableSchema.PrimaryKeyColumns))
Expand All @@ -148,15 +161,26 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string) string {
// _peerdb_target.<pkey1> = _peerdb_deduped.<pkey1> AND _peerdb_target.<pkey2> = _peerdb_deduped.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

deletePart := "DELETE"
if peerdbCols.SoftDelete {
colName := peerdbCols.SoftDeleteColName
deletePart = fmt.Sprintf("UPDATE SET %s = TRUE", colName)
if peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP",
deletePart, peerdbCols.SyncedAtColName)
}
}

return fmt.Sprintf(`
MERGE %s.%s _peerdb_target USING %s _peerdb_deduped
ON %s
WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN
INSERT (%s) VALUES (%s)
%s
WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN
DELETE;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, csep, csep, updateStringToastCols)
%s;
`, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, insertColumnsSQL, insertValuesSQL,
updateStringToastCols, deletePart)
}

/*
Expand All @@ -174,7 +198,11 @@ and updating the other columns (not the unchanged toast columns)
6. Repeat steps 1-5 for each unique unchanged toast column group.
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string {
func (m *mergeStmtGenerator) generateUpdateStatements(
allCols []string,
unchangedToastCols []string,
peerdbCols *protos.PeerDBColumns,
) []string {
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
Expand All @@ -184,6 +212,18 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchange
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName))
}

// set the synced at column to the current timestamp
if peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = CURRENT_TIMESTAMP",
peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = FALSE",
peerdbCols.SoftDeleteColName))
}

ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s'
Expand Down
30 changes: 23 additions & 7 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"reflect"
"strings"
"testing"

"github.com/PeerDB-io/peer-flow/generated/protos"
)

func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
Expand All @@ -16,21 +18,28 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
" AND _peerdb_unchanged_toast_columns='' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," +
" `col3` = _peerdb_deduped.col3",
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2, col3' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1",
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col3` = _peerdb_deduped.col3",
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col3'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2",
" `col2` = _peerdb_deduped.col2," + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -53,10 +62,17 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
"THEN UPDATE SET " +
"`col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," +
" `col3` = _peerdb_deduped.col3",
" `col3` = _peerdb_deduped.col3," +
" `synced_at`=CURRENT_TIMESTAMP," +
"`deleted`=FALSE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols)
result := m.generateUpdateStatements(allCols, unchangedToastCols,
&protos.PeerDBColumns{
SoftDelete: true,
SoftDeleteColName: "deleted",
SyncedAtColName: "synced_at",
})

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func (c *BigQueryConnector) SyncQRepRecords(
partition.PartitionId, destTable))

avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition,
tblMetadata, stream, config.SyncedAtColName)
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
Expand Down
17 changes: 13 additions & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "")
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down Expand Up @@ -107,6 +107,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
partition *protos.QRepPartition,
dstTableMetadata *bigquery.TableMetadata,
stream *model.QRecordStream,
syncedAtCol string,
) (int, error) {
startTime := time.Now()
flowLog := slog.Group("sync_metadata",
Expand All @@ -115,7 +116,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
slog.String("destinationTable", dstTableName),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand All @@ -137,9 +138,13 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
// Start a transaction
stmts := []string{"BEGIN TRANSACTION;"}

selector := "*"
if syncedAtCol != "" { // PeerDB column
selector = "*, CURRENT_TIMESTAMP"
}
// Insert the records from the staging table into the destination table
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT * FROM `%s.%s`;",
datasetID, dstTableName, datasetID, stagingTable)
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;",
datasetID, dstTableName, selector, datasetID, stagingTable)

stmts = append(stmts, insertStmt)

Expand Down Expand Up @@ -181,11 +186,15 @@ type AvroSchema struct {

func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata,
syncedAtCol string,
) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := make(map[string]struct{})

for _, bqField := range dstTableMetadata.Schema {
if bqField.Name == syncedAtCol {
continue
}
avroType, err := GetAvroType(bqField)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit d99f7de

Please sign in to comment.