diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index fe53263d2a..9b32e30b43 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -963,7 +963,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( columns[idx] = &bigquery.FieldSchema{ Name: colName, Type: qValueKindToBigQueryType(genericColType), - Repeated: strings.Contains(genericColType, "array"), + Repeated: qvalue.QValueKind(genericColType).IsArray(), } idx++ } diff --git a/flow/connectors/bigquery/qrecord_value_saver.go b/flow/connectors/bigquery/qrecord_value_saver.go index e724cc4a55..202ac3df4d 100644 --- a/flow/connectors/bigquery/qrecord_value_saver.go +++ b/flow/connectors/bigquery/qrecord_value_saver.go @@ -49,9 +49,10 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) { for i, v := range q.Record.Entries { k := q.ColumnNames[i] if v.Value == nil { - bqValues[k] = nil - if qvalue.QValueKindIsArray(v.Kind) { + if v.Kind.IsArray() { bqValues[k] = make([]interface{}, 0) + } else { + bqValues[k] = nil } continue } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index fd4a162e15..1eec3c5bf7 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -11,6 +11,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" @@ -535,7 +536,7 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie for columnName, genericColumnType := range normalizedTableSchema.Columns { columnNames = append(columnNames, fmt.Sprintf("\"%s\"", columnName)) pgType := qValueKindToPostgresType(genericColumnType) - if strings.Contains(genericColumnType, "array") { + if qvalue.QValueKind(genericColumnType).IsArray() { flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"", strings.Trim(columnName, "\""), pgType, columnName)) @@ -589,7 +590,7 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st primaryKeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns)) for columnName, genericColumnType := range normalizedTableSchema.Columns { pgType := qValueKindToPostgresType(genericColumnType) - if strings.Contains(genericColumnType, "array") { + if qvalue.QValueKind(genericColumnType).IsArray() { flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"", strings.Trim(columnName, "\""), pgType, columnName)) diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index bf29f33454..a4a8ff581c 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -1854,9 +1854,7 @@ type TableSchema struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TableIdentifier string `protobuf:"bytes,1,opt,name=table_identifier,json=tableIdentifier,proto3" json:"table_identifier,omitempty"` - // list of column names and types, types can be one of the following: - // "string", "int", "float", "bool", "timestamp". + TableIdentifier string `protobuf:"bytes,1,opt,name=table_identifier,json=tableIdentifier,proto3" json:"table_identifier,omitempty"` Columns map[string]string `protobuf:"bytes,2,rep,name=columns,proto3" json:"columns,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` PrimaryKeyColumns []string `protobuf:"bytes,3,rep,name=primary_key_columns,json=primaryKeyColumns,proto3" json:"primary_key_columns,omitempty"` IsReplicaIdentityFull bool `protobuf:"varint,4,opt,name=is_replica_identity_full,json=isReplicaIdentityFull,proto3" json:"is_replica_identity_full,omitempty"` diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 4b728f3dad..24ea597ad2 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -1,6 +1,9 @@ package qvalue -import "fmt" +import ( + "fmt" + "strings" +) type QValueKind string @@ -38,17 +41,8 @@ const ( QValueKindArrayString QValueKind = "array_string" ) -func QValueKindIsArray(kind QValueKind) bool { - switch kind { - case QValueKindArrayFloat32, - QValueKindArrayFloat64, - QValueKindArrayInt32, - QValueKindArrayInt64, - QValueKindArrayString: - return true - default: - return false - } +func (kind QValueKind) IsArray() bool { + return strings.HasPrefix(string(kind), "array_") } var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index dc308131e6..798e09c99b 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -298,8 +298,6 @@ pub struct CreateRawTableOutput { pub struct TableSchema { #[prost(string, tag="1")] pub table_identifier: ::prost::alloc::string::String, - /// list of column names and types, types can be one of the following: - /// "string", "int", "float", "bool", "timestamp". #[prost(map="string, string", tag="2")] pub columns: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, #[prost(string, repeated, tag="3")] diff --git a/protos/flow.proto b/protos/flow.proto index 281f609993..88d87b9835 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -188,8 +188,6 @@ message CreateRawTableOutput { string table_identifier = 1; } message TableSchema { string table_identifier = 1; - // list of column names and types, types can be one of the following: - // "string", "int", "float", "bool", "timestamp". map columns = 2; repeated string primary_key_columns = 3; bool is_replica_identity_full = 4; diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 3e8f36e97f..845b4d627f 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -299,10 +299,6 @@ export interface CreateRawTableOutput { export interface TableSchema { tableIdentifier: string; - /** - * list of column names and types, types can be one of the following: - * "string", "int", "float", "bool", "timestamp". - */ columns: { [key: string]: string }; primaryKeyColumns: string[]; isReplicaIdentityFull: boolean;