Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UI - Create Mirror #490

Merged
merged 9 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
Expand Down Expand Up @@ -31,6 +32,57 @@ func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool) *Fl
}
}

func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (int32, int32, error) {
var id int32
var peerType int32
err := h.pool.QueryRow(ctx, "SELECT id,type FROM peers WHERE name = $1", peerName).Scan(&id, &peerType)
if err != nil {
log.Errorf("unable to query peer id for peer %s: %s", peerName, err.Error())
return -1, -1, fmt.Errorf("unable to query peer id for peer %s: %s", peerName, err)
}
return id, peerType, nil
}

func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
tableIdentifierParts := strings.Split(tableIdentifier, ".")
if len(tableIdentifierParts) == 1 && peerDBType != int32(protos.DBType_BIGQUERY) {
tableIdentifierParts = append([]string{"public"}, tableIdentifierParts...)
}
return strings.Join(tableIdentifierParts, ".")
}

func (h *FlowRequestHandler) createFlowJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
req.ConnectionConfigs.Source.Name, srcErr)
}

destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.Destination.Name)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
req.ConnectionConfigs.Destination.Name, srcErr)
}

for sourceTableIdentifier := range req.ConnectionConfigs.TableNameMapping {
destinationTableIdentifier := req.ConnectionConfigs.TableNameMapping[sourceTableIdentifier]
_, err := h.pool.Exec(ctx, `
INSERT INTO flows (workflow_id, name, source_peer, destination_peer, description,
source_table_identifier, destination_table_identifier) VALUES ($1, $2, $3, $4, $5, $6, $7)
`, workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID,
"Mirror created via GRPC",
schemaForTableIdentifier(sourceTableIdentifier, sourePeerType),
schemaForTableIdentifier(destinationTableIdentifier, destinationPeerType))
if err != nil {
return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w",
req.ConnectionConfigs.FlowJobName, sourceTableIdentifier, err)
}
}

return nil
}

// Close closes the connection pool
func (h *FlowRequestHandler) Close() {
if h.pool != nil {
Expand Down Expand Up @@ -59,6 +111,13 @@ func (h *FlowRequestHandler) CreateCDCFlow(
MaxBatchSize: maxBatchSize,
}

if req.CreateCatalogEntry {
err := h.createFlowJobEntry(ctx, req, workflowID)
if err != nil {
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}

state := peerflow.NewCDCFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
Expand Down
235 changes: 123 additions & 112 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
match create_mirror {
CDC(cdc) => {
let mut flow_job_table_mappings = vec![];
for table_mapping in &cdc.table_mappings {
for table_mapping in &cdc.mappings {
flow_job_table_mappings.push(FlowJobTableMapping {
source_table_identifier: table_mapping
.source_table_identifier
.to_string()
.to_lowercase(),
target_table_identifier: table_mapping
.target_table_identifier
.target_identifier
.to_string()
.to_lowercase(),
});
Expand Down
1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl FlowGrpcClient {
) -> anyhow::Result<String> {
let create_peer_flow_req = pt::peerdb_route::CreateCdcFlowRequest {
connection_configs: Some(peer_flow_config),
create_catalog_entry: false
};
let response = self.client.create_cdc_flow(create_peer_flow_req).await?;
let workflow_id = response.into_inner().worflow_id;
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub struct CreateCdcFlowRequest {
#[prost(message, optional, tag="1")]
pub connection_configs: ::core::option::Option<super::peerdb_flow::FlowConnectionConfigs>,
#[prost(bool, tag="2")]
pub create_catalog_entry: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_route.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@ impl serde::Serialize for CreateCdcFlowRequest {
if self.connection_configs.is_some() {
len += 1;
}
if self.create_catalog_entry {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_route.CreateCDCFlowRequest", len)?;
if let Some(v) = self.connection_configs.as_ref() {
struct_ser.serialize_field("connectionConfigs", v)?;
}
if self.create_catalog_entry {
struct_ser.serialize_field("createCatalogEntry", &self.create_catalog_entry)?;
}
struct_ser.end()
}
}
Expand All @@ -26,11 +32,14 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
const FIELDS: &[&str] = &[
"connection_configs",
"connectionConfigs",
"create_catalog_entry",
"createCatalogEntry",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
ConnectionConfigs,
CreateCatalogEntry,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -54,6 +63,7 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
{
match value {
"connectionConfigs" | "connection_configs" => Ok(GeneratedField::ConnectionConfigs),
"createCatalogEntry" | "create_catalog_entry" => Ok(GeneratedField::CreateCatalogEntry),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -74,6 +84,7 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
V: serde::de::MapAccess<'de>,
{
let mut connection_configs__ = None;
let mut create_catalog_entry__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::ConnectionConfigs => {
Expand All @@ -82,13 +93,20 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
}
connection_configs__ = map.next_value()?;
}
GeneratedField::CreateCatalogEntry => {
if create_catalog_entry__.is_some() {
return Err(serde::de::Error::duplicate_field("createCatalogEntry"));
}
create_catalog_entry__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
}
}
Ok(CreateCdcFlowRequest {
connection_configs: connection_configs__,
create_catalog_entry: create_catalog_entry__.unwrap_or_default(),
})
}
}
Expand Down
3 changes: 1 addition & 2 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,7 @@ impl NexusBackend {
"got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);
if workflow_details.is_some() {
let workflow_details = workflow_details.unwrap();
if let Some(workflow_details) = workflow_details {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
flow_handler
.shutdown_flow_job(flow_job_name, workflow_details)
Expand Down
2 changes: 1 addition & 1 deletion nexus/sqlparser-rs
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package peerdb_route;

message CreateCDCFlowRequest {
peerdb_flow.FlowConnectionConfigs connection_configs = 1;
bool create_catalog_entry = 2;
}

message CreateCDCFlowResponse {
Expand Down
21 changes: 21 additions & 0 deletions ui/app/api/mirrors/cdc/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {
CreateCDCFlowRequest,
CreateCDCFlowResponse,
} from '@/grpc_generated/route';
import { GetFlowServiceClientFromEnv } from '@/rpc/rpc';

export async function POST(request: Request) {
const body = await request.json();
const { config } = body;
const flowServiceClient = GetFlowServiceClientFromEnv();
const req: CreateCDCFlowRequest = {
connectionConfigs: config,
createCatalogEntry: true,
};
const createStatus: CreateCDCFlowResponse =
await flowServiceClient.createCdcFlow(req);
if (!createStatus.worflowId) {
return new Response('Failed to create CDC mirror');
}
return new Response('created');
}
8 changes: 8 additions & 0 deletions ui/app/api/peers/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
CreatePeerRequest,
CreatePeerResponse,
CreatePeerStatus,
ListPeersRequest,
ValidatePeerRequest,
ValidatePeerResponse,
ValidatePeerStatus,
Expand Down Expand Up @@ -63,3 +64,10 @@ export async function POST(request: Request) {
} else return new Response('status of peer creation is unknown');
} else return new Response('mode of peer creation is unknown');
}

export async function GET(request: Request) {
let flowServiceClient = GetFlowServiceClientFromEnv();
let req: ListPeersRequest = {};
let peers = await flowServiceClient.listPeers(req);
return new Response(JSON.stringify(peers));
}
Loading