From 1ef5f0538cefb45dd91a2456c677794df3bd3328 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 20 Oct 2023 00:07:41 +0530 Subject: [PATCH] write mode support --- ui/app/mirrors/create/handlers.ts | 22 ++--- ui/app/mirrors/create/helpers/cdc.ts | 36 +++++--- ui/app/mirrors/create/helpers/common.ts | 13 ++- ui/app/mirrors/create/helpers/qrep.ts | 107 +++++++++++++++--------- ui/app/mirrors/create/page.tsx | 17 ++-- ui/app/mirrors/create/qrep.tsx | 46 +++++----- ui/app/mirrors/create/schema.ts | 78 +++++++++-------- ui/app/mirrors/types.ts | 5 +- ui/app/peers/page.tsx | 6 +- 9 files changed, 182 insertions(+), 148 deletions(-) diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index a111af2189..c901c16734 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -1,7 +1,7 @@ import { UCreateMirrorResponse } from '@/app/dto/MirrorsDTO'; -import { QRepWriteMode } from '@/grpc_generated/flow'; +import { QRepConfig } from '@/grpc_generated/flow'; import { Dispatch, SetStateAction } from 'react'; -import { CDCConfig, QREPConfig, TableMapRow } from '../types'; +import { CDCConfig, TableMapRow } from '../types'; import { cdcSchema, qrepSchema, tableMappingSchema } from './schema'; const validateCDCFields = ( @@ -28,23 +28,15 @@ const validateCDCFields = ( const validateQRepFields = ( query: string, - writeMode: QRepWriteMode, setMsg: Dispatch>, - config: QREPConfig + config: QRepConfig ): boolean => { if (query.length < 5) { setMsg({ ok: false, msg: 'Query is invalid' }); return false; } - if (writeMode.writeType == 1 && writeMode.upsertKeyColumns.length == 0) { - setMsg({ - ok: false, - msg: 'You must specify upsert key column when write mode is set to upsert', - }); - return false; - } - let validationErr: string | undefined; + let validationErr: string | undefined; const configValidity = qrepSchema.safeParse(config); if (!configValidity.success) { validationErr = configValidity.error.issues[0].message; @@ -107,9 +99,8 @@ export const handleCreateCDC = async ( export const handleCreateQRep = async ( flowJobName: string, - writeMode: QRepWriteMode, query: string, - config: QREPConfig, + config: QRepConfig, setMsg: Dispatch< SetStateAction<{ ok: boolean; @@ -131,11 +122,10 @@ export const handleCreateQRep = async ( config.initialCopyOnly = false; } - const isValid = validateQRepFields(query, writeMode, setMsg, config); + const isValid = validateQRepFields(query, setMsg, config); if (!isValid) return; config.flowJobName = flowJobName; config.query = query; - config.writeMode = writeMode; setLoading(true); const statusMessage: UCreateMirrorResponse = await fetch( '/api/mirrors/qrep', diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts index eb085bcb01..6e2eaec877 100644 --- a/ui/app/mirrors/create/helpers/cdc.ts +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -1,11 +1,12 @@ import { QRepSyncMode } from '@/grpc_generated/flow'; import { Peer } from '@/grpc_generated/peers'; +import { CDCConfig } from '../../types'; import { MirrorSetting } from './common'; export const cdcSettings: MirrorSetting[] = [ { label: 'Source Peer', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, source: value as Peer })), + setter((curr: CDCConfig) => ({ ...curr, source: value as Peer })), tips: 'The peer from which we will be replicating data. Ensure the prerequisites for this peer are met.', helpfulLink: 'https://docs.peerdb.io/usecases/Real-time%20CDC/postgres-to-snowflake#prerequisites', @@ -15,7 +16,7 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'Destination Peer', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, destination: value as Peer })), + setter((curr: CDCConfig) => ({ ...curr, destination: value as Peer })), tips: 'The peer to which data will be replicated.', type: 'select', required: true, @@ -23,7 +24,7 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'Initial Copy', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: CDCConfig) => ({ ...curr, doInitialCopy: (value as boolean) || false, })), @@ -33,13 +34,16 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'Publication Name', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, publicationName: (value as string) || '' })), + setter((curr: CDCConfig) => ({ + ...curr, + publicationName: (value as string) || '', + })), tips: 'If set, PeerDB will use this publication for the mirror.', }, { label: 'Replication Slot Name', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: CDCConfig) => ({ ...curr, replicationSlotName: (value as string) || '', })), @@ -48,7 +52,7 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'Snapshot Number of Rows Per Partition', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: CDCConfig) => ({ ...curr, snapshotNumRowsPerPartition: parseInt(value as string, 10) || 500000, })), @@ -59,7 +63,7 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'Snapshot Maximum Parallel Workers', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: CDCConfig) => ({ ...curr, snapshotMaxParallelWorkers: parseInt(value as string, 10) || 8, })), @@ -70,7 +74,7 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'Snapshot Number of Tables In Parallel', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: CDCConfig) => ({ ...curr, snapshotNumTablesInParallel: parseInt(value as string, 10) || 1, })), @@ -81,7 +85,7 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'Snapshot Sync Mode', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: CDCConfig) => ({ ...curr, snapshotSyncMode: (value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, @@ -93,7 +97,7 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'CDC Sync Mode', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: CDCConfig) => ({ ...curr, cdcSyncMode: (value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, @@ -105,7 +109,7 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'Snapshot Staging Path', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: CDCConfig) => ({ ...curr, snapshotStagingPath: value as string | '', })), @@ -114,13 +118,19 @@ export const cdcSettings: MirrorSetting[] = [ { label: 'CDC Staging Path', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, cdcStagingPath: (value as string) || '' })), + setter((curr: CDCConfig) => ({ + ...curr, + cdcStagingPath: (value as string) || '', + })), tips: 'You can specify staging path if you have set the CDC sync mode as AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket url', }, { label: 'Soft Delete', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, softDelete: (value as boolean) || false })), + setter((curr: CDCConfig) => ({ + ...curr, + softDelete: (value as boolean) || false, + })), tips: 'Allows you to mark some records as deleted without actual erasure from the database', default: 'SQL', type: 'switch', diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 936f2af394..d85007c6f6 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -2,16 +2,15 @@ import { FlowConnectionConfigs, QRepConfig, QRepSyncMode, - QRepWriteMode, + QRepWriteType, } from '@/grpc_generated/flow'; import { Peer } from '@/grpc_generated/peers'; -import { MirrorSetter } from '../../types'; export interface MirrorSetting { label: string; stateHandler: ( - value: string | Peer | boolean | QRepSyncMode | QRepWriteMode, - setter: MirrorSetter + value: string | string[] | Peer | boolean | QRepSyncMode | QRepWriteType, + setter: any ) => void; type?: string; required?: boolean; @@ -55,12 +54,12 @@ export const blankQRepSetting: QRepConfig = { watermarkColumn: '', initialCopyOnly: false, syncMode: 0, - batchSizeInt: 1, - batchDurationSeconds: 3600, + batchSizeInt: 0, + batchDurationSeconds: 0, maxParallelWorkers: 8, waitBetweenBatchesSeconds: 0, writeMode: undefined, stagingPath: '', - numRowsPerPartition: 500000, + numRowsPerPartition: 0, setupWatermarkTableOnDestination: false, }; diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index b57ee487e7..cab1fbc081 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -1,11 +1,16 @@ -import { QRepSyncMode } from '@/grpc_generated/flow'; +import { + QRepConfig, + QRepSyncMode, + QRepWriteMode, + QRepWriteType, +} from '@/grpc_generated/flow'; import { Peer } from '@/grpc_generated/peers'; import { MirrorSetting } from './common'; export const qrepSettings: MirrorSetting[] = [ { label: 'Source Peer', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, sourcePeer: value as Peer })), + setter((curr: QRepConfig) => ({ ...curr, sourcePeer: value as Peer })), tips: 'The peer from which we will be replicating data. Ensure the prerequisites for this peer are met.', helpfulLink: 'https://docs.peerdb.io/usecases/Real-time%20CDC/postgres-to-snowflake#prerequisites', @@ -15,7 +20,10 @@ export const qrepSettings: MirrorSetting[] = [ { label: 'Destination Peer', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, destinationPeer: value as Peer })), + setter((curr: QRepConfig) => ({ + ...curr, + destinationPeer: value as Peer, + })), tips: 'The peer to which data will be replicated.', type: 'select', required: true, @@ -23,21 +31,27 @@ export const qrepSettings: MirrorSetting[] = [ { label: 'Table', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, watermarkTable: (value as string) || '' })), + setter((curr: QRepConfig) => ({ + ...curr, + watermarkTable: (value as string) || '', + })), tips: 'The source table of the replication and the table to which the watermark column belongs.', required: true, }, { label: 'Watermark Column', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, watermarkColumn: (value as string) || '' })), + setter((curr: QRepConfig) => ({ + ...curr, + watermarkColumn: (value as string) || '', + })), tips: 'Watermark column is used to track the progress of the replication. This column should be a unique column in the query. Example: id', required: true, }, { label: 'Create Destination Table', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: QRepConfig) => ({ ...curr, setupWatermarkTableOnDestination: (value as boolean) || false, })), @@ -47,7 +61,7 @@ export const qrepSettings: MirrorSetting[] = [ { label: 'Destination Table Name', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: QRepConfig) => ({ ...curr, destinationTableIdentifier: value as string, })), @@ -57,18 +71,18 @@ export const qrepSettings: MirrorSetting[] = [ { label: 'Rows Per Partition', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: QRepConfig) => ({ ...curr, - numRowsPerPartition: parseInt(value as string, 10) || 500000, + numRowsPerPartition: parseInt(value as string, 10), })), - tips: 'PeerDB splits up table data into partitions for increased performance. This setting controls the number of rows per partition. The default value is 500000.', - default: '500000', + tips: 'PeerDB splits up table data into partitions for increased performance. This setting controls the number of rows per partition.', type: 'number', + required: true, }, { label: 'Maximum Parallel Workers', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: QRepConfig) => ({ ...curr, maxParallelWorkers: parseInt(value as string, 10) || 8, })), @@ -76,53 +90,68 @@ export const qrepSettings: MirrorSetting[] = [ default: '8', type: 'number', }, - { - label: 'Batch Size', - stateHandler: (value, setter) => - setter((curr) => ({ - ...curr, - batchSizeInt: parseInt(value as string, 10) || 1, - })), - tips: 'Size of each batch which is being synced.', - default: '1', - type: 'number', - }, - { - label: 'Batch Duration (Seconds)', - stateHandler: (value, setter) => - setter((curr) => ({ - ...curr, - snapshotNumTablesInParallel: parseInt(value as string, 10) || 3600, - })), - tips: 'Size of a batch as seconds when the watermark column is a timestamp column.', - default: '3600', - type: 'number', - }, { label: 'Sync Mode', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: QRepConfig) => ({ ...curr, syncMode: (value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, })), - tips: 'Specify whether you want the sync mode for initial load to be via SQL or by staging AVRO files. The default mode is SQL.', + tips: 'Specify whether you want the sync mode to be via SQL or by staging AVRO files. The default mode is SQL.', default: 'SQL', type: 'select', }, + { label: 'Staging Path', stateHandler: (value, setter) => - setter((curr) => ({ ...curr, stagingPath: (value as string) || '' })), + setter((curr: QRepConfig) => ({ + ...curr, + stagingPath: (value as string) || '', + })), tips: `You can specify staging path if you have set the sync mode as AVRO. For Snowflake as destination peer. If this starts with gs:// then it will be written to GCS. If this starts with s3:// then it will be written to S3. If nothing is specified then it will be written to local disk.`, }, + { + label: 'Write Type', + stateHandler: (value, setter) => + setter((curr: QRepConfig) => { + let currWriteMode = curr.writeMode || { writeType: undefined }; + currWriteMode.writeType = value as QRepWriteType; + return { + ...curr, + writeMode: currWriteMode, + }; + }), + tips: `Specify whether you want the write mode to be via APPEND, UPSERT or OVERWRITE. + Append mode is for insert-only workloads. Upsert mode is append mode but also supports updates. + Overwrite mode overwrites the destination table data every sync.`, + type: 'select', + }, + { + label: 'Upsert Key Columns', + stateHandler: (value, setter) => + setter((curr: QRepConfig) => { + let defaultMode: QRepWriteMode = { + writeType: QRepWriteType.QREP_WRITE_MODE_APPEND, + upsertKeyColumns: [], + }; + let currWriteMode = curr.writeMode || defaultMode; + currWriteMode.upsertKeyColumns = value as string[]; + return { + ...curr, + writeMode: currWriteMode, + }; + }), + tips: `Needed when write mode is set to UPSERT. These columns need to be unique and are used for updates.`, + }, { label: 'Initial Copy Only', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: QRepConfig) => ({ ...curr, initialCopyOnly: (value as boolean) || false, })), @@ -132,7 +161,7 @@ export const qrepSettings: MirrorSetting[] = [ { label: 'Wait Time Between Batches', stateHandler: (value, setter) => - setter((curr) => ({ + setter((curr: QRepConfig) => ({ ...curr, waitBetweenBatchesSeconds: parseInt(value as string, 10) || 0, })), diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index b656ff79d3..120b29b611 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -1,5 +1,5 @@ 'use client'; -import { QRepWriteMode, QRepWriteType } from '@/grpc_generated/flow'; +import { QRepConfig } from '@/grpc_generated/flow'; import { Peer } from '@/grpc_generated/peers'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; @@ -12,7 +12,7 @@ import { Divider } from '@tremor/react'; import Link from 'next/link'; import { useRouter } from 'next/navigation'; import { useEffect, useState } from 'react'; -import { CDCConfig, QREPConfig, TableMapRow } from '../types'; +import { CDCConfig, TableMapRow } from '../types'; import CDCConfigForm from './cdc'; import { handleCreateCDC, handleCreateQRep } from './handlers'; import { cdcSettings } from './helpers/cdc'; @@ -35,16 +35,12 @@ export default function CreateMirrors() { msg: '', }); const [loading, setLoading] = useState(false); - const [config, setConfig] = useState(blankCDCSetting); + const [config, setConfig] = useState(blankCDCSetting); const [peers, setPeers] = useState([]); const [rows, setRows] = useState([ { source: '', destination: '' }, ]); const [qrepQuery, setQrepQuery] = useState(''); - const [writeMode, setWriteMode] = useState({ - writeType: QRepWriteType.QREP_WRITE_MODE_APPEND, - upsertKeyColumns: [], - }); useEffect(() => { fetch('/api/peers') @@ -145,11 +141,9 @@ export default function CreateMirrors() { ) : ( )} @@ -173,9 +167,8 @@ export default function CreateMirrors() { ) : handleCreateQRep( mirrorName, - writeMode, qrepQuery, - config as QREPConfig, + config as QRepConfig, setFormMessage, setLoading, listPeersPage, diff --git a/ui/app/mirrors/create/qrep.tsx b/ui/app/mirrors/create/qrep.tsx index 482fb66905..0d0d75c2f2 100644 --- a/ui/app/mirrors/create/qrep.tsx +++ b/ui/app/mirrors/create/qrep.tsx @@ -1,10 +1,6 @@ 'use client'; import { RequiredIndicator } from '@/components/RequiredIndicator'; -import { - QRepSyncMode, - QRepWriteMode, - QRepWriteType, -} from '@/grpc_generated/flow'; +import { QRepConfig, QRepSyncMode, QRepWriteType } from '@/grpc_generated/flow'; import { Peer } from '@/grpc_generated/peers'; import { Label } from '@/lib/Label'; import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; @@ -12,23 +8,26 @@ import { Select, SelectItem } from '@/lib/Select'; import { Switch } from '@/lib/Switch'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; -import { Dispatch, SetStateAction } from 'react'; import { InfoPopover } from '../../../components/InfoPopover'; -import { MirrorSetter, QREPConfig } from '../types'; +import { MirrorSetter } from '../types'; import { MirrorSetting } from './helpers/common'; interface QRepConfigProps { settings: MirrorSetting[]; - mirrorConfig: QREPConfig; + mirrorConfig: QRepConfig; peers: Peer[]; setter: MirrorSetter; - writeMode: QRepWriteMode; - WriteModeSetter: Dispatch>; xmin?: boolean; } export default function QRepConfigForm(props: QRepConfigProps) { const handleChange = (val: string | boolean, setting: MirrorSetting) => { - let stateVal: string | boolean | Peer | QRepSyncMode | QRepWriteType = val; + let stateVal: + | string + | boolean + | Peer + | QRepSyncMode + | QRepWriteType + | string[] = val; if (setting.label.includes('Peer')) { stateVal = props.peers.find((peer) => peer.name === val)!; } else if (setting.label.includes('Sync Mode')) { @@ -36,14 +35,22 @@ export default function QRepConfigForm(props: QRepConfigProps) { val === 'avro' ? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO : QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT; - } else if (setting.label.includes('Write Mode')) { - if (val === 'Upsert') { - props.WriteModeSetter({ - writeType: QRepWriteType.QREP_WRITE_MODE_UPSERT, - upsertKeyColumns: [], - }); + } else if (setting.label.includes('Write Type')) { + console.log('Handling write type: ' + val); + switch (val) { + case 'Upsert': + stateVal = QRepWriteType.QREP_WRITE_MODE_UPSERT; + break; + case 'Overwrite': + stateVal = QRepWriteType.QREP_WRITE_MODE_OVERWRITE; + break; + default: + stateVal = QRepWriteType.QREP_WRITE_MODE_APPEND; + break; } - return; + } else if (setting.label === 'Upsert Key Columns') { + const columns = val as string; + stateVal = columns.split(',').map((item) => item.trim()); } setting.stateHandler(stateVal, props.setter); }; @@ -51,7 +58,8 @@ export default function QRepConfigForm(props: QRepConfigProps) { const label = setting.label.toLowerCase(); if ( (label.includes('upsert') && - props.writeMode.writeType != QRepWriteType.QREP_WRITE_MODE_UPSERT) || + props.mirrorConfig.writeMode?.writeType != + QRepWriteType.QREP_WRITE_MODE_UPSERT) || (label.includes('staging') && props.mirrorConfig.syncMode?.toString() !== '1') || (label.includes('watermark column') && props.xmin) || diff --git a/ui/app/mirrors/create/schema.ts b/ui/app/mirrors/create/schema.ts index 395b46fbe1..4437f39158 100644 --- a/ui/app/mirrors/create/schema.ts +++ b/ui/app/mirrors/create/schema.ts @@ -14,16 +14,22 @@ export const tableMappingSchema = z .nonempty('At least one table mapping is required'); export const cdcSchema = z.object({ - source: z.object({ - name: z.string().nonempty(), - type: z.any(), - config: z.any(), - }), - destination: z.object({ - name: z.string().nonempty(), - type: z.any(), - config: z.any(), - }), + source: z.object( + { + name: z.string().min(1), + type: z.any(), + config: z.any(), + }, + { required_error: 'Source peer is required' } + ), + destination: z.object( + { + name: z.string().min(1), + type: z.any(), + config: z.any(), + }, + { required_error: 'Destination peer is required' } + ), doInitialCopy: z.boolean().optional(), publicationName: z .string({ @@ -74,16 +80,22 @@ export const cdcSchema = z.object({ }); export const qrepSchema = z.object({ - sourcePeer: z.object({ - name: z.string().nonempty(), - type: z.any(), - config: z.any(), - }), - destinationPeer: z.object({ - name: z.string().nonempty(), - type: z.any(), - config: z.any(), - }), + sourcePeer: z.object( + { + name: z.string().min(1), + type: z.any(), + config: z.any(), + }, + { required_error: 'Source peer is required' } + ), + destinationPeer: z.object( + { + name: z.string().min(1), + type: z.any(), + config: z.any(), + }, + { required_error: 'Destination peer is required' } + ), initialCopyOnly: z.boolean().optional(), setupWatermarkTableOnDestination: z.boolean().optional(), destinationTableIdentifier: z @@ -110,10 +122,10 @@ export const qrepSchema = z.object({ numRowsPerPartition: z .number({ invalid_type_error: 'Rows per partition must be a number', + required_error: 'Rows per partition is required', }) .int() - .min(1, 'Rows per partition must be a positive integer') - .optional(), + .min(1, 'Rows per partition must be a positive integer'), maxParallelWorkers: z .number({ invalid_type_error: 'max workers must be a number', @@ -121,26 +133,20 @@ export const qrepSchema = z.object({ .int() .min(1, 'max workers must be a positive integer') .optional(), - batchSizeInt: z - .number({ - invalid_type_error: 'Batch size must be a number', - }) - .int() - .min(1, 'Batch size must be a positive integer') - .optional(), - batchDurationSeconds: z - .number({ - invalid_type_error: 'Batch duration must be a number', - }) - .int() - .min(1, 'Batch duration must be a positive integer') - .optional(), stagingPath: z .string({ invalid_type_error: 'Staging path must be a string', }) .max(255, 'Staging path must be less than 255 characters') .optional(), + writeMode: z.object({ + writeType: z + .number({ required_error: 'Write type is required' }) + .int() + .min(0) + .max(2), + upsert_key_columns: z.array(z.string()).optional(), + }), waitBetweenBatchesSeconds: z .number({ invalid_type_error: 'Batch wait must be a number', diff --git a/ui/app/mirrors/types.ts b/ui/app/mirrors/types.ts index 92a04ab02c..16054b2faf 100644 --- a/ui/app/mirrors/types.ts +++ b/ui/app/mirrors/types.ts @@ -2,7 +2,6 @@ import { FlowConnectionConfigs, QRepConfig } from '@/grpc_generated/flow'; import { Dispatch, SetStateAction } from 'react'; export type CDCConfig = FlowConnectionConfigs; -export type QREPConfig = QRepConfig; -export type MirrorConfig = CDCConfig | QREPConfig; -export type MirrorSetter = Dispatch>; +export type MirrorConfig = CDCConfig | QRepConfig; +export type MirrorSetter = Dispatch>; export type TableMapRow = { source: string; destination: string }; diff --git a/ui/app/peers/page.tsx b/ui/app/peers/page.tsx index 9f85665600..7fd26fa990 100644 --- a/ui/app/peers/page.tsx +++ b/ui/app/peers/page.tsx @@ -72,13 +72,13 @@ async function PeersTable({ title }: { title: string }) { - - - }