Skip to content

Commit

Permalink
write mode support
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 19, 2023
1 parent c65e79a commit 1ef5f05
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 148 deletions.
22 changes: 6 additions & 16 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { UCreateMirrorResponse } from '@/app/dto/MirrorsDTO';
import { QRepWriteMode } from '@/grpc_generated/flow';
import { QRepConfig } from '@/grpc_generated/flow';
import { Dispatch, SetStateAction } from 'react';
import { CDCConfig, QREPConfig, TableMapRow } from '../types';
import { CDCConfig, TableMapRow } from '../types';
import { cdcSchema, qrepSchema, tableMappingSchema } from './schema';

const validateCDCFields = (
Expand All @@ -28,23 +28,15 @@ const validateCDCFields = (

const validateQRepFields = (
query: string,
writeMode: QRepWriteMode,
setMsg: Dispatch<SetStateAction<{ ok: boolean; msg: string }>>,
config: QREPConfig
config: QRepConfig
): boolean => {
if (query.length < 5) {
setMsg({ ok: false, msg: 'Query is invalid' });
return false;
}
if (writeMode.writeType == 1 && writeMode.upsertKeyColumns.length == 0) {
setMsg({
ok: false,
msg: 'You must specify upsert key column when write mode is set to upsert',
});
return false;
}
let validationErr: string | undefined;

let validationErr: string | undefined;
const configValidity = qrepSchema.safeParse(config);
if (!configValidity.success) {
validationErr = configValidity.error.issues[0].message;
Expand Down Expand Up @@ -107,9 +99,8 @@ export const handleCreateCDC = async (

export const handleCreateQRep = async (
flowJobName: string,
writeMode: QRepWriteMode,
query: string,
config: QREPConfig,
config: QRepConfig,
setMsg: Dispatch<
SetStateAction<{
ok: boolean;
Expand All @@ -131,11 +122,10 @@ export const handleCreateQRep = async (
config.initialCopyOnly = false;
}

const isValid = validateQRepFields(query, writeMode, setMsg, config);
const isValid = validateQRepFields(query, setMsg, config);
if (!isValid) return;
config.flowJobName = flowJobName;
config.query = query;
config.writeMode = writeMode;
setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch(
'/api/mirrors/qrep',
Expand Down
36 changes: 23 additions & 13 deletions ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { QRepSyncMode } from '@/grpc_generated/flow';
import { Peer } from '@/grpc_generated/peers';
import { CDCConfig } from '../../types';
import { MirrorSetting } from './common';
export const cdcSettings: MirrorSetting[] = [
{
label: 'Source Peer',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, source: value as Peer })),
setter((curr: CDCConfig) => ({ ...curr, source: value as Peer })),
tips: 'The peer from which we will be replicating data. Ensure the prerequisites for this peer are met.',
helpfulLink:
'https://docs.peerdb.io/usecases/Real-time%20CDC/postgres-to-snowflake#prerequisites',
Expand All @@ -15,15 +16,15 @@ export const cdcSettings: MirrorSetting[] = [
{
label: 'Destination Peer',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, destination: value as Peer })),
setter((curr: CDCConfig) => ({ ...curr, destination: value as Peer })),
tips: 'The peer to which data will be replicated.',
type: 'select',
required: true,
},
{
label: 'Initial Copy',
stateHandler: (value, setter) =>
setter((curr) => ({
setter((curr: CDCConfig) => ({
...curr,
doInitialCopy: (value as boolean) || false,
})),
Expand All @@ -33,13 +34,16 @@ export const cdcSettings: MirrorSetting[] = [
{
label: 'Publication Name',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, publicationName: (value as string) || '' })),
setter((curr: CDCConfig) => ({
...curr,
publicationName: (value as string) || '',
})),
tips: 'If set, PeerDB will use this publication for the mirror.',
},
{
label: 'Replication Slot Name',
stateHandler: (value, setter) =>
setter((curr) => ({
setter((curr: CDCConfig) => ({
...curr,
replicationSlotName: (value as string) || '',
})),
Expand All @@ -48,7 +52,7 @@ export const cdcSettings: MirrorSetting[] = [
{
label: 'Snapshot Number of Rows Per Partition',
stateHandler: (value, setter) =>
setter((curr) => ({
setter((curr: CDCConfig) => ({
...curr,
snapshotNumRowsPerPartition: parseInt(value as string, 10) || 500000,
})),
Expand All @@ -59,7 +63,7 @@ export const cdcSettings: MirrorSetting[] = [
{
label: 'Snapshot Maximum Parallel Workers',
stateHandler: (value, setter) =>
setter((curr) => ({
setter((curr: CDCConfig) => ({
...curr,
snapshotMaxParallelWorkers: parseInt(value as string, 10) || 8,
})),
Expand All @@ -70,7 +74,7 @@ export const cdcSettings: MirrorSetting[] = [
{
label: 'Snapshot Number of Tables In Parallel',
stateHandler: (value, setter) =>
setter((curr) => ({
setter((curr: CDCConfig) => ({
...curr,
snapshotNumTablesInParallel: parseInt(value as string, 10) || 1,
})),
Expand All @@ -81,7 +85,7 @@ export const cdcSettings: MirrorSetting[] = [
{
label: 'Snapshot Sync Mode',
stateHandler: (value, setter) =>
setter((curr) => ({
setter((curr: CDCConfig) => ({
...curr,
snapshotSyncMode:
(value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
Expand All @@ -93,7 +97,7 @@ export const cdcSettings: MirrorSetting[] = [
{
label: 'CDC Sync Mode',
stateHandler: (value, setter) =>
setter((curr) => ({
setter((curr: CDCConfig) => ({
...curr,
cdcSyncMode:
(value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
Expand All @@ -105,7 +109,7 @@ export const cdcSettings: MirrorSetting[] = [
{
label: 'Snapshot Staging Path',
stateHandler: (value, setter) =>
setter((curr) => ({
setter((curr: CDCConfig) => ({
...curr,
snapshotStagingPath: value as string | '',
})),
Expand All @@ -114,13 +118,19 @@ export const cdcSettings: MirrorSetting[] = [
{
label: 'CDC Staging Path',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, cdcStagingPath: (value as string) || '' })),
setter((curr: CDCConfig) => ({
...curr,
cdcStagingPath: (value as string) || '',
})),
tips: 'You can specify staging path if you have set the CDC sync mode as AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket url',
},
{
label: 'Soft Delete',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, softDelete: (value as boolean) || false })),
setter((curr: CDCConfig) => ({
...curr,
softDelete: (value as boolean) || false,
})),
tips: 'Allows you to mark some records as deleted without actual erasure from the database',
default: 'SQL',
type: 'switch',
Expand Down
13 changes: 6 additions & 7 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ import {
FlowConnectionConfigs,
QRepConfig,
QRepSyncMode,
QRepWriteMode,
QRepWriteType,
} from '@/grpc_generated/flow';
import { Peer } from '@/grpc_generated/peers';
import { MirrorSetter } from '../../types';

export interface MirrorSetting {
label: string;
stateHandler: (
value: string | Peer | boolean | QRepSyncMode | QRepWriteMode,
setter: MirrorSetter
value: string | string[] | Peer | boolean | QRepSyncMode | QRepWriteType,
setter: any
) => void;
type?: string;
required?: boolean;
Expand Down Expand Up @@ -55,12 +54,12 @@ export const blankQRepSetting: QRepConfig = {
watermarkColumn: '',
initialCopyOnly: false,
syncMode: 0,
batchSizeInt: 1,
batchDurationSeconds: 3600,
batchSizeInt: 0,
batchDurationSeconds: 0,
maxParallelWorkers: 8,
waitBetweenBatchesSeconds: 0,
writeMode: undefined,
stagingPath: '',
numRowsPerPartition: 500000,
numRowsPerPartition: 0,
setupWatermarkTableOnDestination: false,
};
Loading

0 comments on commit 1ef5f05

Please sign in to comment.