Skip to content

Commit

Permalink
Merge branch 'main' into use-prebuilt-artifacts-for-docker-compose
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 18, 2023
2 parents eaa0a8a + 47078b8 commit de1109c
Show file tree
Hide file tree
Showing 26 changed files with 1,143 additions and 239 deletions.
6 changes: 3 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
) (*protos.QRepParitionResult, error) {
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
return nil, fmt.Errorf("failed to get qrep pull connector: %w", err)
}
defer connectors.CloseConnector(srcConn)

Expand Down Expand Up @@ -483,13 +483,13 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(srcConn)

dstConn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
return fmt.Errorf("failed to get qrep destination connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

Expand Down
43 changes: 40 additions & 3 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
return strings.Join(tableIdentifierParts, ".")
}

func (h *FlowRequestHandler) createFlowJobEntry(ctx context.Context,
func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
if srcErr != nil {
Expand Down Expand Up @@ -82,6 +82,37 @@ func (h *FlowRequestHandler) createFlowJobEntry(ctx context.Context,
return nil
}

func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context,
req *protos.CreateQRepFlowRequest, workflowID string) error {
sourcePeerName := req.QrepConfig.SourcePeer.Name
sourcePeerID, _, srcErr := h.getPeerID(ctx, sourcePeerName)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
sourcePeerName, srcErr)
}

destinationPeerName := req.QrepConfig.DestinationPeer.Name
destinationPeerID, _, dstErr := h.getPeerID(ctx, destinationPeerName)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
destinationPeerName, srcErr)
}
flowName := req.QrepConfig.FlowJobName
_, err := h.pool.Exec(ctx, `INSERT INTO flows (workflow_id,name, source_peer, destination_peer, description,
destination_table_identifier, query_string) VALUES ($1, $2, $3, $4, $5, $6, $7)
`, workflowID, flowName, sourcePeerID, destinationPeerID,
"Mirror created via GRPC",
req.QrepConfig.DestinationTableIdentifier,
req.QrepConfig.Query,
)
if err != nil {
return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w",
flowName, req.QrepConfig.WatermarkTable, err)
}

return nil
}

