diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 1c71c26b21..9591cab251 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -741,6 +741,11 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto } func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { + if !peerdbenv.PeerDBEnableWALHeartbeat() { + slog.InfoContext(ctx, "wal heartbeat is disabled") + return nil + } + sendTimeout := 10 * time.Minute ticker := time.NewTicker(sendTimeout) defer ticker.Stop() diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index e61830db88..e47c27a5f4 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -824,7 +824,8 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( flattenedCastsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns)) for columnName, genericColumnType := range normalizedTableSchema.Columns { - sfType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType)) + qvKind := qvalue.QValueKind(genericColumnType) + sfType, err := qValueKindToSnowflakeType(qvKind) if err != nil { return 0, fmt.Errorf("failed to convert column type %s to snowflake type: %w", genericColumnType, err) @@ -849,8 +850,14 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( // "Microseconds*1000) "+ // "AS %s,", toVariantColumnName, columnName, columnName)) default: - flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("CAST(%s:\"%s\" AS %s) AS %s,", - toVariantColumnName, columnName, sfType, targetColumnName)) + if qvKind == qvalue.QValueKindNumeric { + flattenedCastsSQLArray = append(flattenedCastsSQLArray, + fmt.Sprintf("TRY_CAST((%s:\"%s\")::text AS %s) AS %s,", + toVariantColumnName, columnName, sfType, targetColumnName)) + } else { + flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("CAST(%s:\"%s\" AS %s) AS %s,", + toVariantColumnName, columnName, sfType, targetColumnName)) + } } } flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ""), ",") diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 33b1058066..cdefa6a376 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -80,3 +80,8 @@ func PeerDBAlertingGapMinutesAsDuration() time.Duration { func PeerDBOpenConnectionsAlertThreshold() uint32 { return getEnvUint32("PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) } + +// PEERDB_ENABLE_WAL_HEARTBEAT +func PeerDBEnableWALHeartbeat() bool { + return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) +} diff --git a/flow/peerdbenv/env.go b/flow/peerdbenv/env.go index 11e363d1eb..3bba77c46d 100644 --- a/flow/peerdbenv/env.go +++ b/flow/peerdbenv/env.go @@ -46,6 +46,23 @@ func getEnvUint32(name string, defaultValue uint32) uint32 { return uint32(i) } +// getEnvBool returns the value of the environment variable with the given name +// or defaultValue if the environment variable is not set or is not a valid +// boolean value. +func getEnvBool(name string, defaultValue bool) bool { + val, ok := getEnv(name) + if !ok { + return defaultValue + } + + b, err := strconv.ParseBool(val) + if err != nil { + return defaultValue + } + + return b +} + // GetEnvString returns the value of the environment variable with the given name // or defaultValue if the environment variable is not set. func getEnvString(name string, defaultValue string) string {