From 99ccad7a15f8d8cb3bafe203ca5c70de58876e1e Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 15 Dec 2023 17:57:36 +0530 Subject: [PATCH 1/2] fixes merge statement casting for JSONB and array types (#830) 1) Replaces the legacy `JSON_EXTRACT_*` functions with `JSON_VALUE_*` equivalents which fixes quoting issues in certain types. 2) Changes some logging and Pyroscope to be less noisy. --- flow/cmd/worker.go | 2 +- flow/connectors/bigquery/bigquery.go | 2 +- flow/connectors/bigquery/merge_statement_generator.go | 8 ++++---- flow/connectors/postgres/cdc.go | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index d41794a409..40d2486277 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -45,7 +45,7 @@ func setupPyroscope(opts *WorkerOptions) { ServerAddress: opts.PyroscopeServer, // you can disable logging by setting this to nil - Logger: pyroscope.StandardLogger, + Logger: nil, // you can provide static tags via a map: Tags: map[string]string{"hostname": os.Getenv("HOSTNAME")}, diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index e1f57e9c28..fe53263d2a 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -382,7 +382,7 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, erro } if row[0] == nil { - c.logger.Info("no normalize_batch_id foundreturning 0") + c.logger.Info("no normalize_batch_id found returning 0") return 0, nil } else { return row[0].(int64), nil diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index e6082cd8ac..7e35aadc44 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -62,16 +62,16 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON: //if the type is JSON, then just extract JSON - castStmt = fmt.Sprintf("CAST(JSON_EXTRACT(_peerdb_data, '$.%s') AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", colName, bqType, colName) // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: - castStmt = fmt.Sprintf("FROM_BASE64(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s')) AS `%s`", + castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data, '$.%s')) AS `%s`", colName, colName) case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ - "UNNEST(CAST(JSON_EXTRACT_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", + "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", bqType, colName, colName) // MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64) // Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true} @@ -89,7 +89,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // " AS int64))) AS %s", // colName, colName) default: - castStmt = fmt.Sprintf("CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s') AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", colName, bqType, colName) } flattenedProjs = append(flattenedProjs, castStmt) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index a56ee49edf..20b5a2f2e5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -526,7 +526,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // log lsn and relation id for debugging - p.logger.Warn(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -561,7 +561,7 @@ func (p *PostgresCDCSource) processUpdateMessage( } // log lsn and relation id for debugging - p.logger.Warn(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] From ba27f24a08f4d56154c7d1985603dae5cbb2dcb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 15 Dec 2023 15:57:37 +0000 Subject: [PATCH 2/2] Prefer `HasPrefix(type, "array_")` to `Contains(type, "array")` (#829) Change qvalue.QValueKindIsArray to IsArray method using HasPrefix, update code calling `Contains(type, "array")` to use method --- flow/connectors/bigquery/bigquery.go | 2 +- .../connectors/bigquery/qrecord_value_saver.go | 5 +++-- flow/connectors/postgres/client.go | 5 +++-- flow/generated/protos/flow.pb.go | 4 +--- flow/model/qvalue/kind.go | 18 ++++++------------ nexus/pt/src/peerdb_flow.rs | 2 -- protos/flow.proto | 2 -- ui/grpc_generated/flow.ts | 4 ---- 8 files changed, 14 insertions(+), 28 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index fe53263d2a..9b32e30b43 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -963,7 +963,7 @@ func (c *BigQueryConnector) SetupNormalizedTables( columns[idx] = &bigquery.FieldSchema{ Name: colName, Type: qValueKindToBigQueryType(genericColType), - Repeated: strings.Contains(genericColType, "array"), + Repeated: qvalue.QValueKind(genericColType).IsArray(), } idx++ } diff --git a/flow/connectors/bigquery/qrecord_value_saver.go b/flow/connectors/bigquery/qrecord_value_saver.go index e724cc4a55..202ac3df4d 100644 --- a/flow/connectors/bigquery/qrecord_value_saver.go +++ b/flow/connectors/bigquery/qrecord_value_saver.go @@ -49,9 +49,10 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) { for i, v := range q.Record.Entries { k := q.ColumnNames[i] if v.Value == nil { - bqValues[k] = nil - if qvalue.QValueKindIsArray(v.Kind) { + if v.Kind.IsArray() { bqValues[k] = make([]interface{}, 0) + } else { + bqValues[k] = nil } continue } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index fd4a162e15..1eec3c5bf7 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -11,6 +11,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" @@ -535,7 +536,7 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie for columnName, genericColumnType := range normalizedTableSchema.Columns { columnNames = append(columnNames, fmt.Sprintf("\"%s\"", columnName)) pgType := qValueKindToPostgresType(genericColumnType) - if strings.Contains(genericColumnType, "array") { + if qvalue.QValueKind(genericColumnType).IsArray() { flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"", strings.Trim(columnName, "\""), pgType, columnName)) @@ -589,7 +590,7 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st primaryKeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns)) for columnName, genericColumnType := range normalizedTableSchema.Columns { pgType := qValueKindToPostgresType(genericColumnType) - if strings.Contains(genericColumnType, "array") { + if qvalue.QValueKind(genericColumnType).IsArray() { flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"", strings.Trim(columnName, "\""), pgType, columnName)) diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index bf29f33454..a4a8ff581c 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -1854,9 +1854,7 @@ type TableSchema struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TableIdentifier string `protobuf:"bytes,1,opt,name=table_identifier,json=tableIdentifier,proto3" json:"table_identifier,omitempty"` - // list of column names and types, types can be one of the following: - // "string", "int", "float", "bool", "timestamp". + TableIdentifier string `protobuf:"bytes,1,opt,name=table_identifier,json=tableIdentifier,proto3" json:"table_identifier,omitempty"` Columns map[string]string `protobuf:"bytes,2,rep,name=columns,proto3" json:"columns,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` PrimaryKeyColumns []string `protobuf:"bytes,3,rep,name=primary_key_columns,json=primaryKeyColumns,proto3" json:"primary_key_columns,omitempty"` IsReplicaIdentityFull bool `protobuf:"varint,4,opt,name=is_replica_identity_full,json=isReplicaIdentityFull,proto3" json:"is_replica_identity_full,omitempty"` diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 4b728f3dad..24ea597ad2 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -1,6 +1,9 @@ package qvalue -import "fmt" +import ( + "fmt" + "strings" +) type QValueKind string @@ -38,17 +41,8 @@ const ( QValueKindArrayString QValueKind = "array_string" ) -func QValueKindIsArray(kind QValueKind) bool { - switch kind { - case QValueKindArrayFloat32, - QValueKindArrayFloat64, - QValueKindArrayInt32, - QValueKindArrayInt64, - QValueKindArrayString: - return true - default: - return false - } +func (kind QValueKind) IsArray() bool { + return strings.HasPrefix(string(kind), "array_") } var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index dc308131e6..798e09c99b 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -298,8 +298,6 @@ pub struct CreateRawTableOutput { pub struct TableSchema { #[prost(string, tag="1")] pub table_identifier: ::prost::alloc::string::String, - /// list of column names and types, types can be one of the following: - /// "string", "int", "float", "bool", "timestamp". #[prost(map="string, string", tag="2")] pub columns: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, #[prost(string, repeated, tag="3")] diff --git a/protos/flow.proto b/protos/flow.proto index 281f609993..88d87b9835 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -188,8 +188,6 @@ message CreateRawTableOutput { string table_identifier = 1; } message TableSchema { string table_identifier = 1; - // list of column names and types, types can be one of the following: - // "string", "int", "float", "bool", "timestamp". map columns = 2; repeated string primary_key_columns = 3; bool is_replica_identity_full = 4; diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 3e8f36e97f..845b4d627f 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -299,10 +299,6 @@ export interface CreateRawTableOutput { export interface TableSchema { tableIdentifier: string; - /** - * list of column names and types, types can be one of the following: - * "string", "int", "float", "bool", "timestamp". - */ columns: { [key: string]: string }; primaryKeyColumns: string[]; isReplicaIdentityFull: boolean;