Skip to content

Commit

Permalink
drop mirror and more ui
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 31, 2023
1 parent 243ff7b commit 3b1f4d6
Show file tree
Hide file tree
Showing 12 changed files with 337 additions and 86 deletions.
39 changes: 20 additions & 19 deletions flow/generated/protos/route.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 85 additions & 0 deletions flow/generated/protos/route.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ service FlowService {
rpc GetStatInfo(PostgresPeerActivityInfoRequest) returns (PeerStatResponse) {
option (google.api.http) = { get: "/v1/peers/stats/{peer_name}" };
}
rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) {}
rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) {
option (google.api.http) = { post: "/v1/mirrors/drop", body: "*" };
}
rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) {
option (google.api.http) = { get: "/v1/mirrors/{flow_job_name}" };
}
Expand Down
30 changes: 30 additions & 0 deletions ui/app/api/mirrors/drop/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { UDropMirrorResponse } from '@/app/dto/MirrorsDTO';
import { ShutdownRequest, ShutdownResponse } from '@/grpc_generated/route';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { workflowId, flowJobName, sourcePeer, destinationPeer } = body;
const flowServiceAddr = GetFlowHttpAddressFromEnv();
const req: ShutdownRequest = {
workflowId,
flowJobName,
sourcePeer,
destinationPeer,
};
console.log('/drop/mirror: req:', req);
const dropStatus: ShutdownResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/drop`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UDropMirrorResponse = {
dropped: dropStatus.ok,
};

return new Response(JSON.stringify(response));
}
99 changes: 51 additions & 48 deletions ui/app/api/peers/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
CatalogPeer,
PeerConfig,
UCreatePeerResponse,
UValidatePeerResponse,
Expand Down Expand Up @@ -94,56 +95,58 @@ export async function POST(request: Request) {
}
}

export const getTruePeer = (peer: CatalogPeer) => {
const newPeer: Peer = {
name: peer.name,
type: peer.type,
};
const options = peer.options;
let config:
| BigqueryConfig
| SnowflakeConfig
| PostgresConfig
| EventHubConfig
| S3Config
| SqlServerConfig
| EventHubGroupConfig;
switch (peer.type) {
case 0:
config = BigqueryConfig.decode(options);
newPeer.bigqueryConfig = config;
break;
case 1:
config = SnowflakeConfig.decode(options);
newPeer.snowflakeConfig = config;
break;
case 3:
config = PostgresConfig.decode(options);
newPeer.postgresConfig = config;
break;
case 4:
config = EventHubConfig.decode(options);
newPeer.eventhubConfig = config;
break;
case 5:
config = S3Config.decode(options);
newPeer.s3Config = config;
break;
case 6:
config = SqlServerConfig.decode(options);
newPeer.sqlserverConfig = config;
break;
case 7:
config = EventHubGroupConfig.decode(options);
newPeer.eventhubGroupConfig = config;
break;
default:
return newPeer;
}
return newPeer;
};

// GET all the peers from the database
export async function GET(request: Request) {
const peers = await prisma.peers.findMany();
const truePeers: Peer[] = peers.map((peer) => {
const newPeer: Peer = {
name: peer.name,
type: peer.type,
};
const options = peer.options;
let config:
| BigqueryConfig
| SnowflakeConfig
| PostgresConfig
| EventHubConfig
| S3Config
| SqlServerConfig
| EventHubGroupConfig;
switch (peer.type) {
case 0:
config = BigqueryConfig.decode(options);
newPeer.bigqueryConfig = config;
break;
case 1:
config = SnowflakeConfig.decode(options);
newPeer.snowflakeConfig = config;
break;
case 3:
config = PostgresConfig.decode(options);
newPeer.postgresConfig = config;
break;
case 4:
config = EventHubConfig.decode(options);
newPeer.eventhubConfig = config;
break;
case 5:
config = S3Config.decode(options);
newPeer.s3Config = config;
break;
case 6:
config = SqlServerConfig.decode(options);
newPeer.sqlserverConfig = config;
break;
case 7:
config = EventHubGroupConfig.decode(options);
newPeer.eventhubGroupConfig = config;
break;
default:
return newPeer;
}
return newPeer;
});
const truePeers: Peer[] = peers.map((peer) => getTruePeer(peer));
return new Response(JSON.stringify(truePeers));
}
4 changes: 4 additions & 0 deletions ui/app/dto/MirrorsDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ export type UCreateMirrorResponse = {
created: boolean;
};

export type UDropMirrorResponse = {
dropped: boolean;
};

export type CDCConfig = FlowConnectionConfigs;
export type MirrorConfig = CDCConfig | QRepConfig;
export type MirrorSetter = Dispatch<SetStateAction<CDCConfig | QRepConfig>>;
Expand Down
6 changes: 6 additions & 0 deletions ui/app/dto/PeersDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ export type UColumnsResponse = {
};

export type PeerConfig = PostgresConfig | SnowflakeConfig;
export type CatalogPeer = {
id: number;
name: string;
type: number;
options: Buffer;
};
1 change: 1 addition & 0 deletions ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ export function CDCMirror({ cdc, syncStatusChild }: CDCMirrorStatusProps) {
className='flex flex-col w-full'
defaultValue={selectedTab}
onValueChange={setSelectedTab}
style={{ marginTop: '2rem' }}
>
<Tabs.List className='flex border-b' aria-label='Details'>
<Trigger
Expand Down
Loading

0 comments on commit 3b1f4d6

Please sign in to comment.