Skip to content

Commit

Permalink
stilgar improvements post demo (#1479)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Mar 13, 2024
1 parent 1514b3a commit 3dfe841
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 31 deletions.
10 changes: 10 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ func (a *FlowableActivity) CreateNormalizedTable(

tableExistsMapping := make(map[string]bool)
for tableIdentifier, tableSchema := range config.TableNameSchemaMapping {
if config.AttemptDrop && config.PeerConnectionConfig.Type == protos.DBType_POSTGRES {
parsedNormalizedTable, err := utils.ParseSchemaTable(tableIdentifier)
if err != nil {
return nil, fmt.Errorf("error while parsing table schema and name: %w", err)
}
_, err = tx.(pgx.Tx).Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", parsedNormalizedTable.String()))
if err != nil {
return nil, fmt.Errorf("error while attempting to drop normalized table: %w", err)
}
}
existing, err := conn.SetupNormalizedTable(
ctx,
tx,
Expand Down
46 changes: 16 additions & 30 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi
destinationTables := strings.Split(config.DestinationTableIdentifier, ";")

constraintsHookExists := true
_, err := c.conn.Exec(ctx, fmt.Sprintf("SELECT '_peerdb_post_run_hook_%s()'::regprocedure",
_, err := c.conn.Exec(ctx, fmt.Sprintf("SELECT '_peerdb_pre_run_hook_%s()'::regprocedure",
config.FlowJobName))
if err != nil {
constraintsHookExists = false
Expand All @@ -626,19 +626,31 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi
}
}()

_, err = tx.Exec(ctx, "SET statement_timeout=0")
_, err = tx.Exec(ctx, "SET LOCAL statement_timeout=0")
if err != nil {
return fmt.Errorf("failed to set statement_timeout: %w", err)
}
_, err = tx.Exec(ctx, "SET idle_in_transaction_session_timeout=0")
_, err = tx.Exec(ctx, "SET LOCAL idle_in_transaction_session_timeout=0")
if err != nil {
return fmt.Errorf("failed to set idle_in_transaction_session_timeout: %w", err)
}
_, err = tx.Exec(ctx, "SET lock_timeout=0")
_, err = tx.Exec(ctx, "SET LOCAL lock_timeout='60s'")
if err != nil {
return fmt.Errorf("failed to set lock_timeout: %w", err)
}

if constraintsHookExists {
c.logger.Info("executing constraints hook", slog.String("procName",
fmt.Sprintf("_peerdb_pre_run_hook_%s()", config.FlowJobName)))
_, err = tx.Exec(ctx, fmt.Sprintf("SELECT _peerdb_pre_run_hook_%s()", config.FlowJobName))
if err != nil {
return fmt.Errorf("failed to execute stored procedure for applying constraints: %w", err)
}
} else {
c.logger.Info("no constraints hook found", slog.String("procName",
fmt.Sprintf("_peerdb_post_run_hook_%s()", config.FlowJobName)))
}

for _, dstTableName := range destinationTables {
dstSchemaTable, err := utils.ParseSchemaTable(dstTableName)
if err != nil {
Expand All @@ -651,38 +663,12 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi
if err != nil {
return fmt.Errorf("failed to drop %s: %v", dstTableName, err)
}

if config.SyncedAtColName != "" {
updateSyncedAtStmt := fmt.Sprintf(
`UPDATE %s SET %s = CURRENT_TIMESTAMP`,
tempTableIdentifier.Sanitize(),
QuoteIdentifier(config.SyncedAtColName),
)
_, err = tx.Exec(ctx, updateSyncedAtStmt)
if err != nil {
return fmt.Errorf("failed to update synced_at column: %v", err)
}
}

_, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
tempTableIdentifier.Sanitize(), QuoteIdentifier(dstSchemaTable.Table)))
if err != nil {
return fmt.Errorf("failed to rename %s to %s: %v",
tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err)
}

}

if constraintsHookExists {
c.logger.Info("executing constraints hook", slog.String("procName",
fmt.Sprintf("_peerdb_post_run_hook_%s()", config.FlowJobName)))
_, err = tx.Exec(ctx, fmt.Sprintf("SELECT _peerdb_post_run_hook_%s()", config.FlowJobName))
if err != nil {
return fmt.Errorf("failed to execute stored procedure for applying constraints: %w", err)
}
} else {
c.logger.Info("no constraints hook found", slog.String("procName",
fmt.Sprintf("_peerdb_post_run_hook_%s()", config.FlowJobName)))
}

err = tx.Commit(ctx)
Expand Down
6 changes: 5 additions & 1 deletion flow/connectors/postgres/qrep_sql_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
}
} else if writeMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
tempTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}
_, err = tx.CopyFrom(ctx, tempTableIdentifier, schema.GetColumnNames(), copySource)
normalizedColNames := make([]string, 0, len(schema.GetColumnNames()))
for _, colName := range schema.GetColumnNames() {
normalizedColNames = append(normalizedColNames, utils.PostgresIdentifierNormalize(colName))
}
_, err = tx.CopyFrom(ctx, tempTableIdentifier, normalizedColNames, copySource)
if err != nil {
return -1, fmt.Errorf("failed to copy records into %s: %v", tempTableIdentifier, err)
}
Expand Down
1 change: 1 addition & 0 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
TableNameSchemaMapping: dstTableNameSchemaMapping,
SyncedAtColName: q.config.SyncedAtColName,
FlowName: q.config.FlowJobName,
AttemptDrop: true,
}

future := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig)
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ message SetupNormalizedTableBatchInput {
string soft_delete_col_name = 4;
string synced_at_col_name = 5;
string flow_name = 6;
bool attempt_drop = 7;
}

message SetupNormalizedTableOutput {
Expand Down

0 comments on commit 3dfe841

Please sign in to comment.