Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeerDB: For queues, move sync interval to advanced, default 10 mins #1672

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Icon } from '@/lib/Icon';
import { Dispatch, SetStateAction, useEffect, useMemo, useState } from 'react';
import { CDCConfig, MirrorSetter, TableMapRow } from '../../../dto/MirrorsDTO';
import { IsQueuePeer, fetchPublications } from '../handlers';
import { MirrorSetting } from '../helpers/common';
import { AdvancedSettingType, MirrorSetting } from '../helpers/common';
import CDCField from './fields';
import TableMapping from './tablemapping';

Expand Down Expand Up @@ -46,12 +46,32 @@ export default function CDCConfigForm({
};

const normalSettings = useMemo(() => {
return settings.filter((setting) => setting.advanced != true);
}, [settings]);
return settings!.filter(
(setting) =>
!(
(IsQueuePeer(mirrorConfig.destination?.type) &&
setting.advanced === AdvancedSettingType.QUEUE) ||
setting.advanced === AdvancedSettingType.ALL
)
);
}, [settings, mirrorConfig.destination?.type]);

const advancedSettings = useMemo(() => {
return settings.filter((setting) => setting.advanced == true);
}, [settings]);
return settings!
.map((setting) => {
if (
IsQueuePeer(mirrorConfig.destination?.type) &&
setting.advanced === AdvancedSettingType.QUEUE
) {
setting.stateHandler(600, setter);
return { ...setting, default: 600 };
}
if (setting.advanced === AdvancedSettingType.ALL) {
return setting;
}
})
.filter((setting) => setting !== undefined);
}, [settings, mirrorConfig.destination?.type, setter]);

const paramDisplayCondition = (setting: MirrorSetting) => {
const label = setting.label.toLowerCase();
Expand Down Expand Up @@ -91,15 +111,15 @@ export default function CDCConfigForm({
if (mirrorConfig.source != undefined && mirrorConfig.destination != undefined)
return (
<>
{normalSettings.map((setting, id) => {
{normalSettings!.map((setting, id) => {
return (
paramDisplayCondition(setting) && (
paramDisplayCondition(setting!) && (
<CDCField
key={id}
handleChange={handleChange}
setting={setting}
setting={setting!}
options={
setting.label === 'Publication Name'
setting?.label === 'Publication Name'
? publications
: undefined
}
Expand Down Expand Up @@ -128,13 +148,13 @@ export default function CDCConfigForm({
</Button>

{show &&
advancedSettings.map((setting, id) => {
advancedSettings!.map((setting, id) => {
return (
paramDisplayCondition(setting) && (
paramDisplayCondition(setting!) && (
<CDCField
key={setting.label}
key={setting?.label}
handleChange={handleChange}
setting={setting}
setting={setting!}
/>
)
);
Expand Down
23 changes: 12 additions & 11 deletions ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TypeSystem } from '@/grpc_generated/flow';
import { CDCConfig } from '../../../dto/MirrorsDTO';
import { MirrorSetting } from './common';
import { AdvancedSettingType, MirrorSetting } from './common';
export const cdcSettings: MirrorSetting[] = [
{
label: 'Initial Copy',
Expand All @@ -24,7 +24,7 @@ export const cdcSettings: MirrorSetting[] = [
tips: 'The number of rows PeerDB will pull from source at a time. If left empty, the default value is 1,000,000 rows.',
type: 'number',
default: '1000000',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
{
label: 'Sync Interval (Seconds)',
Expand All @@ -38,6 +38,7 @@ export const cdcSettings: MirrorSetting[] = [
type: 'number',
default: '60',
required: true,
advanced: AdvancedSettingType.QUEUE,
},
{
label: 'Publication Name',
Expand Down Expand Up @@ -71,7 +72,7 @@ export const cdcSettings: MirrorSetting[] = [
tips: 'PeerDB splits up table data into partitions for increased performance. This setting controls the number of rows per partition. The default value is 1000000.',
default: '1000000',
type: 'number',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
{
label: 'Parallelism for Initial Load',
Expand All @@ -95,7 +96,7 @@ export const cdcSettings: MirrorSetting[] = [
tips: 'Specify the number of tables to sync perform initial load for, in parallel. The default value is 1.',
default: '1',
type: 'number',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
{
label: 'Snapshot Staging Path',
Expand All @@ -105,7 +106,7 @@ export const cdcSettings: MirrorSetting[] = [
snapshotStagingPath: value as string | '',
})),
tips: 'You can specify staging path for Snapshot sync mode AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket URL. For BigQuery, this must be either empty or an existing GCS bucket name. In both cases, if empty, the local filesystem will be used.',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
{
label: 'CDC Staging Path',
Expand All @@ -115,7 +116,7 @@ export const cdcSettings: MirrorSetting[] = [
cdcStagingPath: (value as string) || '',
})),
tips: 'You can specify staging path for CDC sync mode AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket URL. For BigQuery, this must be either empty or an existing GCS bucket name. In both cases, if empty, the local filesystem will be used.',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
{
label: 'Soft Delete',
Expand All @@ -138,7 +139,7 @@ export const cdcSettings: MirrorSetting[] = [
})),
tips: 'If set, PeerDB will only perform initial load and will not perform CDC sync.',
type: 'switch',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
{
label: 'Script',
Expand All @@ -148,7 +149,7 @@ export const cdcSettings: MirrorSetting[] = [
script: (value as string) || '',
})),
tips: 'Associate PeerDB script with this mirror.',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
{
label: 'Use Postgres type system',
Expand All @@ -160,7 +161,7 @@ export const cdcSettings: MirrorSetting[] = [
type: 'switch',
default: false,
tips: 'Decide if PeerDB should use native Postgres types directly',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
{
label: 'Synced-At Column Name',
Expand All @@ -170,7 +171,7 @@ export const cdcSettings: MirrorSetting[] = [
syncedAtColName: value as string | '',
})),
tips: 'A field to set the name of PeerDBs synced_at column. If not set, a default name will be set',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
{
label: 'Soft Delete Column Name',
Expand All @@ -180,6 +181,6 @@ export const cdcSettings: MirrorSetting[] = [
softDeleteColName: value as string | '',
})),
tips: 'A field to set the name of PeerDBs soft delete column.',
advanced: true,
advanced: AdvancedSettingType.ALL,
},
];
8 changes: 7 additions & 1 deletion ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { FlowConnectionConfigs, TypeSystem } from '@/grpc_generated/flow';

export enum AdvancedSettingType {
QUEUE = 'queue',
ALL = 'all',
NONE = 'none',
}

export interface MirrorSetting {
label: string;
stateHandler: (value: any, setter: any) => void;
Expand All @@ -8,7 +14,7 @@ export interface MirrorSetting {
tips?: string;
helpfulLink?: string;
default?: string | number | boolean;
advanced?: boolean; // whether it should come under an 'Advanced' section
advanced?: AdvancedSettingType; // whether it should come under an 'Advanced' section
command?: string;
}

Expand Down
Loading