Skip to content

Commit

Permalink
support refresh overwrite, clean up UI
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 11, 2024
1 parent 66bd43c commit 735abae
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 46 deletions.
9 changes: 7 additions & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -211,7 +212,6 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(
func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
slog.Info("QRep endpoint request", slog.Any("req", req))
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -265,8 +265,13 @@ func (h *FlowRequestHandler) CreateQRepFlow(
sourceTables := make([]string, 0, len(req.TableMapping))
destinationTables := make([]string, 0, len(req.TableMapping))
for _, mapping := range req.TableMapping {
destinationSchemaTable, err := utils.ParseSchemaTable(mapping.DestinationTableIdentifier)
if err != nil {
return nil, fmt.Errorf("unable to parse destination table identifier: %w", err)
}

sourceTables = append(sourceTables, mapping.SourceTableIdentifier)
destinationTables = append(destinationTables, mapping.DestinationTableIdentifier)
destinationTables = append(destinationTables, utils.PostgresSchemaTableNormalize(destinationSchemaTable))
}
cfg.WatermarkTable = strings.Join(sourceTables, ";")
cfg.DestinationTableIdentifier = strings.Join(destinationTables, ";")
Expand Down
12 changes: 3 additions & 9 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

const qRepMetadataTableName = "_peerdb_query_replication_metadata"
const QRepOverwriteTempTablePrefix = "_peerdb_overwrite_"

func (c *PostgresConnector) GetQRepPartitions(
ctx context.Context,
Expand Down Expand Up @@ -624,9 +625,9 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi
return fmt.Errorf("failed to parse destination table identifier: %w", err)
}
dstTableIdentifier := pgx.Identifier{dstSchemaTable.Schema, dstSchemaTable.Table}
tempTableIdentifier := pgx.Identifier{dstSchemaTable.Schema, dstSchemaTable.Table + "_overwrite"}
tempTableIdentifier := pgx.Identifier{dstSchemaTable.Schema, QRepOverwriteTempTablePrefix + dstSchemaTable.Table}

_, err = tx.Exec(ctx, fmt.Sprintf("DROP TABLE %s CASCADE", dstTableIdentifier.Sanitize()))
_, err = tx.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s CASCADE", dstTableIdentifier.Sanitize()))
if err != nil {
return fmt.Errorf("failed to drop %s: %v", dstTableName, err)
}
Expand All @@ -637,13 +638,6 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi
tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err)
}

_, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s SET LOGGED",
dstTableIdentifier.Sanitize()))
if err != nil {
return fmt.Errorf("failed to rename %s to %s: %v",
tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err)
}

if config.SyncedAtColName != "" {
updateSyncedAtStmt := fmt.Sprintf(
`UPDATE %s SET %s = CURRENT_TIMESTAMP`,
Expand Down
11 changes: 1 addition & 10 deletions flow/connectors/postgres/qrep_sql_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
}
}
} else if writeMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}
tempTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table + "_overwrite"}
_, err := tx.Exec(ctx, fmt.Sprintf("CREATE UNLOGGED TABLE %s(LIKE %s);",
tempTableIdentifier.Sanitize(),
dstTableIdentifier.Sanitize(),
))
if err != nil {
return -1, fmt.Errorf("failed to create %s: %v", tempTableIdentifier, err)
}

tempTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}
_, err = tx.CopyFrom(ctx, tempTableIdentifier, schema.GetColumnNames(), copySource)
if err != nil {
return -1, fmt.Errorf("failed to copy records into %s: %v", tempTableIdentifier, err)
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/url"
"strings"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -73,3 +74,15 @@ func RegisterHStore(ctx context.Context, conn *pgx.Conn) error {

return nil
}

func PostgresIdentifierNormalize(identifier string) string {
if IsUpper(identifier) {
return strings.ToLower(identifier)
}
return identifier
}

func PostgresSchemaTableNormalize(schemaTable *SchemaTable) string {
return fmt.Sprintf(`%s.%s`, PostgresIdentifierNormalize(schemaTable.Schema),
PostgresIdentifierNormalize(schemaTable.Table))
}
18 changes: 18 additions & 0 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -462,6 +464,21 @@ func QRepFlowWorkflow(
maxParallelWorkers = int(config.MaxParallelWorkers)
}

oldDstTableIdentifier := config.DestinationTableIdentifier
if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
dstTables := strings.Split(q.config.DestinationTableIdentifier, ";")
transformedTables := make([]string, 0, len(dstTables))
for _, table := range dstTables {
dstSchemaTable, err := utils.ParseSchemaTable(table)
if err != nil {
return fmt.Errorf("failed to parse table name %s: %w", table, err)
}
dstTransformedTable := dstSchemaTable.Schema + "." + connpostgres.QRepOverwriteTempTablePrefix + dstSchemaTable.Table
transformedTables = append(transformedTables, dstTransformedTable)
}
config.DestinationTableIdentifier = strings.Join(transformedTables, ";")
}

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
return fmt.Errorf("failed to setup watermark table: %w", err)
Expand Down Expand Up @@ -489,6 +506,7 @@ func QRepFlowWorkflow(
return err
}

