Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SF QRep: support schema changes on refresh, clean up UI #1469

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -211,7 +212,6 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(
func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
slog.Info("QRep endpoint request", slog.Any("req", req))
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -265,8 +265,13 @@ func (h *FlowRequestHandler) CreateQRepFlow(
sourceTables := make([]string, 0, len(req.TableMapping))
destinationTables := make([]string, 0, len(req.TableMapping))
for _, mapping := range req.TableMapping {
destinationSchemaTable, err := utils.ParseSchemaTable(mapping.DestinationTableIdentifier)
if err != nil {
return nil, fmt.Errorf("unable to parse destination table identifier: %w", err)
}

sourceTables = append(sourceTables, mapping.SourceTableIdentifier)
destinationTables = append(destinationTables, mapping.DestinationTableIdentifier)
destinationTables = append(destinationTables, utils.PostgresSchemaTableNormalize(destinationSchemaTable))
}
cfg.WatermarkTable = strings.Join(sourceTables, ";")
cfg.DestinationTableIdentifier = strings.Join(destinationTables, ";")
Expand Down
12 changes: 3 additions & 9 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

const qRepMetadataTableName = "_peerdb_query_replication_metadata"
const QRepOverwriteTempTablePrefix = "_peerdb_overwrite_"

func (c *PostgresConnector) GetQRepPartitions(
ctx context.Context,
Expand Down Expand Up @@ -624,9 +625,9 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi
return fmt.Errorf("failed to parse destination table identifier: %w", err)
}
dstTableIdentifier := pgx.Identifier{dstSchemaTable.Schema, dstSchemaTable.Table}
tempTableIdentifier := pgx.Identifier{dstSchemaTable.Schema, dstSchemaTable.Table + "_overwrite"}
tempTableIdentifier := pgx.Identifier{dstSchemaTable.Schema, QRepOverwriteTempTablePrefix + dstSchemaTable.Table}

_, err = tx.Exec(ctx, fmt.Sprintf("DROP TABLE %s CASCADE", dstTableIdentifier.Sanitize()))
_, err = tx.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s CASCADE", dstTableIdentifier.Sanitize()))
if err != nil {
return fmt.Errorf("failed to drop %s: %v", dstTableName, err)
}
Expand All @@ -637,13 +638,6 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi
tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err)
}

_, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s SET LOGGED",
dstTableIdentifier.Sanitize()))
if err != nil {
return fmt.Errorf("failed to rename %s to %s: %v",
tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err)
}

if config.SyncedAtColName != "" {
updateSyncedAtStmt := fmt.Sprintf(
`UPDATE %s SET %s = CURRENT_TIMESTAMP`,
Expand Down
11 changes: 1 addition & 10 deletions flow/connectors/postgres/qrep_sql_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
}
}
} else if writeMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}
tempTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table + "_overwrite"}
_, err := tx.Exec(ctx, fmt.Sprintf("CREATE UNLOGGED TABLE %s(LIKE %s);",
tempTableIdentifier.Sanitize(),
dstTableIdentifier.Sanitize(),
))
if err != nil {
return -1, fmt.Errorf("failed to create %s: %v", tempTableIdentifier, err)
}

tempTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}
_, err = tx.CopyFrom(ctx, tempTableIdentifier, schema.GetColumnNames(), copySource)
if err != nil {
return -1, fmt.Errorf("failed to copy records into %s: %v", tempTableIdentifier, err)
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/url"
"strings"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -73,3 +74,15 @@ func RegisterHStore(ctx context.Context, conn *pgx.Conn) error {

return nil
}

func PostgresIdentifierNormalize(identifier string) string {
if IsUpper(identifier) {
return strings.ToLower(identifier)
}
return identifier
}

func PostgresSchemaTableNormalize(schemaTable *SchemaTable) string {
return fmt.Sprintf(`%s.%s`, PostgresIdentifierNormalize(schemaTable.Schema),
PostgresIdentifierNormalize(schemaTable.Table))
}
18 changes: 18 additions & 0 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -462,6 +464,21 @@ func QRepFlowWorkflow(
maxParallelWorkers = int(config.MaxParallelWorkers)
}

oldDstTableIdentifier := config.DestinationTableIdentifier
if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
dstTables := strings.Split(q.config.DestinationTableIdentifier, ";")
transformedTables := make([]string, 0, len(dstTables))
for _, table := range dstTables {
dstSchemaTable, err := utils.ParseSchemaTable(table)
if err != nil {
return fmt.Errorf("failed to parse table name %s: %w", table, err)
}
dstTransformedTable := dstSchemaTable.Schema + "." + connpostgres.QRepOverwriteTempTablePrefix + dstSchemaTable.Table
transformedTables = append(transformedTables, dstTransformedTable)
}
config.DestinationTableIdentifier = strings.Join(transformedTables, ";")
}

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
return fmt.Errorf("failed to setup watermark table: %w", err)
Expand Down Expand Up @@ -489,6 +506,7 @@ func QRepFlowWorkflow(
return err
}

