Skip to content

Commit

Permalink
ui: dropdown for table select
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 27, 2024
1 parent 8864f94 commit 45cf16b
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 37 deletions.
90 changes: 70 additions & 20 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"google.golang.org/protobuf/proto"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

Expand All @@ -31,6 +32,23 @@ func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName strin
return &pgPeerConfig, nil
}

func (h *FlowRequestHandler) getSFPeerConfig(ctx context.Context, peerName string) (*protos.SnowflakeConfig, error) {
var sfPeerOptions sql.RawBytes
var sfPeerConfig protos.SnowflakeConfig
err := h.pool.QueryRow(ctx,
"SELECT options FROM peers WHERE name = $1 AND type=1", peerName).Scan(&sfPeerOptions)
if err != nil {
return nil, err
}

unmarshalErr := proto.Unmarshal(sfPeerOptions, &sfPeerConfig)
if err != nil {
return nil, unmarshalErr
}

return &sfPeerConfig, nil
}

func (h *FlowRequestHandler) getConnForPGPeer(ctx context.Context, peerName string) (*connpostgres.SSHTunnel, *pgx.Conn, error) {
pgPeerConfig, err := h.getPGPeerConfig(ctx, peerName)
if err != nil {
Expand All @@ -52,6 +70,21 @@ func (h *FlowRequestHandler) getConnForPGPeer(ctx context.Context, peerName stri
return tunnel, conn, nil
}

func (h *FlowRequestHandler) getConnForSFPeer(ctx context.Context, peerName string) (*connsnowflake.SnowflakeConnector, error) {
sfPeerConfig, err := h.getSFPeerConfig(ctx, peerName)
if err != nil {
return nil, err
}

sfConn, err := connsnowflake.NewSnowflakeConnector(ctx, sfPeerConfig)
if err != nil {
slog.Error("Failed to create snowflake client", slog.Any("error", err))
return nil, err
}

return sfConn, nil
}

func (h *FlowRequestHandler) GetSchemas(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
Expand Down Expand Up @@ -138,33 +171,50 @@ func (h *FlowRequestHandler) GetAllTables(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.AllTablesResponse, error) {
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)
switch req.PeerType {
case protos.DBType_SNOWFLAKE:
sfConn, err := h.getConnForSFPeer(ctx, req.PeerName)
if err != nil {
slog.Error("Failed to get snowflake client", slog.Any("error", err))
return &protos.AllTablesResponse{Tables: nil}, err
}
defer sfConn.Close()
sfTables, err := sfConn.GetAllTables(ctx)
if err != nil {
slog.Error("Failed to get all Snowflake tables", slog.Any("error", err))
return &protos.AllTablesResponse{Tables: nil}, err
}
return &protos.AllTablesResponse{Tables: sfTables}, nil

rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+
"FROM pg_class c "+
"JOIN pg_namespace n ON c.relnamespace = n.oid "+
"WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
default:
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

defer rows.Close()
var tables []string
for rows.Next() {
var table pgtype.Text
err := rows.Scan(&table)
rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+
"FROM pg_class c "+
"JOIN pg_namespace n ON c.relnamespace = n.oid "+
"WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}

tables = append(tables, table.String)
defer rows.Close()
var tables []string
for rows.Next() {
var table pgtype.Text
err := rows.Scan(&table)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}

tables = append(tables, table.String)
}
return &protos.AllTablesResponse{Tables: tables}, nil
}
return &protos.AllTablesResponse{Tables: tables}, nil
}

func (h *FlowRequestHandler) GetColumns(
Expand Down
27 changes: 27 additions & 0 deletions flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,33 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string
return totalRecords, nil
}

func (c *SnowflakeConnector) GetAllTables(ctx context.Context) ([]string, error) {
// return all tables in database in schema.table form
// get it from information schema columns
rows, err := c.database.QueryContext(ctx, `
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_type = 'BASE TABLE';`)
if err != nil {
return nil, fmt.Errorf("failed to get tables from Snowflake: %w", err)
}
defer rows.Close()
if rows.Err() != nil {
return nil, fmt.Errorf("failed to get tables from Snowflake: %w", rows.Err())
}
var tables []string
for rows.Next() {
var schema, table string
err := rows.Scan(&schema, &table)
if err != nil {
return nil, fmt.Errorf("failed to scan table from Snowflake: %w", err)
}
tables = append(tables, fmt.Sprintf(`%s.%s`, schema, table))
}

return tables, nil
}

func SnowflakeIdentifierNormalize(identifier string) string {
// https://www.alberton.info/dbms_identifiers_and_case_sensitivity.html
// Snowflake follows the SQL standard, but Postgres does the opposite.
Expand Down
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ message TableColumnsResponse {

message PostgresPeerActivityInfoRequest {
string peer_name = 1;
peerdb_peers.DBType peer_type = 2;
}

message SlotInfo {
Expand Down
4 changes: 2 additions & 2 deletions ui/app/api/peers/tables/all/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { peerName } = body;
const { peerName, peerType } = body;
const flowServiceAddr = GetFlowHttpAddressFromEnv();
try {
const tableList: AllTablesResponse = await fetch(
`${flowServiceAddr}/v1/peers/tables/all?peer_name=${peerName}`
`${flowServiceAddr}/v1/peers/tables/all?peer_name=${peerName}&peer_type=${peerType}`
).then((res) => {
return res.json();
});
Expand Down
3 changes: 2 additions & 1 deletion ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,13 @@ export const fetchColumns = async (
return columnsRes.columns;
};

export const fetchAllTables = async (peerName: string) => {
export const fetchAllTables = async (peerName: string, peerType?: DBType) => {
if (peerName?.length === 0) return [];
const tablesRes: UTablesAllResponse = await fetch('/api/peers/tables/all', {
method: 'POST',
body: JSON.stringify({
peerName,
peerType,
}),
cache: 'no-store',
}).then((res) => res.json());
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/create/helpers/qrep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export const snowflakeQRepSettings: MirrorSetting[] = [
...curr,
watermarkTable: (value as string) || '',
})),
type: 'text',
type: 'select',
tips: 'The source table of the replication and the table to which the watermark column belongs.',
required: true,
},
Expand Down
5 changes: 4 additions & 1 deletion ui/app/mirrors/create/qrep/qrep.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ export default function QRepConfigForm({
};

useEffect(() => {
fetchAllTables(mirrorConfig.sourcePeer?.name ?? '').then((tables) =>
fetchAllTables(
mirrorConfig.sourcePeer?.name ?? '',
mirrorConfig.sourcePeer?.type
).then((tables) =>
setSourceTables(tables?.map((table) => ({ value: table, label: table })))
);
}, [mirrorConfig.sourcePeer]);
Expand Down
58 changes: 46 additions & 12 deletions ui/app/mirrors/create/qrep/snowflakeQrep.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout';
import { Switch } from '@/lib/Switch';
import { TextField } from '@/lib/TextField';
import { Tooltip } from '@/lib/Tooltip';
import { useEffect } from 'react';
import { useEffect, useState } from 'react';
import ReactSelect from 'react-select';
import { InfoPopover } from '../../../../components/InfoPopover';
import { MirrorSetter } from '../../types';
import { fetchAllTables } from '../handlers';
import { MirrorSetting, blankSnowflakeQRepSetting } from '../helpers/common';
import { snowflakeQRepSettings } from '../helpers/qrep';
import QRepQuery from './query';
Expand All @@ -23,11 +24,10 @@ export default function SnowflakeQRepForm({
mirrorConfig,
setter,
}: SnowflakeQRepProps) {
const WriteModes = ['Overwrite'].map((value) => ({
label: value,
value,
}));

const [sourceTables, setSourceTables] = useState<
{ value: string; label: string }[]
>([]);
const [loading, setLoading] = useState(false);
const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean | QRepWriteType | string[] = val;
if (setting.label.includes('Write Type')) {
Expand All @@ -46,6 +46,14 @@ export default function SnowflakeQRepForm({
setting.stateHandler(stateVal, setter);
};

const handleSourceChange = (val: string, setting: MirrorSetting) => {
setter((curr) => ({
...curr,
destinationTableIdentifier: val.toLowerCase(),
}));
handleChange(val, setting);
};

const paramDisplayCondition = (setting: MirrorSetting) => {
const label = setting.label.toLowerCase();
if (
Expand All @@ -57,6 +65,17 @@ export default function SnowflakeQRepForm({
return true;
};

useEffect(() => {
setLoading(true);
fetchAllTables(
mirrorConfig.sourcePeer?.name ?? '',
mirrorConfig.sourcePeer?.type
).then((tables) => {
setSourceTables(tables?.map((table) => ({ value: table, label: table })));
setLoading(false);
});
}, [mirrorConfig.sourcePeer]);

useEffect(() => {
// set defaults
setter((curr) => ({ ...curr, ...blankSnowflakeQRepSetting }));
Expand Down Expand Up @@ -126,11 +145,22 @@ export default function SnowflakeQRepForm({
}}
>
<div style={{ width: '100%' }}>
<ReactSelect
isDisabled={true}
placeholder='Select a write mode'
value={{ value: 'Overwrite', label: 'Overwrite' }}
/>
{setting.label.includes('Write') ? (
<ReactSelect
isDisabled={true}
placeholder='Select a write mode'
value={{ value: 'Overwrite', label: 'Overwrite' }}
/>
) : (
<ReactSelect
placeholder={'Select a table'}
onChange={(val, action) =>
val && handleSourceChange(val.value, setting)
}
isLoading={loading}
options={sourceTables}
/>
)}
</div>

{setting.tips && (
Expand Down Expand Up @@ -171,7 +201,11 @@ export default function SnowflakeQRepForm({
<TextField
variant='simple'
type={setting.type}
defaultValue={setting.default as string}
defaultValue={
setting.label === 'Destination Table Name'
? mirrorConfig.destinationTableIdentifier
: (setting.default as string)
}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
handleChange(e.target.value, setting)
}
Expand Down

0 comments on commit 45cf16b

Please sign in to comment.