Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic ui for CDC mirrors #511

Merged
merged 5 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,10 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
if len(partitions) > 0 {
err = a.CatalogMirrorMonitor.InitializeQRepRun(
ctx,
config.FlowJobName,
config,
runUUID,
startTime,
partitions,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -488,10 +489,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
var wg sync.WaitGroup
var numRecords int64

err = a.CatalogMirrorMonitor.AddPartitionToQRepRun(ctx, config.FlowJobName, runUUID, partition)
if err != nil {
return err
}
var goroutineErr error = nil
if config.SourcePeer.Type == protos.DBType_POSTGRES {
stream = model.NewQRecordStream(bufferSize)
Expand Down
39 changes: 29 additions & 10 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,41 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
return &config, nil
}

func (h *FlowRequestHandler) getQRepConfigFromCatalog(
flowJobName string,
) *protos.QRepConfig {
func (h *FlowRequestHandler) getQRepConfigFromCatalog(flowJobName string) *protos.QRepConfig {
var configBytes []byte
var err error
var config protos.QRepConfig

err = h.pool.QueryRow(context.Background(),
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
logrus.Warnf("unable to query qrep config from catalog: %s", err.Error())
queryInfos := []struct {
Query string
Warning string
}{
{
Query: "SELECT config_proto FROM flows WHERE name = $1",
Warning: "unable to query qrep config from catalog",
},
{
Query: "SELECT config_proto FROM peerdb_stats.qrep_runs WHERE flow_name = $1",
Warning: "unable to query qrep config from qrep_runs",
},
}

// Iterate over queries and attempt to fetch the config
for _, qInfo := range queryInfos {
err := h.pool.QueryRow(context.Background(), qInfo.Query, flowJobName).Scan(&configBytes)
if err == nil {
break
}
logrus.Warnf("%s - %s: %s", qInfo.Warning, flowJobName, err.Error())
}

// If no config was fetched, return nil
if len(configBytes) == 0 {
return nil
}

err = proto.Unmarshal(configBytes, &config)
if err != nil {
// Try unmarshaling
if err := proto.Unmarshal(configBytes, &config); err != nil {
logrus.Warnf("failed to unmarshal config for %s: %s", flowJobName, err.Error())
return nil
}

Expand Down
31 changes: 28 additions & 3 deletions flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type CatalogMirrorMonitor struct {
Expand Down Expand Up @@ -152,19 +153,43 @@ func (c *CatalogMirrorMonitor) AddCDCBatchTablesForFlow(ctx context.Context, flo
return nil
}

func (c *CatalogMirrorMonitor) InitializeQRepRun(ctx context.Context, flowJobName string, runUUID string,
startTime time.Time) error {
func (c *CatalogMirrorMonitor) InitializeQRepRun(
ctx context.Context,
config *protos.QRepConfig,
runUUID string,
startTime time.Time,
partitions []*protos.QRepPartition,
) error {
if c == nil || c.catalogConn == nil {
return nil
}

flowJobName := config.GetFlowJobName()
_, err := c.catalogConn.Exec(ctx,
"INSERT INTO peerdb_stats.qrep_runs(flow_name,run_uuid,start_time) VALUES($1,$2,$3) ON CONFLICT DO NOTHING",
flowJobName, runUUID, startTime)
if err != nil {
return fmt.Errorf("error while inserting qrep run in qrep_runs: %w", err)
}

cfgBytes, err := proto.Marshal(config)
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}

_, err = c.catalogConn.Exec(ctx,
"UPDATE peerdb_stats.qrep_runs SET config_proto = $1 WHERE flow_name = $2",
cfgBytes, flowJobName)
if err != nil {
return fmt.Errorf("unable to update flow config in catalog: %w", err)
}

for _, partition := range partitions {
if err := c.addPartitionToQRepRun(ctx, flowJobName, runUUID, partition); err != nil {
return fmt.Errorf("unable to add partition to qrep run: %w", err)
}
}

return nil
}

Expand All @@ -183,7 +208,7 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU
return nil
}

func (c *CatalogMirrorMonitor) AddPartitionToQRepRun(ctx context.Context, flowJobName string,
func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJobName string,
runUUID string, partition *protos.QRepPartition) error {
if c == nil || c.catalogConn == nil {
return nil
Expand Down
2 changes: 2 additions & 0 deletions nexus/catalog/migrations/V8__qrep_runs_config.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE peerdb_stats.qrep_runs
ADD COLUMN config_proto BYTEA;
14 changes: 14 additions & 0 deletions ui/app/api/mirrors/status/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

function getMirrorStatusUrl(mirrorId: string) {
let base = GetFlowHttpAddressFromEnv();
return `${base}/v1/mirrors/${mirrorId}`;
}

export async function POST(request: Request) {
const { flowJobName } = await request.json();
const url = getMirrorStatusUrl(flowJobName);
const resp = await fetch(url);
const json = await resp.json();
return new Response(JSON.stringify(json));
}
8 changes: 6 additions & 2 deletions ui/app/mirrors/create/config.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ export default function MirrorConfig(props: MirrorConfigProps) {
}}
>
<Switch
onCheckedChange={(state) => handleChange(state, setting)}
onCheckedChange={(state: boolean) =>
handleChange(state, setting)
}
/>
{setting.tips && (
<InfoPopover
Expand Down Expand Up @@ -143,7 +145,9 @@ export default function MirrorConfig(props: MirrorConfigProps) {
variant='simple'
type={setting.type}
defaultValue={setting.default}
onChange={(e) => handleChange(e.target.value, setting)}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
handleChange(e.target.value, setting)
}
/>
{setting.tips && (
<InfoPopover
Expand Down
4 changes: 3 additions & 1 deletion ui/app/mirrors/create/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ export default function CreateMirrors() {
<TextField
variant='simple'
value={mirrorName}
onChange={(e) => setMirrorName(e.target.value)}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setMirrorName(e.target.value)
}
/>
}
/>
Expand Down
4 changes: 2 additions & 2 deletions ui/app/mirrors/create/tablemapping.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const TableMapping = ({ rows, setRows }: TableMappingProps) => {
<TextField
variant='simple'
value={row.source}
onChange={(e) =>
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
handleTableChange(index, 'source', e.target.value)
}
/>
Expand All @@ -61,7 +61,7 @@ const TableMapping = ({ rows, setRows }: TableMappingProps) => {
<TextField
variant='simple'
value={row.destination}
onChange={(e) =>
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
handleTableChange(index, 'destination', e.target.value)
}
/>
Expand Down
181 changes: 181 additions & 0 deletions ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
'use client';

import {
CDCMirrorStatus,
QRepMirrorStatus,
SnapshotStatus,
} from '@/grpc_generated/route';
import { Badge } from '@/lib/Badge';
import { Button } from '@/lib/Button';
import { Checkbox } from '@/lib/Checkbox';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressBar } from '@/lib/ProgressBar';
import { SearchField } from '@/lib/SearchField';
import { Table, TableCell, TableRow } from '@/lib/Table';
import moment, { Duration, Moment } from 'moment';

const Badges = [
<Badge variant='positive' key={1}>
<Icon name='play_circle' />
Active
</Badge>,
<Badge variant='warning' key={1}>
<Icon name='pause_circle' />
Paused
</Badge>,
<Badge variant='destructive' key={1}>
<Icon name='dangerous' />
Broken
</Badge>,
<Badge variant='normal' key={1}>
<Icon name='pending' />
Incomplete
</Badge>,
];

class TableCloneSummary {
tableName: string;
totalNumPartitions: number;
totalNumRows: number;
completedNumPartitions: number;
completedNumRows: number;
avgTimePerPartition: Duration | null;
cloneStartTime: Moment | null;

constructor(clone: QRepMirrorStatus) {
this.tableName = clone.config?.watermarkTable || '';
this.totalNumPartitions = 0;
this.totalNumRows = 0;
this.completedNumPartitions = 0;
this.completedNumRows = 0;
this.avgTimePerPartition = null;
this.cloneStartTime = null;

this.calculate(clone);
}

private calculate(clone: QRepMirrorStatus): void {
let totalTime = moment.duration(0);
clone.partitions?.forEach((partition) => {
this.totalNumPartitions++;
this.totalNumRows += partition.numRows;

if (partition.startTime) {
let st = moment(partition.startTime);
if (!this.cloneStartTime || st.isBefore(this.cloneStartTime)) {
this.cloneStartTime = st;
}
}

if (partition.endTime) {
this.completedNumPartitions++;
this.completedNumRows += partition.numRows;
let st = moment(partition.startTime);
let et = moment(partition.endTime);
let duration = moment.duration(et.diff(st));
totalTime = totalTime.add(duration);
}
});

if (this.completedNumPartitions > 0) {
this.avgTimePerPartition = moment.duration(
totalTime.asMilliseconds() / this.completedNumPartitions
);
}
}

getRowProgressPercentage(): number {
if (this.totalNumRows === 0) {
return 0;
}
return (this.completedNumRows / this.totalNumRows) * 100;
}

getPartitionProgressPercentage(): number {
if (this.totalNumPartitions === 0) {
return 0;
}
return (this.completedNumPartitions / this.totalNumPartitions) * 100;
}
}

function summarizeTableClone(clone: QRepMirrorStatus): TableCloneSummary {
return new TableCloneSummary(clone);
}

type SnapshotStatusProps = {
status: SnapshotStatus;
};
const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => (
<Table
title={<Label variant='headline'>Initial Snapshot</Label>}
toolbar={{
left: (
<>
<Button variant='normalBorderless'>
<Icon name='chevron_left' />
</Button>
<Button variant='normalBorderless'>
<Icon name='chevron_right' />
</Button>
<Button variant='normalBorderless'>
<Icon name='refresh' />
</Button>
<Button variant='normalBorderless'>
<Icon name='help' />
</Button>
<Button variant='normalBorderless' disabled>
<Icon name='download' />
</Button>
</>
),
right: <SearchField placeholder='Search' />,
}}
header={
<TableRow>
<TableCell as='th' variant='button'>
<Checkbox variant='mixed' defaultChecked />
</TableCell>
<TableCell as='th'>Table Identifier</TableCell>
<TableCell as='th'>Start Time</TableCell>
<TableCell as='th'>Progress Partitions</TableCell>
<TableCell as='th'>Num Rows Synced</TableCell>
<TableCell as='th'>Avg Time Per Partition</TableCell>
</TableRow>
}
>
{status.clones.map(summarizeTableClone).map((clone, index) => (
<TableRow key={index}>
<TableCell variant='button'>
<Checkbox />
</TableCell>
<TableCell>
<Label>{clone.tableName}</Label>
</TableCell>
<TableCell>
<Label>{clone.cloneStartTime?.format('YYYY-MM-DD HH:mm:ss')}</Label>
</TableCell>
<TableCell>
<ProgressBar progress={clone.getPartitionProgressPercentage()} />
{clone.completedNumPartitions} / {clone.totalNumPartitions}
</TableCell>
<TableCell>{clone.completedNumRows}</TableCell>
<TableCell>
<Label>{clone.avgTimePerPartition?.humanize() || 'N/A'}</Label>
</TableCell>
</TableRow>
))}
</Table>
);

type CDCMirrorStatusProps = {
cdc: CDCMirrorStatus;
};
export function CDCMirror({ cdc }: CDCMirrorStatusProps) {
let snapshot = <></>;
if (cdc.snapshotStatus) {
snapshot = <SnapshotStatusTable status={cdc.snapshotStatus} />;
}
return <>{snapshot}</>;
}
Loading