diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index d41794a409..40d2486277 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -45,7 +45,7 @@ func setupPyroscope(opts *WorkerOptions) { ServerAddress: opts.PyroscopeServer, // you can disable logging by setting this to nil - Logger: pyroscope.StandardLogger, + Logger: nil, // you can provide static tags via a map: Tags: map[string]string{"hostname": os.Getenv("HOSTNAME")}, diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 4ee7425962..9b32e30b43 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -382,7 +382,7 @@ func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, erro } if row[0] == nil { - c.logger.Info("no normalize_batch_id foundreturning 0") + c.logger.Info("no normalize_batch_id found returning 0") return 0, nil } else { return row[0].(int64), nil diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index e6082cd8ac..7e35aadc44 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -62,16 +62,16 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON: //if the type is JSON, then just extract JSON - castStmt = fmt.Sprintf("CAST(JSON_EXTRACT(_peerdb_data, '$.%s') AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", colName, bqType, colName) // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: - castStmt = fmt.Sprintf("FROM_BASE64(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s')) AS `%s`", + castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data, '$.%s')) AS `%s`", colName, colName) case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ - "UNNEST(CAST(JSON_EXTRACT_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", + "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", bqType, colName, colName) // MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64) // Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true} @@ -89,7 +89,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // " AS int64))) AS %s", // colName, colName) default: - castStmt = fmt.Sprintf("CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s') AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", colName, bqType, colName) } flattenedProjs = append(flattenedProjs, castStmt) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index a56ee49edf..20b5a2f2e5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -526,7 +526,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // log lsn and relation id for debugging - p.logger.Warn(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -561,7 +561,7 @@ func (p *PostgresCDCSource) processUpdateMessage( } // log lsn and relation id for debugging - p.logger.Warn(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID]