Skip to content

Commit

Permalink
Merge branch 'main' into update-jose2go
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 22, 2023
2 parents e09f669 + c478a37 commit f3bfe16
Show file tree
Hide file tree
Showing 24 changed files with 986 additions and 643 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 2400s
gotestsum --format testname -- -p 8 ./... -timeout 1200s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (a *FlowableActivity) CreateNormalizedTable(
return setupNormalizedTablesOutput, nil
}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
Expand Down
9 changes: 4 additions & 5 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,16 +500,15 @@ func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) {
}

// SyncRecords pushes records to the destination.
// currently only supports inserts,updates and deletes
// more record types will be added in the future.
// Currently only supports inserts, updates, and deletes.
// More record types will be added in the future.
func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

c.logger.Info(fmt.Sprintf("pushing records to %s.%s...", c.datasetID, rawTableName))

// generate a sequential number for the last synced batch
// this sequence will be used to keep track of records that are normalized
// in the NormalizeFlowWorkflow
// generate a sequential number for last synced batch this sequence will be
// used to keep track of records that are normalized in NormalizeFlowWorkflow
syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName)
if err != nil {
return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) {
$$ language sql;
CREATE OR REPLACE FUNCTION random_bytea(bytea_length integer)
RETURNS bytea AS $body$
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0') ,''), 'hex')
SELECT decode(string_agg(lpad(to_hex(width_bucket(random(), 0, 1, 256)-1),2,'0'), ''), 'hex')
FROM generate_series(1, $1);
$body$
LANGUAGE 'sql'
Expand Down
1,153 changes: 582 additions & 571 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ func CDCFlowWorkflowWithConfig(

state.SnapshotComplete = true
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")
// if initial_copy_only is opted for, we end the flow here.
if cfg.InitialCopyOnly {
return nil, nil
}
}

syncFlowOptions := &protos.SyncFlowOptions{
Expand Down
66 changes: 66 additions & 0 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package peerflow

import (
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)

type NormalizeFlowState struct {
CDCFlowName string
Progress []string
}

type NormalizeFlowExecution struct {
NormalizeFlowState
executionID string
logger log.Logger
}

func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution {
return &NormalizeFlowExecution{
NormalizeFlowState: *state,
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
}

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
})

return s.executeNormalizeFlow(ctx, config)
}

func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return nil, fmt.Errorf("failed to flow: %w", err)
}

return normalizeResponse, nil
}
55 changes: 0 additions & 55 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@ type SyncFlowExecution struct {
logger log.Logger
}

type NormalizeFlowState struct {
CDCFlowName string
Progress []string
}

type NormalizeFlowExecution struct {
NormalizeFlowState
executionID string
logger log.Logger
}

// NewSyncFlowExecution creates a new instance of SyncFlowExecution.
func NewSyncFlowExecution(ctx workflow.Context, state *SyncFlowState) *SyncFlowExecution {
return &SyncFlowExecution{
Expand All @@ -41,14 +30,6 @@ func NewSyncFlowExecution(ctx workflow.Context, state *SyncFlowState) *SyncFlowE
}
}

func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution {
return &NormalizeFlowExecution{
NormalizeFlowState: *state,
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
}

// executeSyncFlow executes the sync flow.
func (s *SyncFlowExecution) executeSyncFlow(
ctx workflow.Context,
Expand Down Expand Up @@ -131,39 +112,3 @@ func SyncFlowWorkflow(ctx workflow.Context,

return s.executeSyncFlow(ctx, config, options, options.RelationMessageMapping)
}

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
})

return s.executeNormalizeFlow(ctx, config)
}

func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
HeartbeatTimeout: 5 * time.Minute,
})

// execute StartFlow on the peers to start the flow
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

var normalizeResponse *model.NormalizeResponse
if err := fStartNormalize.Get(normalizeFlowCtx, &normalizeResponse); err != nil {
return nil, fmt.Errorf("failed to flow: %w", err)
}

return normalizeResponse, nil
}
10 changes: 10 additions & 0 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
_ => None,
};

let initial_copy_only = match raw_options.remove("initial_copy_only") {
Some(sqlparser::ast::Value::Boolean(b)) => *b,
_ => false,
};

let flow_job = FlowJob {
name: cdc.mirror_name.to_string().to_lowercase(),
source_peer: cdc.source_peer.to_string().to_lowercase(),
Expand All @@ -327,8 +332,13 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
resync,
soft_delete_col_name,
synced_at_col_name,
initial_copy_only,
};

if initial_copy_only && !do_initial_copy {
anyhow::bail!("initial_copy_only is set to true, but do_initial_copy is set to false");
}

