Skip to content

Commit

Permalink
Merge branch 'main' into schema-delta-audit-log
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 18, 2023
2 parents b13b420 + 2500b34 commit ccb0816
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 94 deletions.
33 changes: 33 additions & 0 deletions ui/app/api/mirrors/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { getTruePeer } from '@/app/api/peers/getTruePeer';
import prisma from '@/app/utils/prisma';

export const dynamic = 'force-dynamic';

const stringifyConfig = (flowArray: any[]) => {
flowArray.forEach((flow) => {
if (flow.config_proto) {
flow.config_proto = new TextDecoder().decode(flow.config_proto);
}
});
return flowArray;
};

export async function GET(request: Request) {
const mirrors = await prisma.flows.findMany({
distinct: 'name',
include: {
sourcePeer: true,
destinationPeer: true,
},
});

const flows = mirrors?.map((mirror) => {
let newMirror: any = {
...mirror,
sourcePeer: getTruePeer(mirror.sourcePeer),
destinationPeer: getTruePeer(mirror.destinationPeer),
};
return newMirror;
});
return new Response(JSON.stringify(stringifyConfig(flows)));
}
60 changes: 60 additions & 0 deletions ui/app/api/peers/getTruePeer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { CatalogPeer } from '@/app/dto/PeersDTO';
import {
BigqueryConfig,
EventHubConfig,
EventHubGroupConfig,
Peer,
PostgresConfig,
S3Config,
SnowflakeConfig,
SqlServerConfig,
} from '@/grpc_generated/peers';

