Skip to content

Commit

Permalink
Merge branch 'main' into move-nil-check-earlier
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 17, 2023
2 parents 85a4c4b + b413367 commit 9b0e3a2
Show file tree
Hide file tree
Showing 14 changed files with 2,020 additions and 642 deletions.
26 changes: 20 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,13 +671,27 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos.
return fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(srcConn)

err = srcConn.SendWALHeartbeat()
if err != nil {
return fmt.Errorf("failed to send WAL heartbeat: %w", err)
log.WithFields(log.Fields{"flowName": config.FlowJobName}).Info("sending walheartbeat every 10 minutes")
ticker := time.NewTicker(10 * time.Minute)
for {
select {
case <-ctx.Done():
log.WithFields(
log.Fields{
"flowName": config.FlowJobName,
}).Info("context is done, exiting wal heartbeat send loop")
return nil
case <-ticker.C:
err = srcConn.SendWALHeartbeat()
if err != nil {
return fmt.Errorf("failed to send WAL heartbeat: %w", err)
}
log.WithFields(
log.Fields{
"flowName": config.FlowJobName,
}).Info("sent wal heartbeat")
}
}

return nil
}

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
Expand Down
27 changes: 20 additions & 7 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
Expand All @@ -23,18 +24,30 @@ type PostgresMetadataStore struct {

func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string) (*PostgresMetadataStore, error) {
connectionString := utils.GetPGConnectionString(pgConfig)
pool, err := pgxpool.New(ctx, connectionString)
if err != nil {
log.Errorf("failed to create connection pool: %v", err)
return nil, err
var storePool *pgxpool.Pool
var poolErr error
if pgConfig == nil {
storePool, poolErr = cc.GetCatalogConnectionPoolFromEnv()
if poolErr != nil {
return nil, fmt.Errorf("failed to create catalog connection pool: %v", poolErr)
}

log.Info("obtained catalog connection pool for metadata store")
} else {
connectionString := utils.GetPGConnectionString(pgConfig)
storePool, poolErr = pgxpool.New(ctx, connectionString)
if poolErr != nil {
log.Errorf("failed to create connection pool: %v", poolErr)
return nil, poolErr
}

log.Info("created connection pool for metadata store")
}
log.Info("created connection pool for metadata store")

return &PostgresMetadataStore{
ctx: ctx,
config: pgConfig,
pool: pool,
pool: storePool,
schemaName: schemaName,
}, nil
}
Expand Down
19 changes: 19 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,25 @@ func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*proto
}
}()

if req.SyncedAtColName != nil {
for _, renameRequest := range req.RenameTableOptions {
resyncTblName := renameRequest.CurrentName

log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("setting synced at column for table '%s'...", resyncTblName)

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("setting synced at column for table '%s'...",
resyncTblName))

_, err = renameTablesTx.ExecContext(c.ctx,
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP", resyncTblName, *req.SyncedAtColName))
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w", resyncTblName, err)
}
}
}

if req.SoftDeleteColName != nil {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
Expand Down
1,002 changes: 507 additions & 495 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

25 changes: 9 additions & 16 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,6 @@ func (s *CDCFlowWorkflowState) TruncateProgress() {
}
}

func (s *CDCFlowWorkflowState) SendWALHeartbeat(ctx workflow.Context, cfg *protos.FlowConnectionConfigs) error {
walHeartbeatCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})

if err := workflow.ExecuteActivity(walHeartbeatCtx, flowable.SendWALHeartbeat, cfg).Get(ctx, nil); err != nil {
return fmt.Errorf("failed to send WAL heartbeat: %w", err)
}

return nil
}