q.config.DestinationTableIdentifier = oldDstTableIdentifier
logger.Info("consolidating partitions for peer flow")
if err := q.consolidatePartitions(ctx); err != nil {
return err
Expand Down
9 changes: 2 additions & 7 deletions ui/app/api/mirrors/qrep/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,8 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { config, tableMapping } = body;

const flowServiceAddr = GetFlowHttpAddressFromEnv();
const req: CreateQRepFlowRequest = {
qrepConfig: config,
createCatalogEntry: true,
tableMapping: tableMapping,
};
const req: CreateQRepFlowRequest = body;
try {
const createStatus: CreateQRepFlowResponse = await fetch(
`${flowServiceAddr}/v1/flows/qrep/create`,
Expand All @@ -31,6 +25,7 @@ export async function POST(request: Request) {

return new Response(JSON.stringify(response));
} catch (e) {
return new Response(JSON.stringify({ created: false }));
console.log(e);
}
}
32 changes: 17 additions & 15 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
cdcSchema,
flowNameSchema,
qrepSchema,
sfQrepSchema,
tableMappingSchema,
} from './schema';

Expand Down Expand Up @@ -206,16 +207,27 @@ export const handleCreateQRep = async (
return;
}

let tableMapping: TableMapping[] = [];
let reqBody: CreateQRepFlowRequest = {
qrepConfig: config,
createCatalogEntry: true,
tableMapping: tableMapping,
};

tableMapping = reformattedTableMapping(rows ?? []);
if (config.sourcePeer?.snowflakeConfig) {
config.query = 'SELECT * FROM ' + config.watermarkTable;
if (config.watermarkTable == '') {
notify('Please fill in the source table');
const sfQRepFieldErr = sfQrepSchema.safeParse(config);
if (!sfQRepFieldErr.success) {
notify(sfQRepFieldErr.error.issues[0].message);
return;
}
if (config.destinationTableIdentifier == '') {
notify('Please fill in the destination table');
console.log('rows in handler:', rows);
const tableValidity = tableMappingSchema.safeParse(tableMapping);
if (!tableValidity.success) {
notify(tableValidity.error.issues[0].message);
return;
}
reqBody.tableMapping = tableMapping;
} else {
const fieldErr = validateQRepFields(config.query, config);
if (fieldErr) {
Expand All @@ -224,16 +236,6 @@ export const handleCreateQRep = async (
}
}
config.flowJobName = flowJobName;
let reqBody: CreateQRepFlowRequest = {
qrepConfig: config,
createCatalogEntry: true,
tableMapping: [],
};
if (rows) {
const tableMapping = reformattedTableMapping(rows);
reqBody.tableMapping = tableMapping;
}

setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch(
'/api/mirrors/qrep',
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export const blankQRepSetting = {

export const blankSnowflakeQRepSetting = {
destinationTableIdentifier: '',
query: '',
query: '-- nothing',
watermarkTable: '',
watermarkColumn: '',
maxParallelWorkers: 4,
Expand Down
30 changes: 28 additions & 2 deletions ui/app/mirrors/create/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ export const tableMappingSchema = z
.array(
z.object({
sourceTableIdentifier: z
.string()
.string({ required_error: 'Source table name is required' })
.min(1, 'source table names, if added, must be non-empty'),
destinationTableIdentifier: z
.string()
.string({ required_error: 'Destination table name is required' })
.min(1, 'destination table names, if added, must be non-empty'),
exclude: z.array(z.string()).optional(),
partitionKey: z.string().optional(),
Expand Down Expand Up @@ -182,3 +182,29 @@ export const qrepSchema = z.object({
.min(1, 'Batch wait must be a non-negative integer')
.optional(),
});

export const sfQrepSchema = z.object({
sourcePeer: z.object(
{
name: z.string().min(1),
type: z.any(),
config: z.any(),
},
{ required_error: 'Source peer is required' }
),
destinationPeer: z.object(
{
name: z.string().min(1),
type: z.any(),
config: z.any(),
},
{ required_error: 'Destination peer is required' }
),
waitBetweenBatchesSeconds: z
.number({
invalid_type_error: 'Batch wait must be a number',
})
.int()
.min(1, 'Batch wait must be a non-negative integer')
.optional(),
});

0 comments on commit 735abae

Please sign in to comment.