Skip to content

Commit

Permalink
Merge branch 'main' into add-qchar-type
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Feb 14, 2024
2 parents 86c8180 + 0fb50d9 commit 857ec44
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 173 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (c *BigQueryConnector) SyncFlowCleanup(ctx context.Context, jobName string)
// getRawTableName returns the raw table name for the given table identifier.
func (c *BigQueryConnector) getRawTableName(flowJobName string) string {
// replace all non-alphanumeric characters with _
flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_")
flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_")
return fmt.Sprintf("_peerdb_raw_%s", flowJobName)
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
// getRawTableName returns the raw table name for the given table identifier.
func (c *ClickhouseConnector) getRawTableName(flowJobName string) string {
// replace all non-alphanumeric characters with _
flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_")
flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_")
return fmt.Sprintf("_peerdb_raw_%s", flowJobName)
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (c *PostgresConnector) createMetadataSchema(ctx context.Context) error {
}

func getRawTableIdentifier(jobName string) string {
jobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(jobName, "_")
jobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(jobName, "_")
return fmt.Sprintf("%s_%s", rawTablePrefix, strings.ToLower(jobName))
}

Expand Down
191 changes: 24 additions & 167 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/lib/pq/oid"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

Expand Down Expand Up @@ -194,6 +195,20 @@ func parseJSON(value interface{}) (qvalue.QValue, error) {
return qvalue.QValue{Kind: qvalue.QValueKindJSON, Value: string(jsonVal)}, nil
}

func convertToArray[T any](kind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) {
switch v := value.(type) {
case pgtype.Array[T]:
if v.Valid {
return qvalue.QValue{Kind: kind, Value: v.Elements}, nil
}
case []T:
return qvalue.QValue{Kind: kind, Value: v}, nil
case []interface{}:
return qvalue.QValue{Kind: kind, Value: utils.ArrayCastElements[T](v)}, nil
}
return qvalue.QValue{}, fmt.Errorf("failed to parse array %s from %T: %v", kind, value, value)
}

func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) {
val := qvalue.QValue{}

Expand Down Expand Up @@ -325,179 +340,21 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
val = qvalue.QValue{Kind: qvalue.QValueKindNumeric, Value: rat}
}
case qvalue.QValueKindArrayFloat32:
switch v := value.(type) {
case pgtype.Array[float32]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v.Elements}
}
case []float32:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v}
case []interface{}:
float32Array := make([]float32, len(v))
for i, val := range v {
float32Array[i] = val.(float32)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: float32Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array float32: %v", value)
}
return convertToArray[float32](qvalueKind, value)
case qvalue.QValueKindArrayFloat64:
switch v := value.(type) {
case pgtype.Array[float64]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v.Elements}
}
case []float64:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v}
case []interface{}:
float64Array := make([]float64, len(v))
for i, val := range v {
float64Array[i] = val.(float64)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: float64Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array float64: %v", value)
}
return convertToArray[float64](qvalueKind, value)
case qvalue.QValueKindArrayInt16:
switch v := value.(type) {
case pgtype.Array[int16]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v.Elements}
}
case []int16:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v}
case []interface{}:
int16Array := make([]int16, len(v))
for i, val := range v {
int16Array[i] = val.(int16)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: int16Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array int16: %v", value)
}
return convertToArray[int16](qvalueKind, value)
case qvalue.QValueKindArrayInt32:
switch v := value.(type) {
case pgtype.Array[int32]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v.Elements}
}
case []int32:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v}
case []interface{}:
int32Array := make([]int32, len(v))
for i, val := range v {
if val == nil {
int32Array[i] = 0
} else {
int32Array[i] = val.(int32)
}
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: int32Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array int32: %v", value)
}
return convertToArray[int32](qvalueKind, value)
case qvalue.QValueKindArrayInt64:
switch v := value.(type) {
case pgtype.Array[int64]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v.Elements}
}
case []int64:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v}
case []interface{}:
int64Array := make([]int64, len(v))
for i, val := range v {
int64Array[i] = val.(int64)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: int64Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array int64: %v", value)
}
case qvalue.QValueKindArrayDate:
switch v := value.(type) {
case pgtype.Array[time.Time]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v.Elements}
}
case []time.Time:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v}
case []interface{}:
dateArray := make([]time.Time, len(v))
for i, val := range v {
dateArray[i] = val.(time.Time)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: dateArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array date: %v", value)
}
case qvalue.QValueKindArrayTimestamp:
switch v := value.(type) {
case pgtype.Array[time.Time]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v.Elements}
}
case []time.Time:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v}
case []interface{}:
timestampArray := make([]time.Time, len(v))
for i, val := range v {
timestampArray[i] = val.(time.Time)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: timestampArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamp: %v", value)
}
case qvalue.QValueKindArrayTimestampTZ:
switch v := value.(type) {
case pgtype.Array[time.Time]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v.Elements}
}
case []time.Time:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v}
case []interface{}:
timestampTZArray := make([]time.Time, len(v))
for i, val := range v {
timestampTZArray[i] = val.(time.Time)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: timestampTZArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamptz: %v", value)
}
return convertToArray[int64](qvalueKind, value)
case qvalue.QValueKindArrayDate, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ:
return convertToArray[time.Time](qvalueKind, value)
case qvalue.QValueKindArrayBoolean:
switch v := value.(type) {
case pgtype.Array[bool]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v.Elements}
}
case []bool:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v}
case []interface{}:
boolArray := make([]bool, len(v))
for i, val := range v {
boolArray[i] = val.(bool)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: boolArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array boolean: %v", value)
}
return convertToArray[bool](qvalueKind, value)
case qvalue.QValueKindArrayString:
switch v := value.(type) {
case pgtype.Array[string]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v.Elements}
}
case []string:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v}
case []interface{}:
stringArray := make([]string, len(v))
for i, val := range v {
stringArray[i] = val.(string)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: stringArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array string: %v", value)
}
return convertToArray[string](qvalueKind, value)
case qvalue.QValueKindPoint:
xCoord := value.(pgtype.Point).P.X
yCoord := value.(pgtype.Point).P.Y
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func generateCreateTableSQLForNormalizedTable(
}

func getRawTableIdentifier(jobName string) string {
jobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(jobName, "_")
jobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(jobName, "_")
return fmt.Sprintf("%s_%s", rawTablePrefix, jobName)
}

Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,16 @@ func ArraysHaveOverlap[T comparable](first, second []T) bool {

return false
}

func ArrayCastElements[T any](arr []any) []T {
res := make([]T, 0, len(arr))
for _, val := range arr {
if v, ok := val.(T); ok {
res = append(res, v)
} else {
var none T
res = append(res, none)
}
}
return res
}
2 changes: 0 additions & 2 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,8 @@ func (c *QValueAvroConverter) processNullableUnion(
if value == nil {
return nil, nil
}

return goavro.Union(avroType, value), nil
}

return value, nil
}

Expand Down

0 comments on commit 857ec44

Please sign in to comment.