Skip to content

Commit

Permalink
Datatypes: fix inet and cidr (#1412)
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Mar 1, 2024
1 parent 449b86f commit 904b513
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 8 deletions.
11 changes: 8 additions & 3 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"net/netip"
"strings"
"time"

Expand Down Expand Up @@ -299,20 +300,24 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
return qvalue.QValue{}, fmt.Errorf("failed to parse UUID: %v", value)
}
case qvalue.QValueKindINET:
switch value.(type) {
switch v := value.(type) {
case string:
val = qvalue.QValue{Kind: qvalue.QValueKindINET, Value: value}
case [16]byte:
val = qvalue.QValue{Kind: qvalue.QValueKindINET, Value: value}
case netip.Prefix:
val = qvalue.QValue{Kind: qvalue.QValueKindINET, Value: v.String()}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse INET: %v", value)
return qvalue.QValue{}, fmt.Errorf("failed to parse INET: %v", v)
}
case qvalue.QValueKindCIDR:
switch value.(type) {
switch v := value.(type) {
case string:
val = qvalue.QValue{Kind: qvalue.QValueKindCIDR, Value: value}
case [16]byte:
val = qvalue.QValue{Kind: qvalue.QValueKindCIDR, Value: value}
case netip.Prefix:
val = qvalue.QValue{Kind: qvalue.QValueKindCIDR, Value: v.String()}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse CIDR: %v", value)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s SELECT 2,2,b'1',b'101',
true,random_bytea(32),'s','test','1.1.10.2'::cidr,
CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1,
CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1,
'5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval,
'{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr,
1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
Expand Down
10 changes: 8 additions & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error {
"myreal REAL",
"myreal2 REAL",
"myreal3 REAL",
"myinet INET",
"mycidr CIDR",
}
tblFieldStr := strings.Join(tblFields, ",")
var pgErr *pgconn.PgError
Expand Down Expand Up @@ -351,7 +353,9 @@ func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCou
'NaN',
3.14159,
1,
1.0
1.0,
'10.0.0.0/32',
'1.1.10.2'::cidr
)`,
id, uuid.New().String(), uuid.New().String(),
uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String())
Expand All @@ -372,7 +376,9 @@ func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCou
nannu,
myreal,
myreal2,
myreal3
myreal3,
myinet,
mycidr
) VALUES %s;
`, suffix, tableName, strings.Join(rows, ",")))
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) {
}
values[i] = v

case qvalue.QValueKindCIDR, qvalue.QValueKindINET:
v, ok := qValue.Value.(string)
if !ok {
src.err = errors.New("invalid INET/CIDR value")
return nil, src.err
}
values[i] = v

case qvalue.QValueKindTime:
t, ok := qValue.Value.(time.Time)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type AvroSchemaField struct {
// will return an error.
func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision int16, scale int16) (interface{}, error) {
switch kind {
case QValueKindString, QValueKindQChar:
case QValueKindString, QValueKindQChar, QValueKindCIDR, QValueKindINET:
return "string", nil
case QValueKindUUID:
return AvroSchemaLogical{
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (q QValue) Equals(other QValue) bool {
} else {
return false
}
case QValueKindString:
case QValueKindString, QValueKindINET, QValueKindCIDR:
return compareString(q.Value, other.Value)
// all internally represented as a Golang time.Time
case QValueKindDate,
Expand Down

0 comments on commit 904b513

Please sign in to comment.