diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 554e177dc1..dc4899bad1 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -211,6 +211,7 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog( func (h *FlowRequestHandler) CreateQRepFlow( ctx context.Context, req *protos.CreateQRepFlowRequest, ) (*protos.CreateQRepFlowResponse, error) { + slog.Info("QRep endpoint request", slog.Any("req", req)) cfg := req.QrepConfig workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ @@ -259,6 +260,18 @@ func (h *FlowRequestHandler) CreateQRepFlow( // make them all uppercase cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName) } + + if req.QrepConfig.SourcePeer.Type == protos.DBType_SNOWFLAKE { + sourceTables := make([]string, 0, len(req.TableMapping)) + destinationTables := make([]string, 0, len(req.TableMapping)) + for _, mapping := range req.TableMapping { + sourceTables = append(sourceTables, mapping.SourceTableIdentifier) + destinationTables = append(destinationTables, mapping.DestinationTableIdentifier) + } + cfg.WatermarkTable = strings.Join(sourceTables, ";") + cfg.DestinationTableIdentifier = strings.Join(destinationTables, ";") + } + _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state) if err != nil { slog.Error("unable to start QRepFlow workflow", diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 4bde101258..287d0b0228 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -89,38 +89,76 @@ func (h *FlowRequestHandler) GetSchemas( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.PeerSchemasResponse, error) { - tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) - if err != nil { - return &protos.PeerSchemasResponse{Schemas: nil}, err - } - defer tunnel.Close() - defer peerConn.Close(ctx) + slog.Info("GET /schemas", slog.Any("req", req)) + switch int32(req.PeerType) { + case int32(protos.DBType_SNOWFLAKE): + sfConn, err := h.getConnForSFPeer(ctx, req.PeerName) + if err != nil { + slog.Error("Failed to get snowflake client", slog.Any("error", err)) + return &protos.PeerSchemasResponse{Schemas: nil}, err + } + defer sfConn.Close() + sfSchemas, err := sfConn.GetAllSchemas(ctx) + if err != nil { + slog.Error("Failed to get all Snowflake schemas", slog.Any("error", err)) + return &protos.PeerSchemasResponse{Schemas: nil}, err + } + return &protos.PeerSchemasResponse{Schemas: sfSchemas}, nil + default: + tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) + if err != nil { + return &protos.PeerSchemasResponse{Schemas: nil}, err + } + defer tunnel.Close() + defer peerConn.Close(ctx) - rows, err := peerConn.Query(ctx, "SELECT nspname"+ - " FROM pg_namespace WHERE nspname !~ '^pg_' AND nspname <> 'information_schema';") - if err != nil { - return &protos.PeerSchemasResponse{Schemas: nil}, err - } + rows, err := peerConn.Query(ctx, "SELECT nspname"+ + " FROM pg_namespace WHERE nspname !~ '^pg_' AND nspname <> 'information_schema';") + if err != nil { + return &protos.PeerSchemasResponse{Schemas: nil}, err + } - schemas, err := pgx.CollectRows[string](rows, pgx.RowTo) - if err != nil { - return &protos.PeerSchemasResponse{Schemas: nil}, err + schemas, err := pgx.CollectRows[string](rows, pgx.RowTo) + if err != nil { + return &protos.PeerSchemasResponse{Schemas: nil}, err + } + return &protos.PeerSchemasResponse{Schemas: schemas}, nil } - return &protos.PeerSchemasResponse{Schemas: schemas}, nil } func (h *FlowRequestHandler) GetTablesInSchema( ctx context.Context, req *protos.SchemaTablesRequest, ) (*protos.SchemaTablesResponse, error) { - tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) + _, peerType, err := h.getPeerID(ctx, req.PeerName) if err != nil { - return &protos.SchemaTablesResponse{Tables: nil}, err + slog.Info("failed to get peer type in /schema tables fetch", slog.Any("error", err)) + return &protos.SchemaTablesResponse{Tables: nil}, nil } - defer tunnel.Close() - defer peerConn.Close(ctx) + switch peerType { + case int32(protos.DBType_SNOWFLAKE): + sfConn, err := h.getConnForSFPeer(ctx, req.PeerName) + if err != nil { + slog.Error("Failed to get snowflake client", slog.Any("error", err)) + return &protos.SchemaTablesResponse{Tables: nil}, err + } + defer sfConn.Close() + sfTables, err := sfConn.GetTablesInSchema(ctx, req.SchemaName) + if err != nil { + slog.Error("Failed to get Snowflake tables in schema "+req.SchemaName, slog.Any("error", err)) + return &protos.SchemaTablesResponse{Tables: nil}, err + } + return &protos.SchemaTablesResponse{Tables: sfTables}, nil - rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname) + default: + tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) + if err != nil { + return &protos.SchemaTablesResponse{Tables: nil}, err + } + defer tunnel.Close() + defer peerConn.Close(ctx) + + rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname) t.relname, CASE WHEN con.contype = 'p' OR t.relreplident = 'i' OR t.relreplident = 'f' THEN true @@ -140,30 +178,31 @@ func (h *FlowRequestHandler) GetTablesInSchema( t.relname, can_mirror DESC; `, req.SchemaName) - if err != nil { - return &protos.SchemaTablesResponse{Tables: nil}, err - } - - defer rows.Close() - var tables []*protos.TableResponse - for rows.Next() { - var table pgtype.Text - var hasPkeyOrReplica pgtype.Bool - err := rows.Scan(&table, &hasPkeyOrReplica) if err != nil { return &protos.SchemaTablesResponse{Tables: nil}, err } - canMirror := false - if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool { - canMirror = true - } - tables = append(tables, &protos.TableResponse{ - TableName: table.String, - CanMirror: canMirror, - }) + defer rows.Close() + var tables []*protos.TableResponse + for rows.Next() { + var table pgtype.Text + var hasPkeyOrReplica pgtype.Bool + err := rows.Scan(&table, &hasPkeyOrReplica) + if err != nil { + return &protos.SchemaTablesResponse{Tables: nil}, err + } + canMirror := false + if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool { + canMirror = true + } + + tables = append(tables, &protos.TableResponse{ + TableName: table.String, + CanMirror: canMirror, + }) + } + return &protos.SchemaTablesResponse{Tables: tables}, nil } - return &protos.SchemaTablesResponse{Tables: tables}, nil } // Returns list of tables across schema in schema.table format @@ -171,44 +210,28 @@ func (h *FlowRequestHandler) GetAllTables( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.AllTablesResponse, error) { - switch req.PeerType { - case protos.DBType_SNOWFLAKE: - sfConn, err := h.getConnForSFPeer(ctx, req.PeerName) - if err != nil { - slog.Error("Failed to get snowflake client", slog.Any("error", err)) - return &protos.AllTablesResponse{Tables: nil}, err - } - defer sfConn.Close() - sfTables, err := sfConn.GetAllTables(ctx) - if err != nil { - slog.Error("Failed to get all Snowflake tables", slog.Any("error", err)) - return &protos.AllTablesResponse{Tables: nil}, err - } - return &protos.AllTablesResponse{Tables: sfTables}, nil - default: - tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) - if err != nil { - return &protos.AllTablesResponse{Tables: nil}, err - } - defer tunnel.Close() - defer peerConn.Close(ctx) + tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName) + if err != nil { + return &protos.AllTablesResponse{Tables: nil}, err + } + defer tunnel.Close() + defer peerConn.Close(ctx) - rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+ - "FROM pg_class c "+ - "JOIN pg_namespace n ON c.relnamespace = n.oid "+ - "WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';") - if err != nil { - return &protos.AllTablesResponse{Tables: nil}, err - } - defer rows.Close() + rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+ + "FROM pg_class c "+ + "JOIN pg_namespace n ON c.relnamespace = n.oid "+ + "WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';") + if err != nil { + return &protos.AllTablesResponse{Tables: nil}, err + } + defer rows.Close() - tables, err := pgx.CollectRows[string](rows, pgx.RowTo) - if err != nil { - return nil, fmt.Errorf("failed to fetch all tables: %w", err) - } - return &protos.AllTablesResponse{Tables: tables}, nil + tables, err := pgx.CollectRows[string](rows, pgx.RowTo) + if err != nil { + return nil, fmt.Errorf("failed to fetch all tables: %w", err) } + return &protos.AllTablesResponse{Tables: tables}, nil } func (h *FlowRequestHandler) GetColumns( diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index 6e66a6123b..19aba16bb2 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -85,11 +85,11 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string return totalRecords, nil } -func (c *SnowflakeConnector) GetAllTables(ctx context.Context) ([]string, error) { +func (c *SnowflakeConnector) GetTablesInSchema(ctx context.Context, schemaName string) ([]*protos.TableResponse, error) { rows, err := c.database.QueryContext(ctx, ` - SELECT table_schema, table_name - FROM information_schema.tables - WHERE table_type = 'BASE TABLE';`) + SELECT table_name + FROM information_schema.tables + WHERE TABLE_SCHEMA=?;`, schemaName) if err != nil { return nil, fmt.Errorf("failed to get tables from Snowflake: %w", err) } @@ -97,14 +97,41 @@ func (c *SnowflakeConnector) GetAllTables(ctx context.Context) ([]string, error) if rows.Err() != nil { return nil, fmt.Errorf("failed to get tables from Snowflake: %w", rows.Err()) } - var tables []string + var tables []*protos.TableResponse for rows.Next() { - var schema, table string - err := rows.Scan(&schema, &table) + var table string + err := rows.Scan(&table) if err != nil { return nil, fmt.Errorf("failed to scan table from Snowflake: %w", err) } - tables = append(tables, fmt.Sprintf(`%s.%s`, schema, table)) + tables = append(tables, &protos.TableResponse{ + TableName: table, + CanMirror: true, + }) + } + + return tables, nil +} + +func (c *SnowflakeConnector) GetAllSchemas(ctx context.Context) ([]string, error) { + rows, err := c.database.QueryContext(ctx, ` + SELECT SCHEMA_NAME + FROM INFORMATION_SCHEMA.SCHEMATA;`) + if err != nil { + return nil, fmt.Errorf("failed to get schemas from Snowflake: %w", err) + } + defer rows.Close() + if rows.Err() != nil { + return nil, fmt.Errorf("failed to get schemas from Snowflake: %w", rows.Err()) + } + var tables []string + for rows.Next() { + var schema string + err := rows.Scan(&schema) + if err != nil { + return nil, fmt.Errorf("failed to scan schema from Snowflake: %w", err) + } + tables = append(tables, schema) } return tables, nil diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index a962c4ec4f..56220586dc 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -49,6 +49,7 @@ impl FlowGrpcClient { let create_qrep_flow_req = pt::peerdb_route::CreateQRepFlowRequest { qrep_config: Some(qrep_config.clone()), create_catalog_entry: false, + table_mapping: vec![] }; let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?; let workflow_id = response.into_inner().workflow_id; diff --git a/protos/route.proto b/protos/route.proto index 85dc0666d9..684647f932 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -20,6 +20,7 @@ message CreateCDCFlowResponse { message CreateQRepFlowRequest { peerdb_flow.QRepConfig qrep_config = 1; bool create_catalog_entry = 2; + repeated peerdb_flow.TableMapping table_mapping = 3; } message CreateQRepFlowResponse { diff --git a/ui/app/api/mirrors/qrep/route.ts b/ui/app/api/mirrors/qrep/route.ts index 7febf60140..b60a316cee 100644 --- a/ui/app/api/mirrors/qrep/route.ts +++ b/ui/app/api/mirrors/qrep/route.ts @@ -7,12 +7,13 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); - const { config } = body; + const { config, tableMapping } = body; const flowServiceAddr = GetFlowHttpAddressFromEnv(); const req: CreateQRepFlowRequest = { qrepConfig: config, createCatalogEntry: true, + tableMapping: tableMapping, }; try { const createStatus: CreateQRepFlowResponse = await fetch( diff --git a/ui/app/api/peers/schemas/route.ts b/ui/app/api/peers/schemas/route.ts index 0c701cb713..32f6ce7438 100644 --- a/ui/app/api/peers/schemas/route.ts +++ b/ui/app/api/peers/schemas/route.ts @@ -3,11 +3,11 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; export async function POST(request: Request) { const body = await request.json(); - const { peerName } = body; + const { peerName, peerType } = body; const flowServiceAddr = GetFlowHttpAddressFromEnv(); try { const schemaList = await fetch( - `${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}` + `${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}&peer_type=${peerType}` ).then((res) => { return res.json(); }); diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx index 0c1d7e9a19..8170820c33 100644 --- a/ui/app/mirrors/create/cdc/schemabox.tsx +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -36,6 +36,7 @@ interface SchemaBoxProps { >; peerType?: DBType; omitAdditionalTables: string[] | undefined; + disableColumnView?: boolean; } const SchemaBox = ({ sourcePeer, @@ -46,6 +47,7 @@ const SchemaBox = ({ tableColumns, setTableColumns, omitAdditionalTables, + disableColumnView, }: SchemaBoxProps) => { const [tablesLoading, setTablesLoading] = useState(false); const [columnsLoading, setColumnsLoading] = useState(false); @@ -275,7 +277,7 @@ const SchemaBox = ({ {/* COLUMN BOX */} - {row.selected && ( + {row.selected && !disableColumnView && (