Skip to content

Commit

Permalink
defaults for sync mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 27, 2023
1 parent 00d206a commit 62917de
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 18 deletions.
48 changes: 37 additions & 11 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,32 +84,58 @@ func (h *FlowRequestHandler) GetStatInfo(
defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
"EXTRACT(epoch FROM(now()-query_start)) AS dur"+
" FROM pg_stat_activity WHERE usename=$1 AND state != 'idle' AND query_start IS NOT NULL;", peerUser)
" FROM pg_stat_activity WHERE "+
"usename=$1 AND state != 'idle';", peerUser)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}
defer rows.Close()
var statInfoRows []*protos.StatInfo
for rows.Next() {
var pid int64
var waitEvent string
var waitEventType string
var queryStart string
var query string
var duration float32
var waitEvent sql.NullString
var waitEventType sql.NullString
var queryStart sql.NullString
var query sql.NullString
var duration sql.NullFloat64

err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}

we := waitEvent.String
if !waitEvent.Valid {
we = ""
}

wet := waitEventType.String
if !waitEventType.Valid {
wet = ""
}

q := query.String
if !query.Valid {
q = ""
}

qs := queryStart.String
if !queryStart.Valid {
qs = ""
}

d := duration.Float64
if !duration.Valid {
d = -1
}

statInfoRows = append(statInfoRows, &protos.StatInfo{
Pid: pid,
WaitEvent: waitEvent,
WaitEventType: waitEventType,
QueryStart: queryStart,
Query: query,
Duration: duration,
WaitEvent: we,
WaitEventType: wet,
QueryStart: qs,
Query: q,
Duration: float32(d),
})
}
return &protos.PeerStatResponse{
Expand Down
1 change: 1 addition & 0 deletions ui/app/api/mirrors/cdc/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http';
export async function POST(request: Request) {
const body = await request.json();
const { config } = body;
console.log('/mirrors/cdc config: ', config);
const flowServiceAddr = GetFlowHttpAddressFromEnv();
const req: CreateCDCFlowRequest = {
connectionConfigs: config,
Expand Down
57 changes: 54 additions & 3 deletions ui/app/mirrors/create/cdc.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use client';
import { RequiredIndicator } from '@/components/RequiredIndicator';
import { QRepSyncMode } from '@/grpc_generated/flow';
import { Peer } from '@/grpc_generated/peers';
import { DBType, Peer } from '@/grpc_generated/peers';
import { Label } from '@/lib/Label';
import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout';
import { Select, SelectItem } from '@/lib/Select';
Expand All @@ -18,13 +18,55 @@ interface MirrorConfigProps {
}

export default function CDCConfigForm(props: MirrorConfigProps) {
const defaultSyncMode = (
dtype: DBType | undefined,
setting: MirrorSetting
) => {
switch (dtype) {
case DBType.POSTGRES:
return 'Copy with Binary';
case DBType.SNOWFLAKE:
return 'AVRO';
default:
return 'Copy with Binary';
}
};

const setToDefault = (setting: MirrorSetting) => {
const destinationPeerType = props.mirrorConfig.destination?.type;
return (
setting.label.includes('Sync') &&
(destinationPeerType === DBType.POSTGRES ||
destinationPeerType === DBType.SNOWFLAKE)
);
};

const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean | Peer | QRepSyncMode = val;
if (setting.label.includes('Peer')) {
stateVal = props.peers.find((peer) => peer.name === val)!;
if (setting.label === 'Destination Peer') {
if (stateVal.type === DBType.POSTGRES) {
props.setter((curr) => {
return {
...curr,
cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
};
});
} else if (stateVal.type === DBType.SNOWFLAKE) {
props.setter((curr) => {
return {
...curr,
cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO,
snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO,
};
});
}
}
} else if (setting.label.includes('Sync Mode')) {
stateVal =
val === 'avro'
val === 'AVRO'
? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO
: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT;
}
Expand Down Expand Up @@ -98,10 +140,19 @@ export default function CDCConfigForm(props: MirrorConfigProps) {
setting.label.includes('Peer') ? 'a peer' : 'a sync mode'
}`}
onValueChange={(val) => handleChange(val, setting)}
disabled={setToDefault(setting)}
value={
setToDefault(setting)
? defaultSyncMode(
props.mirrorConfig.destination?.type,
setting
)
: undefined
}
>
{(setting.label.includes('Peer')
? (props.peers ?? []).map((peer) => peer.name)
: ['avro', 'sql']
: ['AVRO', 'Copy with Binary']
).map((item, id) => {
return (
<SelectItem key={id} value={item.toString()}>
Expand Down
8 changes: 4 additions & 4 deletions ui/app/peers/[peerName]/datatables.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ export const StatTable = ({ data }: { data: StatInfo[] }) => {
header={
<TableRow>
<TableCell as='th'>PID</TableCell>
<TableCell as='th'>Duration</TableCell>
<TableCell as='th'>Wait Event</TableCell>
<TableCell as='th'>Wait Event Type</TableCell>
<TableCell as='th'>Query Start Time</TableCell>
<TableCell as='th'>Duration</TableCell>
<TableCell as='th'>Query</TableCell>
</TableRow>
}
Expand All @@ -65,9 +65,9 @@ export const StatTable = ({ data }: { data: StatInfo[] }) => {
<TableCell>
<DurationDisplay duration={stat.duration} />
</TableCell>
<TableCell>{stat.waitEvent}</TableCell>
<TableCell>{stat.waitEventType}</TableCell>
<TableCell>{stat.queryStart}</TableCell>
<TableCell>{stat.waitEvent || 'N/A'}</TableCell>
<TableCell>{stat.waitEventType || 'N/A'}</TableCell>
<TableCell>{stat.queryStart || 'N/A'}</TableCell>
<TableCell>
<div
style={{
Expand Down
1 change: 1 addition & 0 deletions ui/app/peers/[peerName]/helpers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export const SlotNameDisplay = ({ slotName }: { slotName: string }) => {
};

export const DurationDisplay = ({ duration }: { duration: number }) => {
if (duration < 0) return 'N/A';
return duration >= 3600
? `${Math.floor(duration / 3600)} hour(s) ${Math.floor(
(duration % 3600) / 60
Expand Down

0 comments on commit 62917de

Please sign in to comment.