Skip to content

Commit

Permalink
Merge branch 'main' into less-arc
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 15, 2023
2 parents 9ee5b7c + ba27f24 commit bca3f0a
Show file tree
Hide file tree
Showing 11 changed files with 22 additions and 36 deletions.
2 changes: 1 addition & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func setupPyroscope(opts *WorkerOptions) {
ServerAddress: opts.PyroscopeServer,

// you can disable logging by setting this to nil
Logger: pyroscope.StandardLogger,
Logger: nil,

// you can provide static tags via a map:
Tags: map[string]string{"hostname": os.Getenv("HOSTNAME")},
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, erro
}

if row[0] == nil {
c.logger.Info("no normalize_batch_id foundreturning 0")
c.logger.Info("no normalize_batch_id found returning 0")
return 0, nil
} else {
return row[0].(int64), nil
Expand Down Expand Up @@ -963,7 +963,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
columns[idx] = &bigquery.FieldSchema{
Name: colName,
Type: qValueKindToBigQueryType(genericColType),
Repeated: strings.Contains(genericColType, "array"),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
}
idx++
}
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
//if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(JSON_EXTRACT(_peerdb_data, '$.%s') AS %s) AS `%s`",
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
colName, bqType, colName)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s')) AS `%s`",
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data, '$.%s')) AS `%s`",
colName, colName)
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_EXTRACT_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
bqType, colName, colName)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
// Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true}
Expand All @@ -89,7 +89,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// " AS int64))) AS %s",
// colName, colName)
default:
castStmt = fmt.Sprintf("CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s') AS %s) AS `%s`",
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
colName, bqType, colName)
}
flattenedProjs = append(flattenedProjs, castStmt)
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/bigquery/qrecord_value_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) {
for i, v := range q.Record.Entries {
k := q.ColumnNames[i]
if v.Value == nil {
bqValues[k] = nil
if qvalue.QValueKindIsArray(v.Kind) {
if v.Kind.IsArray() {
bqValues[k] = make([]interface{}, 0)
} else {
bqValues[k] = nil
}
continue
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// log lsn and relation id for debugging
p.logger.Warn(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down Expand Up @@ -561,7 +561,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// log lsn and relation id for debugging
p.logger.Warn(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
Expand Down Expand Up @@ -535,7 +536,7 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie
for columnName, genericColumnType := range normalizedTableSchema.Columns {
columnNames = append(columnNames, fmt.Sprintf("\"%s\"", columnName))
pgType := qValueKindToPostgresType(genericColumnType)
if strings.Contains(genericColumnType, "array") {
if qvalue.QValueKind(genericColumnType).IsArray() {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
Expand Down Expand Up @@ -589,7 +590,7 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st
primaryKeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
for columnName, genericColumnType := range normalizedTableSchema.Columns {
pgType := qValueKindToPostgresType(genericColumnType)
if strings.Contains(genericColumnType, "array") {
if qvalue.QValueKind(genericColumnType).IsArray() {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
Expand Down
4 changes: 1 addition & 3 deletions flow/generated/protos/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 6 additions & 12 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package qvalue

import "fmt"
import (
"fmt"
"strings"
)

type QValueKind string

Expand Down Expand Up @@ -38,17 +41,8 @@ const (
QValueKindArrayString QValueKind = "array_string"
)

func QValueKindIsArray(kind QValueKind) bool {
switch kind {
case QValueKindArrayFloat32,
QValueKindArrayFloat64,
QValueKindArrayInt32,
QValueKindArrayInt64,
QValueKindArrayString:
return true
default:
return false
}
func (kind QValueKind) IsArray() bool {
return strings.HasPrefix(string(kind), "array_")
}

var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
Expand Down
2 changes: 0 additions & 2 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,6 @@ pub struct CreateRawTableOutput {
pub struct TableSchema {
#[prost(string, tag="1")]
pub table_identifier: ::prost::alloc::string::String,
/// list of column names and types, types can be one of the following:
/// "string", "int", "float", "bool", "timestamp".
#[prost(map="string, string", tag="2")]
pub columns: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
#[prost(string, repeated, tag="3")]
Expand Down
2 changes: 0 additions & 2 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ message CreateRawTableOutput { string table_identifier = 1; }

message TableSchema {
string table_identifier = 1;
// list of column names and types, types can be one of the following:
// "string", "int", "float", "bool", "timestamp".
map<string, string> columns = 2;
repeated string primary_key_columns = 3;
bool is_replica_identity_full = 4;
Expand Down
4 changes: 0 additions & 4 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,6 @@ export interface CreateRawTableOutput {

export interface TableSchema {
tableIdentifier: string;
/**
* list of column names and types, types can be one of the following:
* "string", "int", "float", "bool", "timestamp".
*/
columns: { [key: string]: string };
primaryKeyColumns: string[];
isReplicaIdentityFull: boolean;
Expand Down

0 comments on commit bca3f0a

Please sign in to comment.