From aa095a5a39ec7598e57761bef38008aa3d4e9edf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 04:08:30 +0000 Subject: [PATCH 1/3] raw table names: preserve underscores in names (#1286) avoids mirror `a__b` from having a name conflict with `a_b` --- flow/connectors/bigquery/bigquery.go | 2 +- flow/connectors/clickhouse/cdc.go | 2 +- flow/connectors/postgres/client.go | 2 +- flow/connectors/snowflake/snowflake.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 438994a800..4d63a97657 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -793,7 +793,7 @@ func (c *BigQueryConnector) SyncFlowCleanup(ctx context.Context, jobName string) // getRawTableName returns the raw table name for the given table identifier. func (c *BigQueryConnector) getRawTableName(flowJobName string) string { // replace all non-alphanumeric characters with _ - flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_") + flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_") return fmt.Sprintf("_peerdb_raw_%s", flowJobName) } diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 8e2a48a877..24c9a8f0f5 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -24,7 +24,7 @@ const ( // getRawTableName returns the raw table name for the given table identifier. func (c *ClickhouseConnector) getRawTableName(flowJobName string) string { // replace all non-alphanumeric characters with _ - flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_") + flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_") return fmt.Sprintf("_peerdb_raw_%s", flowJobName) } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 333f0516a4..72a5691fe5 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -425,7 +425,7 @@ func (c *PostgresConnector) createMetadataSchema(ctx context.Context) error { } func getRawTableIdentifier(jobName string) string { - jobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(jobName, "_") + jobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(jobName, "_") return fmt.Sprintf("%s_%s", rawTablePrefix, strings.ToLower(jobName)) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 1644787119..cb86bc4389 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -724,7 +724,7 @@ func generateCreateTableSQLForNormalizedTable( } func getRawTableIdentifier(jobName string) string { - jobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(jobName, "_") + jobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(jobName, "_") return fmt.Sprintf("%s_%s", rawTablePrefix, jobName) } From 68a4fe4efc222961bd85d13ab3c812779a257895 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 13:23:27 +0000 Subject: [PATCH 2/3] Misc cleanup of qvalue code (#1288) Split out from #1285 --- flow/connectors/postgres/qvalue_convert.go | 5 ++-- .../snowflake/merge_stmt_generator.go | 28 +++++++++---------- flow/model/conversion_avro.go | 2 +- flow/model/qrecord_batch.go | 5 ---- flow/model/qvalue/avro_converter.go | 12 ++++---- 5 files changed, 23 insertions(+), 29 deletions(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 4823aa2ecc..72f51dff6b 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -501,10 +501,9 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( } default: textVal, ok := value.(string) - if !ok { - return qvalue.QValue{}, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind) + if ok { + val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal} } - val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal} } // parsing into pgtype failed. diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index 4d8ea7dc5a..1ab579069c 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -40,7 +40,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) { } targetColumnName := SnowflakeIdentifierNormalize(column.Name) - switch qvalue.QValueKind(genericColumnType) { + switch qvKind { case qvalue.QValueKindBytes, qvalue.QValueKindBit: flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("BASE64_DECODE_BINARY(%s:\"%s\") "+ "AS %s", toVariantColumnName, column.Name, targetColumnName)) @@ -61,21 +61,19 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) { // flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("TIME_FROM_PARTS(0,0,0,%s:%s:"+ // "Microseconds*1000) "+ // "AS %s", toVariantColumnName, columnName, columnName)) - default: - if qvKind == qvalue.QValueKindNumeric { - precision, scale := numeric.ParseNumericTypmod(column.TypeModifier) - if column.TypeModifier == -1 || precision > 38 || scale > 37 { - precision = numeric.PeerDBNumericPrecision - scale = numeric.PeerDBNumericScale - } - numericType := fmt.Sprintf("NUMERIC(%d,%d)", precision, scale) - flattenedCastsSQLArray = append(flattenedCastsSQLArray, - fmt.Sprintf("TRY_CAST((%s:\"%s\")::text AS %s) AS %s", - toVariantColumnName, column.Name, numericType, targetColumnName)) - } else { - flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("CAST(%s:\"%s\" AS %s) AS %s", - toVariantColumnName, column.Name, sfType, targetColumnName)) + case qvalue.QValueKindNumeric: + precision, scale := numeric.ParseNumericTypmod(column.TypeModifier) + if column.TypeModifier == -1 || precision > 38 || scale > 37 { + precision = numeric.PeerDBNumericPrecision + scale = numeric.PeerDBNumericScale } + numericType := fmt.Sprintf("NUMERIC(%d,%d)", precision, scale) + flattenedCastsSQLArray = append(flattenedCastsSQLArray, + fmt.Sprintf("TRY_CAST((%s:\"%s\")::text AS %s) AS %s", + toVariantColumnName, column.Name, numericType, targetColumnName)) + default: + flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("CAST(%s:\"%s\" AS %s) AS %s", + toVariantColumnName, column.Name, sfType, targetColumnName)) } } flattenedCastsSQL := strings.Join(flattenedCastsSQLArray, ",") diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index b379abc09a..b26aeaf9d7 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -29,7 +29,7 @@ func NewQRecordAvroConverter( } func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) { - m := map[string]interface{}{} + m := make(map[string]interface{}, len(qac.QRecord)) for idx, val := range qac.QRecord { key := qac.ColNames[idx] diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index d7f25c7f5d..572c788630 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -173,11 +173,6 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { values[i] = timestampTZ case qvalue.QValueKindUUID: - if qValue.Value == nil { - values[i] = nil - break - } - v, ok := qValue.Value.([16]byte) // treat it as byte slice if !ok { src.err = fmt.Errorf("invalid UUID value %v", qValue.Value) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 7c902ffcb5..4f5488cbff 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -169,6 +169,10 @@ func NewQValueAvroConverter(value QValue, targetDWH QDWHType, nullable bool) *QV } func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { + if c.Nullable && c.Value.Value == nil { + return nil, nil + } + switch c.Value.Kind { case QValueKindInvalid: // we will attempt to convert invalid to a string @@ -457,14 +461,12 @@ func (c *QValueAvroConverter) processNullableUnion( avroType string, value interface{}, ) (interface{}, error) { - if value == nil && c.Nullable { - return nil, nil - } - if c.Nullable { + if value == nil { + return nil, nil + } return goavro.Union(avroType, value), nil } - return value, nil } From 0fb50d9069faa182794e3f51a1513bb95e076502 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 13:24:34 +0000 Subject: [PATCH 3/3] Generalize recent []int32 nil handling fix to rest of array types (#1287) Ideally we'd preserve nulls, but this'll do for now --- flow/connectors/postgres/qvalue_convert.go | 191 +++------------------ flow/connectors/utils/array.go | 13 ++ 2 files changed, 37 insertions(+), 167 deletions(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 72f51dff6b..bd39fd1048 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -12,6 +12,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/lib/pq/oid" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -190,6 +191,20 @@ func parseJSON(value interface{}) (qvalue.QValue, error) { return qvalue.QValue{Kind: qvalue.QValueKindJSON, Value: string(jsonVal)}, nil } +func convertToArray[T any](kind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) { + switch v := value.(type) { + case pgtype.Array[T]: + if v.Valid { + return qvalue.QValue{Kind: kind, Value: v.Elements}, nil + } + case []T: + return qvalue.QValue{Kind: kind, Value: v}, nil + case []interface{}: + return qvalue.QValue{Kind: kind, Value: utils.ArrayCastElements[T](v)}, nil + } + return qvalue.QValue{}, fmt.Errorf("failed to parse array %s from %T: %v", kind, value, value) +} + func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) { val := qvalue.QValue{} @@ -319,179 +334,21 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( val = qvalue.QValue{Kind: qvalue.QValueKindNumeric, Value: rat} } case qvalue.QValueKindArrayFloat32: - switch v := value.(type) { - case pgtype.Array[float32]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v.Elements} - } - case []float32: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v} - case []interface{}: - float32Array := make([]float32, len(v)) - for i, val := range v { - float32Array[i] = val.(float32) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: float32Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array float32: %v", value) - } + return convertToArray[float32](qvalueKind, value) case qvalue.QValueKindArrayFloat64: - switch v := value.(type) { - case pgtype.Array[float64]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v.Elements} - } - case []float64: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v} - case []interface{}: - float64Array := make([]float64, len(v)) - for i, val := range v { - float64Array[i] = val.(float64) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: float64Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array float64: %v", value) - } + return convertToArray[float64](qvalueKind, value) case qvalue.QValueKindArrayInt16: - switch v := value.(type) { - case pgtype.Array[int16]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v.Elements} - } - case []int16: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v} - case []interface{}: - int16Array := make([]int16, len(v)) - for i, val := range v { - int16Array[i] = val.(int16) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: int16Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array int16: %v", value) - } + return convertToArray[int16](qvalueKind, value) case qvalue.QValueKindArrayInt32: - switch v := value.(type) { - case pgtype.Array[int32]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v.Elements} - } - case []int32: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v} - case []interface{}: - int32Array := make([]int32, len(v)) - for i, val := range v { - if val == nil { - int32Array[i] = 0 - } else { - int32Array[i] = val.(int32) - } - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: int32Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array int32: %v", value) - } + return convertToArray[int32](qvalueKind, value) case qvalue.QValueKindArrayInt64: - switch v := value.(type) { - case pgtype.Array[int64]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v.Elements} - } - case []int64: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v} - case []interface{}: - int64Array := make([]int64, len(v)) - for i, val := range v { - int64Array[i] = val.(int64) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: int64Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array int64: %v", value) - } - case qvalue.QValueKindArrayDate: - switch v := value.(type) { - case pgtype.Array[time.Time]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v.Elements} - } - case []time.Time: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v} - case []interface{}: - dateArray := make([]time.Time, len(v)) - for i, val := range v { - dateArray[i] = val.(time.Time) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: dateArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array date: %v", value) - } - case qvalue.QValueKindArrayTimestamp: - switch v := value.(type) { - case pgtype.Array[time.Time]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v.Elements} - } - case []time.Time: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v} - case []interface{}: - timestampArray := make([]time.Time, len(v)) - for i, val := range v { - timestampArray[i] = val.(time.Time) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: timestampArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamp: %v", value) - } - case qvalue.QValueKindArrayTimestampTZ: - switch v := value.(type) { - case pgtype.Array[time.Time]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v.Elements} - } - case []time.Time: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v} - case []interface{}: - timestampTZArray := make([]time.Time, len(v)) - for i, val := range v { - timestampTZArray[i] = val.(time.Time) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: timestampTZArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamptz: %v", value) - } + return convertToArray[int64](qvalueKind, value) + case qvalue.QValueKindArrayDate, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ: + return convertToArray[time.Time](qvalueKind, value) case qvalue.QValueKindArrayBoolean: - switch v := value.(type) { - case pgtype.Array[bool]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v.Elements} - } - case []bool: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v} - case []interface{}: - boolArray := make([]bool, len(v)) - for i, val := range v { - boolArray[i] = val.(bool) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: boolArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array boolean: %v", value) - } + return convertToArray[bool](qvalueKind, value) case qvalue.QValueKindArrayString: - switch v := value.(type) { - case pgtype.Array[string]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v.Elements} - } - case []string: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v} - case []interface{}: - stringArray := make([]string, len(v)) - for i, val := range v { - stringArray[i] = val.(string) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: stringArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array string: %v", value) - } + return convertToArray[string](qvalueKind, value) case qvalue.QValueKindPoint: xCoord := value.(pgtype.Point).P.X yCoord := value.(pgtype.Point).P.Y diff --git a/flow/connectors/utils/array.go b/flow/connectors/utils/array.go index d203beacc2..2633153ae6 100644 --- a/flow/connectors/utils/array.go +++ b/flow/connectors/utils/array.go @@ -52,3 +52,16 @@ func ArraysHaveOverlap[T comparable](first, second []T) bool { return false } + +func ArrayCastElements[T any](arr []any) []T { + res := make([]T, 0, len(arr)) + for _, val := range arr { + if v, ok := val.(T); ok { + res = append(res, v) + } else { + var none T + res = append(res, none) + } + } + return res +}