Skip to content

Commit

Permalink
fix all the errors
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 21, 2023
1 parent 8153697 commit 1e111f4
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions flow/connectors/postgres/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/jackc/pgx/v5"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -84,14 +85,17 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
}
} else {
// Step 2.1: Create a temp staging table
stagingTableName := fmt.Sprintf("%s_%s", dstTableName.Table, partitionID)
stagingTableName := fmt.Sprintf("_peerdb_staging_%s", util.RandomString(8))
stagingTableIdentifier := pgx.Identifier{dstTableName.Schema, stagingTableName}
dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}

createStagingTableStmt := fmt.Sprintf(
"CREATE TABLE %s LIKE %s;",
pgx.Identifier{dstTableName.Schema, stagingTableName},
pgx.Identifier{dstTableName.Schema, dstTableName.Table},
"CREATE UNLOGGED TABLE %s (LIKE %s);",
stagingTableIdentifier.Sanitize(),
dstTableIdentifier.Sanitize(),
)

log.Infof("Creating staging table %s", stagingTableName)
log.Infof("Creating staging table %s - '%s'", stagingTableName, createStagingTableStmt)
_, err = tx.Exec(context.Background(), createStagingTableStmt)

if err != nil {
Expand All @@ -101,7 +105,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
// Step 2.2: Insert records into the staging table
numRowsSynced, err = tx.CopyFrom(
context.Background(),
pgx.Identifier{dstTableName.Schema, stagingTableName},
stagingTableIdentifier,
schema.GetColumnNames(),
copySource,
)
Expand All @@ -120,16 +124,20 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
for _, col := range schema.GetColumnNames() {
_, ok := upsertMatchCols[col]
if !ok {
setClause += fmt.Sprintf("%s = %s.%s, ", col, stagingTableName, col)
setClause += fmt.Sprintf("%s = EXCLUDED.%s,", col, col)
}
}
setClause = setClause[:len(setClause)-2]

setClause = strings.TrimSuffix(setClause, ",")
selectStr := strings.Join(schema.GetColumnNames(), ", ")

// Step 2.3: Perform the upsert operation, ON CONFLICT UPDATE
upsertStmt := fmt.Sprintf(
"INSERT INTO %s SELECT * FROM %s ON CONFLICT (%s) DO UPDATE SET %s;",
pgx.Identifier{dstTableName.Schema, dstTableName.Table},
pgx.Identifier{dstTableName.Schema, stagingTableName},
"INSERT INTO %s (%s) SELECT %s FROM %s ON CONFLICT (%s) DO UPDATE SET %s;",
dstTableIdentifier.Sanitize(),
selectStr,
selectStr,
stagingTableIdentifier.Sanitize(),
strings.Join(writeMode.UpsertKeyColumns, ", "),
setClause,
)
Expand All @@ -144,7 +152,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
// Step 2.4: Drop the staging table
dropStagingTableStmt := fmt.Sprintf(
"DROP TABLE %s;",
pgx.Identifier{dstTableName.Schema, stagingTableName},
stagingTableIdentifier.Sanitize(),
)
log.Infof("Dropping staging table %s", stagingTableName)
_, err = tx.Exec(context.Background(), dropStagingTableStmt)
Expand Down

0 comments on commit 1e111f4

Please sign in to comment.