From a3a3b4c6da0404dcfc1402b5f78a89c0c0aeb203 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 11 Mar 2024 19:07:42 +0530 Subject: [PATCH] wire and clean up UI --- flow/cmd/handler.go | 13 + flow/cmd/peer_data.go | 169 ++++++------ flow/connectors/snowflake/client.go | 43 +++- nexus/flow-rs/src/grpc.rs | 1 + protos/route.proto | 1 + ui/app/api/mirrors/qrep/route.ts | 3 +- ui/app/api/peers/schemas/route.ts | 4 +- ui/app/mirrors/create/cdc/schemabox.tsx | 4 +- ui/app/mirrors/create/cdc/tablemapping.tsx | 9 +- ui/app/mirrors/create/handlers.ts | 21 +- ui/app/mirrors/create/helpers/common.ts | 3 +- ui/app/mirrors/create/helpers/qrep.ts | 48 ---- ui/app/mirrors/create/page.tsx | 5 +- ui/app/mirrors/create/qrep/snowflakeQrep.tsx | 256 +++++-------------- 14 files changed, 249 insertions(+), 331 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 554e177dc1..dc4899bad1 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -211,6 +211,7 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog( func (h *FlowRequestHandler) CreateQRepFlow( ctx context.Context, req *protos.CreateQRepFlowRequest, ) (*protos.CreateQRepFlowResponse, error) { + slog.Info("QRep endpoint request", slog.Any("req", req)) cfg := req.QrepConfig workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ @@ -259,6 +260,18 @@ func (h *FlowRequestHandler) CreateQRepFlow( // make them all uppercase cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName) } + + if req.QrepConfig.SourcePeer.Type == protos.DBType_SNOWFLAKE { + sourceTables := make([]string, 0, len(req.TableMapping)) + destinationTables := make([]string, 0, len(req.TableMapping)) + for _, mapping := range req.TableMapping { + sourceTables = append(sourceTables, mapping.SourceTableIdentifier) + destinationTables = append(destinationTables, mapping.DestinationTableIdentifier) + } + cfg.WatermarkTable = strings.Join(sourceTables, ";") + cfg.DestinationTableIdentifier = strings.Join(destinationTables, ";") + } + _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state) if err != nil { slog.Error("unable to start QRepFlow workflow", diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 4bde101258..287d0b0228 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -89,38 +89,76 @@ func (h *FlowRequestHandler) GetSchemas( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.PeerSchemasResponse, error) { - tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) - if err != nil { - return &protos.PeerSchemasResponse{Schemas: nil}, err - } - defer tunnel.Close() - defer peerConn.Close(ctx) + slog.Info("GET /schemas", slog.Any("req", req)) + switch int32(req.PeerType) { + case int32(protos.DBType_SNOWFLAKE): + sfConn, err := h.getConnForSFPeer(ctx, req.PeerName) + if err != nil { + slog.Error("Failed to get snowflake client", slog.Any("error", err)) + return &protos.PeerSchemasResponse{Schemas: nil}, err + } + defer sfConn.Close() + sfSchemas, err := sfConn.GetAllSchemas(ctx) + if err != nil { + slog.Error("Failed to get all Snowflake schemas", slog.Any("error", err)) + return &protos.PeerSchemasResponse{Schemas: nil}, err + } + return &protos.PeerSchemasResponse{Schemas: sfSchemas}, nil + default: + tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) + if err != nil { + return &protos.PeerSchemasResponse{Schemas: nil}, err + } + defer tunnel.Close() + defer peerConn.Close(ctx) - rows, err := peerConn.Query(ctx, "SELECT nspname"+ - " FROM pg_namespace WHERE nspname !~ '^pg_' AND nspname <> 'information_schema';") - if err != nil { - return &protos.PeerSchemasResponse{Schemas: nil}, err - } + rows, err := peerConn.Query(ctx, "SELECT nspname"+ + " FROM pg_namespace WHERE nspname !~ '^pg_' AND nspname <> 'information_schema';") + if err != nil { + return &protos.PeerSchemasResponse{Schemas: nil}, err + } - schemas, err := pgx.CollectRows[string](rows, pgx.RowTo) - if err != nil { - return &protos.PeerSchemasResponse{Schemas: nil}, err + schemas, err := pgx.CollectRows[string](rows, pgx.RowTo) + if err != nil { + return &protos.PeerSchemasResponse{Schemas: nil}, err + } + return &protos.PeerSchemasResponse{Schemas: schemas}, nil } - return &protos.PeerSchemasResponse{Schemas: schemas}, nil } func (h *FlowRequestHandler) GetTablesInSchema( ctx context.Context, req *protos.SchemaTablesRequest, ) (*protos.SchemaTablesResponse, error) { - tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) + _, peerType, err := h.getPeerID(ctx, req.PeerName) if err != nil { - return &protos.SchemaTablesResponse{Tables: nil}, err + slog.Info("failed to get peer type in /schema tables fetch", slog.Any("error", err)) + return &protos.SchemaTablesResponse{Tables: nil}, nil } - defer tunnel.Close() - defer peerConn.Close(ctx) + switch peerType { + case int32(protos.DBType_SNOWFLAKE): + sfConn, err := h.getConnForSFPeer(ctx, req.PeerName) + if err != nil { + slog.Error("Failed to get snowflake client", slog.Any("error", err)) + return &protos.SchemaTablesResponse{Tables: nil}, err + } + defer sfConn.Close() + sfTables, err := sfConn.GetTablesInSchema(ctx, req.SchemaName) + if err != nil { + slog.Error("Failed to get Snowflake tables in schema "+req.SchemaName, slog.Any("error", err)) + return &protos.SchemaTablesResponse{Tables: nil}, err + } + return &protos.SchemaTablesResponse{Tables: sfTables}, nil - rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname) + default: + tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) + if err != nil { + return &protos.SchemaTablesResponse{Tables: nil}, err + } + defer tunnel.Close() + defer peerConn.Close(ctx) + + rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname) t.relname, CASE WHEN con.contype = 'p' OR t.relreplident = 'i' OR t.relreplident = 'f' THEN true @@ -140,30 +178,31 @@ func (h *FlowRequestHandler) GetTablesInSchema( t.relname, can_mirror DESC; `, req.SchemaName) - if err != nil { - return &protos.SchemaTablesResponse{Tables: nil}, err - } - - defer rows.Close() - var tables []*protos.TableResponse - for rows.Next() { - var table pgtype.Text - var hasPkeyOrReplica pgtype.Bool - err := rows.Scan(&table, &hasPkeyOrReplica) if err != nil { return &protos.SchemaTablesResponse{Tables: nil}, err } - canMirror := false - if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool { - canMirror = true - } - tables = append(tables, &protos.TableResponse{ - TableName: table.String, - CanMirror: canMirror, - }) + defer rows.Close() + var tables []*protos.TableResponse + for rows.Next() { + var table pgtype.Text + var hasPkeyOrReplica pgtype.Bool + err := rows.Scan(&table, &hasPkeyOrReplica) + if err != nil { + return &protos.SchemaTablesResponse{Tables: nil}, err + } + canMirror := false + if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool { + canMirror = true + } + + tables = append(tables, &protos.TableResponse{ + TableName: table.String, + CanMirror: canMirror, + }) + } + return &protos.SchemaTablesResponse{Tables: tables}, nil } - return &protos.SchemaTablesResponse{Tables: tables}, nil } // Returns list of tables across schema in schema.table format @@ -171,44 +210,28 @@ func (h *FlowRequestHandler) GetAllTables( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.AllTablesResponse, error) { - switch req.PeerType { - case protos.DBType_SNOWFLAKE: - sfConn, err := h.getConnForSFPeer(ctx, req.PeerName) - if err != nil { - slog.Error("Failed to get snowflake client", slog.Any("error", err)) - return &protos.AllTablesResponse{Tables: nil}, err - } - defer sfConn.Close() - sfTables, err := sfConn.GetAllTables(ctx) - if err != nil { - slog.Error("Failed to get all Snowflake tables", slog.Any("error", err)) - return &protos.AllTablesResponse{Tables: nil}, err - } - return &protos.AllTablesResponse{Tables: sfTables}, nil - default: - tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) - if err != nil { - return &protos.AllTablesResponse{Tables: nil}, err - } - defer tunnel.Close() - defer peerConn.Close(ctx) + tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) + if err != nil { + return &protos.AllTablesResponse{Tables: nil}, err + } + defer tunnel.Close() + defer peerConn.Close(ctx) - rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+ - "FROM pg_class c "+ - "JOIN pg_namespace n ON c.relnamespace = n.oid "+ - "WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';") - if err != nil { - return &protos.AllTablesResponse{Tables: nil}, err - } - defer rows.Close() + rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+ + "FROM pg_class c "+ + "JOIN pg_namespace n ON c.relnamespace = n.oid "+ + "WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';") + if err != nil { + return &protos.AllTablesResponse{Tables: nil}, err + } + defer rows.Close() - tables, err := pgx.CollectRows[string](rows, pgx.RowTo) - if err != nil { - return nil, fmt.Errorf("failed to fetch all tables: %w", err) - } - return &protos.AllTablesResponse{Tables: tables}, nil + tables, err := pgx.CollectRows[string](rows, pgx.RowTo) + if err != nil { + return nil, fmt.Errorf("failed to fetch all tables: %w", err) } + return &protos.AllTablesResponse{Tables: tables}, nil } func (h *FlowRequestHandler) GetColumns( diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index 6e66a6123b..19aba16bb2 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -85,11 +85,11 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string return totalRecords, nil } -func (c *SnowflakeConnector) GetAllTables(ctx context.Context) ([]string, error) { +func (c *SnowflakeConnector) GetTablesInSchema(ctx context.Context, schemaName string) ([]*protos.TableResponse, error) { rows, err := c.database.QueryContext(ctx, ` - SELECT table_schema, table_name - FROM information_schema.tables - WHERE table_type = 'BASE TABLE';`) + SELECT table_name + FROM information_schema.tables + WHERE TABLE_SCHEMA=?;`, schemaName) if err != nil { return nil, fmt.Errorf("failed to get tables from Snowflake: %w", err) } @@ -97,14 +97,41 @@ func (c *SnowflakeConnector) GetAllTables(ctx context.Context) ([]string, error) if rows.Err() != nil { return nil, fmt.Errorf("failed to get tables from Snowflake: %w", rows.Err()) } - var tables []string + var tables []*protos.TableResponse for rows.Next() { - var schema, table string - err := rows.Scan(&schema, &table) + var table string + err := rows.Scan(&table) if err != nil { return nil, fmt.Errorf("failed to scan table from Snowflake: %w", err) } - tables = append(tables, fmt.Sprintf(`%s.%s`, schema, table)) + tables = append(tables, &protos.TableResponse{ + TableName: table, + CanMirror: true, + }) + } + + return tables, nil +} + +func (c *SnowflakeConnector) GetAllSchemas(ctx context.Context) ([]string, error) { + rows, err := c.database.QueryContext(ctx, ` + SELECT SCHEMA_NAME + FROM INFORMATION_SCHEMA.SCHEMATA;`) + if err != nil { + return nil, fmt.Errorf("failed to get schemas from Snowflake: %w", err) + } + defer rows.Close() + if rows.Err() != nil { + return nil, fmt.Errorf("failed to get schemas from Snowflake: %w", rows.Err()) + } + var tables []string + for rows.Next() { + var schema string + err := rows.Scan(&schema) + if err != nil { + return nil, fmt.Errorf("failed to scan schema from Snowflake: %w", err) + } + tables = append(tables, schema) } return tables, nil diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index a962c4ec4f..56220586dc 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -49,6 +49,7 @@ impl FlowGrpcClient { let create_qrep_flow_req = pt::peerdb_route::CreateQRepFlowRequest { qrep_config: Some(qrep_config.clone()), create_catalog_entry: false, + table_mapping: vec![] }; let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?; let workflow_id = response.into_inner().workflow_id; diff --git a/protos/route.proto b/protos/route.proto index 85dc0666d9..684647f932 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -20,6 +20,7 @@ message CreateCDCFlowResponse { message CreateQRepFlowRequest { peerdb_flow.QRepConfig qrep_config = 1; bool create_catalog_entry = 2; + repeated peerdb_flow.TableMapping table_mapping = 3; } message CreateQRepFlowResponse { diff --git a/ui/app/api/mirrors/qrep/route.ts b/ui/app/api/mirrors/qrep/route.ts index 7febf60140..b60a316cee 100644 --- a/ui/app/api/mirrors/qrep/route.ts +++ b/ui/app/api/mirrors/qrep/route.ts @@ -7,12 +7,13 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); - const { config } = body; + const { config, tableMapping } = body; const flowServiceAddr = GetFlowHttpAddressFromEnv(); const req: CreateQRepFlowRequest = { qrepConfig: config, createCatalogEntry: true, + tableMapping: tableMapping, }; try { const createStatus: CreateQRepFlowResponse = await fetch( diff --git a/ui/app/api/peers/schemas/route.ts b/ui/app/api/peers/schemas/route.ts index 0c701cb713..32f6ce7438 100644 --- a/ui/app/api/peers/schemas/route.ts +++ b/ui/app/api/peers/schemas/route.ts @@ -3,11 +3,11 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); - const { peerName } = body; + const { peerName, peerType } = body; const flowServiceAddr = GetFlowHttpAddressFromEnv(); try { const schemaList = await fetch( - `${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}` + `${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}&peer_type=${peerType}` ).then((res) => { return res.json(); }); diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx index 0c1d7e9a19..8170820c33 100644 --- a/ui/app/mirrors/create/cdc/schemabox.tsx +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -36,6 +36,7 @@ interface SchemaBoxProps { >; peerType?: DBType; omitAdditionalTables: string[] | undefined; + disableColumnView?: boolean; } const SchemaBox = ({ sourcePeer, @@ -46,6 +47,7 @@ const SchemaBox = ({ tableColumns, setTableColumns, omitAdditionalTables, + disableColumnView, }: SchemaBoxProps) => { const [tablesLoading, setTablesLoading] = useState(false); const [columnsLoading, setColumnsLoading] = useState(false); @@ -275,7 +277,7 @@ const SchemaBox = ({ {/* COLUMN BOX */} - {row.selected && ( + {row.selected && !disableColumnView && (
-
+
{searchedSchemas ? ( searchedSchemas.map((schema) => ( )) ) : ( diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 01c8a11b8f..0b86c745e5 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -11,6 +11,7 @@ import { QRepWriteType, } from '@/grpc_generated/flow'; import { DBType, Peer, dBTypeToJSON } from '@/grpc_generated/peers'; +import { CreateQRepFlowRequest } from '@/grpc_generated/route'; import { Dispatch, SetStateAction } from 'react'; import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO'; import { @@ -178,7 +179,8 @@ export const handleCreateQRep = async ( notify: (msg: string) => void, setLoading: Dispatch>, route: RouteCallback, - xmin?: boolean + xmin?: boolean, + rows?: TableMapRow[] ) => { const flowNameValid = flowNameSchema.safeParse(flowJobName); if (!flowNameValid.success) { @@ -222,14 +224,22 @@ export const handleCreateQRep = async ( } } config.flowJobName = flowJobName; + let reqBody: CreateQRepFlowRequest = { + qrepConfig: config, + createCatalogEntry: true, + tableMapping: [], + }; + if (rows) { + const tableMapping = reformattedTableMapping(rows); + reqBody.tableMapping = tableMapping; + } + setLoading(true); const statusMessage: UCreateMirrorResponse = await fetch( '/api/mirrors/qrep', { method: 'POST', - body: JSON.stringify({ - config, - }), + body: JSON.stringify(reqBody), } ).then((res) => res.json()); if (!statusMessage.created) { @@ -242,11 +252,12 @@ export const handleCreateQRep = async ( setLoading(false); }; -export const fetchSchemas = async (peerName: string) => { +export const fetchSchemas = async (peerName: string, peerType?: DBType) => { const schemasRes: USchemasResponse = await fetch('/api/peers/schemas', { method: 'POST', body: JSON.stringify({ peerName, + peerType, }), cache: 'no-store', }).then((res) => res.json()); diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 04db03cad8..51d4a2b5ab 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -60,8 +60,7 @@ export const blankQRepSetting = { export const blankSnowflakeQRepSetting = { destinationTableIdentifier: '', - query: `-- Here's a sample template you can fill in: - SELECT * FROM `, + query: '', watermarkTable: '', watermarkColumn: '', maxParallelWorkers: 4, diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index 64472d94c0..abcd8e5983 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -138,51 +138,3 @@ export const qrepSettings: MirrorSetting[] = [ type: 'number', }, ]; - -export const snowflakeQRepSettings: MirrorSetting[] = [ - { - label: 'Table', - stateHandler: (value, setter) => - setter((curr: QRepConfig) => ({ - ...curr, - watermarkTable: (value as string) || '', - })), - type: 'select', - tips: 'The source table of the replication and the table to which the watermark column belongs.', - required: true, - }, - { - label: 'Create Destination Table', - stateHandler: (value, setter) => - setter((curr: QRepConfig) => ({ - ...curr, - setupWatermarkTableOnDestination: (value as boolean) || false, - })), - tips: 'Specify if you want to create the watermark table on the destination as-is, can be used for some queries.', - type: 'switch', - }, - { - label: 'Destination Table Name', - stateHandler: (value, setter) => - setter((curr: QRepConfig) => ({ - ...curr, - destinationTableIdentifier: value as string, - })), - tips: 'Name of the destination. For any destination peer apart from BigQuery, this must be schema-qualified. Example: public.users', - required: true, - }, - { - label: 'Write Type', - stateHandler: (value, setter) => - setter((curr: QRepConfig) => { - let currWriteMode = curr.writeMode || { writeType: undefined }; - currWriteMode.writeType = value as QRepWriteType; - return { - ...curr, - writeMode: currWriteMode, - }; - }), - tips: `Overwrite mode overwrites the destination table data every sync.`, - type: 'select', - }, -]; diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index fbf49286f7..bb8d7a7ed4 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -207,6 +207,8 @@ export default function CreateMirrors() { ) : ( diff --git a/ui/app/mirrors/create/qrep/snowflakeQrep.tsx b/ui/app/mirrors/create/qrep/snowflakeQrep.tsx index 1a83420eb9..0d535ccc79 100644 --- a/ui/app/mirrors/create/qrep/snowflakeQrep.tsx +++ b/ui/app/mirrors/create/qrep/snowflakeQrep.tsx @@ -1,80 +1,29 @@ 'use client'; -import { RequiredIndicator } from '@/components/RequiredIndicator'; -import { QRepConfig, QRepWriteType } from '@/grpc_generated/flow'; +import { TableMapRow } from '@/app/dto/MirrorsDTO'; +import { QRepConfig } from '@/grpc_generated/flow'; import { Label } from '@/lib/Label'; -import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; -import { Switch } from '@/lib/Switch'; +import { RowWithTextField } from '@/lib/Layout'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; -import { useEffect, useState } from 'react'; -import ReactSelect from 'react-select'; -import { InfoPopover } from '../../../../components/InfoPopover'; +import { Callout } from '@tremor/react'; +import { Dispatch, SetStateAction, useEffect } from 'react'; import { MirrorSetter } from '../../types'; -import { fetchAllTables } from '../handlers'; -import { MirrorSetting, blankSnowflakeQRepSetting } from '../helpers/common'; -import { snowflakeQRepSettings } from '../helpers/qrep'; +import TableMapping from '../cdc/tablemapping'; +import { blankSnowflakeQRepSetting } from '../helpers/common'; interface SnowflakeQRepProps { mirrorConfig: QRepConfig; setter: MirrorSetter; + rows: TableMapRow[]; + setRows: Dispatch>; } export default function SnowflakeQRepForm({ mirrorConfig, setter, + rows, + setRows, }: SnowflakeQRepProps) { - const [sourceTables, setSourceTables] = useState< - { value: string; label: string }[] - >([]); - const [loading, setLoading] = useState(false); - const handleChange = (val: string | boolean, setting: MirrorSetting) => { - let stateVal: string | boolean | QRepWriteType | string[] = val; - if (setting.label.includes('Write Type')) { - switch (val) { - case 'Append': - stateVal = QRepWriteType.QREP_WRITE_MODE_APPEND; - break; - case 'Overwrite': - stateVal = QRepWriteType.QREP_WRITE_MODE_OVERWRITE; - break; - default: - stateVal = QRepWriteType.QREP_WRITE_MODE_APPEND; - break; - } - } - setting.stateHandler(stateVal, setter); - }; - - const handleSourceChange = (val: string, setting: MirrorSetting) => { - setter((curr) => ({ - ...curr, - destinationTableIdentifier: val.toLowerCase(), - })); - handleChange(val, setting); - }; - - const paramDisplayCondition = (setting: MirrorSetting) => { - const label = setting.label.toLowerCase(); - if ( - setting.label === 'Upsert Key Columns' || - setting.label === 'Watermark Column' - ) { - return false; - } - return true; - }; - - useEffect(() => { - setLoading(true); - fetchAllTables( - mirrorConfig.sourcePeer?.name ?? '', - mirrorConfig.sourcePeer?.type - ).then((tables) => { - setSourceTables(tables?.map((table) => ({ value: table, label: table }))); - setLoading(false); - }); - }, [mirrorConfig.sourcePeer]); - useEffect(() => { // set defaults setter((curr) => ({ ...curr, ...blankSnowflakeQRepSetting })); @@ -82,134 +31,67 @@ export default function SnowflakeQRepForm({ return ( <> {mirrorConfig.sourcePeer?.name ? ( - snowflakeQRepSettings.map((setting, id) => { - return ( - paramDisplayCondition(setting) && - (setting.type === 'switch' ? ( - {setting.label}} - action={ -
- - handleChange(state, setting) - } - /> - {setting.tips && ( - - )} -
- } - /> - ) : setting.type === 'select' ? ( - - {setting.label} - {RequiredIndicator(setting.required)} - - } - action={ -
-
- {setting.label.includes('Write') ? ( - - ) : ( - - val && handleSourceChange(val.value, setting) - } - isLoading={loading} - options={sourceTables} - /> - )} -
- - {setting.tips && ( - - )} -
- } - /> - ) : ( - - {setting.label} - {setting.required && ( - - - - )} - - } - action={ -
+ + Query replication mirrors with Snowflake source supports only + overwrite, full-refresh mode. + +
+ + + Refresh Interval + - ) => - handleChange(e.target.value, setting) - } - /> - {setting.tips && ( - - )} -
- } - /> - )) - ); - }) + + + + } + action={ +
+ ) => + setter((curr) => ({ + ...curr, + waitBetweenBatchesSeconds: e.target.valueAsNumber, + })) + } + /> +
+ } + /> +
+ ()} + disableColumnView={true} + peerType={mirrorConfig.sourcePeer?.type} + /> +
) : ( )}