From eb837f6711caa567acd4eea9670c9d699c8d8169 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 24 Jan 2024 18:40:57 +0530 Subject: [PATCH] fix date and time handling --- flow/connectors/bigquery/qrep_avro_sync.go | 2 +- .../snowflake/qrep_avro_consolidate.go | 7 +- flow/e2e/bigquery/bigquery_helper.go | 2 + flow/e2e/test_utils.go | 9 +- flow/model/qvalue/avro_converter.go | 139 ++++++++++++++++-- flow/model/qvalue/kind.go | 2 +- flow/model/qvalue/qvalue.go | 27 +++- 7 files changed, 173 insertions(+), 15 deletions(-) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 62759e32a3..bdf80311e3 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -342,7 +342,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { case bigquery.TimeFieldType: return qvalue.AvroSchemaField{ Type: "long", - LogicalType: "timestamp-micros", + LogicalType: "time-micros", }, nil case bigquery.DateTimeFieldType: return qvalue.AvroSchemaRecord{ diff --git a/flow/connectors/snowflake/qrep_avro_consolidate.go b/flow/connectors/snowflake/qrep_avro_consolidate.go index a77767eb53..5fd0b51792 100644 --- a/flow/connectors/snowflake/qrep_avro_consolidate.go +++ b/flow/connectors/snowflake/qrep_avro_consolidate.go @@ -94,6 +94,12 @@ func getTransformSQL(colNames []string, colTypes []string, syncedAtCol string) ( case "NUMBER": transformations = append(transformations, fmt.Sprintf("$1:\"%s\" AS %s", avroColName, normalizedColName)) + case "DATE": + transformations = append(transformations, + fmt.Sprintf("TO_DATE($1:\"%s\") AS %s", avroColName, normalizedColName)) + case "TIME": + transformations = append(transformations, + fmt.Sprintf("TO_TIME(SPLIT($1:\"%s\",'+')[0]) AS %s", avroColName, normalizedColName)) case "VARIANT": transformations = append(transformations, fmt.Sprintf("PARSE_JSON($1:\"%s\") AS %s", avroColName, normalizedColName)) @@ -114,7 +120,6 @@ func (s *SnowflakeAvroConsolidateHandler) getCopyTransformation(copyDstTable str copyOpts := []string{ "FILE_FORMAT = (TYPE = AVRO)", "PURGE = TRUE", - "ON_ERROR = 'CONTINUE'", } transformationSQL, columnsSQL := getTransformSQL(s.allColNames, s.allColTypes, s.config.SyncedAtColName) return fmt.Sprintf("COPY INTO %s(%s) FROM (SELECT %s FROM @%s) %s", diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 0d024f9dc7..d4b2735fab 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -220,6 +220,8 @@ func toQValue(bqValue bigquery.Value) (qvalue.QValue, error) { return qvalue.QValue{Kind: qvalue.QValueKindBoolean, Value: v}, nil case civil.Date: return qvalue.QValue{Kind: qvalue.QValueKindDate, Value: bqValue.(civil.Date).In(time.UTC)}, nil + case civil.Time: + return qvalue.QValue{Kind: qvalue.QValueKindTime, Value: bqValue.(civil.Time)}, nil case time.Time: return qvalue.QValue{Kind: qvalue.QValueKindTimestamp, Value: v}, nil case *big.Rat: diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 94ee47a8e2..c08c940099 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -248,6 +248,8 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err "f12 boolean[]", "f13 smallint[]", "my_date DATE", + "my_time TIME", + "my_timetz TIMETZ", "my_mood mood", "myh HSTORE", `"geometryPoint" geometry(point)`, @@ -310,7 +312,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro '{"2026-01-17 10:00:00","2026-01-18 13:45:00"}', '{true, false}', '{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}', - CURRENT_DATE, 'happy', '"a"=>"b"','POINT(1 2)','POINT(40.7128 -74.0060)', + '1970-06-02', CURRENT_TIME, CURRENT_TIME,'happy', '"a"=>"b"','POINT(1 2)','POINT(40.7128 -74.0060)', 'LINESTRING(0 0, 1 1, 2 2)', 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))' @@ -328,7 +330,8 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro deal_id, ethereum_transaction_id, ignore_price, card_eth_value, paid_eth_price, card_bought_notified, address, account_id, asset_id, status, transaction_id, settled_at, reference_id, - settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12, f13, my_date, my_mood, myh, + settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12, f13, my_date, + my_time, my_timetz, my_mood, myh, "geometryPoint", geography_point,geometry_linestring, geography_linestring,geometry_polygon, geography_polygon ) VALUES %s; `, suffix, tableName, strings.Join(rows, ","))) @@ -451,6 +454,8 @@ func GetOwnersSchema() *model.QRecordSchema { {Name: "f8", Type: qvalue.QValueKindInt16, Nullable: true}, {Name: "f13", Type: qvalue.QValueKindArrayInt16, Nullable: true}, {Name: "my_date", Type: qvalue.QValueKindDate, Nullable: true}, + {Name: "my_time", Type: qvalue.QValueKindTime, Nullable: true}, + {Name: "my_timetz", Type: qvalue.QValueKindTimeTZ, Nullable: true}, {Name: "my_mood", Type: qvalue.QValueKindString, Nullable: true}, {Name: "geometryPoint", Type: qvalue.QValueKindGeometry, Nullable: true}, {Name: "geometry_linestring", Type: qvalue.QValueKindGeometry, Nullable: true}, diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index b95cde93b8..7248d924ff 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -4,7 +4,6 @@ import ( "fmt" "log/slog" "math/big" - "strconv" "time" "github.com/google/uuid" @@ -151,7 +150,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { case QValueKindInvalid: // we will attempt to convert invalid to a string return c.processNullableUnion("string", c.Value.Value) - case QValueKindTime, QValueKindTimeTZ, QValueKindTimestamp, QValueKindTimestampTZ: + case QValueKindTime: t, err := c.processGoTime() if err != nil || t == nil { return t, err @@ -164,6 +163,54 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } } + if c.TargetDWH == QDWHTypeClickhouse { + if c.Nullable { + return c.processNullableUnion("long", t.(int64)) + } else { + return t.(int64), nil + } + } + if c.Nullable { + return goavro.Union("long.time-micros", t.(int64)), nil + } + return t.(int64), nil + case QValueKindTimeTZ: + t, err := c.processGoTimeTZ() + if err != nil || t == nil { + return t, err + } + if c.TargetDWH == QDWHTypeSnowflake { + if c.Nullable { + return c.processNullableUnion("string", t.(string)) + } else { + return t.(string), nil + } + } + + if c.TargetDWH == QDWHTypeClickhouse { + if c.Nullable { + return c.processNullableUnion("long", t.(int64)) + } else { + return t.(int64), nil + } + } + if c.Nullable { + return goavro.Union("long.time-micros", t.(int64)), nil + } + return t.(int64), nil + case QValueKindTimestamp: + t, err := c.processGoTimestamp() + if err != nil || t == nil { + return t, err + } + if c.TargetDWH == QDWHTypeSnowflake { + if c.Nullable { + return c.processNullableUnion("string", t.(string)) + } else { + return t.(string), nil + } + } + if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { return c.processNullableUnion("long", t.(int64)) @@ -175,7 +222,30 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return goavro.Union("long.timestamp-micros", t.(int64)), nil } return t.(int64), nil + case QValueKindTimestampTZ: + t, err := c.processGoTimestampTZ() + if err != nil || t == nil { + return t, err + } + if c.TargetDWH == QDWHTypeSnowflake { + if c.Nullable { + return c.processNullableUnion("string", t.(string)) + } else { + return t.(string), nil + } + } + if c.TargetDWH == QDWHTypeClickhouse { + if c.Nullable { + return c.processNullableUnion("long", t.(int64)) + } else { + return t.(int64), nil + } + } + if c.Nullable { + return goavro.Union("long.timestamp-micros", t.(int64)), nil + } + return t.(int64), nil case QValueKindDate: t, err := c.processGoDate() if err != nil || t == nil { @@ -191,9 +261,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.Nullable { return goavro.Union("int.date", t), nil - } else { - return t, nil } + return t, nil case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr: if c.TargetDWH == QDWHTypeSnowflake && c.Value.Value != nil && @@ -263,6 +332,24 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } } +func (c *QValueAvroConverter) processGoTimeTZ() (interface{}, error) { + if c.Value.Value == nil && c.Nullable { + return nil, nil + } + + t, ok := c.Value.Value.(time.Time) + if !ok { + return nil, fmt.Errorf("invalid TimeTZ value") + } + + // Snowflake has issues with avro timestamp types, returning as string form of the int64 + // See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file + if c.TargetDWH == QDWHTypeSnowflake { + return t.Format("15:04:05.999999-0700"), nil + } + return t.UnixMicro(), nil +} + func (c *QValueAvroConverter) processGoTime() (interface{}, error) { if c.Value.Value == nil && c.Nullable { return nil, nil @@ -273,13 +360,48 @@ func (c *QValueAvroConverter) processGoTime() (interface{}, error) { return nil, fmt.Errorf("invalid Time value") } - ret := t.UnixMicro() // Snowflake has issues with avro timestamp types, returning as string form of the int64 // See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file if c.TargetDWH == QDWHTypeSnowflake { - return strconv.FormatInt(ret, 10), nil + return t.Format("15:04:05.999999"), nil + } + return t.UnixMicro(), nil +} + +func (c *QValueAvroConverter) processGoTimestampTZ() (interface{}, error) { + if c.Value.Value == nil && c.Nullable { + return nil, nil + } + + t, ok := c.Value.Value.(time.Time) + if !ok { + return nil, fmt.Errorf("invalid TimestampTZ value") + } + + // Snowflake has issues with avro timestamp types, returning as string form of the int64 + // See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file + if c.TargetDWH == QDWHTypeSnowflake { + return t.Format("2006-01-02 15:04:05.999999-0700"), nil + } + return t.UnixMicro(), nil +} + +func (c *QValueAvroConverter) processGoTimestamp() (interface{}, error) { + if c.Value.Value == nil && c.Nullable { + return nil, nil + } + + t, ok := c.Value.Value.(time.Time) + if !ok { + return nil, fmt.Errorf("invalid Timestamp value") + } + + // Snowflake has issues with avro timestamp types, returning as string form of the int64 + // See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file + if c.TargetDWH == QDWHTypeSnowflake { + return t.Format("2006-01-02 15:04:05.999999"), nil } - return ret, nil + return t.UnixMicro(), nil } func (c *QValueAvroConverter) processGoDate() (interface{}, error) { @@ -295,8 +417,7 @@ func (c *QValueAvroConverter) processGoDate() (interface{}, error) { // Snowflake has issues with avro timestamp types, returning as string form of the int64 // See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file if c.TargetDWH == QDWHTypeSnowflake { - ret := t.UnixMicro() - return strconv.FormatInt(ret, 10), nil + return t.Format("2006-01-02"), nil } return t, nil } diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index bba156bb88..5c6b069639 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -68,12 +68,12 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindTimestamp: "TIMESTAMP_NTZ", QValueKindTimestampTZ: "TIMESTAMP_TZ", QValueKindTime: "TIME", + QValueKindTimeTZ: "TIME", QValueKindDate: "DATE", QValueKindBit: "BINARY", QValueKindBytes: "BINARY", QValueKindStruct: "STRING", QValueKindUUID: "STRING", - QValueKindTimeTZ: "STRING", QValueKindInvalid: "STRING", QValueKindHStore: "VARIANT", QValueKindGeography: "GEOGRAPHY", diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 7abe9b5ef4..d64059ca71 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -45,9 +45,11 @@ func (q QValue) Equals(other QValue) bool { case QValueKindString: return compareString(q.Value, other.Value) // all internally represented as a Golang time.Time - case QValueKindTime, QValueKindTimeTZ, QValueKindDate, + case QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ: return compareGoTime(q.Value, other.Value) + case QValueKindTime, QValueKindTimeTZ: + return compareGoCivilTime(q.Value, other.Value) case QValueKindNumeric: return compareNumeric(q.Value, other.Value) case QValueKindBytes: @@ -162,6 +164,29 @@ func compareGoTime(value1, value2 interface{}) bool { return t1 == t2 } +func compareGoCivilTime(value1, value2 interface{}) bool { + if value1 == nil && value2 == nil { + return true + } + + t1, ok1 := value1.(time.Time) + t2, ok2 := value2.(time.Time) + + if !ok1 || !ok2 { + if !ok2 { + // For BigQuery, we need to compare civil.Time with time.Time + ct2, ok3 := value2.(civil.Time) + if !ok3 { + return false + } + return t1.Hour() == ct2.Hour && t1.Minute() == ct2.Minute && t1.Second() == ct2.Second + } + return false + } + + return t1.Hour() == t2.Hour() && t1.Minute() == t2.Minute() && t1.Second() == t2.Second() +} + func compareUUID(value1, value2 interface{}) bool { if value1 == nil && value2 == nil { return true