q.config.DestinationTableIdentifier = oldDstTableIdentifier
logger.Info("consolidating partitions for peer flow")
if err := q.consolidatePartitions(ctx); err != nil {
return err
Expand Down
9 changes: 2 additions & 7 deletions ui/app/api/mirrors/qrep/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,8 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { config, tableMapping } = body;

const flowServiceAddr = GetFlowHttpAddressFromEnv();
const req: CreateQRepFlowRequest = {
qrepConfig: config,
createCatalogEntry: true,
tableMapping: tableMapping,
};
const req: CreateQRepFlowRequest = body;
try {
const createStatus: CreateQRepFlowResponse = await fetch(
`${flowServiceAddr}/v1/flows/qrep/create`,
Expand All @@ -31,6 +25,7 @@ export async function POST(request: Request) {

return new Response(JSON.stringify(response));
} catch (e) {
return new Response(JSON.stringify({ created: false }));
console.log(e);
}
}
32 changes: 17 additions & 15 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
cdcSchema,
flowNameSchema,
qrepSchema,
sfQrepSchema,
tableMappingSchema,
} from './schema';

Expand Down Expand Up @@ -206,16 +207,27 @@ export const handleCreateQRep = async (
return;
}

let tableMapping: TableMapping[] = [];
let reqBody: CreateQRepFlowRequest = {
qrepConfig: config,
createCatalogEntry: true,
tableMapping: tableMapping,
};

tableMapping = reformattedTableMapping(rows ?? []);
if (config.sourcePeer?.snowflakeConfig) {
config.query = 'SELECT * FROM ' + config.watermarkTable;
if (config.watermarkTable == '') {
notify('Please fill in the source table');
const sfQRepFieldErr = sfQrepSchema.safeParse(config);
if (!sfQRepFieldErr.success) {
notify(sfQRepFieldErr.error.issues[0].message);
return;
}
if (config.destinationTableIdentifier == '') {
notify('Please fill in the destination table');

const tableValidity = tableMappingSchema.safeParse(tableMapping);
if (!tableValidity.success) {
notify(tableValidity.error.issues[0].message);
return;
}
reqBody.tableMapping = tableMapping;
} else {
const fieldErr = validateQRepFields(config.query, config);
if (fieldErr) {
Expand All @@ -224,16 +236,6 @@ export const handleCreateQRep = async (
}
}
config.flowJobName = flowJobName;
let reqBody: CreateQRepFlowRequest = {
qrepConfig: config,
createCatalogEntry: true,
tableMapping: [],
};
if (rows) {
const tableMapping = reformattedTableMapping(rows);
reqBody.tableMapping = tableMapping;
}

setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch(
'/api/mirrors/qrep',
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export const blankQRepSetting = {

export const blankSnowflakeQRepSetting = {
destinationTableIdentifier: '',
query: '',
query: '-- nothing',
watermarkTable: '',
watermarkColumn: '',
maxParallelWorkers: 4,
Expand Down
30 changes: 28 additions & 2 deletions ui/app/mirrors/create/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ export const tableMappingSchema = z
.array(
z.object({
sourceTableIdentifier: z
.string()
.string({ required_error: 'Source table name is required' })
.min(1, 'source table names, if added, must be non-empty'),
destinationTableIdentifier: z
.string()
.string({ required_error: 'Destination table name is required' })
.min(1, 'destination table names, if added, must be non-empty'),
exclude: z.array(z.string()).optional(),
partitionKey: z.string().optional(),
Expand Down Expand Up @@ -182,3 +182,29 @@ export const qrepSchema = z.object({
.min(1, 'Batch wait must be a non-negative integer')
.optional(),
});

export const sfQrepSchema = z.object({
sourcePeer: z.object(
{
name: z.string().min(1),
type: z.any(),
config: z.any(),
},
{ required_error: 'Source peer is required' }
),
destinationPeer: z.object(
{
name: z.string().min(1),
type: z.any(),
config: z.any(),
},
{ required_error: 'Destination peer is required' }
),
waitBetweenBatchesSeconds: z
.number({
invalid_type_error: 'Batch wait must be a number',
})
.int()
.min(1, 'Batch wait must be a non-negative integer')
.optional(),
});
Loading