Skip to content

Commit

Permalink
fix date and time handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 24, 2024
1 parent ef14822 commit eb837f6
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 15 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
case bigquery.TimeFieldType:
return qvalue.AvroSchemaField{
Type: "long",
LogicalType: "timestamp-micros",
LogicalType: "time-micros",
}, nil
case bigquery.DateTimeFieldType:
return qvalue.AvroSchemaRecord{
Expand Down
7 changes: 6 additions & 1 deletion flow/connectors/snowflake/qrep_avro_consolidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func getTransformSQL(colNames []string, colTypes []string, syncedAtCol string) (
case "NUMBER":
transformations = append(transformations,
fmt.Sprintf("$1:\"%s\" AS %s", avroColName, normalizedColName))
case "DATE":
transformations = append(transformations,
fmt.Sprintf("TO_DATE($1:\"%s\") AS %s", avroColName, normalizedColName))
case "TIME":
transformations = append(transformations,
fmt.Sprintf("TO_TIME(SPLIT($1:\"%s\",'+')[0]) AS %s", avroColName, normalizedColName))
case "VARIANT":
transformations = append(transformations,
fmt.Sprintf("PARSE_JSON($1:\"%s\") AS %s", avroColName, normalizedColName))
Expand All @@ -114,7 +120,6 @@ func (s *SnowflakeAvroConsolidateHandler) getCopyTransformation(copyDstTable str
copyOpts := []string{
"FILE_FORMAT = (TYPE = AVRO)",
"PURGE = TRUE",
"ON_ERROR = 'CONTINUE'",
}
transformationSQL, columnsSQL := getTransformSQL(s.allColNames, s.allColTypes, s.config.SyncedAtColName)
return fmt.Sprintf("COPY INTO %s(%s) FROM (SELECT %s FROM @%s) %s",
Expand Down
2 changes: 2 additions & 0 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ func toQValue(bqValue bigquery.Value) (qvalue.QValue, error) {
return qvalue.QValue{Kind: qvalue.QValueKindBoolean, Value: v}, nil
case civil.Date:
return qvalue.QValue{Kind: qvalue.QValueKindDate, Value: bqValue.(civil.Date).In(time.UTC)}, nil
case civil.Time:
return qvalue.QValue{Kind: qvalue.QValueKindTime, Value: bqValue.(civil.Time)}, nil
case time.Time:
return qvalue.QValue{Kind: qvalue.QValueKindTimestamp, Value: v}, nil
case *big.Rat:
Expand Down
9 changes: 7 additions & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err
"f12 boolean[]",
"f13 smallint[]",
"my_date DATE",
"my_time TIME",
"my_timetz TIMETZ",
"my_mood mood",
"myh HSTORE",
`"geometryPoint" geometry(point)`,
Expand Down Expand Up @@ -310,7 +312,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
'{"2026-01-17 10:00:00","2026-01-18 13:45:00"}',
'{true, false}',
'{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}',
CURRENT_DATE, 'happy', '"a"=>"b"','POINT(1 2)','POINT(40.7128 -74.0060)',
'1970-06-02', CURRENT_TIME, CURRENT_TIME,'happy', '"a"=>"b"','POINT(1 2)','POINT(40.7128 -74.0060)',
'LINESTRING(0 0, 1 1, 2 2)',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)',
'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'
Expand All @@ -328,7 +330,8 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
deal_id, ethereum_transaction_id, ignore_price, card_eth_value,
paid_eth_price, card_bought_notified, address, account_id,
asset_id, status, transaction_id, settled_at, reference_id,
settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12, f13, my_date, my_mood, myh,
settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12, f13, my_date,
my_time, my_timetz, my_mood, myh,
"geometryPoint", geography_point,geometry_linestring, geography_linestring,geometry_polygon, geography_polygon
) VALUES %s;
`, suffix, tableName, strings.Join(rows, ",")))
Expand Down Expand Up @@ -451,6 +454,8 @@ func GetOwnersSchema() *model.QRecordSchema {
{Name: "f8", Type: qvalue.QValueKindInt16, Nullable: true},
{Name: "f13", Type: qvalue.QValueKindArrayInt16, Nullable: true},
{Name: "my_date", Type: qvalue.QValueKindDate, Nullable: true},
{Name: "my_time", Type: qvalue.QValueKindTime, Nullable: true},
{Name: "my_timetz", Type: qvalue.QValueKindTimeTZ, Nullable: true},
{Name: "my_mood", Type: qvalue.QValueKindString, Nullable: true},
{Name: "geometryPoint", Type: qvalue.QValueKindGeometry, Nullable: true},
{Name: "geometry_linestring", Type: qvalue.QValueKindGeometry, Nullable: true},
Expand Down
139 changes: 130 additions & 9 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"log/slog"
"math/big"
"strconv"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -151,7 +150,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
case QValueKindInvalid:
// we will attempt to convert invalid to a string
return c.processNullableUnion("string", c.Value.Value)
case QValueKindTime, QValueKindTimeTZ, QValueKindTimestamp, QValueKindTimestampTZ:
case QValueKindTime:
t, err := c.processGoTime()
if err != nil || t == nil {
return t, err
Expand All @@ -164,6 +163,54 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
}
}

if c.TargetDWH == QDWHTypeClickhouse {
if c.Nullable {
return c.processNullableUnion("long", t.(int64))
} else {
return t.(int64), nil
}
}
if c.Nullable {
return goavro.Union("long.time-micros", t.(int64)), nil
}
return t.(int64), nil
case QValueKindTimeTZ:
t, err := c.processGoTimeTZ()
if err != nil || t == nil {
return t, err
}
if c.TargetDWH == QDWHTypeSnowflake {
if c.Nullable {
return c.processNullableUnion("string", t.(string))
} else {
return t.(string), nil
}
}

if c.TargetDWH == QDWHTypeClickhouse {
if c.Nullable {
return c.processNullableUnion("long", t.(int64))
} else {
return t.(int64), nil
}
}
if c.Nullable {
return goavro.Union("long.time-micros", t.(int64)), nil
}
return t.(int64), nil
case QValueKindTimestamp:
t, err := c.processGoTimestamp()
if err != nil || t == nil {
return t, err
}
if c.TargetDWH == QDWHTypeSnowflake {
if c.Nullable {
return c.processNullableUnion("string", t.(string))
} else {
return t.(string), nil
}
}

if c.TargetDWH == QDWHTypeClickhouse {
if c.Nullable {
return c.processNullableUnion("long", t.(int64))
Expand All @@ -175,7 +222,30 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
return goavro.Union("long.timestamp-micros", t.(int64)), nil
}
return t.(int64), nil
case QValueKindTimestampTZ:
t, err := c.processGoTimestampTZ()
if err != nil || t == nil {
return t, err
}
if c.TargetDWH == QDWHTypeSnowflake {
if c.Nullable {
return c.processNullableUnion("string", t.(string))
} else {
return t.(string), nil
}
}

if c.TargetDWH == QDWHTypeClickhouse {
if c.Nullable {
return c.processNullableUnion("long", t.(int64))
} else {
return t.(int64), nil
}
}
if c.Nullable {
return goavro.Union("long.timestamp-micros", t.(int64)), nil
}
return t.(int64), nil
case QValueKindDate:
t, err := c.processGoDate()
if err != nil || t == nil {
Expand All @@ -191,9 +261,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {

if c.Nullable {
return goavro.Union("int.date", t), nil
} else {
return t, nil
}
return t, nil

case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr:
if c.TargetDWH == QDWHTypeSnowflake && c.Value.Value != nil &&
Expand Down Expand Up @@ -263,6 +332,24 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
}
}

func (c *QValueAvroConverter) processGoTimeTZ() (interface{}, error) {
if c.Value.Value == nil && c.Nullable {
return nil, nil
}

t, ok := c.Value.Value.(time.Time)
if !ok {
return nil, fmt.Errorf("invalid TimeTZ value")
}

// Snowflake has issues with avro timestamp types, returning as string form of the int64
// See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file
if c.TargetDWH == QDWHTypeSnowflake {
return t.Format("15:04:05.999999-0700"), nil
}
return t.UnixMicro(), nil
}

func (c *QValueAvroConverter) processGoTime() (interface{}, error) {
if c.Value.Value == nil && c.Nullable {
return nil, nil
Expand All @@ -273,13 +360,48 @@ func (c *QValueAvroConverter) processGoTime() (interface{}, error) {
return nil, fmt.Errorf("invalid Time value")
}

ret := t.UnixMicro()
// Snowflake has issues with avro timestamp types, returning as string form of the int64
// See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file
if c.TargetDWH == QDWHTypeSnowflake {
return strconv.FormatInt(ret, 10), nil
return t.Format("15:04:05.999999"), nil
}
return t.UnixMicro(), nil
}

func (c *QValueAvroConverter) processGoTimestampTZ() (interface{}, error) {
if c.Value.Value == nil && c.Nullable {
return nil, nil
}

t, ok := c.Value.Value.(time.Time)
if !ok {
return nil, fmt.Errorf("invalid TimestampTZ value")
}

// Snowflake has issues with avro timestamp types, returning as string form of the int64
// See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file
if c.TargetDWH == QDWHTypeSnowflake {
return t.Format("2006-01-02 15:04:05.999999-0700"), nil
}
return t.UnixMicro(), nil
}

func (c *QValueAvroConverter) processGoTimestamp() (interface{}, error) {
if c.Value.Value == nil && c.Nullable {
return nil, nil
}

t, ok := c.Value.Value.(time.Time)
if !ok {
return nil, fmt.Errorf("invalid Timestamp value")
}

// Snowflake has issues with avro timestamp types, returning as string form of the int64
// See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file
if c.TargetDWH == QDWHTypeSnowflake {
return t.Format("2006-01-02 15:04:05.999999"), nil
}
return ret, nil
return t.UnixMicro(), nil
}

func (c *QValueAvroConverter) processGoDate() (interface{}, error) {
Expand All @@ -295,8 +417,7 @@ func (c *QValueAvroConverter) processGoDate() (interface{}, error) {
// Snowflake has issues with avro timestamp types, returning as string form of the int64
// See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file
if c.TargetDWH == QDWHTypeSnowflake {
ret := t.UnixMicro()
return strconv.FormatInt(ret, 10), nil
return t.Format("2006-01-02"), nil
}
return t, nil
}
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindTimestamp: "TIMESTAMP_NTZ",
QValueKindTimestampTZ: "TIMESTAMP_TZ",
QValueKindTime: "TIME",
QValueKindTimeTZ: "TIME",
QValueKindDate: "DATE",
QValueKindBit: "BINARY",
QValueKindBytes: "BINARY",
QValueKindStruct: "STRING",
QValueKindUUID: "STRING",
QValueKindTimeTZ: "STRING",
QValueKindInvalid: "STRING",
QValueKindHStore: "VARIANT",
QValueKindGeography: "GEOGRAPHY",
Expand Down
27 changes: 26 additions & 1 deletion flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ func (q QValue) Equals(other QValue) bool {
case QValueKindString:
return compareString(q.Value, other.Value)
// all internally represented as a Golang time.Time
case QValueKindTime, QValueKindTimeTZ, QValueKindDate,
case QValueKindDate,
QValueKindTimestamp, QValueKindTimestampTZ:
return compareGoTime(q.Value, other.Value)
case QValueKindTime, QValueKindTimeTZ:
return compareGoCivilTime(q.Value, other.Value)
case QValueKindNumeric:
return compareNumeric(q.Value, other.Value)
case QValueKindBytes:
Expand Down Expand Up @@ -162,6 +164,29 @@ func compareGoTime(value1, value2 interface{}) bool {
return t1 == t2
}

func compareGoCivilTime(value1, value2 interface{}) bool {
if value1 == nil && value2 == nil {
return true
}

t1, ok1 := value1.(time.Time)
t2, ok2 := value2.(time.Time)

if !ok1 || !ok2 {
if !ok2 {
// For BigQuery, we need to compare civil.Time with time.Time
ct2, ok3 := value2.(civil.Time)
if !ok3 {
return false
}
return t1.Hour() == ct2.Hour && t1.Minute() == ct2.Minute && t1.Second() == ct2.Second
}
return false
}

return t1.Hour() == t2.Hour() && t1.Minute() == t2.Minute() && t1.Second() == t2.Second()
}

func compareUUID(value1, value2 interface{}) bool {
if value1 == nil && value2 == nil {
return true
Expand Down

0 comments on commit eb837f6

Please sign in to comment.