diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 872c859304..628d5a5cb3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,7 +30,7 @@ jobs: --health-timeout 5s --health-retries 5 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: submodules: recursive diff --git a/.github/workflows/customer-docker.yml b/.github/workflows/customer-docker.yml index 026e70c7f8..f8c6ae2781 100644 --- a/.github/workflows/customer-docker.yml +++ b/.github/workflows/customer-docker.yml @@ -19,7 +19,7 @@ jobs: packages: write steps: - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: submodules: recursive diff --git a/.github/workflows/dev-docker.yml b/.github/workflows/dev-docker.yml index 1a8e64a876..8d4352004e 100644 --- a/.github/workflows/dev-docker.yml +++ b/.github/workflows/dev-docker.yml @@ -17,7 +17,7 @@ jobs: packages: write steps: - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: submodules: recursive diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 66cb684937..54887adeac 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -31,11 +31,11 @@ jobs: --health-retries 5 steps: - name: checkout sources - uses: actions/checkout@v3 + uses: actions/checkout@v4 - uses: actions/setup-go@v3 with: - go-version: ">=1.19.0" + go-version: ">=1.21.0" - name: install gotestsum run: | diff --git a/.github/workflows/golang-lint.yml b/.github/workflows/golang-lint.yml index fd685d8c44..65da8f7e4d 100644 --- a/.github/workflows/golang-lint.yml +++ b/.github/workflows/golang-lint.yml @@ -17,7 +17,7 @@ jobs: runs-on: ${{ matrix.runner }} steps: - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: submodules: recursive diff --git a/.github/workflows/rust-lint.yml b/.github/workflows/rust-lint.yml index 01c3dde174..3e67d54be5 100644 --- a/.github/workflows/rust-lint.yml +++ b/.github/workflows/rust-lint.yml @@ -17,7 +17,7 @@ jobs: runs-on: ${{ matrix.runner }} steps: - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: submodules: recursive diff --git a/.github/workflows/stable-docker.yml b/.github/workflows/stable-docker.yml index 79e9017cea..48f9faa113 100644 --- a/.github/workflows/stable-docker.yml +++ b/.github/workflows/stable-docker.yml @@ -16,7 +16,7 @@ jobs: packages: write steps: - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: submodules: recursive diff --git a/.github/workflows/ui-build.yml b/.github/workflows/ui-build.yml index c21c1e1fad..3a6228e2dc 100644 --- a/.github/workflows/ui-build.yml +++ b/.github/workflows/ui-build.yml @@ -17,7 +17,7 @@ jobs: runs-on: ${{ matrix.runner }} steps: - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Node.js dependencies working-directory: ui diff --git a/.github/workflows/ui-lint.yml b/.github/workflows/ui-lint.yml index 213ebef34b..ba895eb782 100644 --- a/.github/workflows/ui-lint.yml +++ b/.github/workflows/ui-lint.yml @@ -21,7 +21,7 @@ jobs: runs-on: ${{ matrix.runner }} steps: - name: checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Install Node.js dependencies working-directory: ui @@ -34,3 +34,5 @@ jobs: prettier: true eslint_dir: ui prettier_dir: ui + eslint_args: "--max-warnings 0" + eslint_extensions: js,ts,jsx,tsx diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 30be24eba6..f2faccf13a 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -38,7 +38,7 @@ func NewEventHubConnector( return nil, err } - hubManager := NewEventHubManager(ctx, defaultAzureCreds, config) + hubManager := NewEventHubManager(defaultAzureCreds, config) metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101 pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(), metadataSchemaName) @@ -124,7 +124,6 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error func (c *EventHubConnector) processBatch( flowJobName string, batch *model.CDCRecordStream, - eventsPerBatch int, maxParallelism int64, ) (uint32, error) { ctx := context.Background() @@ -133,6 +132,18 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns) + flushBatch := func() error { + err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) + if err != nil { + log.WithFields(log.Fields{ + "flowName": flowJobName, + }).Infof("failed to send event batch: %v", err) + return err + } + batchPerTopic.Clear() + return nil + } + numRecords := 0 for record := range batch.GetRecords() { numRecords++ @@ -144,18 +155,6 @@ func (c *EventHubConnector) processBatch( return 0, err } - flushBatch := func() error { - err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) - if err != nil { - log.WithFields(log.Fields{ - "flowName": flowJobName, - }).Infof("failed to send event batch: %v", err) - return err - } - batchPerTopic.Clear() - return nil - } - topicName, err := NewScopedEventhub(record.GetTableName()) if err != nil { log.WithFields(log.Fields{ @@ -172,19 +171,20 @@ func (c *EventHubConnector) processBatch( return 0, err } - if (numRecords)%eventsPerBatch == 0 { - err := flushBatch() - if err != nil { - return 0, err - } + if numRecords%1000 == 0 { + log.WithFields(log.Fields{ + "flowName": flowJobName, + }).Infof("processed %d records for sending", numRecords) } } - if batchPerTopic.Len() > 0 { - err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping) - if err != nil { - return 0, err - } + log.WithFields(log.Fields{ + "flowName": flowJobName, + }).Infof("processed %d records for sending", numRecords) + + err := flushBatch() + if err != nil { + return 0, err } log.WithFields(log.Fields{ @@ -203,10 +203,6 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S shutdown <- true }() - eventsPerBatch := int(req.PushBatchSize) - if eventsPerBatch <= 0 { - eventsPerBatch = 10000 - } maxParallelism := req.PushParallelism if maxParallelism <= 0 { maxParallelism = 10 @@ -221,13 +217,13 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S // otherwise, we block until processBatch is done. if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) { go func() { - numRecords, err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism) + numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) if err != nil { log.Errorf("[async] failed to process batch: %v", err) } }() } else { - numRecords, err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism) + numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) if err != nil { log.Errorf("failed to process batch: %v", err) return nil, err @@ -316,7 +312,7 @@ func (c *EventHubConnector) sendEventBatch( return firstErr } - log.Infof("successfully sent %d events to event hub", numEventsPushed) + log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed) return nil } @@ -328,7 +324,7 @@ func (c *EventHubConnector) sendBatch( subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - hub, err := c.hubManager.GetOrCreateHubClient(tblName) + hub, err := c.hubManager.GetOrCreateHubClient(subCtx, tblName) if err != nil { return err } diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 1241c0ec7a..a30f94d162 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" @@ -24,7 +25,6 @@ type EventHubManager struct { } func NewEventHubManager( - ctx context.Context, creds *azidentity.DefaultAzureCredential, groupConfig *protos.EventHubGroupConfig, ) *EventHubManager { @@ -40,7 +40,8 @@ func NewEventHubManager( } } -func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhubs.ProducerClient, error) { +func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) ( + *azeventhubs.ProducerClient, error) { ehConfig, ok := m.peerConfig.Get(name.PeerName) if !ok { return nil, fmt.Errorf("eventhub '%s' not configured", name) @@ -53,9 +54,27 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace) } - hub, ok := m.hubs.Load(name) - if !ok { - opts := &azeventhubs.ProducerClientOptions{} + var hubConnectOK bool + var hub any + hub, hubConnectOK = m.hubs.Load(name) + if hubConnectOK { + hubTmp := hub.(*azeventhubs.ProducerClient) + _, err := hubTmp.GetEventHubProperties(ctx, nil) + if err != nil { + log.Infof("eventhub %s not reachable. Will re-establish connection and re-create it. Err: %v", name, err) + m.hubs.Delete(name) + hubConnectOK = false + } + } + + if !hubConnectOK { + opts := &azeventhubs.ProducerClientOptions{ + RetryOptions: azeventhubs.RetryOptions{ + MaxRetries: 32, + RetryDelay: 2 * time.Second, + MaxRetryDelay: 16 * time.Second, + }, + } hub, err := azeventhubs.NewProducerClient(namespace, name.Eventhub, m.creds, opts) if err != nil { return nil, fmt.Errorf("failed to create eventhub client: %v", err) @@ -67,8 +86,9 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub return hub.(*azeventhubs.ProducerClient), nil } -func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (*azeventhubs.EventDataBatch, error) { - hub, err := m.GetOrCreateHubClient(name) +func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) ( + *azeventhubs.EventDataBatch, error) { + hub, err := m.GetOrCreateHubClient(ctx, name) if err != nil { return nil, err } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 533b4ee0c5..278f0ed863 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -191,6 +191,7 @@ func (p *PostgresCDCSource) consumeStream( records.SignalAsEmpty() } records.RelationMessageMapping <- &p.relationMessageMapping + log.Infof("[finished] PullRecords streamed %d records", len(localRecords)) }() shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string { diff --git a/ui/app/dto/MirrorsDTO.ts b/ui/app/dto/MirrorsDTO.ts index e0f6486b10..b2561a4dda 100644 --- a/ui/app/dto/MirrorsDTO.ts +++ b/ui/app/dto/MirrorsDTO.ts @@ -17,4 +17,5 @@ export type TableMapRow = { source: string; destination: string; partitionKey: string; + selected: boolean; }; diff --git a/ui/app/globalstate/time.tsx b/ui/app/globalstate/time.tsx new file mode 100644 index 0000000000..04934e5272 --- /dev/null +++ b/ui/app/globalstate/time.tsx @@ -0,0 +1,13 @@ +import { create } from 'zustand'; + +interface TZState { + timezone: string; + setZone: (tz: string) => void; +} + +const useTZStore = create()((set) => ({ + timezone: 'UTC', + setZone: (tz) => set(() => ({ timezone: tz })), +})); + +export default useTZStore; diff --git a/ui/app/mirrors/create/cdc.tsx b/ui/app/mirrors/create/cdc.tsx index 45595c7fa9..226167b2d6 100644 --- a/ui/app/mirrors/create/cdc.tsx +++ b/ui/app/mirrors/create/cdc.tsx @@ -45,29 +45,8 @@ export default function CDCConfigForm(props: MirrorConfigProps) { ); }; const handleChange = (val: string | boolean, setting: MirrorSetting) => { - let stateVal: string | boolean | Peer | QRepSyncMode = val; - if (setting.label.includes('Peer')) { - stateVal = props.peers.find((peer) => peer.name === val)!; - if (setting.label === 'Destination Peer') { - if (stateVal.type === DBType.POSTGRES) { - props.setter((curr) => { - return { - ...curr, - cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - }; - }); - } else if (stateVal.type === DBType.SNOWFLAKE) { - props.setter((curr) => { - return { - ...curr, - cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - }; - }); - } - } - } else if (setting.label.includes('Sync Mode')) { + let stateVal: string | boolean | QRepSyncMode = val; + if (setting.label.includes('Sync Mode')) { stateVal = val === 'AVRO' ? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO @@ -92,55 +71,6 @@ export default function CDCConfigForm(props: MirrorConfigProps) { return ( <> - - Source Peer - {RequiredIndicator(true)} - - } - action={ -
- - -
- } - /> {props.mirrorConfig.source && ( - {(setting.label.includes('Peer') - ? (props.peers ?? []).map((peer) => peer.name) - : ['AVRO', 'Copy with Binary'] - ).map((item, id) => { + {['AVRO', 'Copy with Binary'].map((item, id) => { return ( {item.toString()} diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index cd08210f26..ffafaa2602 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -52,14 +52,23 @@ const validateQRepFields = ( return true; }; +interface TableMapping { + sourceTableIdentifier: string; + destinationTableIdentifier: string; + partitionKey: string; +} const reformattedTableMapping = (tableMapping: TableMapRow[]) => { - const mapping = tableMapping.map((row) => { - return { - sourceTableIdentifier: row.source, - destinationTableIdentifier: row.destination, - partitionKey: row.partitionKey, - }; - }); + const mapping = tableMapping + .map((row) => { + if (row.selected === true) { + return { + sourceTableIdentifier: row.source, + destinationTableIdentifier: row.destination, + partitionKey: row.partitionKey, + }; + } + }) + .filter(Boolean); return mapping; }; @@ -83,7 +92,7 @@ export const handleCreateCDC = async ( const isValid = validateCDCFields(rows, setMsg, config); if (!isValid) return; const tableNameMapping = reformattedTableMapping(rows); - config['tableMappings'] = tableNameMapping; + config['tableMappings'] = tableNameMapping as TableMapping[]; config['flowJobName'] = flowJobName; setLoading(true); const statusMessage: UCreateMirrorResponse = await fetch('/api/mirrors/cdc', { diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts index b1b03e8bd0..af79f83c52 100644 --- a/ui/app/mirrors/create/helpers/cdc.ts +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -1,16 +1,7 @@ import { QRepSyncMode } from '@/grpc_generated/flow'; -import { Peer } from '@/grpc_generated/peers'; import { CDCConfig } from '../../../dto/MirrorsDTO'; import { MirrorSetting } from './common'; export const cdcSettings: MirrorSetting[] = [ - { - label: 'Destination Peer', - stateHandler: (value, setter) => - setter((curr: CDCConfig) => ({ ...curr, destination: value as Peer })), - tips: 'The peer to which data will be replicated.', - type: 'select', - required: true, - }, { label: 'Initial Copy', stateHandler: (value, setter) => @@ -55,10 +46,10 @@ export const cdcSettings: MirrorSetting[] = [ stateHandler: (value, setter) => setter((curr: CDCConfig) => ({ ...curr, - snapshotMaxParallelWorkers: parseInt(value as string, 10) || 8, + snapshotMaxParallelWorkers: parseInt(value as string, 10) || 1, })), tips: 'PeerDB spins up parallel threads for each partition. This setting controls the number of partitions to sync in parallel. The default value is 8.', - default: '8', + default: '1', type: 'number', }, { diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index f09ef1fbcf..1c03e084b6 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -32,7 +32,7 @@ export const blankCDCSetting: FlowConnectionConfigs = { doInitialCopy: false, publicationName: '', snapshotNumRowsPerPartition: 500000, - snapshotMaxParallelWorkers: 8, + snapshotMaxParallelWorkers: 1, snapshotNumTablesInParallel: 1, snapshotSyncMode: 0, cdcSyncMode: 0, @@ -57,7 +57,7 @@ export const blankQRepSetting: QRepConfig = { syncMode: 0, batchSizeInt: 0, batchDurationSeconds: 0, - maxParallelWorkers: 8, + maxParallelWorkers: 1, waitBetweenBatchesSeconds: 30, writeMode: undefined, stagingPath: '', diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index f0f594bd5f..9d518cd987 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -4,30 +4,8 @@ import { QRepWriteMode, QRepWriteType, } from '@/grpc_generated/flow'; -import { Peer } from '@/grpc_generated/peers'; import { MirrorSetting } from './common'; export const qrepSettings: MirrorSetting[] = [ - { - label: 'Source Peer', - stateHandler: (value, setter) => - setter((curr: QRepConfig) => ({ ...curr, sourcePeer: value as Peer })), - tips: 'The peer from which we will be replicating data. Ensure the prerequisites for this peer are met.', - helpfulLink: - 'https://docs.peerdb.io/usecases/Real-time%20CDC/postgres-to-snowflake#prerequisites', - type: 'select', - required: true, - }, - { - label: 'Destination Peer', - stateHandler: (value, setter) => - setter((curr: QRepConfig) => ({ - ...curr, - destinationPeer: value as Peer, - })), - tips: 'The peer to which data will be replicated.', - type: 'select', - required: true, - }, { label: 'Table', stateHandler: (value, setter) => @@ -84,10 +62,10 @@ export const qrepSettings: MirrorSetting[] = [ stateHandler: (value, setter) => setter((curr: QRepConfig) => ({ ...curr, - maxParallelWorkers: parseInt(value as string, 10) || 8, + maxParallelWorkers: parseInt(value as string, 10) || 1, })), tips: 'PeerDB spins up parallel threads for each partition. This setting controls the number of partitions to sync in parallel. The default value is 8.', - default: '8', + default: '1', type: 'number', }, { diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index a58c54c370..b689bffbb9 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -1,22 +1,31 @@ 'use client'; -import { QRepConfig } from '@/grpc_generated/flow'; -import { Peer } from '@/grpc_generated/peers'; +import { DBTypeToImageMapping } from '@/components/PeerComponent'; +import { RequiredIndicator } from '@/components/RequiredIndicator'; +import { QRepConfig, QRepSyncMode } from '@/grpc_generated/flow'; +import { DBType, Peer } from '@/grpc_generated/peers'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; import { Label } from '@/lib/Label'; -import { RowWithRadiobutton, RowWithTextField } from '@/lib/Layout'; +import { + RowWithRadiobutton, + RowWithSelect, + RowWithTextField, +} from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; import { RadioButton, RadioButtonGroup } from '@/lib/RadioButtonGroup'; +import { Select, SelectItem } from '@/lib/Select'; import { TextField } from '@/lib/TextField'; import { Divider } from '@tremor/react'; +import Image from 'next/image'; import Link from 'next/link'; import { useRouter } from 'next/navigation'; import { useEffect, useState } from 'react'; +import { InfoPopover } from '../../../components/InfoPopover'; import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO'; import CDCConfigForm from './cdc'; import { handleCreateCDC, handleCreateQRep } from './handlers'; import { cdcSettings } from './helpers/cdc'; -import { blankCDCSetting, blankQRepSetting } from './helpers/common'; +import { blankCDCSetting } from './helpers/common'; import { qrepSettings } from './helpers/qrep'; import QRepConfigForm from './qrep'; import QRepQuery from './query'; @@ -24,7 +33,7 @@ import QRepQuery from './query'; export default function CreateMirrors() { const router = useRouter(); const [mirrorName, setMirrorName] = useState(''); - const [mirrorType, setMirrorType] = useState('CDC'); + const [mirrorType, setMirrorType] = useState(''); const [formMessage, setFormMessage] = useState<{ ok: boolean; msg: string }>({ ok: true, msg: '', @@ -34,7 +43,11 @@ export default function CreateMirrors() { const [peers, setPeers] = useState([]); const [rows, setRows] = useState([]); const [sourceSchema, setSourceSchema] = useState('public'); - const [qrepQuery, setQrepQuery] = useState(''); + const [qrepQuery, setQrepQuery] = + useState(`-- Here's a sample template: + SELECT * FROM + WHERE + BETWEEN {{.start}} AND {{.end}}`); useEffect(() => { fetch('/api/peers') @@ -44,7 +57,6 @@ export default function CreateMirrors() { }); if (mirrorType === 'Query Replication' || mirrorType === 'XMIN') { - setConfig(blankQRepSetting); if (mirrorType === 'XMIN') { setConfig((curr) => { return { ...curr, setupWatermarkTableOnDestination: true }; @@ -53,13 +65,49 @@ export default function CreateMirrors() { setConfig((curr) => { return { ...curr, setupWatermarkTableOnDestination: false }; }); - } else setConfig(blankCDCSetting); + } }, [mirrorType]); let listMirrorsPage = () => { router.push('/mirrors'); }; + const handlePeer = (val: string, peerEnd: 'src' | 'dst') => { + const stateVal = peers.find((peer) => peer.name === val)!; + if (peerEnd === 'dst') { + if (stateVal.type === DBType.POSTGRES) { + setConfig((curr) => { + return { + ...curr, + cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + syncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + }; + }); + } else if (stateVal.type === DBType.SNOWFLAKE) { + setConfig((curr) => { + return { + ...curr, + cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + syncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + }; + }); + } + setConfig((curr) => ({ + ...curr, + destination: stateVal, + destinationPeer: stateVal, + })); + } else { + setConfig((curr) => ({ + ...curr, + source: stateVal, + sourcePeer: stateVal, + })); + } + }; + return (
@@ -78,7 +126,9 @@ export default function CreateMirrors() { > Mirror type - setMirrorType(value)}> + setMirrorType(value)} + >
@@ -111,8 +160,8 @@ export default function CreateMirrors() {
@@ -131,12 +180,11 @@ export default function CreateMirrors() { width: '35%', marginLeft: '0.5rem', marginRight: '0.5rem', - height: '20vh', + height: '22vh', display: 'flex', flexDirection: 'column', justifyContent: 'space-between', boxShadow: '2px 2px 4px rgba(0, 0, 0, 0.1)', - backgroundColor: 'ghostwhite', borderRadius: '1rem', }} > @@ -171,12 +219,11 @@ export default function CreateMirrors() { style={{ padding: '1rem', width: '35%', - height: '20vh', + height: '22vh', display: 'flex', flexDirection: 'column', justifyContent: 'space-between', boxShadow: '2px 2px 4px rgba(0, 0, 0, 0.1)', - backgroundColor: 'ghostwhite', borderRadius: '1rem', }} > @@ -217,13 +264,113 @@ export default function CreateMirrors() { /> } /> + + + Source Peer + {RequiredIndicator(true)} + + } + action={ +
+ + +
+ } + /> + + + Destination Peer + {RequiredIndicator(true)} + + } + action={ +
+ + +
+ } + /> {mirrorType === 'Query Replication' && ( )} - + {mirrorType && } {!loading && formMessage.msg.length > 0 && (
); diff --git a/ui/app/mirrors/create/qrep.tsx b/ui/app/mirrors/create/qrep.tsx index ed8677bcb8..58806a9942 100644 --- a/ui/app/mirrors/create/qrep.tsx +++ b/ui/app/mirrors/create/qrep.tsx @@ -31,33 +31,9 @@ export default function QRepConfigForm(props: QRepConfigProps) { }; const handleChange = (val: string | boolean, setting: MirrorSetting) => { - let stateVal: - | string - | boolean - | Peer - | QRepSyncMode - | QRepWriteType - | string[] = val; - if (setting.label.includes('Peer')) { - stateVal = props.peers.find((peer) => peer.name === val)!; - if (setting.label === 'Destination Peer') { - if (stateVal.type === DBType.POSTGRES) { - props.setter((curr) => { - return { - ...curr, - syncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - }; - }); - } else if (stateVal.type === DBType.SNOWFLAKE) { - props.setter((curr) => { - return { - ...curr, - syncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - }; - }); - } - } - } else if (setting.label.includes('Sync Mode')) { + let stateVal: string | boolean | QRepSyncMode | QRepWriteType | string[] = + val; + if (setting.label.includes('Sync Mode')) { stateVal = val === 'AVRO' ? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO @@ -150,9 +126,7 @@ export default function QRepConfigForm(props: QRepConfigProps) { }} > } /> - -
- {allTables ? ( - allTables.map((sourceTableName, index) => ( +
+
+ handleSelectAll(e)} /> + +
+
+ ) => + setSearchQuery(e.target.value) + } + /> +
+
+
+ {filteredRows ? ( + filteredRows.map((row, index) => (
- handleSwitch(state, sourceTableName) + handleSwitch(state, row.source) } />
- {sourceTableName} + {row.source}
- {rows.find( - (row) => row.source === `${schema}.${sourceTableName}` - )?.destination && ( + {row.selected && (
- row.source === - `${schema}.${sourceTableName}` - )?.destination - } + defaultValue={row.destination} onChange={( e: React.ChangeEvent ) => - updateDestination( - `${schema}.${sourceTableName}`, - e.target.value - ) + updateDestination(row.source, e.target.value) } />
@@ -230,10 +279,7 @@ const TableMapping = ({ onChange={( e: React.ChangeEvent ) => - updatePartitionKey( - `${schema}.${sourceTableName}`, - e.target.value - ) + updatePartitionKey(row.source, e.target.value) } />
@@ -249,7 +295,7 @@ const TableMapping = ({ diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx index b0fa880fc6..ff8263dc6b 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx @@ -1,21 +1,22 @@ 'use client'; +import SearchBar from '@/components/Search'; +import TimeLabel from '@/components/TimeComponent'; import { CDCMirrorStatus, QRepMirrorStatus, SnapshotStatus, } from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; -import { Checkbox } from '@/lib/Checkbox'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { ProgressBar } from '@/lib/ProgressBar'; -import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; import * as Tabs from '@radix-ui/react-tabs'; import moment, { Duration, Moment } from 'moment'; import { useQueryState } from 'next-usequerystate'; import Link from 'next/link'; +import { useState } from 'react'; import styled from 'styled-components'; import CDCDetails from './cdcDetails'; @@ -94,73 +95,92 @@ function summarizeTableClone(clone: QRepMirrorStatus): TableCloneSummary { type SnapshotStatusProps = { status: SnapshotStatus; }; -const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => ( - Initial Copy} - toolbar={{ - left: ( - <> - - - - - ), - right: , - }} - header={ - - - - - Table Identifier - Start Time - Progress Partitions - Num Rows Synced - Avg Time Per Partition - - } - > - {status.clones.map(summarizeTableClone).map((clone, index) => ( - - - - - -
Initial Copy} + toolbar={{ + left: ( + <> + + +
-); + + + + ), + right: ( + + status.clones.map(summarizeTableClone).filter((row: any) => { + return row.tableName + .toLowerCase() + .includes(query.toLowerCase()); + }) + } + /> + ), + }} + header={ + + Table Identifier + Start Time + Progress Partitions + Num Rows Synced + Avg Time Per Partition + + } + > + {snapshotRows.map((clone, index) => ( + + + + + + + + + + {clone.completedNumPartitions} / {clone.totalNumPartitions} + + {clone.completedNumRows} + + + + + ))} + + ); +}; const Trigger = styled( ({ isActive, ...props }: { isActive?: boolean } & Tabs.TabsTriggerProps) => ( diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx index 20f44ea903..752b24e8c3 100644 --- a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx @@ -1,11 +1,11 @@ 'use client'; +import SearchBar from '@/components/Search'; +import TimeLabel from '@/components/TimeComponent'; import { Button } from '@/lib/Button'; -import { Checkbox } from '@/lib/Checkbox'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle'; -import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; import moment from 'moment'; import { useState } from 'react'; @@ -30,10 +30,14 @@ function TimeWithDurationOrRunning({ }) { if (endTime) { return ( - + <> + + + ); } else { return ( @@ -52,9 +56,10 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { const startRow = (currentPage - 1) * ROWS_PER_PAGE; const endRow = startRow + ROWS_PER_PAGE; - - const displayedRows = rows.slice(startRow, endRow); - + const allRows = rows.slice(startRow, endRow); + const [displayedRows, setDisplayedRows] = useState( + rows.slice(startRow, endRow) + ); const handlePrevPage = () => { if (currentPage > 1) setCurrentPage(currentPage - 1); }; @@ -65,7 +70,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { return ( Initial Copy} + title={} toolbar={{ left: ( <> @@ -84,30 +89,45 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { ), - right: , + right: ( + + allRows.filter((row: any) => { + return row.batchId == parseInt(query, 10); + }) + } + /> + ), }} header={ - - - - Batch ID - Start Time - End Time (Duration) - Num Rows Synced + {['Batch ID', 'Start Time', 'End Time (Duration)', 'Rows Synced'].map( + (heading, index) => ( + + + + ) + )} } > {displayedRows.map((row, index) => ( - - - - + { + flowArray.forEach((flow) => { + if (flow.config_proto) { + flow.config_proto = new TextDecoder().decode(flow.config_proto); + } + }); +}; + +export default async function Mirrors() { + let mirrors = await prisma.flows.findMany({ include: { sourcePeer: true, destinationPeer: true, }, }); + const flows = mirrors.map((mirror) => { + let newMirror: any = { + ...mirror, + sourcePeer: getTruePeer(mirror.sourcePeer), + destinationPeer: getTruePeer(mirror.destinationPeer), + }; + return newMirror; + }); + let cdcFlows = flows.filter((flow) => { return !flow.query_string; }); - return ( - <> - -
-
- - - - - - - ), - right: , - }} - header={ - - Name - Source - Destination - Start Time - - - } - > - {cdcFlows.map((flow) => ( - - - - - -
- -
-
- -
- -
-
- - - - - - -
- ))} -
-
- - ); -} + let qrepFlows = flows.filter((flow) => { + if (flow.config_proto && flow.query_string) { + let config = QRepConfig.decode(flow.config_proto); + const watermarkCol = config.watermarkColumn.toLowerCase(); + return watermarkCol !== 'xmin' && watermarkCol !== 'ctid'; + } + return false; + }); -// query replication flows table like CDC flows table -async function QRepFlows() { - const flows = await prisma.flows.findMany({ - include: { - sourcePeer: true, - destinationPeer: true, - }, + let xminFlows = flows.filter((flow) => { + if (flow.config_proto && flow.query_string) { + let config = QRepConfig.decode(flow.config_proto); + return config.watermarkColumn.toLowerCase() === 'xmin'; + } + return false; }); - let qrepFlows = flows.filter((flow) => { - return flow.query_string; + let snapshotFlows = flows.filter((flow) => { + if (flow.config_proto && flow.query_string) { + let config = QRepConfig.decode(flow.config_proto); + return config.watermarkColumn.toLowerCase() === 'ctid'; + } + return false; }); - return ( - <> - -
- - - - - - - - ), - right: , - }} - header={ - - Name - Source - Destination - Start Time - - - } - > - {qrepFlows.map((flow) => ( - - - - - -
- -
-
- -
- -
-
- - - - - - -
- ))} -
-
- - ); -} + stringifyConfig(cdcFlows); + stringifyConfig(qrepFlows); + stringifyConfig(xminFlows); + stringifyConfig(snapshotFlows); -export default async function Mirrors() { return (
- New mirror + } > @@ -219,10 +97,16 @@ export default async function Mirrors() {
- + + + + + + + - +
); diff --git a/ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx b/ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx index e90038d45c..9d0651757d 100644 --- a/ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx +++ b/ui/app/mirrors/status/qrep/[mirrorId]/qrepStatusTable.tsx @@ -1,11 +1,11 @@ 'use client'; +import SearchBar from '@/components/Search'; +import TimeLabel from '@/components/TimeComponent'; import { Button } from '@/lib/Button'; -import { Checkbox } from '@/lib/Checkbox'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle'; -import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; import moment from 'moment'; import { useState } from 'react'; @@ -23,7 +23,7 @@ function TimeOrProgressBar({ time }: { time: Date | null }) { if (time === null) { return ; } else { - return ; + return ; } } @@ -44,9 +44,6 @@ function RowPerPartition({ return ( - - - @@ -57,7 +54,7 @@ function RowPerPartition({ - +
), - right: , + right: ( + + visiblePartitions.filter((partition: QRepPartitionStatus) => { + return partition.partitionId + .toLowerCase() + .includes(query.toLowerCase()); + }) + } + /> + ), }} header={ - - - - Partition UUID - Run UUID - Duration - Start Time - End Time - Num Rows Synced + {[ + 'Partition UUID', + 'Run UUID', + 'Duration', + 'Start Time', + 'End Time', + 'Num Rows Synced', + ].map((heading, index) => ( + + + + ))} } > - {visiblePartitions.map((partition, index) => ( + {displayedPartitions.map((partition, index) => ( ))} diff --git a/ui/app/mirrors/tables.tsx b/ui/app/mirrors/tables.tsx new file mode 100644 index 0000000000..b23866fad7 --- /dev/null +++ b/ui/app/mirrors/tables.tsx @@ -0,0 +1,188 @@ +'use client'; +import { DropDialog } from '@/components/DropDialog'; +import PeerButton from '@/components/PeerComponent'; +import SearchBar from '@/components/Search'; +import TimeLabel from '@/components/TimeComponent'; +import { Label } from '@/lib/Label'; +import { Table, TableCell, TableRow } from '@/lib/Table'; +import Link from 'next/link'; +import { useState } from 'react'; + +export function CDCFlows({ cdcFlows }: { cdcFlows: any }) { + const [mirrors, setMirrors] = useState(cdcFlows); + + return ( + <> + +
+ , + right: ( + + cdcFlows.filter((flow: any) => { + return flow.name + .toLowerCase() + .includes(query.toLowerCase()); + }) + } + /> + ), + }} + header={ + + {['Name', 'Source', 'Destination', 'Start Time', ''].map( + (heading, index) => ( + + + + ) + )} + + } + > + {mirrors.map((flow: any) => ( + + + + + + + + + + + + + + + + + + ))} +
+
+ + ); +} + +// query replication flows table like CDC flows table +export function QRepFlows({ + qrepFlows, + title, +}: { + qrepFlows: any; + title: string; +}) { + const [mirrors, setMirrors] = useState(qrepFlows); + + return ( + <> + +
+ , + right: ( + + qrepFlows.filter((flow: any) => { + return flow.name + .toLowerCase() + .includes(query.toLowerCase()); + }) + } + /> + ), + }} + header={ + + {['Name', 'Source', 'Destination', 'Start Time', ''].map( + (heading, index) => ( + + + + ) + )} + + } + > + {mirrors.map((flow: any) => ( + + + + + + + + + + + + + + + + + + ))} +
+
+ + ); +} diff --git a/ui/app/peers/[peerName]/datatables.tsx b/ui/app/peers/[peerName]/datatables.tsx index a9f5d9d46d..e06a3cec17 100644 --- a/ui/app/peers/[peerName]/datatables.tsx +++ b/ui/app/peers/[peerName]/datatables.tsx @@ -1,4 +1,5 @@ import { CopyButton } from '@/components/CopyButton'; +import TimeLabel from '@/components/TimeComponent'; import { SlotInfo, StatInfo } from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { Table, TableCell, TableRow } from '@/lib/Table'; @@ -7,15 +8,18 @@ import { DurationDisplay, SlotNameDisplay } from './helpers'; export const SlotTable = ({ data }: { data: SlotInfo[] }) => { return (
-