Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SF QRep: wire and clean up UI #1467

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(
func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
slog.Info("QRep endpoint request", slog.Any("req", req))
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -259,6 +260,18 @@ func (h *FlowRequestHandler) CreateQRepFlow(
// make them all uppercase
cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName)
}

if req.QrepConfig.SourcePeer.Type == protos.DBType_SNOWFLAKE {
sourceTables := make([]string, 0, len(req.TableMapping))
destinationTables := make([]string, 0, len(req.TableMapping))
for _, mapping := range req.TableMapping {
sourceTables = append(sourceTables, mapping.SourceTableIdentifier)
destinationTables = append(destinationTables, mapping.DestinationTableIdentifier)
}
cfg.WatermarkTable = strings.Join(sourceTables, ";")
cfg.DestinationTableIdentifier = strings.Join(destinationTables, ";")
}

_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
Expand Down
169 changes: 96 additions & 73 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,38 +89,76 @@ func (h *FlowRequestHandler) GetSchemas(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSchemasResponse, error) {
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)
slog.Info("GET /schemas", slog.Any("req", req))
switch int32(req.PeerType) {
case int32(protos.DBType_SNOWFLAKE):
sfConn, err := h.getConnForSFPeer(ctx, req.PeerName)
if err != nil {
slog.Error("Failed to get snowflake client", slog.Any("error", err))
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
defer sfConn.Close()
sfSchemas, err := sfConn.GetAllSchemas(ctx)
if err != nil {
slog.Error("Failed to get all Snowflake schemas", slog.Any("error", err))
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
return &protos.PeerSchemasResponse{Schemas: sfSchemas}, nil
default:
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, "SELECT nspname"+
" FROM pg_namespace WHERE nspname !~ '^pg_' AND nspname <> 'information_schema';")
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
rows, err := peerConn.Query(ctx, "SELECT nspname"+
" FROM pg_namespace WHERE nspname !~ '^pg_' AND nspname <> 'information_schema';")
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}

schemas, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
schemas, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
return &protos.PeerSchemasResponse{Schemas: schemas}, nil
}
return &protos.PeerSchemasResponse{Schemas: schemas}, nil
}

