Skip to content

Commit

Permalink
Fix UUID mapping, Rust bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 4, 2023
1 parent ed3f0d2 commit c2dc800
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
17 changes: 17 additions & 0 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -198,6 +199,8 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
case qvalue.QValueKindNumeric:
var s sql.NullString
values[i] = &s
case qvalue.QValueKindUUID:
values[i] = new([]byte)
default:
values[i] = new(interface{})
}
Expand Down Expand Up @@ -373,6 +376,20 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
return qvalue.QValue{Kind: kind, Value: *v}, nil
}

case qvalue.QValueKindUUID:
if v, ok := val.(*[]byte); ok && v != nil {
// convert byte array to string
uuidVal, err := uuid.FromBytes(*v)
if err != nil {
return qvalue.QValue{}, fmt.Errorf("failed to parse uuid: %v", *v)
}
return qvalue.QValue{Kind: qvalue.QValueKindString, Value: uuidVal.String()}, nil
}

if v, ok := val.(*[16]byte); ok && v != nil {
return qvalue.QValue{Kind: qvalue.QValueKindString, Value: *v}, nil
}

case qvalue.QValueKindJSON:
vraw := val.(*interface{})
vstring := (*vraw).(string)
Expand Down
9 changes: 8 additions & 1 deletion nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,14 @@ impl Catalog {
}

let first_row = rows.get(0).unwrap();
let workflow_id: String = first_row.get(0);
let workflow_id: Option<String> = first_row.get(0);
if workflow_id.is_none() {
return Err(anyhow!(
"workflow id not found for existing flow job {}",
flow_job_name
));
}
let workflow_id = workflow_id.unwrap();
let source_peer_id: i32 = first_row.get(1);
let destination_peer_id: i32 = first_row.get(2);

Expand Down
13 changes: 5 additions & 8 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,6 @@ impl FlowGrpcClient {
}
"append" => cfg.write_mode = Some(wm),
"overwrite" => {
if !cfg.initial_copy_only {
return anyhow::Result::Err(
anyhow::anyhow!(
"write mode overwrite can only be set with initial_copy_only = true"
)
);
}
wm.write_type = QRepWriteType::QrepWriteModeOverwrite as i32;
cfg.write_mode = Some(wm);
}
Expand Down Expand Up @@ -297,7 +290,11 @@ impl FlowGrpcClient {
}
}
}

if !cfg.initial_copy_only {
return anyhow::Result::Err(anyhow::anyhow!(
"write mode overwrite can only be set with initial_copy_only = true"
));
}
self.start_query_replication_flow(&cfg).await
}

Expand Down

0 comments on commit c2dc800

Please sign in to comment.