From c2dc800111a264ebd967ece1e2d5085a2e90be42 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 5 Oct 2023 01:37:08 +0530 Subject: [PATCH] Fix UUID mapping, Rust bugs --- flow/connectors/sql/query_executor.go | 17 +++++++++++++++++ nexus/catalog/src/lib.rs | 9 ++++++++- nexus/flow-rs/src/grpc.rs | 13 +++++-------- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index b3e5803a32..5d4c154cbb 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -10,6 +10,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/google/uuid" "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" "go.temporal.io/sdk/activity" @@ -198,6 +199,8 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa case qvalue.QValueKindNumeric: var s sql.NullString values[i] = &s + case qvalue.QValueKindUUID: + values[i] = new([]byte) default: values[i] = new(interface{}) } @@ -373,6 +376,20 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { return qvalue.QValue{Kind: kind, Value: *v}, nil } + case qvalue.QValueKindUUID: + if v, ok := val.(*[]byte); ok && v != nil { + // convert byte array to string + uuidVal, err := uuid.FromBytes(*v) + if err != nil { + return qvalue.QValue{}, fmt.Errorf("failed to parse uuid: %v", *v) + } + return qvalue.QValue{Kind: qvalue.QValueKindString, Value: uuidVal.String()}, nil + } + + if v, ok := val.(*[16]byte); ok && v != nil { + return qvalue.QValue{Kind: qvalue.QValueKindString, Value: *v}, nil + } + case qvalue.QValueKindJSON: vraw := val.(*interface{}) vstring := (*vraw).(string) diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index dbce51dfaf..cd20d86452 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -527,7 +527,14 @@ impl Catalog { } let first_row = rows.get(0).unwrap(); - let workflow_id: String = first_row.get(0); + let workflow_id: Option = first_row.get(0); + if workflow_id.is_none() { + return Err(anyhow!( + "workflow id not found for existing flow job {}", + flow_job_name + )); + } + let workflow_id = workflow_id.unwrap(); let source_peer_id: i32 = first_row.get(1); let destination_peer_id: i32 = first_row.get(2); diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index d2df4c7b7e..9c5d4a14e7 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -241,13 +241,6 @@ impl FlowGrpcClient { } "append" => cfg.write_mode = Some(wm), "overwrite" => { - if !cfg.initial_copy_only { - return anyhow::Result::Err( - anyhow::anyhow!( - "write mode overwrite can only be set with initial_copy_only = true" - ) - ); - } wm.write_type = QRepWriteType::QrepWriteModeOverwrite as i32; cfg.write_mode = Some(wm); } @@ -297,7 +290,11 @@ impl FlowGrpcClient { } } } - + if !cfg.initial_copy_only { + return anyhow::Result::Err(anyhow::anyhow!( + "write mode overwrite can only be set with initial_copy_only = true" + )); + } self.start_query_replication_flow(&cfg).await }