diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 438994a800..4d63a97657 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -793,7 +793,7 @@ func (c *BigQueryConnector) SyncFlowCleanup(ctx context.Context, jobName string) // getRawTableName returns the raw table name for the given table identifier. func (c *BigQueryConnector) getRawTableName(flowJobName string) string { // replace all non-alphanumeric characters with _ - flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_") + flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_") return fmt.Sprintf("_peerdb_raw_%s", flowJobName) } diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 8e2a48a877..24c9a8f0f5 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -24,7 +24,7 @@ const ( // getRawTableName returns the raw table name for the given table identifier. func (c *ClickhouseConnector) getRawTableName(flowJobName string) string { // replace all non-alphanumeric characters with _ - flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_") + flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_") return fmt.Sprintf("_peerdb_raw_%s", flowJobName) } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 333f0516a4..72a5691fe5 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -425,7 +425,7 @@ func (c *PostgresConnector) createMetadataSchema(ctx context.Context) error { } func getRawTableIdentifier(jobName string) string { - jobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(jobName, "_") + jobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(jobName, "_") return fmt.Sprintf("%s_%s", rawTablePrefix, strings.ToLower(jobName)) } diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 0cab79c942..81f19e1ec9 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -12,6 +12,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/lib/pq/oid" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -194,6 +195,20 @@ func parseJSON(value interface{}) (qvalue.QValue, error) { return qvalue.QValue{Kind: qvalue.QValueKindJSON, Value: string(jsonVal)}, nil } +func convertToArray[T any](kind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) { + switch v := value.(type) { + case pgtype.Array[T]: + if v.Valid { + return qvalue.QValue{Kind: kind, Value: v.Elements}, nil + } + case []T: + return qvalue.QValue{Kind: kind, Value: v}, nil + case []interface{}: + return qvalue.QValue{Kind: kind, Value: utils.ArrayCastElements[T](v)}, nil + } + return qvalue.QValue{}, fmt.Errorf("failed to parse array %s from %T: %v", kind, value, value) +} + func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) { val := qvalue.QValue{} @@ -325,179 +340,21 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( val = qvalue.QValue{Kind: qvalue.QValueKindNumeric, Value: rat} } case qvalue.QValueKindArrayFloat32: - switch v := value.(type) { - case pgtype.Array[float32]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v.Elements} - } - case []float32: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v} - case []interface{}: - float32Array := make([]float32, len(v)) - for i, val := range v { - float32Array[i] = val.(float32) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: float32Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array float32: %v", value) - } + return convertToArray[float32](qvalueKind, value) case qvalue.QValueKindArrayFloat64: - switch v := value.(type) { - case pgtype.Array[float64]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v.Elements} - } - case []float64: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v} - case []interface{}: - float64Array := make([]float64, len(v)) - for i, val := range v { - float64Array[i] = val.(float64) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: float64Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array float64: %v", value) - } + return convertToArray[float64](qvalueKind, value) case qvalue.QValueKindArrayInt16: - switch v := value.(type) { - case pgtype.Array[int16]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v.Elements} - } - case []int16: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v} - case []interface{}: - int16Array := make([]int16, len(v)) - for i, val := range v { - int16Array[i] = val.(int16) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: int16Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array int16: %v", value) - } + return convertToArray[int16](qvalueKind, value) case qvalue.QValueKindArrayInt32: - switch v := value.(type) { - case pgtype.Array[int32]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v.Elements} - } - case []int32: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v} - case []interface{}: - int32Array := make([]int32, len(v)) - for i, val := range v { - if val == nil { - int32Array[i] = 0 - } else { - int32Array[i] = val.(int32) - } - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: int32Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array int32: %v", value) - } + return convertToArray[int32](qvalueKind, value) case qvalue.QValueKindArrayInt64: - switch v := value.(type) { - case pgtype.Array[int64]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v.Elements} - } - case []int64: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v} - case []interface{}: - int64Array := make([]int64, len(v)) - for i, val := range v { - int64Array[i] = val.(int64) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: int64Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array int64: %v", value) - } - case qvalue.QValueKindArrayDate: - switch v := value.(type) { - case pgtype.Array[time.Time]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v.Elements} - } - case []time.Time: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v} - case []interface{}: - dateArray := make([]time.Time, len(v)) - for i, val := range v { - dateArray[i] = val.(time.Time) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: dateArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array date: %v", value) - } - case qvalue.QValueKindArrayTimestamp: - switch v := value.(type) { - case pgtype.Array[time.Time]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v.Elements} - } - case []time.Time: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v} - case []interface{}: - timestampArray := make([]time.Time, len(v)) - for i, val := range v { - timestampArray[i] = val.(time.Time) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: timestampArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamp: %v", value) - } - case qvalue.QValueKindArrayTimestampTZ: - switch v := value.(type) { - case pgtype.Array[time.Time]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v.Elements} - } - case []time.Time: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v} - case []interface{}: - timestampTZArray := make([]time.Time, len(v)) - for i, val := range v { - timestampTZArray[i] = val.(time.Time) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: timestampTZArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamptz: %v", value) - } + return convertToArray[int64](qvalueKind, value) + case qvalue.QValueKindArrayDate, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ: + return convertToArray[time.Time](qvalueKind, value) case qvalue.QValueKindArrayBoolean: - switch v := value.(type) { - case pgtype.Array[bool]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v.Elements} - } - case []bool: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v} - case []interface{}: - boolArray := make([]bool, len(v)) - for i, val := range v { - boolArray[i] = val.(bool) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: boolArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array boolean: %v", value) - } + return convertToArray[bool](qvalueKind, value) case qvalue.QValueKindArrayString: - switch v := value.(type) { - case pgtype.Array[string]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v.Elements} - } - case []string: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v} - case []interface{}: - stringArray := make([]string, len(v)) - for i, val := range v { - stringArray[i] = val.(string) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: stringArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array string: %v", value) - } + return convertToArray[string](qvalueKind, value) case qvalue.QValueKindPoint: xCoord := value.(pgtype.Point).P.X yCoord := value.(pgtype.Point).P.Y diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 1644787119..cb86bc4389 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -724,7 +724,7 @@ func generateCreateTableSQLForNormalizedTable( } func getRawTableIdentifier(jobName string) string { - jobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(jobName, "_") + jobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(jobName, "_") return fmt.Sprintf("%s_%s", rawTablePrefix, jobName) } diff --git a/flow/connectors/utils/array.go b/flow/connectors/utils/array.go index d203beacc2..2633153ae6 100644 --- a/flow/connectors/utils/array.go +++ b/flow/connectors/utils/array.go @@ -52,3 +52,16 @@ func ArraysHaveOverlap[T comparable](first, second []T) bool { return false } + +func ArrayCastElements[T any](arr []any) []T { + res := make([]T, 0, len(arr)) + for _, val := range arr { + if v, ok := val.(T); ok { + res = append(res, v) + } else { + var none T + res = append(res, none) + } + } + return res +} diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 8397cb61d3..f20dd2b778 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -466,10 +466,8 @@ func (c *QValueAvroConverter) processNullableUnion( if value == nil { return nil, nil } - return goavro.Union(avroType, value), nil } - return value, nil }