Skip to content

Commit

Permalink
Prefer HasPrefix(type, "array_") to Contains(type, "array")
Browse files Browse the repository at this point in the history
Change qvalue.QValueKindIsArray to IsArray method using HasPrefix,
update code calling `Contains(type, "array")` to use method
  • Loading branch information
serprex committed Dec 15, 2023
1 parent f62f3e2 commit e267a22
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 17 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
columns[idx] = &bigquery.FieldSchema{
Name: colName,
Type: qValueKindToBigQueryType(genericColType),
Repeated: strings.Contains(genericColType, "array"),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
}
idx++
}
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/bigquery/qrecord_value_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) {
for i, v := range q.Record.Entries {
k := q.ColumnNames[i]
if v.Value == nil {
bqValues[k] = nil
if qvalue.QValueKindIsArray(v.Kind) {
if v.Kind.IsArray() {
bqValues[k] = make([]interface{}, 0)
} else {
bqValues[k] = nil
}
continue
}
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
Expand Down Expand Up @@ -535,7 +536,7 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie
for columnName, genericColumnType := range normalizedTableSchema.Columns {
columnNames = append(columnNames, fmt.Sprintf("\"%s\"", columnName))
pgType := qValueKindToPostgresType(genericColumnType)
if strings.Contains(genericColumnType, "array") {
if qvalue.QValueKind(genericColumnType).IsArray() {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
Expand Down Expand Up @@ -589,7 +590,7 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st
primaryKeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
for columnName, genericColumnType := range normalizedTableSchema.Columns {
pgType := qValueKindToPostgresType(genericColumnType)
if strings.Contains(genericColumnType, "array") {
if qvalue.QValueKind(genericColumnType).IsArray() {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
Expand Down
18 changes: 6 additions & 12 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package qvalue

import "fmt"
import (
"fmt"
"strings"
)

type QValueKind string

Expand Down Expand Up @@ -38,17 +41,8 @@ const (
QValueKindArrayString QValueKind = "array_string"
)

func QValueKindIsArray(kind QValueKind) bool {
switch kind {
case QValueKindArrayFloat32,
QValueKindArrayFloat64,
QValueKindArrayInt32,
QValueKindArrayInt64,
QValueKindArrayString:
return true
default:
return false
}
func (kind QValueKind) IsArray() bool {
return strings.HasPrefix(string(kind), "array_")
}

var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
Expand Down

0 comments on commit e267a22

Please sign in to comment.