Skip to content

Commit

Permalink
wire and clean up UI
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 11, 2024
1 parent bf3e135 commit a3a3b4c
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 331 deletions.
13 changes: 13 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(
func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
slog.Info("QRep endpoint request", slog.Any("req", req))
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -259,6 +260,18 @@ func (h *FlowRequestHandler) CreateQRepFlow(
// make them all uppercase
cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName)
}

if req.QrepConfig.SourcePeer.Type == protos.DBType_SNOWFLAKE {
sourceTables := make([]string, 0, len(req.TableMapping))
destinationTables := make([]string, 0, len(req.TableMapping))
for _, mapping := range req.TableMapping {
sourceTables = append(sourceTables, mapping.SourceTableIdentifier)
destinationTables = append(destinationTables, mapping.DestinationTableIdentifier)
}
cfg.WatermarkTable = strings.Join(sourceTables, ";")
cfg.DestinationTableIdentifier = strings.Join(destinationTables, ";")
}

_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
Expand Down
169 changes: 96 additions & 73 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,38 +89,76 @@ func (h *FlowRequestHandler) GetSchemas(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSchemasResponse, error) {
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)
slog.Info("GET /schemas", slog.Any("req", req))
switch int32(req.PeerType) {
case int32(protos.DBType_SNOWFLAKE):
sfConn, err := h.getConnForSFPeer(ctx, req.PeerName)
if err != nil {
slog.Error("Failed to get snowflake client", slog.Any("error", err))
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
defer sfConn.Close()
sfSchemas, err := sfConn.GetAllSchemas(ctx)
if err != nil {
slog.Error("Failed to get all Snowflake schemas", slog.Any("error", err))
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
return &protos.PeerSchemasResponse{Schemas: sfSchemas}, nil
default:
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, "SELECT nspname"+
" FROM pg_namespace WHERE nspname !~ '^pg_' AND nspname <> 'information_schema';")
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
rows, err := peerConn.Query(ctx, "SELECT nspname"+
" FROM pg_namespace WHERE nspname !~ '^pg_' AND nspname <> 'information_schema';")
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}

schemas, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
schemas, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
return &protos.PeerSchemasResponse{Schemas: schemas}, nil
}
return &protos.PeerSchemasResponse{Schemas: schemas}, nil
}