export const getTruePeer = (peer: CatalogPeer) => {
const newPeer: Peer = {
name: peer.name,
type: peer.type,
};
const options = peer.options;
let config:
| BigqueryConfig
| SnowflakeConfig
| PostgresConfig
| EventHubConfig
| S3Config
| SqlServerConfig
| EventHubGroupConfig;
switch (peer.type) {
case 0:
config = BigqueryConfig.decode(options);
newPeer.bigqueryConfig = config;
break;
case 1:
config = SnowflakeConfig.decode(options);
newPeer.snowflakeConfig = config;
break;
case 3:
config = PostgresConfig.decode(options);
newPeer.postgresConfig = config;
break;
case 4:
config = EventHubConfig.decode(options);
newPeer.eventhubConfig = config;
break;
case 5:
config = S3Config.decode(options);
newPeer.s3Config = config;
break;
case 6:
config = SqlServerConfig.decode(options);
newPeer.sqlserverConfig = config;
break;
case 7:
config = EventHubGroupConfig.decode(options);
newPeer.eventhubGroupConfig = config;
break;
default:
return newPeer;
}
return newPeer;
};
55 changes: 3 additions & 52 deletions ui/app/api/peers/route.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { getTruePeer } from '@/app/api/peers/getTruePeer';
import {
CatalogPeer,
PeerConfig,
Expand All @@ -8,13 +9,10 @@ import prisma from '@/app/utils/prisma';
import {
BigqueryConfig,
DBType,
EventHubConfig,
EventHubGroupConfig,
Peer,
PostgresConfig,
S3Config,
SnowflakeConfig,
SqlServerConfig,
} from '@/grpc_generated/peers';
import {
CreatePeerRequest,
Expand Down Expand Up @@ -63,6 +61,8 @@ const constructPeer = (
}
};

export const dynamic = 'force-dynamic';

export async function POST(request: Request) {
const body = await request.json();
console.log('POST Validate Peer:', body);
Expand Down Expand Up @@ -117,55 +117,6 @@ export async function POST(request: Request) {
}
}

export const getTruePeer = (peer: CatalogPeer) => {
const newPeer: Peer = {
name: peer.name,
type: peer.type,
};
const options = peer.options;
let config:
| BigqueryConfig
| SnowflakeConfig
| PostgresConfig
| EventHubConfig
| S3Config
| SqlServerConfig
| EventHubGroupConfig;
switch (peer.type) {
case 0:
config = BigqueryConfig.decode(options);
newPeer.bigqueryConfig = config;
break;
case 1:
config = SnowflakeConfig.decode(options);
newPeer.snowflakeConfig = config;
break;
case 3:
config = PostgresConfig.decode(options);
newPeer.postgresConfig = config;
break;
case 4:
config = EventHubConfig.decode(options);
newPeer.eventhubConfig = config;
break;
case 5:
config = S3Config.decode(options);
newPeer.s3Config = config;
break;
case 6:
config = SqlServerConfig.decode(options);
newPeer.sqlserverConfig = config;
break;
case 7:
config = EventHubGroupConfig.decode(options);
newPeer.eventhubGroupConfig = config;
break;
default:
return newPeer;
}
return newPeer;
};

// GET all the peers from the database
export async function GET(request: Request) {
const peers = await prisma.peers.findMany();
Expand Down
81 changes: 39 additions & 42 deletions ui/app/mirrors/page.tsx
Original file line number Diff line number Diff line change
@@ -1,47 +1,35 @@
'use client';

import { QRepConfig } from '@/grpc_generated/flow';
import { Button } from '@/lib/Button';
import { Header } from '@/lib/Header';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { LayoutMain } from '@/lib/Layout';
import { Panel } from '@/lib/Panel';
import { ProgressCircle } from '@/lib/ProgressCircle';
import Link from 'next/link';
import { getTruePeer } from '../api/peers/route';
import prisma from '../utils/prisma';
import useSWR from 'swr';
import { CDCFlows, QRepFlows } from './tables';
export const dynamic = 'force-dynamic';

const stringifyConfig = (flowArray: any[]) => {
flowArray.forEach((flow) => {
if (flow.config_proto) {
flow.config_proto = new TextDecoder().decode(flow.config_proto);
}
});
};

export default async function Mirrors() {
let mirrors = await prisma.flows.findMany({
distinct: 'name',
include: {
sourcePeer: true,
destinationPeer: true,
},
});
const fetcher = (...args: [any]) => fetch(...args).then((res) => res.json());

const flows = mirrors.map((mirror) => {
let newMirror: any = {
...mirror,
sourcePeer: getTruePeer(mirror.sourcePeer),
destinationPeer: getTruePeer(mirror.destinationPeer),
};
return newMirror;
});
export default function Mirrors() {
const {
data: flows,
error,
isLoading,
}: { data: [any]; error: any; isLoading: boolean } = useSWR(
'/api/mirrors',
fetcher
);

let cdcFlows = flows.filter((flow) => {
let cdcFlows = flows?.filter((flow) => {
return !flow.query_string;
});

let qrepFlows = flows.filter((flow) => {
let qrepFlows = flows?.filter((flow) => {
if (flow.config_proto && flow.query_string) {
let config = QRepConfig.decode(flow.config_proto);
const watermarkCol = config.watermarkColumn.toLowerCase();
Expand All @@ -50,18 +38,14 @@ export default async function Mirrors() {
return false;
});

let xminFlows = flows.filter((flow) => {
let xminFlows = flows?.filter((flow) => {
if (flow.config_proto && flow.query_string) {
let config = QRepConfig.decode(flow.config_proto);
return config.watermarkColumn.toLowerCase() === 'xmin';
}
return false;
});

stringifyConfig(cdcFlows);
stringifyConfig(qrepFlows);
stringifyConfig(xminFlows);

return (
<LayoutMain alignSelf='flex-start' justifySelf='flex-start' width='full'>
<Panel>
Expand All @@ -84,15 +68,28 @@ export default async function Mirrors() {
Mirrors
</Header>
</Panel>
<Panel>
<CDCFlows cdcFlows={cdcFlows} />
</Panel>
<Panel className='mt-10'>
<QRepFlows title='Query Replication' qrepFlows={qrepFlows} />
</Panel>
<Panel className='mt-10'>
<QRepFlows title='XMIN Mirrors' qrepFlows={xminFlows} />
</Panel>
{isLoading && (
<Panel>
<div className='h-screen flex items-center justify-center'>
<ProgressCircle variant='determinate_progress_circle' />
</div>
</Panel>
)}
{!isLoading && (
<Panel>
<CDCFlows cdcFlows={cdcFlows} />
</Panel>
)}
{!isLoading && (
<Panel className='mt-10'>
<QRepFlows title='Query Replication' qrepFlows={qrepFlows} />
</Panel>
)}
{!isLoading && (
<Panel className='mt-10'>
<QRepFlows title='XMIN Mirrors' qrepFlows={xminFlows} />
</Panel>
)}
</LayoutMain>
);
}

0 comments on commit ccb0816

Please sign in to comment.