func (h *FlowRequestHandler) GetTablesInSchema(
ctx context.Context,
req *protos.SchemaTablesRequest,
) (*protos.SchemaTablesResponse, error) {
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
_, peerType, err := h.getPeerID(ctx, req.PeerName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
slog.Info("failed to get peer type in /schema tables fetch", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, nil
}
defer tunnel.Close()
defer peerConn.Close(ctx)
switch peerType {
case int32(protos.DBType_SNOWFLAKE):
sfConn, err := h.getConnForSFPeer(ctx, req.PeerName)
if err != nil {
slog.Error("Failed to get snowflake client", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}
defer sfConn.Close()
sfTables, err := sfConn.GetTablesInSchema(ctx, req.SchemaName)
if err != nil {
slog.Error("Failed to get Snowflake tables in schema "+req.SchemaName, slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}
return &protos.SchemaTablesResponse{Tables: sfTables}, nil

rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname)
default:
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname)
t.relname,
CASE
WHEN con.contype = 'p' OR t.relreplident = 'i' OR t.relreplident = 'f' THEN true
Expand All @@ -140,75 +178,60 @@ func (h *FlowRequestHandler) GetTablesInSchema(
t.relname,
can_mirror DESC;
`, req.SchemaName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}

defer rows.Close()
var tables []*protos.TableResponse
for rows.Next() {
var table pgtype.Text
var hasPkeyOrReplica pgtype.Bool
err := rows.Scan(&table, &hasPkeyOrReplica)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
canMirror := false
if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool {
canMirror = true
}

tables = append(tables, &protos.TableResponse{
TableName: table.String,
CanMirror: canMirror,
})
defer rows.Close()
var tables []*protos.TableResponse
for rows.Next() {
var table pgtype.Text
var hasPkeyOrReplica pgtype.Bool
err := rows.Scan(&table, &hasPkeyOrReplica)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
canMirror := false
if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool {
canMirror = true
}

tables = append(tables, &protos.TableResponse{
TableName: table.String,
CanMirror: canMirror,
})
}
return &protos.SchemaTablesResponse{Tables: tables}, nil
}
return &protos.SchemaTablesResponse{Tables: tables}, nil
}

// Returns list of tables across schema in schema.table format
func (h *FlowRequestHandler) GetAllTables(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.AllTablesResponse, error) {
switch req.PeerType {
case protos.DBType_SNOWFLAKE:
sfConn, err := h.getConnForSFPeer(ctx, req.PeerName)
if err != nil {
slog.Error("Failed to get snowflake client", slog.Any("error", err))
return &protos.AllTablesResponse{Tables: nil}, err
}
defer sfConn.Close()
sfTables, err := sfConn.GetAllTables(ctx)
if err != nil {
slog.Error("Failed to get all Snowflake tables", slog.Any("error", err))
return &protos.AllTablesResponse{Tables: nil}, err
}
return &protos.AllTablesResponse{Tables: sfTables}, nil

default:
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+
"FROM pg_class c "+
"JOIN pg_namespace n ON c.relnamespace = n.oid "+
"WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer rows.Close()
rows, err := peerConn.Query(ctx, "SELECT n.nspname || '.' || c.relname AS schema_table "+
"FROM pg_class c "+
"JOIN pg_namespace n ON c.relnamespace = n.oid "+
"WHERE n.nspname !~ '^pg_' AND n.nspname <> 'information_schema' AND c.relkind = 'r';")
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
defer rows.Close()

tables, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return nil, fmt.Errorf("failed to fetch all tables: %w", err)
}
return &protos.AllTablesResponse{Tables: tables}, nil
tables, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return nil, fmt.Errorf("failed to fetch all tables: %w", err)
}
return &protos.AllTablesResponse{Tables: tables}, nil
}

func (h *FlowRequestHandler) GetColumns(
Expand Down
43 changes: 35 additions & 8 deletions flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,53 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string
return totalRecords, nil
}

func (c *SnowflakeConnector) GetAllTables(ctx context.Context) ([]string, error) {
func (c *SnowflakeConnector) GetTablesInSchema(ctx context.Context, schemaName string) ([]*protos.TableResponse, error) {
rows, err := c.database.QueryContext(ctx, `
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_type = 'BASE TABLE';`)
SELECT table_name
FROM information_schema.tables
WHERE TABLE_SCHEMA=?;`, schemaName)
if err != nil {
return nil, fmt.Errorf("failed to get tables from Snowflake: %w", err)
}
defer rows.Close()
if rows.Err() != nil {
return nil, fmt.Errorf("failed to get tables from Snowflake: %w", rows.Err())
}
var tables []string
var tables []*protos.TableResponse
for rows.Next() {
var schema, table string
err := rows.Scan(&schema, &table)
var table string
err := rows.Scan(&table)
if err != nil {
return nil, fmt.Errorf("failed to scan table from Snowflake: %w", err)
}
tables = append(tables, fmt.Sprintf(`%s.%s`, schema, table))
tables = append(tables, &protos.TableResponse{
TableName: table,
CanMirror: true,
})
}

return tables, nil
}

func (c *SnowflakeConnector) GetAllSchemas(ctx context.Context) ([]string, error) {
rows, err := c.database.QueryContext(ctx, `
SELECT SCHEMA_NAME
FROM INFORMATION_SCHEMA.SCHEMATA;`)
if err != nil {
return nil, fmt.Errorf("failed to get schemas from Snowflake: %w", err)
}
defer rows.Close()
if rows.Err() != nil {
return nil, fmt.Errorf("failed to get schemas from Snowflake: %w", rows.Err())
}
var tables []string
for rows.Next() {
var schema string
err := rows.Scan(&schema)
if err != nil {
return nil, fmt.Errorf("failed to scan schema from Snowflake: %w", err)
}
tables = append(tables, schema)
}

return tables, nil
Expand Down
1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl FlowGrpcClient {
let create_qrep_flow_req = pt::peerdb_route::CreateQRepFlowRequest {
qrep_config: Some(qrep_config.clone()),
create_catalog_entry: false,
table_mapping: vec![]
};
let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?;
let workflow_id = response.into_inner().workflow_id;
Expand Down
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ message CreateCDCFlowResponse {
message CreateQRepFlowRequest {
peerdb_flow.QRepConfig qrep_config = 1;
bool create_catalog_entry = 2;
repeated peerdb_flow.TableMapping table_mapping = 3;
}

message CreateQRepFlowResponse {
Expand Down
3 changes: 2 additions & 1 deletion ui/app/api/mirrors/qrep/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { config } = body;
const { config, tableMapping } = body;

const flowServiceAddr = GetFlowHttpAddressFromEnv();
const req: CreateQRepFlowRequest = {
qrepConfig: config,
createCatalogEntry: true,
tableMapping: tableMapping,
};
try {
const createStatus: CreateQRepFlowResponse = await fetch(
Expand Down
4 changes: 2 additions & 2 deletions ui/app/api/peers/schemas/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { peerName } = body;
const { peerName, peerType } = body;
const flowServiceAddr = GetFlowHttpAddressFromEnv();
try {
const schemaList = await fetch(
`${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}`
`${flowServiceAddr}/v1/peers/schemas?peer_name=${peerName}&peer_type=${peerType}`
).then((res) => {
return res.json();
});
Expand Down
4 changes: 3 additions & 1 deletion ui/app/mirrors/create/cdc/schemabox.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ interface SchemaBoxProps {
>;
peerType?: DBType;
omitAdditionalTables: string[] | undefined;
disableColumnView?: boolean;
}
const SchemaBox = ({
sourcePeer,
Expand All @@ -46,6 +47,7 @@ const SchemaBox = ({
tableColumns,
setTableColumns,
omitAdditionalTables,
disableColumnView,
}: SchemaBoxProps) => {
const [tablesLoading, setTablesLoading] = useState(false);
const [columnsLoading, setColumnsLoading] = useState(false);
Expand Down Expand Up @@ -275,7 +277,7 @@ const SchemaBox = ({
</div>

{/* COLUMN BOX */}
{row.selected && (
{row.selected && !disableColumnView && (
<div className='ml-5' style={{ width: '100%' }}>
<Label
as='label'
Expand Down
Loading
Loading