func (h *FlowRequestHandler) GetTablesInSchema(
ctx context.Context,
req *protos.SchemaTablesRequest,
) (*protos.SchemaTablesResponse, error) {
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
_, peerType, err := h.getPeerID(ctx, req.PeerName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
slog.Info("failed to get peer type in /schema tables fetch", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, nil
}
defer tunnel.Close()
defer peerConn.Close(ctx)
switch peerType {
case int32(protos.DBType_SNOWFLAKE):
sfConn, err := h.getConnForSFPeer(ctx, req.PeerName)
if err != nil {
slog.Error("Failed to get snowflake client", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}
defer sfConn.Close()
sfTables, err := sfConn.GetTablesInSchema(ctx, req.SchemaName)
if err != nil {
slog.Error("Failed to get Snowflake tables in schema "+req.SchemaName, slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}
return &protos.SchemaTablesResponse{Tables: sfTables}, nil

rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname)
default:
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname)
t.relname,
CASE
WHEN con.contype = 'p' OR t.relreplident = 'i' OR t.relreplident = 'f' THEN true
Expand All @@ -140,75 +178,60 @@ func (h *FlowRequestHandler) GetTablesInSchema(
t.relname,
can_mirror DESC;
`, req.SchemaName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}

defer rows.Close()
var tables []*protos.TableResponse
for rows.Next() {
var table pgtype.Text
var hasPkeyOrReplica pgtype.Bool
err := rows.Scan(&table, &hasPkeyOrReplica)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
canMirror := false
if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool {
canMirror = true
}

tables = append(tables, &protos.TableResponse{
TableName: table.String,
CanMirror: canMirror,
})
defer rows.Close()
var tables []*protos.TableResponse
for rows.Next() {
var table pgtype.Text
var hasPkeyOrReplica pgtype.Bool
err := rows.Scan(&table, &hasPkeyOrReplica)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
canMirror := false
if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool {
canMirror = true
}

tables = append(tables, &protos.TableResponse{
TableName: table.String,
CanMirror: canMirror,
})
}
return &protos.SchemaTablesResponse{Tables: tables}, nil
}
return &protos.SchemaTablesResponse{Tables: tables}, nil
}

// Returns list of tables across schema in schema.table format
func (h *FlowRequestHandler) GetAllTables(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.AllTablesResponse, error) {
switch req.PeerType {
case protos.DBType_SNOWFLAKE:
sfConn, err := h.getConnForSFPeer(ctx, req.PeerName)
if err != nil {
slog.Error("Failed to get snowflake client", slog.Any("error", err))
return &protos.AllTablesResponse{Tables: nil}, err
}
defer sfConn.Close()
sfTables, err := sfConn.GetAllTables(ctx)
if err != nil {
slog.Error("Failed to get all Snowflake tables", slog.Any("error", err))
return &protos.AllTablesResponse{Tables: nil}, err
}
return &protos.AllTablesResponse{Tables: sfTables}, nil

default:
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+
"FROM pg_class c "+
"JOIN pg_namespace n ON c.relnamespace = n.oid "+
"WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer rows.Close()
rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+
"FROM pg_class c "+
"JOIN pg_namespace n ON c.relnamespace = n.oid "+
"WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer rows.Close()

tables, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return nil, fmt.Errorf("failed to fetch all tables: %w", err)
}
return &protos.AllTablesResponse{Tables: tables}, nil
tables, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return nil, fmt.Errorf("failed to fetch all tables: %w", err)
}
return &protos.AllTablesResponse{Tables: tables}, nil
}

func (h *FlowRequestHandler) GetColumns(
Expand Down
43 changes: 35 additions & 8 deletions flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,53 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string
return totalRecords, nil
}

func (c *SnowflakeConnector) GetAllTables(ctx context.Context) ([]string, error) {
func (c *SnowflakeConnector) GetTablesInSchema(ctx context.Context, schemaName string) ([]*protos.TableResponse, error) {
rows, err := c.database.QueryContext(ctx, `
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_type = 'BASE TABLE';`)
SELECT table_name
FROM information_schema.tables
WHERE TABLE_SCHEMA=?;`, schemaName)
if err != nil {
return nil, fmt.Errorf("failed to get tables from Snowflake: %w", err)
}
defer rows.Close()
if rows.Err() != nil {
return nil, fmt.Errorf("failed to get tables from Snowflake: %w", rows.Err())
}
var tables []string
var tables []*protos.TableResponse
for rows.Next() {
var schema, table string
err := rows.Scan(&schema, &table)
var table string
err := rows.Scan(&table)
if err != nil {
return nil, fmt.Errorf("failed to scan table from Snowflake: %w", err)
}
tables = append(tables, fmt.Sprintf(`%s.%s`, schema, table))
tables = append(tables, &protos.TableResponse{
TableName: table,
CanMirror: true,
})
}

return tables, nil
}

func (c *SnowflakeConnector) GetAllSchemas(ctx context.Context) ([]string, error) {
rows, err := c.database.QueryContext(ctx, `
SELECT SCHEMA_NAME
FROM INFORMATION_SCHEMA.SCHEMATA;`)
if err != nil {
return nil, fmt.Errorf("failed to get schemas from Snowflake: %w", err)
}
defer rows.Close()
if rows.Err() != nil {
return nil, fmt.Errorf("failed to get schemas from Snowflake: %w", rows.Err())
}
var tables []string
for rows.Next() {
var schema string
err := rows.Scan(&schema)
if err != nil {
return nil, fmt.Errorf("failed to scan schema from Snowflake: %w", err)
}
tables = append(tables, schema)
}

return tables, nil
Expand Down
1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl FlowGrpcClient {
let create_qrep_flow_req = pt::peerdb_route::CreateQRepFlowRequest {
qrep_config: Some(qrep_config.clone()),
create_catalog_entry: false,
table_mapping: vec![]
};
let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?;
let workflow_id = response.into_inner().workflow_id;
Expand Down
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ message CreateCDCFlowResponse {
message CreateQRepFlowRequest {
peerdb_flow.QRepConfig qrep_config = 1;
bool create_catalog_entry = 2;
repeated peerdb_flow.TableMapping table_mapping = 3;
}

message CreateQRepFlowResponse {
Expand Down
3 changes: 2 additions & 1 deletion ui/app/api/mirrors/qrep/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { config } = body;
const { config, tableMapping } = body;

const flowServiceAddr = GetFlowHttpAddressFromEnv();
const req: CreateQRepFlowRequest = {
qrepConfig: config,
createCatalogEntry: true,
tableMapping: tableMapping,
};
try {
const createStatus: CreateQRepFlowResponse = await fetch(
Expand Down
4 changes: 2 additions & 2 deletions ui/app/api/peers/schemas/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { peerName } = body;
const { peerName, peerType } = body;
const flowServiceAddr = GetFlowHttpAddressFromEnv();
try {
const schemaList = await fetch(
`${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}`
`${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}&peer_type=${peerType}`
).then((res) => {
return res.json();
});
Expand Down
4 changes: 3 additions & 1 deletion ui/app/mirrors/create/cdc/schemabox.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ interface SchemaBoxProps {
>;
peerType?: DBType;
omitAdditionalTables: string[] | undefined;
disableColumnView?: boolean;
}
const SchemaBox = ({
sourcePeer,
Expand All @@ -46,6 +47,7 @@ const SchemaBox = ({
tableColumns,
setTableColumns,
omitAdditionalTables,
disableColumnView,
}: SchemaBoxProps) => {
const [tablesLoading, setTablesLoading] = useState(false);
const [columnsLoading, setColumnsLoading] = useState(false);
Expand Down Expand Up @@ -275,7 +277,7 @@ const SchemaBox = ({
</div>

{/* COLUMN BOX */}
{row.selected && (
{row.selected && !disableColumnView && (
<div className='ml-5' style={{ width: '100%' }}>
<Label
as='label'
Expand Down
Loading

0 comments on commit a3a3b4c

Please sign in to comment.