diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index d54e6fa4d3..defc2f492f 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "net/netip" "strings" "time" @@ -299,20 +300,24 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( return qvalue.QValue{}, fmt.Errorf("failed to parse UUID: %v", value) } case qvalue.QValueKindINET: - switch value.(type) { + switch v := value.(type) { case string: val = qvalue.QValue{Kind: qvalue.QValueKindINET, Value: value} case [16]byte: val = qvalue.QValue{Kind: qvalue.QValueKindINET, Value: value} + case netip.Prefix: + val = qvalue.QValue{Kind: qvalue.QValueKindINET, Value: v.String()} default: - return qvalue.QValue{}, fmt.Errorf("failed to parse INET: %v", value) + return qvalue.QValue{}, fmt.Errorf("failed to parse INET: %v", v) } case qvalue.QValueKindCIDR: - switch value.(type) { + switch v := value.(type) { case string: val = qvalue.QValue{Kind: qvalue.QValueKindCIDR, Value: value} case [16]byte: val = qvalue.QValue{Kind: qvalue.QValueKindCIDR, Value: value} + case netip.Prefix: + val = qvalue.QValue{Kind: qvalue.QValueKindCIDR, Value: v.String()} default: return qvalue.QValue{}, fmt.Errorf("failed to parse CIDR: %v", value) } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index cdf7f1d021..9d0b8f6a57 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -588,7 +588,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', true,random_bytea(32),'s','test','1.1.10.2'::cidr, - CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1, + CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1, '5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval, '{"sai":-8.02139037433155}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr, 1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz, diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 36a0e4d7c9..598297106b 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -288,6 +288,8 @@ func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error { "myreal REAL", "myreal2 REAL", "myreal3 REAL", + "myinet INET", + "mycidr CIDR", } tblFieldStr := strings.Join(tblFields, ",") var pgErr *pgconn.PgError @@ -351,7 +353,9 @@ func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCou 'NaN', 3.14159, 1, - 1.0 + 1.0, + '10.0.0.0/32', + '1.1.10.2'::cidr )`, id, uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String()) @@ -372,7 +376,9 @@ func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCou nannu, myreal, myreal2, - myreal3 + myreal3, + myinet, + mycidr ) VALUES %s; `, suffix, tableName, strings.Join(rows, ","))) if err != nil { diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 77700b7a89..4cf4a11017 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -154,6 +154,14 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { } values[i] = v + case qvalue.QValueKindCIDR, qvalue.QValueKindINET: + v, ok := qValue.Value.(string) + if !ok { + src.err = errors.New("invalid INET/CIDR value") + return nil, src.err + } + values[i] = v + case qvalue.QValueKindTime: t, ok := qValue.Value.(time.Time) if !ok { diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index d5aef18113..ea90af00ed 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -58,7 +58,7 @@ type AvroSchemaField struct { // will return an error. func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision int16, scale int16) (interface{}, error) { switch kind { - case QValueKindString, QValueKindQChar: + case QValueKindString, QValueKindQChar, QValueKindCIDR, QValueKindINET: return "string", nil case QValueKindUUID: return AvroSchemaLogical{ diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 1ad07150b6..972063d494 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -56,7 +56,7 @@ func (q QValue) Equals(other QValue) bool { } else { return false } - case QValueKindString: + case QValueKindString, QValueKindINET, QValueKindCIDR: return compareString(q.Value, other.Value) // all internally represented as a Golang time.Time case QValueKindDate,