Skip to content

Commit

Permalink
Merge branch 'main' into array-check-prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Dec 15, 2023
2 parents 8d0c9b5 + 99ccad7 commit 395cd9a
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func setupPyroscope(opts *WorkerOptions) {
ServerAddress: opts.PyroscopeServer,

// you can disable logging by setting this to nil
Logger: pyroscope.StandardLogger,
Logger: nil,

// you can provide static tags via a map:
Tags: map[string]string{"hostname": os.Getenv("HOSTNAME")},
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, erro
}

if row[0] == nil {
c.logger.Info("no normalize_batch_id foundreturning 0")
c.logger.Info("no normalize_batch_id found returning 0")
return 0, nil
} else {
return row[0].(int64), nil
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
//if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(JSON_EXTRACT(_peerdb_data, '$.%s') AS %s) AS `%s`",
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
colName, bqType, colName)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s')) AS `%s`",
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data, '$.%s')) AS `%s`",
colName, colName)
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_EXTRACT_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element) AS `%s`",
bqType, colName, colName)
// MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64)
// Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true}
Expand All @@ -89,7 +89,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// " AS int64))) AS %s",
// colName, colName)
default:
castStmt = fmt.Sprintf("CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s') AS %s) AS `%s`",
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
colName, bqType, colName)
}
flattenedProjs = append(flattenedProjs, castStmt)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (p *PostgresCDCSource) processInsertMessage(
}

// log lsn and relation id for debugging
p.logger.Warn(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down Expand Up @@ -561,7 +561,7 @@ func (p *PostgresCDCSource) processUpdateMessage(
}

// log lsn and relation id for debugging
p.logger.Warn(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s",
lsn, relID, tableName))

rel, ok := p.relationMessageMapping[relID]
Expand Down

0 comments on commit 395cd9a

Please sign in to comment.