Skip to content

Commit

Permalink
fix and remove wal heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 23, 2023
1 parent 0e8db19 commit e195105
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 4 deletions.
5 changes: 5 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,11 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
if !peerdbenv.PeerDBEnableWALHeartbeat() {
slog.InfoContext(ctx, "wal heartbeat is disabled")
return nil
}

sendTimeout := 10 * time.Minute
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()
Expand Down
10 changes: 6 additions & 4 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,8 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(

flattenedCastsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns))
for columnName, genericColumnType := range normalizedTableSchema.Columns {
sfType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType))
qvKind := qvalue.QValueKind(genericColumnType)
sfType, err := qValueKindToSnowflakeType(qvKind)
if err != nil {
return 0, fmt.Errorf("failed to convert column type %s to snowflake type: %w",
genericColumnType, err)
Expand All @@ -849,9 +850,10 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
// "Microseconds*1000) "+
// "AS %s,", toVariantColumnName, columnName, columnName))
default:
if sfType == "NUMBER(38, 9)" {
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("TRY_CAST((%s:\"%s\")::text AS %s) AS %s,",
toVariantColumnName, columnName, sfType, targetColumnName))
if qvKind == qvalue.QValueKindNumeric {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("TRY_CAST((%s:\"%s\")::text AS %s) AS %s,",
toVariantColumnName, columnName, sfType, targetColumnName))
} else {
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("CAST(%s:\"%s\" AS %s) AS %s,",
toVariantColumnName, columnName, sfType, targetColumnName))
Expand Down
5 changes: 5 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,8 @@ func PeerDBAlertingGapMinutesAsDuration() time.Duration {
func PeerDBOpenConnectionsAlertThreshold() uint32 {
return getEnvUint32("PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}

// PEERDB_ENABLE_WAL_HEARTBEAT
func PeerDBEnableWALHeartbeat() bool {
return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false)
}
17 changes: 17 additions & 0 deletions flow/peerdbenv/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ func getEnvUint32(name string, defaultValue uint32) uint32 {
return uint32(i)
}

// getEnvBool returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set or is not a valid
// boolean value.
func getEnvBool(name string, defaultValue bool) bool {
val, ok := getEnv(name)
if !ok {
return defaultValue
}

b, err := strconv.ParseBool(val)
if err != nil {
return defaultValue
}

return b
}

// GetEnvString returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set.
func getEnvString(name string, defaultValue string) string {
Expand Down

0 comments on commit e195105

Please sign in to comment.