Skip to content

Commit

Permalink
Merge branch 'main' into schema-changes-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 5, 2023
2 parents c456fd7 + f0cc278 commit 44fa3fb
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 43 deletions.
72 changes: 63 additions & 9 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
}

func (h *FlowRequestHandler) ShutdownFlow(
ctx context.Context, req *protos.ShutdownRequest) (*protos.ShutdownResponse, error) {
ctx context.Context,
req *protos.ShutdownRequest,
) (*protos.ShutdownResponse, error) {
err := h.temporalClient.SignalWorkflow(
ctx,
req.WorkflowId,
Expand Down Expand Up @@ -142,8 +144,24 @@ func (h *FlowRequestHandler) ShutdownFlow(
return nil, fmt.Errorf("unable to start DropFlow workflow: %w", err)
}

if err = dropFlowHandle.Get(ctx, nil); err != nil {
return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

errChan := make(chan error, 1)
go func() {
errChan <- dropFlowHandle.Get(cancelCtx, nil)
}()

select {
case err := <-errChan:
if err != nil {
return nil, fmt.Errorf("DropFlow workflow did not execute successfully: %w", err)
}
case <-time.After(1 * time.Minute):
err := h.handleWorkflowNotClosed(ctx, workflowID, "")
if err != nil {
return nil, fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err)
}
}

return &protos.ShutdownResponse{
Expand All @@ -153,9 +171,9 @@ func (h *FlowRequestHandler) ShutdownFlow(

func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowID string) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 5 * time.Second
expBackoff.MaxInterval = 30 * time.Second
expBackoff.MaxElapsedTime = 5 * time.Minute
expBackoff.InitialInterval = 3 * time.Second
expBackoff.MaxInterval = 10 * time.Second
expBackoff.MaxElapsedTime = 1 * time.Minute

// empty will terminate the latest run
runID := ""
Expand All @@ -176,10 +194,39 @@ func (h *FlowRequestHandler) waitForWorkflowClose(ctx context.Context, workflowI

err := backoff.Retry(operation, expBackoff)
if err != nil {
// terminate workflow if it is still running
reason := "PeerFlow workflow did not close in time"
err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, reason)
return h.handleWorkflowNotClosed(ctx, workflowID, runID)
}

return nil
}

func (h *FlowRequestHandler) handleWorkflowNotClosed(ctx context.Context, workflowID, runID string) error {
errChan := make(chan error, 1)

// Create a new context with timeout for CancelWorkflow
ctxWithTimeout, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()

// Call CancelWorkflow in a goroutine
go func() {
err := h.temporalClient.CancelWorkflow(ctxWithTimeout, workflowID, runID)
errChan <- err
}()

select {
case err := <-errChan:
if err != nil {
log.Errorf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error())
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
if err = h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
}
}
case <-time.After(1 * time.Minute):
// If 1 minute has passed and we haven't received an error, terminate the workflow
log.Errorf("Timeout reached while trying to cancel PeerFlow workflow. Attempting to terminate.")
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
}
}
Expand Down Expand Up @@ -362,6 +409,13 @@ func (h *FlowRequestHandler) CreatePeer(
}
sfConfig := sfConfigObject.SnowflakeConfig
encodedConfig, encodingErr = proto.Marshal(sfConfig)
case protos.DBType_SQLSERVER:
sqlServerConfigObject, ok := config.(*protos.Peer_SqlserverConfig)
if !ok {
return wrongConfigResponse, nil
}
sqlServerConfig := sqlServerConfigObject.SqlserverConfig
encodedConfig, encodingErr = proto.Marshal(sqlServerConfig)

default:
return wrongConfigResponse, nil
Expand Down
9 changes: 7 additions & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,17 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)

case protos.DBType_SQLSERVER:
sqlServerConfig := peer.GetSqlserverConfig()
if sqlServerConfig == nil {
return nil, fmt.Errorf("missing sqlserver config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsqlserver.NewSQLServerConnector(ctx, sqlServerConfig)
// case protos.DBType_S3:
// return conns3.NewS3Connector(ctx, config.GetS3Config())
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
// case protos.DBType_SQLSERVER:
// return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
default:
return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String())
}
Expand Down
25 changes: 18 additions & 7 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=?"
isDeletedColumnName = "_PEERDB_IS_DELETED"
checkSchemaExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME=?"

syncRecordsChunkSize = 1024
)
Expand Down Expand Up @@ -1005,16 +1006,26 @@ func (c *SnowflakeConnector) SyncFlowCleanup(jobName string) error {
}
}()

_, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema,
getRawTableIdentifier(jobName)))
row := syncFlowCleanupTx.QueryRowContext(c.ctx, checkSchemaExistsSQL, peerDBInternalSchema)
var schemaExists bool
err = row.Scan(&schemaExists)
if err != nil {
return fmt.Errorf("unable to drop raw table: %w", err)
return fmt.Errorf("unable to check if internal schema exists: %w", err)
}
_, err = syncFlowCleanupTx.ExecContext(c.ctx,
fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return fmt.Errorf("unable to delete job metadata: %w", err)

if schemaExists {
_, err = syncFlowCleanupTx.ExecContext(c.ctx, fmt.Sprintf(dropTableIfExistsSQL, peerDBInternalSchema,
getRawTableIdentifier(jobName)))
if err != nil {
return fmt.Errorf("unable to drop raw table: %w", err)
}
_, err = syncFlowCleanupTx.ExecContext(c.ctx,
fmt.Sprintf(deleteJobMetadataSQL, peerDBInternalSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return fmt.Errorf("unable to delete job metadata: %w", err)
}
}

err = syncFlowCleanupTx.Commit()
if err != nil {
return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err)
Expand Down
17 changes: 17 additions & 0 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -198,6 +199,8 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
case qvalue.QValueKindNumeric:
var s sql.NullString
values[i] = &s
case qvalue.QValueKindUUID:
values[i] = new([]byte)
default:
values[i] = new(interface{})
}
Expand Down Expand Up @@ -373,6 +376,20 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
return qvalue.QValue{Kind: kind, Value: *v}, nil
}

case qvalue.QValueKindUUID:
if v, ok := val.(*[]byte); ok && v != nil {
// convert byte array to string
uuidVal, err := uuid.FromBytes(*v)
if err != nil {
return qvalue.QValue{}, fmt.Errorf("failed to parse uuid: %v", *v)
}
return qvalue.QValue{Kind: qvalue.QValueKindString, Value: uuidVal.String()}, nil
}

if v, ok := val.(*[16]byte); ok && v != nil {
return qvalue.QValue{Kind: qvalue.QValueKindString, Value: *v}, nil
}

case qvalue.QValueKindJSON:
vraw := val.(*interface{})
vstring := (*vraw).(string)
Expand Down
14 changes: 9 additions & 5 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -231,14 +232,17 @@ func QRepFlowWorkflow(

// register a signal handler to terminate the workflow
terminateWorkflow := false
signalChan := workflow.GetSignalChannel(ctx, "terminate")
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)

s := workflow.NewSelector(ctx)
s.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) {
var signal string
c.Receive(ctx, &signal)
logger.Info("Received signal to terminate workflow", "Signal", signal)
terminateWorkflow = true
var signalVal shared.CDCFlowSignal
c.Receive(ctx, &signalVal)
logger.Info("received signal", "signal", signalVal)
if signalVal == shared.ShutdownSignal {
logger.Info("received shutdown signal")
terminateWorkflow = true
}
})

// register a query to get the number of partitions processed
Expand Down
9 changes: 8 additions & 1 deletion nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,14 @@ impl Catalog {
}

let first_row = rows.get(0).unwrap();
let workflow_id: String = first_row.get(0);
let workflow_id: Option<String> = first_row.get(0);
if workflow_id.is_none() {
return Err(anyhow!(
"workflow id not found for existing flow job {}",
flow_job_name
));
}
let workflow_id = workflow_id.unwrap();
let source_peer_id: i32 = first_row.get(1);
let destination_peer_id: i32 = first_row.get(2);

Expand Down
36 changes: 28 additions & 8 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ use pt::{
use serde_json::Value;
use tonic_health::pb::health_client;

pub enum PeerValidationResult {
Valid,
Invalid(String),
}

pub struct FlowGrpcClient {
client: peerdb_route::flow_service_client::FlowServiceClient<tonic::transport::Channel>,
health_client: health_client::HealthClient<tonic::transport::Channel>,
Expand Down Expand Up @@ -82,6 +87,24 @@ impl FlowGrpcClient {
Ok(workflow_id)
}

pub async fn validate_peer(
&mut self,
validate_request: &pt::peerdb_route::ValidatePeerRequest,
) -> anyhow::Result<PeerValidationResult> {
let validate_peer_req = pt::peerdb_route::ValidatePeerRequest {
peer: validate_request.peer.clone(),
};
let response = self.client.validate_peer(validate_peer_req).await?;
let response_body = &response.into_inner();
let message = response_body.message.clone();
let status = response_body.status;
if status == pt::peerdb_route::ValidatePeerStatus::Valid as i32 {
Ok(PeerValidationResult::Valid)
} else {
Ok(PeerValidationResult::Invalid(message))
}
}

async fn start_peer_flow(
&mut self,
peer_flow_config: pt::peerdb_flow::FlowConnectionConfigs,
Expand Down Expand Up @@ -218,13 +241,6 @@ impl FlowGrpcClient {
}
"append" => cfg.write_mode = Some(wm),
"overwrite" => {
if !cfg.initial_copy_only {
return anyhow::Result::Err(
anyhow::anyhow!(
"write mode overwrite can only be set with initial_copy_only = true"
)
);
}
wm.write_type = QRepWriteType::QrepWriteModeOverwrite as i32;
cfg.write_mode = Some(wm);
}
Expand Down Expand Up @@ -274,7 +290,11 @@ impl FlowGrpcClient {
}
}
}

if !cfg.initial_copy_only {
return anyhow::Result::Err(anyhow::anyhow!(
"write mode overwrite can only be set with initial_copy_only = true"
));
}
self.start_query_replication_flow(&cfg).await
}

Expand Down
Loading

0 comments on commit 44fa3fb

Please sign in to comment.