// Close closes the connection pool
func (h *FlowRequestHandler) Close() {
if h.pool != nil {
Expand Down Expand Up @@ -111,7 +142,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}

if req.CreateCatalogEntry {
err := h.createFlowJobEntry(ctx, req, workflowID)
err := h.createCdcJobEntry(ctx, req, workflowID)
if err != nil {
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
Expand Down Expand Up @@ -170,12 +201,18 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

cfg := req.QrepConfig
log.Infof("Config for QRepFlow: %+v", cfg)
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: shared.PeerFlowTaskQueue,
}

if req.CreateCatalogEntry {
err := h.createQrepJobEntry(ctx, req, workflowID)
if err != nil {
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}
numPartitionsProcessed := 0
_, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
Expand Down
357 changes: 185 additions & 172 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

85 changes: 85 additions & 0 deletions flow/generated/protos/route.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl FlowGrpcClient {
) -> anyhow::Result<String> {
let create_qrep_flow_req = pt::peerdb_route::CreateQRepFlowRequest {
qrep_config: Some(qrep_config.clone()),
create_catalog_entry:false
};
let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?;
let workflow_id = response.into_inner().worflow_id;
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub struct CreateCdcFlowResponse {
pub struct CreateQRepFlowRequest {
#[prost(message, optional, tag="1")]
pub qrep_config: ::core::option::Option<super::peerdb_flow::QRepConfig>,
#[prost(bool, tag="2")]
pub create_catalog_entry: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_route.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,10 +810,16 @@ impl serde::Serialize for CreateQRepFlowRequest {
if self.qrep_config.is_some() {
len += 1;
}
if self.create_catalog_entry {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_route.CreateQRepFlowRequest", len)?;
if let Some(v) = self.qrep_config.as_ref() {
struct_ser.serialize_field("qrepConfig", v)?;
}
if self.create_catalog_entry {
struct_ser.serialize_field("createCatalogEntry", &self.create_catalog_entry)?;
}
struct_ser.end()
}
}
Expand All @@ -826,11 +832,14 @@ impl<'de> serde::Deserialize<'de> for CreateQRepFlowRequest {
const FIELDS: &[&str] = &[
"qrep_config",
"qrepConfig",
"create_catalog_entry",
"createCatalogEntry",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
QrepConfig,
CreateCatalogEntry,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -854,6 +863,7 @@ impl<'de> serde::Deserialize<'de> for CreateQRepFlowRequest {
{
match value {
"qrepConfig" | "qrep_config" => Ok(GeneratedField::QrepConfig),
"createCatalogEntry" | "create_catalog_entry" => Ok(GeneratedField::CreateCatalogEntry),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -874,6 +884,7 @@ impl<'de> serde::Deserialize<'de> for CreateQRepFlowRequest {
V: serde::de::MapAccess<'de>,
{
let mut qrep_config__ = None;
let mut create_catalog_entry__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::QrepConfig => {
Expand All @@ -882,13 +893,20 @@ impl<'de> serde::Deserialize<'de> for CreateQRepFlowRequest {
}
qrep_config__ = map.next_value()?;
}
GeneratedField::CreateCatalogEntry => {
if create_catalog_entry__.is_some() {
return Err(serde::de::Error::duplicate_field("createCatalogEntry"));
}
create_catalog_entry__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
}
}
Ok(CreateQRepFlowRequest {
qrep_config: qrep_config__,
create_catalog_entry: create_catalog_entry__.unwrap_or_default(),
})
}
}
Expand Down
8 changes: 7 additions & 1 deletion protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message CreateCDCFlowResponse {

message CreateQRepFlowRequest {
peerdb_flow.QRepConfig qrep_config = 1;
bool create_catalog_entry = 2;
}

message CreateQRepFlowResponse {
Expand Down Expand Up @@ -131,7 +132,12 @@ service FlowService {
body: "*"
};
}
rpc CreateQRepFlow(CreateQRepFlowRequest) returns (CreateQRepFlowResponse) {}
rpc CreateQRepFlow(CreateQRepFlowRequest) returns (CreateQRepFlowResponse) {
option (google.api.http) = {
post: "/v1/flows/qrep/create",
body: "*"
};
}
rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) {}
rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) {
option (google.api.http) = { get: "/v1/mirrors/{flow_job_name}" };
Expand Down
2 changes: 1 addition & 1 deletion ui/app/api/mirrors/cdc/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export async function POST(request: Request) {
createCatalogEntry: true,
};
const createStatus: CreateCDCFlowResponse = await fetch(
`${flowServiceAddr}/v1/cdc/create`,
`${flowServiceAddr}/v1/flows/cdc/create`,
{
method: 'POST',
body: JSON.stringify(req),
Expand Down
31 changes: 31 additions & 0 deletions ui/app/api/mirrors/qrep/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { UCreateMirrorResponse } from '@/app/dto/MirrorsDTO';
import {
CreateQRepFlowRequest,
CreateQRepFlowResponse,
} from '@/grpc_generated/route';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { config } = body;
console.log('/qrep/post config:', config);
const flowServiceAddr = GetFlowHttpAddressFromEnv();
const req: CreateQRepFlowRequest = {
qrepConfig: config,
createCatalogEntry: true,
};
const createStatus: CreateQRepFlowResponse = await fetch(
`${flowServiceAddr}/v1/flows/qrep/create`,
{
method: 'POST',
body: JSON.stringify(req),
}
).then((res) => {
return res.json();
});
let response: UCreateMirrorResponse = {
created: !!createStatus.worflowId,
};

return new Response(JSON.stringify(response));
}
Loading

0 comments on commit de1109c

Please sign in to comment.