Skip to content

Commit

Permalink
creates flow job entry
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 9, 2023
1 parent 33c3858 commit bf2be46
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 115 deletions.
59 changes: 59 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
Expand Down Expand Up @@ -31,6 +32,57 @@ func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool) *Fl
}
}

func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (int32, int32, error) {
var id int32
var peerType int32
err := h.pool.QueryRow(ctx, "SELECT id,type FROM peers WHERE name = $1", peerName).Scan(&id, &peerType)
if err != nil {
log.Errorf("unable to query peer id for peer %s: %s", peerName, err.Error())
return -1, -1, fmt.Errorf("unable to query peer id for peer %s: %s", peerName, err)
}
return id, peerType, nil
}

func schemaForTableIdentifier(tableIdentifier string, peerDbType int32) string {
tableIdentifierParts := strings.Split(tableIdentifier, ".")
if len(tableIdentifierParts) == 1 && peerDbType != int32(protos.DBType_BIGQUERY) {
tableIdentifierParts = append([]string{"public"}, tableIdentifierParts...)
}
return strings.Join(tableIdentifierParts, ".")
}

func (h *FlowRequestHandler) createFlowJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
req.ConnectionConfigs.Source.Name, srcErr)
}

destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.Destination.Name)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
req.ConnectionConfigs.Destination.Name, srcErr)
}

for sourceTableIdentifier := range req.ConnectionConfigs.TableNameMapping {
destinationTableIdentifier := req.ConnectionConfigs.TableNameMapping[sourceTableIdentifier]
_, err := h.pool.Exec(ctx, `
INSERT INTO flows (workflow_id, name, source_peer, destination_peer, description,
source_table_identifier, destination_table_identifier) VALUES ($1, $2, $3, $4, $5, $6, $7)
`, workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID,
"Mirror created via GRPC",
schemaForTableIdentifier(sourceTableIdentifier, sourePeerType),
schemaForTableIdentifier(destinationTableIdentifier, destinationPeerType))
if err != nil {
return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w",
req.ConnectionConfigs.FlowJobName, sourceTableIdentifier, err)
}
}

return nil
}

