Skip to content

Commit

Permalink
Prefer HasPrefix(type, "array_") to Contains(type, "array") (#829)
Browse files Browse the repository at this point in the history
Change qvalue.QValueKindIsArray to IsArray method using HasPrefix,
update code calling `Contains(type, "array")` to use method
  • Loading branch information
serprex authored Dec 15, 2023
1 parent 99ccad7 commit ba27f24
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 28 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
columns[idx] = &bigquery.FieldSchema{
Name: colName,
Type: qValueKindToBigQueryType(genericColType),
Repeated: strings.Contains(genericColType, "array"),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
}
idx++
}
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/bigquery/qrecord_value_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) {
for i, v := range q.Record.Entries {
k := q.ColumnNames[i]
if v.Value == nil {
bqValues[k] = nil
if qvalue.QValueKindIsArray(v.Kind) {
if v.Kind.IsArray() {
bqValues[k] = make([]interface{}, 0)
} else {
bqValues[k] = nil
}
continue
}
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
Expand Down Expand Up @@ -535,7 +536,7 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie
for columnName, genericColumnType := range normalizedTableSchema.Columns {
columnNames = append(columnNames, fmt.Sprintf("\"%s\"", columnName))
pgType := qValueKindToPostgresType(genericColumnType)
if strings.Contains(genericColumnType, "array") {
if qvalue.QValueKind(genericColumnType).IsArray() {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
Expand Down Expand Up @@ -589,7 +590,7 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st
primaryKeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
for columnName, genericColumnType := range normalizedTableSchema.Columns {
pgType := qValueKindToPostgresType(genericColumnType)
if strings.Contains(genericColumnType, "array") {
if qvalue.QValueKind(genericColumnType).IsArray() {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
Expand Down
4 changes: 1 addition & 3 deletions flow/generated/protos/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 6 additions & 12 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package qvalue

import "fmt"
import (
"fmt"
"strings"
)

type QValueKind string

Expand Down Expand Up @@ -38,17 +41,8 @@ const (
QValueKindArrayString QValueKind = "array_string"
)

func QValueKindIsArray(kind QValueKind) bool {
switch kind {
case QValueKindArrayFloat32,
QValueKindArrayFloat64,
QValueKindArrayInt32,
QValueKindArrayInt64,
QValueKindArrayString:
return true
default:
return false
}
func (kind QValueKind) IsArray() bool {
return strings.HasPrefix(string(kind), "array_")
}

var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
Expand Down
2 changes: 0 additions & 2 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,6 @@ pub struct CreateRawTableOutput {
pub struct TableSchema {
#[prost(string, tag="1")]
pub table_identifier: ::prost::alloc::string::String,
/// list of column names and types, types can be one of the following:
/// "string", "int", "float", "bool", "timestamp".
#[prost(map="string, string", tag="2")]
pub columns: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
#[prost(string, repeated, tag="3")]
Expand Down
2 changes: 0 additions & 2 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ message CreateRawTableOutput { string table_identifier = 1; }

message TableSchema {
string table_identifier = 1;
// list of column names and types, types can be one of the following:
// "string", "int", "float", "bool", "timestamp".
map<string, string> columns = 2;
repeated string primary_key_columns = 3;
bool is_replica_identity_full = 4;
Expand Down
4 changes: 0 additions & 4 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,6 @@ export interface CreateRawTableOutput {

export interface TableSchema {
tableIdentifier: string;
/**
* list of column names and types, types can be one of the following:
* "string", "int", "float", "bool", "timestamp".
*/
columns: { [key: string]: string };
primaryKeyColumns: string[];
isReplicaIdentityFull: boolean;
Expand Down

0 comments on commit ba27f24

Please sign in to comment.