Ok(Some(PeerDDL::CreateMirrorForCDC {
if_not_exists: *if_not_exists,
flow_job,
Expand Down
1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl FlowGrpcClient {
resync: job.resync,
soft_delete_col_name: job.soft_delete_col_name.clone().unwrap_or_default(),
synced_at_col_name: job.synced_at_col_name.clone().unwrap_or_default(),
initial_copy_only: job.initial_copy_only,
..Default::default()
};

Expand Down
1 change: 1 addition & 0 deletions nexus/pt/src/flow_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct FlowJob {
pub resync: bool,
pub soft_delete_col_name: Option<String>,
pub synced_at_col_name: Option<String>,
pub initial_copy_only: bool,
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub struct FlowConnectionConfigs {
pub soft_delete_col_name: ::prost::alloc::string::String,
#[prost(string, tag="25")]
pub synced_at_col_name: ::prost::alloc::string::String,
#[prost(bool, tag="26")]
pub initial_copy_only: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,9 @@ impl serde::Serialize for FlowConnectionConfigs {
if !self.synced_at_col_name.is_empty() {
len += 1;
}
if self.initial_copy_only {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.FlowConnectionConfigs", len)?;
if let Some(v) = self.source.as_ref() {
struct_ser.serialize_field("source", v)?;
Expand Down Expand Up @@ -1309,6 +1312,9 @@ impl serde::Serialize for FlowConnectionConfigs {
if !self.synced_at_col_name.is_empty() {
struct_ser.serialize_field("syncedAtColName", &self.synced_at_col_name)?;
}
if self.initial_copy_only {
struct_ser.serialize_field("initialCopyOnly", &self.initial_copy_only)?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -1366,6 +1372,8 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
"softDeleteColName",
"synced_at_col_name",
"syncedAtColName",
"initial_copy_only",
"initialCopyOnly",
];

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -1395,6 +1403,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
Resync,
SoftDeleteColName,
SyncedAtColName,
InitialCopyOnly,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -1442,6 +1451,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
"resync" => Ok(GeneratedField::Resync),
"softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName),
"syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName),
"initialCopyOnly" | "initial_copy_only" => Ok(GeneratedField::InitialCopyOnly),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand Down Expand Up @@ -1486,6 +1496,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
let mut resync__ = None;
let mut soft_delete_col_name__ = None;
let mut synced_at_col_name__ = None;
let mut initial_copy_only__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Source => {
Expand Down Expand Up @@ -1655,6 +1666,12 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
}
synced_at_col_name__ = Some(map.next_value()?);
}
GeneratedField::InitialCopyOnly => {
if initial_copy_only__.is_some() {
return Err(serde::de::Error::duplicate_field("initialCopyOnly"));
}
initial_copy_only__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand Down Expand Up @@ -1686,6 +1703,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
resync: resync__.unwrap_or_default(),
soft_delete_col_name: soft_delete_col_name__.unwrap_or_default(),
synced_at_col_name: synced_at_col_name__.unwrap_or_default(),
initial_copy_only: initial_copy_only__.unwrap_or_default(),
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ message FlowConnectionConfigs {

string soft_delete_col_name = 24;
string synced_at_col_name = 25;

bool initial_copy_only = 26;
}

message RenameTableOption {
Expand Down
21 changes: 21 additions & 0 deletions ui/app/api/mirrors/alerts/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import prisma from '@/app/utils/prisma';

export const dynamic = 'force-dynamic';

export async function POST(request: Request) {
const { flowName } = await request.json();
const errCount = await prisma.flow_errors.count({
where: {
flow_name: flowName,
error_type: 'error',
ack: false,
},
});
let mirrorStatus: 'healthy' | 'failed';
if (errCount > 0) {
mirrorStatus = 'failed';
} else {
mirrorStatus = 'healthy';
}
return new Response(JSON.stringify(mirrorStatus));
}
9 changes: 9 additions & 0 deletions ui/app/dto/MirrorsDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,12 @@ export type SyncStatusRow = {
endTime: Date | null;
numRows: number;
};

export type AlertErr = {
id: bigint;
flow_name: string;
error_message: string;
error_type: string;
error_timestamp: Date;
ack: boolean;
};
9 changes: 9 additions & 0 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ export const handleCreateCDC = async (
config.cdcSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
config.snapshotSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
}

if (config.doInitialCopy == false && config.initialCopyOnly == true) {
setMsg({
ok: false,
msg: 'Initial Copy Only cannot be true if Initial Copy is false.',
});
return;
}

setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch('/api/mirrors/cdc', {
method: 'POST',
Expand Down
11 changes: 11 additions & 0 deletions ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,15 @@ export const cdcSettings: MirrorSetting[] = [
default: 'SQL',
type: 'switch',
},
{
label: 'Initial Copy Only',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
initialCopyOnly: (value as boolean) || false,
})),
tips: 'If set, PeerDB will only perform initial load and will not perform CDC sync.',
type: 'switch',
advanced: true,
},
];
Loading

0 comments on commit f3bfe16

Please sign in to comment.