// CDCFlowWorkflowExecution represents the state for execution of a peer flow.
type CDCFlowWorkflowExecution struct {
flowExecutionID string
Expand Down Expand Up @@ -247,6 +235,7 @@ func CDCFlowWorkflowWithConfig(
if cfg.SoftDelete {
renameOpts.SoftDeleteColName = &cfg.SoftDeleteColName
}
renameOpts.SyncedAtColName = &cfg.SyncedAtColName
correctedTableNameSchemaMapping := make(map[string]*protos.TableSchema)
for _, mapping := range cfg.TableMappings {
oldName := mapping.DestinationTableIdentifier
Expand Down Expand Up @@ -277,6 +266,12 @@ func CDCFlowWorkflowWithConfig(
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")
}

heartbeatCancelCtx, cancelHeartbeat := workflow.WithCancel(ctx)
walHeartbeatCtx := workflow.WithActivityOptions(heartbeatCancelCtx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
})
workflow.ExecuteActivity(walHeartbeatCtx, flowable.SendWALHeartbeat, cfg)

syncFlowOptions := &protos.SyncFlowOptions{
BatchSize: int32(limits.MaxBatchSize),
}
Expand Down Expand Up @@ -419,10 +414,8 @@ func CDCFlowWorkflowWithConfig(
selector.Select(ctx)
}

// send WAL heartbeat
if err := state.SendWALHeartbeat(ctx, cfg); err != nil {
return state, err
}
// cancel the SendWalHeartbeat activity
defer cancelHeartbeat()

state.TruncateProgress()
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ pub struct RenameTablesInput {
pub rename_table_options: ::prost::alloc::vec::Vec<RenameTableOption>,
#[prost(string, optional, tag="4")]
pub soft_delete_col_name: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag="5")]
pub synced_at_col_name: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4146,6 +4146,9 @@ impl serde::Serialize for RenameTablesInput {
if self.soft_delete_col_name.is_some() {
len += 1;
}
if self.synced_at_col_name.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.RenameTablesInput", len)?;
if !self.flow_job_name.is_empty() {
struct_ser.serialize_field("flowJobName", &self.flow_job_name)?;
Expand All @@ -4159,6 +4162,9 @@ impl serde::Serialize for RenameTablesInput {
if let Some(v) = self.soft_delete_col_name.as_ref() {
struct_ser.serialize_field("softDeleteColName", v)?;
}
if let Some(v) = self.synced_at_col_name.as_ref() {
struct_ser.serialize_field("syncedAtColName", v)?;
}
struct_ser.end()
}
}
Expand All @@ -4176,6 +4182,8 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
"renameTableOptions",
"soft_delete_col_name",
"softDeleteColName",
"synced_at_col_name",
"syncedAtColName",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -4184,6 +4192,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
Peer,
RenameTableOptions,
SoftDeleteColName,
SyncedAtColName,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -4210,6 +4219,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
"peer" => Ok(GeneratedField::Peer),
"renameTableOptions" | "rename_table_options" => Ok(GeneratedField::RenameTableOptions),
"softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName),
"syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -4233,6 +4243,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
let mut peer__ = None;
let mut rename_table_options__ = None;
let mut soft_delete_col_name__ = None;
let mut synced_at_col_name__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::FlowJobName => {
Expand All @@ -4259,6 +4270,12 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
}
soft_delete_col_name__ = map.next_value()?;
}
GeneratedField::SyncedAtColName => {
if synced_at_col_name__.is_some() {
return Err(serde::de::Error::duplicate_field("syncedAtColName"));
}
synced_at_col_name__ = map.next_value()?;
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -4269,6 +4286,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
peer: peer__,
rename_table_options: rename_table_options__.unwrap_or_default(),
soft_delete_col_name: soft_delete_col_name__,
synced_at_col_name: synced_at_col_name__,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ message RenameTablesInput {
peerdb_peers.Peer peer = 2;
repeated RenameTableOption rename_table_options = 3;
optional string soft_delete_col_name = 4;
optional string synced_at_col_name = 5;
}

message RenameTablesOutput {
Expand Down
20 changes: 12 additions & 8 deletions ui/app/peers/create/[peerType]/helpers/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,23 @@ export const s3Setting: PeerSetting[] = [
},
];

export const blankMetadata = {
host: '',
port: 5432,
user: 'postgres',
password: '',
database: 'postgres',
transactionSnapshot: '',
};

export const blankS3Setting: S3Config = {
url: 's3://<bucket_name>/<prefix_name>',
accessKeyId: undefined,
secretAccessKey: undefined,
roleArn: undefined,
region: undefined,
endpoint: '',
metadataDb: {
host: '',
port: 5432,
user: 'postgres',
password: '',
database: 'postgres',
transactionSnapshot: '',
},
// For Storage peers created in UI
// we use catalog as the metadata DB
metadataDb: blankMetadata,
};
2 changes: 1 addition & 1 deletion ui/app/peers/create/[peerType]/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,5 @@ export const s3Schema = z.object({
invalid_type_error: 'Endpoint must be a string',
})
.optional(),
metadataDb: pgSchema,
metadataDb: pgSchema.optional(),
});
Loading

0 comments on commit 9b0e3a2

Please sign in to comment.