diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index ef12fd61c4..ff8af3905c 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/netip" + "strconv" "strings" "time" @@ -14,7 +15,6 @@ import ( "github.com/lib/pq/oid" "github.com/shopspring/decimal" - datatypes "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" ) @@ -254,25 +254,13 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( return qvalue.QValueTimestampTZ{Val: timestamp}, nil case qvalue.QValueKindInterval: intervalObject := value.(pgtype.Interval) - var interval datatypes.PeerDBInterval - interval.Hours = int(intervalObject.Microseconds / 3600000000) - interval.Minutes = int((intervalObject.Microseconds % 3600000000) / 60000000) - interval.Seconds = float64(intervalObject.Microseconds%60000000) / 1000000.0 - interval.Days = int(intervalObject.Days) - interval.Years = int(intervalObject.Months / 12) - interval.Months = int(intervalObject.Months % 12) - interval.Valid = intervalObject.Valid - - intervalJSON, err := json.Marshal(interval) - if err != nil { - return nil, fmt.Errorf("failed to parse interval: %w", err) - } - - if !interval.Valid { - return nil, fmt.Errorf("invalid interval: %v", value) - } - - return qvalue.QValueString{Val: string(intervalJSON)}, nil + // Construct the interval string manually + var intervalString string + intervalString += strconv.Itoa(int(intervalObject.Months)) + " months " + intervalString += strconv.Itoa(int(intervalObject.Days)) + " days " + intervalString += strconv.Itoa(int(intervalObject.Microseconds)) + " microseconds" + intervalString = strings.Trim(intervalString, " ") + return qvalue.QValueString{Val: intervalString}, nil case qvalue.QValueKindDate: date := value.(time.Time) return qvalue.QValueDate{Val: date}, nil diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 1f787b5c1c..ba49243857 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -215,6 +215,8 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { values[i] = a case qvalue.QValueJSON: values[i] = v.Val + case qvalue.QValueInterval: + values[i] = v.Val // And so on for the other types... default: