From 62917de9c0f6e8a53f64999bdef929958e001462 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 27 Oct 2023 15:12:06 -0700 Subject: [PATCH] defaults for sync mode --- flow/cmd/peer_data.go | 48 +++++++++++++++++----- ui/app/api/mirrors/cdc/route.ts | 1 + ui/app/mirrors/create/cdc.tsx | 57 ++++++++++++++++++++++++-- ui/app/peers/[peerName]/datatables.tsx | 8 ++-- ui/app/peers/[peerName]/helpers.tsx | 1 + 5 files changed, 97 insertions(+), 18 deletions(-) diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 9746d8da96..01c461deef 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -84,7 +84,8 @@ func (h *FlowRequestHandler) GetStatInfo( defer peerPool.Close() rows, err := peerPool.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+ "EXTRACT(epoch FROM(now()-query_start)) AS dur"+ - " FROM pg_stat_activity WHERE usename=$1 AND state != 'idle' AND query_start IS NOT NULL;", peerUser) + " FROM pg_stat_activity WHERE "+ + "usename=$1 AND state != 'idle';", peerUser) if err != nil { return &protos.PeerStatResponse{StatData: nil}, err } @@ -92,24 +93,49 @@ func (h *FlowRequestHandler) GetStatInfo( var statInfoRows []*protos.StatInfo for rows.Next() { var pid int64 - var waitEvent string - var waitEventType string - var queryStart string - var query string - var duration float32 + var waitEvent sql.NullString + var waitEventType sql.NullString + var queryStart sql.NullString + var query sql.NullString + var duration sql.NullFloat64 err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration) if err != nil { return &protos.PeerStatResponse{StatData: nil}, err } + we := waitEvent.String + if !waitEvent.Valid { + we = "" + } + + wet := waitEventType.String + if !waitEventType.Valid { + wet = "" + } + + q := query.String + if !query.Valid { + q = "" + } + + qs := queryStart.String + if !queryStart.Valid { + qs = "" + } + + d := duration.Float64 + if !duration.Valid { + d = -1 + } + statInfoRows = append(statInfoRows, &protos.StatInfo{ Pid: pid, - WaitEvent: waitEvent, - WaitEventType: waitEventType, - QueryStart: queryStart, - Query: query, - Duration: duration, + WaitEvent: we, + WaitEventType: wet, + QueryStart: qs, + Query: q, + Duration: float32(d), }) } return &protos.PeerStatResponse{ diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts index 307faf45ba..025dfc149a 100644 --- a/ui/app/api/mirrors/cdc/route.ts +++ b/ui/app/api/mirrors/cdc/route.ts @@ -8,6 +8,7 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); const { config } = body; + console.log('/mirrors/cdc config: ', config); const flowServiceAddr = GetFlowHttpAddressFromEnv(); const req: CreateCDCFlowRequest = { connectionConfigs: config, diff --git a/ui/app/mirrors/create/cdc.tsx b/ui/app/mirrors/create/cdc.tsx index d21dacdf7b..acba1c03d7 100644 --- a/ui/app/mirrors/create/cdc.tsx +++ b/ui/app/mirrors/create/cdc.tsx @@ -1,7 +1,7 @@ 'use client'; import { RequiredIndicator } from '@/components/RequiredIndicator'; import { QRepSyncMode } from '@/grpc_generated/flow'; -import { Peer } from '@/grpc_generated/peers'; +import { DBType, Peer } from '@/grpc_generated/peers'; import { Label } from '@/lib/Label'; import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; import { Select, SelectItem } from '@/lib/Select'; @@ -18,13 +18,55 @@ interface MirrorConfigProps { } export default function CDCConfigForm(props: MirrorConfigProps) { + const defaultSyncMode = ( + dtype: DBType | undefined, + setting: MirrorSetting + ) => { + switch (dtype) { + case DBType.POSTGRES: + return 'Copy with Binary'; + case DBType.SNOWFLAKE: + return 'AVRO'; + default: + return 'Copy with Binary'; + } + }; + + const setToDefault = (setting: MirrorSetting) => { + const destinationPeerType = props.mirrorConfig.destination?.type; + return ( + setting.label.includes('Sync') && + (destinationPeerType === DBType.POSTGRES || + destinationPeerType === DBType.SNOWFLAKE) + ); + }; + const handleChange = (val: string | boolean, setting: MirrorSetting) => { let stateVal: string | boolean | Peer | QRepSyncMode = val; if (setting.label.includes('Peer')) { stateVal = props.peers.find((peer) => peer.name === val)!; + if (setting.label === 'Destination Peer') { + if (stateVal.type === DBType.POSTGRES) { + props.setter((curr) => { + return { + ...curr, + cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + }; + }); + } else if (stateVal.type === DBType.SNOWFLAKE) { + props.setter((curr) => { + return { + ...curr, + cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + }; + }); + } + } } else if (setting.label.includes('Sync Mode')) { stateVal = - val === 'avro' + val === 'AVRO' ? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO : QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT; } @@ -98,10 +140,19 @@ export default function CDCConfigForm(props: MirrorConfigProps) { setting.label.includes('Peer') ? 'a peer' : 'a sync mode' }`} onValueChange={(val) => handleChange(val, setting)} + disabled={setToDefault(setting)} + value={ + setToDefault(setting) + ? defaultSyncMode( + props.mirrorConfig.destination?.type, + setting + ) + : undefined + } > {(setting.label.includes('Peer') ? (props.peers ?? []).map((peer) => peer.name) - : ['avro', 'sql'] + : ['AVRO', 'Copy with Binary'] ).map((item, id) => { return ( diff --git a/ui/app/peers/[peerName]/datatables.tsx b/ui/app/peers/[peerName]/datatables.tsx index f1385aa3f1..12aa77715a 100644 --- a/ui/app/peers/[peerName]/datatables.tsx +++ b/ui/app/peers/[peerName]/datatables.tsx @@ -51,10 +51,10 @@ export const StatTable = ({ data }: { data: StatInfo[] }) => { header={ PID + Duration Wait Event Wait Event Type Query Start Time - Duration Query } @@ -65,9 +65,9 @@ export const StatTable = ({ data }: { data: StatInfo[] }) => { - {stat.waitEvent} - {stat.waitEventType} - {stat.queryStart} + {stat.waitEvent || 'N/A'} + {stat.waitEventType || 'N/A'} + {stat.queryStart || 'N/A'}
{ }; export const DurationDisplay = ({ duration }: { duration: number }) => { + if (duration < 0) return 'N/A'; return duration >= 3600 ? `${Math.floor(duration / 3600)} hour(s) ${Math.floor( (duration % 3600) / 60