// Close closes the connection pool
func (h *FlowRequestHandler) Close() {
if h.pool != nil {
Expand Down Expand Up @@ -59,6 +111,13 @@ func (h *FlowRequestHandler) CreateCDCFlow(
MaxBatchSize: maxBatchSize,
}

if req.CreateCatalogEntry {
err := h.createFlowJobEntry(ctx, req, workflowID)
if err != nil {
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}

state := peerflow.NewCDCFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
Expand Down
235 changes: 123 additions & 112 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl FlowGrpcClient {
) -> anyhow::Result<String> {
let create_peer_flow_req = pt::peerdb_route::CreateCdcFlowRequest {
connection_configs: Some(peer_flow_config),
create_catalog_entry: false
};
let response = self.client.create_cdc_flow(create_peer_flow_req).await?;
let workflow_id = response.into_inner().worflow_id;
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub struct CreateCdcFlowRequest {
#[prost(message, optional, tag="1")]
pub connection_configs: ::core::option::Option<super::peerdb_flow::FlowConnectionConfigs>,
#[prost(bool, tag="2")]
pub create_catalog_entry: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_route.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@ impl serde::Serialize for CreateCdcFlowRequest {
if self.connection_configs.is_some() {
len += 1;
}
if self.create_catalog_entry {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_route.CreateCDCFlowRequest", len)?;
if let Some(v) = self.connection_configs.as_ref() {
struct_ser.serialize_field("connectionConfigs", v)?;
}
if self.create_catalog_entry {
struct_ser.serialize_field("createCatalogEntry", &self.create_catalog_entry)?;
}
struct_ser.end()
}
}
Expand All @@ -26,11 +32,14 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
const FIELDS: &[&str] = &[
"connection_configs",
"connectionConfigs",
"create_catalog_entry",
"createCatalogEntry",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
ConnectionConfigs,
CreateCatalogEntry,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -54,6 +63,7 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
{
match value {
"connectionConfigs" | "connection_configs" => Ok(GeneratedField::ConnectionConfigs),
"createCatalogEntry" | "create_catalog_entry" => Ok(GeneratedField::CreateCatalogEntry),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -74,6 +84,7 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
V: serde::de::MapAccess<'de>,
{
let mut connection_configs__ = None;
let mut create_catalog_entry__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::ConnectionConfigs => {
Expand All @@ -82,13 +93,20 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
}
connection_configs__ = map.next_value()?;
}
GeneratedField::CreateCatalogEntry => {
if create_catalog_entry__.is_some() {
return Err(serde::de::Error::duplicate_field("createCatalogEntry"));
}
create_catalog_entry__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
}
}
Ok(CreateCdcFlowRequest {
connection_configs: connection_configs__,
create_catalog_entry: create_catalog_entry__.unwrap_or_default(),
})
}
}
Expand Down
3 changes: 1 addition & 2 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,7 @@ impl NexusBackend {
"got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);
if workflow_details.is_some() {
let workflow_details = workflow_details.unwrap();
if let Some(workflow_details) = workflow_details {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
flow_handler
.shutdown_flow_job(flow_job_name, workflow_details)
Expand Down
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package peerdb_route;

message CreateCDCFlowRequest {
peerdb_flow.FlowConnectionConfigs connection_configs = 1;
bool create_catalog_entry = 2;
}

message CreateCDCFlowResponse {
Expand Down
1 change: 1 addition & 0 deletions ui/app/api/mirrors/cdc/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export async function POST(request: Request) {
const flowServiceClient = GetFlowServiceClientFromEnv();
const req: CreateCDCFlowRequest = {
connectionConfigs: config,
createCatalogEntry: true
};
const createStatus: CreateCDCFlowResponse =
await flowServiceClient.createCdcFlow(req);
Expand Down
18 changes: 17 additions & 1 deletion ui/grpc_generated/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export function createPeerStatusToJSON(object: CreatePeerStatus): string {

export interface CreateCDCFlowRequest {
connectionConfigs: FlowConnectionConfigs | undefined;
createCatalogEntry: boolean;
}

export interface CreateCDCFlowResponse {
Expand Down Expand Up @@ -149,14 +150,17 @@ export interface CreatePeerResponse {
}

function createBaseCreateCDCFlowRequest(): CreateCDCFlowRequest {
return { connectionConfigs: undefined };
return { connectionConfigs: undefined, createCatalogEntry: false };
}

export const CreateCDCFlowRequest = {
encode(message: CreateCDCFlowRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.connectionConfigs !== undefined) {
FlowConnectionConfigs.encode(message.connectionConfigs, writer.uint32(10).fork()).ldelim();
}
if (message.createCatalogEntry === true) {
writer.uint32(16).bool(message.createCatalogEntry);
}
return writer;
},

Expand All @@ -174,6 +178,13 @@ export const CreateCDCFlowRequest = {

message.connectionConfigs = FlowConnectionConfigs.decode(reader, reader.uint32());
continue;
case 2:
if (tag !== 16) {
break;
}

message.createCatalogEntry = reader.bool();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -188,6 +199,7 @@ export const CreateCDCFlowRequest = {
connectionConfigs: isSet(object.connectionConfigs)
? FlowConnectionConfigs.fromJSON(object.connectionConfigs)
: undefined,
createCatalogEntry: isSet(object.createCatalogEntry) ? Boolean(object.createCatalogEntry) : false,
};
},

Expand All @@ -196,6 +208,9 @@ export const CreateCDCFlowRequest = {
if (message.connectionConfigs !== undefined) {
obj.connectionConfigs = FlowConnectionConfigs.toJSON(message.connectionConfigs);
}
if (message.createCatalogEntry === true) {
obj.createCatalogEntry = message.createCatalogEntry;
}
return obj;
},

Expand All @@ -207,6 +222,7 @@ export const CreateCDCFlowRequest = {
message.connectionConfigs = (object.connectionConfigs !== undefined && object.connectionConfigs !== null)
? FlowConnectionConfigs.fromPartial(object.connectionConfigs)
: undefined;
message.createCatalogEntry = object.createCatalogEntry ?? false;
return message;
},
};
Expand Down

0 comments on commit bf2be46

Please sign in to comment.