Skip to content

Commit

Permalink
Add basic ui for CDC mirrors (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Oct 14, 2023
1 parent d295566 commit a204e6a
Show file tree
Hide file tree
Showing 23 changed files with 2,731 additions and 2,530 deletions.
7 changes: 2 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,10 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
if len(partitions) > 0 {
err = a.CatalogMirrorMonitor.InitializeQRepRun(
ctx,
config.FlowJobName,
config,
runUUID,
startTime,
partitions,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -488,10 +489,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
var wg sync.WaitGroup
var numRecords int64

err = a.CatalogMirrorMonitor.AddPartitionToQRepRun(ctx, config.FlowJobName, runUUID, partition)
if err != nil {
return err
}
var goroutineErr error = nil
if config.SourcePeer.Type == protos.DBType_POSTGRES {
stream = model.NewQRecordStream(bufferSize)
Expand Down
39 changes: 29 additions & 10 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,41 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
return &config, nil
}

func (h *FlowRequestHandler) getQRepConfigFromCatalog(
flowJobName string,
) *protos.QRepConfig {
func (h *FlowRequestHandler) getQRepConfigFromCatalog(flowJobName string) *protos.QRepConfig {
var configBytes []byte
var err error
var config protos.QRepConfig

err = h.pool.QueryRow(context.Background(),
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
logrus.Warnf("unable to query qrep config from catalog: %s", err.Error())
queryInfos := []struct {
Query string
Warning string
}{
{
Query: "SELECT config_proto FROM flows WHERE name = $1",
Warning: "unable to query qrep config from catalog",
},
{
Query: "SELECT config_proto FROM peerdb_stats.qrep_runs WHERE flow_name = $1",
Warning: "unable to query qrep config from qrep_runs",
},
}

// Iterate over queries and attempt to fetch the config
for _, qInfo := range queryInfos {
err := h.pool.QueryRow(context.Background(), qInfo.Query, flowJobName).Scan(&configBytes)
if err == nil {
break
}
logrus.Warnf("%s - %s: %s", qInfo.Warning, flowJobName, err.Error())
}

// If no config was fetched, return nil
if len(configBytes) == 0 {
return nil
}

err = proto.Unmarshal(configBytes, &config)
if err != nil {
// Try unmarshaling
if err := proto.Unmarshal(configBytes, &config); err != nil {
logrus.Warnf("failed to unmarshal config for %s: %s", flowJobName, err.Error())
return nil
}

Expand Down
31 changes: 28 additions & 3 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type CatalogMirrorMonitor struct {
Expand Down Expand Up @@ -152,19 +153,43 @@ func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flo
return nil
}

func (c *CatalogMirrorMonitor) InitializeQRepRun(ctx context.Context, flowJobName string, runUUID string,
startTime time.Time) error {
func (c *CatalogMirrorMonitor) InitializeQRepRun(
ctx context.Context,
config *protos.QRepConfig,
runUUID string,
startTime time.Time,
partitions []*protos.QRepPartition,
) error {
if c == nil || c.catalogConn == nil {
return nil
}

flowJobName := config.GetFlowJobName()
_, err := c.catalogConn.Exec(ctx,
"INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid,start_time) VALUES($1,$2,$3) ON CONFLICT DO NOTHING",
flowJobName, runUUID, startTime)
if err != nil {
return fmt.Errorf("error while inserting qrep run in qrep_runs: %w", err)
}

cfgBytes, err := proto.Marshal(config)
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}

_, err = c.catalogConn.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET config_proto = $1 WHERE flow_name = $2",
cfgBytes, flowJobName)
if err != nil {
return fmt.Errorf("unable to update flow config in catalog: %w", err)
}

for _, partition := range partitions {
if err := c.addPartitionToQRepRun(ctx, flowJobName, runUUID, partition); err != nil {
return fmt.Errorf("unable to add partition to qrep run: %w", err)
}
}

return nil
}

Expand All @@ -183,7 +208,7 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU
return nil
}

func (c *CatalogMirrorMonitor) AddPartitionToQRepRun(ctx context.Context, flowJobName string,
func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJobName string,
runUUID string, partition *protos.QRepPartition) error {
if c == nil || c.catalogConn == nil {
return nil
Expand Down
2 changes: 2 additions & 0 deletions nexus/catalog/migrations/V8__qrep_runs_config.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE peerdb_stats.qrep_runs
ADD COLUMN config_proto BYTEA;
14 changes: 14 additions & 0 deletions ui/app/api/mirrors/status/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

function getMirrorStatusUrl(mirrorId: string) {
let base = GetFlowHttpAddressFromEnv();
return `${base}/v1/mirrors/${mirrorId}`;
}

export async function POST(request: Request) {
const { flowJobName } = await request.json();
const url = getMirrorStatusUrl(flowJobName);
const resp = await fetch(url);
const json = await resp.json();
return new Response(JSON.stringify(json));
}
8 changes: 6 additions & 2 deletions ui/app/mirrors/create/config.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ export default function MirrorConfig(props: MirrorConfigProps) {
}}
>
<Switch
onCheckedChange={(state) => handleChange(state, setting)}
onCheckedChange={(state: boolean) =>
handleChange(state, setting)
}
/>
{setting.tips && (
<InfoPopover
Expand Down Expand Up @@ -143,7 +145,9 @@ export default function MirrorConfig(props: MirrorConfigProps) {
variant='simple'
type={setting.type}
defaultValue={setting.default}
onChange={(e) => handleChange(e.target.value, setting)}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
handleChange(e.target.value, setting)
}
/>
{setting.tips && (
<InfoPopover
Expand Down
4 changes: 3 additions & 1 deletion ui/app/mirrors/create/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ export default function CreateMirrors() {
<TextField
variant='simple'
value={mirrorName}
onChange={(e) => setMirrorName(e.target.value)}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setMirrorName(e.target.value)
}
/>
}
/>
Expand Down
4 changes: 2 additions & 2 deletions ui/app/mirrors/create/tablemapping.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const TableMapping = ({ rows, setRows }: TableMappingProps) => {
<TextField
variant='simple'
value={row.source}
onChange={(e) =>
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
handleTableChange(index, 'source', e.target.value)
}
/>
Expand All @@ -61,7 +61,7 @@ const TableMapping = ({ rows, setRows }: TableMappingProps) => {
<TextField
variant='simple'
value={row.destination}
onChange={(e) =>
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
handleTableChange(index, 'destination', e.target.value)
}
/>
Expand Down
181 changes: 181 additions & 0 deletions ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
'use client';

import {
CDCMirrorStatus,
QRepMirrorStatus,
SnapshotStatus,
} from '@/grpc_generated/route';
import { Badge } from '@/lib/Badge';
import { Button } from '@/lib/Button';
import { Checkbox } from '@/lib/Checkbox';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressBar } from '@/lib/ProgressBar';
import { SearchField } from '@/lib/SearchField';
import { Table, TableCell, TableRow } from '@/lib/Table';
import moment, { Duration, Moment } from 'moment';

const Badges = [
<Badge variant='positive' key={1}>
<Icon name='play_circle' />
Active
</Badge>,
<Badge variant='warning' key={1}>
<Icon name='pause_circle' />
Paused
</Badge>,
<Badge variant='destructive' key={1}>
<Icon name='dangerous' />
Broken
</Badge>,
<Badge variant='normal' key={1}>
<Icon name='pending' />
Incomplete
</Badge>,
];

class TableCloneSummary {
tableName: string;
totalNumPartitions: number;
totalNumRows: number;
completedNumPartitions: number;
completedNumRows: number;
avgTimePerPartition: Duration | null;
cloneStartTime: Moment | null;

constructor(clone: QRepMirrorStatus) {
this.tableName = clone.config?.watermarkTable || '';
this.totalNumPartitions = 0;
this.totalNumRows = 0;
this.completedNumPartitions = 0;
this.completedNumRows = 0;
this.avgTimePerPartition = null;
this.cloneStartTime = null;

this.calculate(clone);
}

private calculate(clone: QRepMirrorStatus): void {
let totalTime = moment.duration(0);
clone.partitions?.forEach((partition) => {
this.totalNumPartitions++;
this.totalNumRows += partition.numRows;

if (partition.startTime) {
let st = moment(partition.startTime);
if (!this.cloneStartTime || st.isBefore(this.cloneStartTime)) {
this.cloneStartTime = st;
}
}

if (partition.endTime) {
this.completedNumPartitions++;
this.completedNumRows += partition.numRows;
let st = moment(partition.startTime);
let et = moment(partition.endTime);
let duration = moment.duration(et.diff(st));
totalTime = totalTime.add(duration);
}
});

if (this.completedNumPartitions > 0) {
this.avgTimePerPartition = moment.duration(
totalTime.asMilliseconds() / this.completedNumPartitions
);
}
}

getRowProgressPercentage(): number {
if (this.totalNumRows === 0) {
return 0;
}
return (this.completedNumRows / this.totalNumRows) * 100;
}

getPartitionProgressPercentage(): number {
if (this.totalNumPartitions === 0) {
return 0;
}
return (this.completedNumPartitions / this.totalNumPartitions) * 100;
}
}

function summarizeTableClone(clone: QRepMirrorStatus): TableCloneSummary {
return new TableCloneSummary(clone);
}

type SnapshotStatusProps = {
status: SnapshotStatus;
};
const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => (
<Table
title={<Label variant='headline'>Initial Snapshot</Label>}
toolbar={{
left: (
<>
<Button variant='normalBorderless'>
<Icon name='chevron_left' />
</Button>
<Button variant='normalBorderless'>
<Icon name='chevron_right' />
</Button>
<Button variant='normalBorderless'>
<Icon name='refresh' />
</Button>
<Button variant='normalBorderless'>
<Icon name='help' />
</Button>
<Button variant='normalBorderless' disabled>
<Icon name='download' />
</Button>
</>
),
right: <SearchField placeholder='Search' />,
}}
header={
<TableRow>
<TableCell as='th' variant='button'>
<Checkbox variant='mixed' defaultChecked />
</TableCell>
<TableCell as='th'>Table Identifier</TableCell>
<TableCell as='th'>Start Time</TableCell>
<TableCell as='th'>Progress Partitions</TableCell>
<TableCell as='th'>Num Rows Synced</TableCell>
<TableCell as='th'>Avg Time Per Partition</TableCell>
</TableRow>
}
>
{status.clones.map(summarizeTableClone).map((clone, index) => (
<TableRow key={index}>
<TableCell variant='button'>
<Checkbox />
</TableCell>
<TableCell>
<Label>{clone.tableName}</Label>
</TableCell>
<TableCell>
<Label>{clone.cloneStartTime?.format('YYYY-MM-DD HH:mm:ss')}</Label>
</TableCell>
<TableCell>
<ProgressBar progress={clone.getPartitionProgressPercentage()} />
{clone.completedNumPartitions} / {clone.totalNumPartitions}
</TableCell>
<TableCell>{clone.completedNumRows}</TableCell>
<TableCell>
<Label>{clone.avgTimePerPartition?.humanize() || 'N/A'}</Label>
</TableCell>
</TableRow>
))}
</Table>
);

type CDCMirrorStatusProps = {
cdc: CDCMirrorStatus;
};
export function CDCMirror({ cdc }: CDCMirrorStatusProps) {
let snapshot = <></>;
if (cdc.snapshotStatus) {
snapshot = <SnapshotStatusTable status={cdc.snapshotStatus} />;
}
return <>{snapshot}</>;
}
Loading

0 comments on commit a204e6a

Please sign in to comment.