From d3a8e962d49fe593f82e2bd2220b0210a9ae66b6 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 01:26:44 +0530 Subject: [PATCH 01/16] support interval for snowflake --- flow/connectors/postgres/qvalue_convert.go | 24 ++++++++++ flow/e2e/snowflake/peer_flow_sf_test.go | 53 ++++++++++++++++++++++ flow/interval/interval.go | 11 +++++ flow/model/qvalue/avro_converter.go | 8 +++- flow/model/qvalue/kind.go | 1 + flow/model/qvalue/qvalue.go | 2 + 6 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 flow/interval/interval.go diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 79c817d28c..b42936cd5f 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -12,6 +12,7 @@ import ( "github.com/lib/pq/oid" "github.com/shopspring/decimal" + peerdb_interval "github.com/PeerDB-io/peer-flow/interval" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" ) @@ -80,6 +81,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu return qvalue.QValueKindArrayTimestampTZ case pgtype.TextArrayOID, pgtype.VarcharArrayOID, pgtype.BPCharArrayOID: return qvalue.QValueKindArrayString + case pgtype.IntervalOID: + return qvalue.QValueKindInterval default: typeName, ok := pgtype.NewMap().TypeForOID(recvOID) if !ok { @@ -225,6 +228,27 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( case qvalue.QValueKindTimestampTZ: timestamp := value.(time.Time) val = qvalue.QValue{Kind: qvalue.QValueKindTimestampTZ, Value: timestamp} + case qvalue.QValueKindInterval: + intervalObject := value.(pgtype.Interval) + var interval peerdb_interval.PeerDBInterval + interval.Hours = int(intervalObject.Microseconds / 3600000000) + interval.Minutes = int((intervalObject.Microseconds % 3600000000) / 60000000) + interval.Seconds = float64((intervalObject.Microseconds % 60000000) / 1000000) + interval.Days = int(intervalObject.Days) + interval.Years = int(intervalObject.Months / 12) + interval.Months = int(intervalObject.Months % 12) + interval.Valid = intervalObject.Valid + + intervalJSON, err := json.Marshal(interval) + if err != nil { + return qvalue.QValue{}, fmt.Errorf("failed to parse interval: %w", err) + } + + if !interval.Valid { + return qvalue.QValue{}, fmt.Errorf("invalid interval: %v", value) + } + + return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(intervalJSON)}, nil case qvalue.QValueKindDate: date := value.(time.Time) val = qvalue.QValue{Kind: qvalue.QValueKindDate, Value: date} diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 56084a1a27..b0ae6e2096 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1119,3 +1119,56 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { e2e.RequireEnvCanceled(s.t, env) } + +func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { + tc := e2e.NewTemporalClient(s.t) + + srcTableName := s.attachSchemaSuffix("testintervalsf") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "testintervalsf") + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS e2e_test_%s."%s" ( + id SERIAL PRIMARY KEY, + dur INTERVAL + ); + `, s.pgSuffix, "testintervalsf")) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_interval_sf"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + Destination: s.sfHelper.Peer, + } + + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + flowConnConfig.MaxBatchSize = 100 + + // wait for PeerFlowStatusQuery to finish setup + // and then insert 20 rows into the source table + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + // insert 20 rows into the source table + for range 20 { + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO e2e_test_%s."%s"(dur) + SELECT + floor(random() * 100)::int || ' days ' || + floor(random() * 24)::int || ' hours ' || + floor(random() * 60)::int || ' minutes ' || + floor(random() * 60)::int || ' seconds ' || + floor(random() * 30)::int || ' months' AS random_interval; + `, s.pgSuffix, "testintervalsf")) + e2e.EnvNoError(s.t, env, err) + } + s.t.Log("Inserted 20 rows into the source table") + e2e.EnvWaitForEqualTablesWithNames( + env, + s, + "normalize interval sf test", + "testintervalsf", + "\"testintervalsf\"", + "id,dur", + ) + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/interval/interval.go b/flow/interval/interval.go new file mode 100644 index 0000000000..79fbdc3ecf --- /dev/null +++ b/flow/interval/interval.go @@ -0,0 +1,11 @@ +package peerdb_interval + +type PeerDBInterval struct { + Hours int `json:"hours,omitempty"` + Minutes int `json:"minutes,omitempty"` + Seconds float64 `json:"seconds,omitempty"` + Days int `json:"days,omitempty"` + Months int `json:"months,omitempty"` + Years int `json:"years,omitempty"` + Valid bool `json:"valid"` +} diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 0a299cf82f..3df8738209 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -58,7 +58,11 @@ type AvroSchemaField struct { // will return an error. func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision int16, scale int16) (interface{}, error) { switch kind { - case QValueKindString, QValueKindQChar, QValueKindCIDR, QValueKindINET: + case QValueKindString: + return "string", nil + case QValueKindQChar, QValueKindCIDR, QValueKindINET: + return "string", nil + case QValueKindInterval: return "string", nil case QValueKindUUID: return AvroSchemaLogical{ @@ -285,7 +289,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return t, nil case QValueKindQChar: return c.processNullableUnion("string", string(c.Value.(uint8))) - case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr: + case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr, QValueKindInterval: if c.TargetDWH == QDWHTypeSnowflake && c.Value != nil && (len(c.Value.(string)) > 15*1024*1024) { slog.Warn("Truncating TEXT value > 15MB for Snowflake!") diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 9def7821f4..445dec6f8b 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -24,6 +24,7 @@ const ( QValueKindDate QValueKind = "date" QValueKindTime QValueKind = "time" QValueKindTimeTZ QValueKind = "timetz" + QValueKindInterval QValueKind = "interval" QValueKindNumeric QValueKind = "numeric" QValueKindBytes QValueKind = "bytes" QValueKindUUID QValueKind = "uuid" diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index ae0a3945ab..33be135e3d 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -64,6 +64,8 @@ func (q QValue) Equals(other QValue) bool { case QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ: return compareGoTime(q.Value, other.Value) + case QValueKindInterval: + return compareString(q.Value, other.Value) case QValueKindTime, QValueKindTimeTZ: return compareGoCivilTime(q.Value, other.Value) case QValueKindNumeric: From 49a26c0dfc0b55e988df4d5f73daafd0ebbedf9a Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 01:28:27 +0530 Subject: [PATCH 02/16] map interval to variant --- flow/model/qvalue/kind.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 445dec6f8b..9ed9ac0beb 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -70,6 +70,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindJSON: "VARIANT", QValueKindTimestamp: "TIMESTAMP_NTZ", QValueKindTimestampTZ: "TIMESTAMP_TZ", + QValueKindInterval: "VARIANT", QValueKindTime: "TIME", QValueKindTimeTZ: "TIME", QValueKindDate: "DATE", @@ -118,6 +119,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindTimeTZ: "String", QValueKindInvalid: "String", QValueKindHStore: "String", + // array types will be mapped to VARIANT QValueKindArrayFloat32: "Array(Float32)", QValueKindArrayFloat64: "Array(Float64)", From 655e47f2ab942b0f6774273dbbd1fa444c57ada8 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 03:07:54 +0530 Subject: [PATCH 03/16] change test --- flow/e2e/snowflake/peer_flow_sf_test.go | 33 +++++++++++-------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index b0ae6e2096..b1e7e2de70 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1147,28 +1147,23 @@ func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { // and then insert 20 rows into the source table env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - // insert 20 rows into the source table - for range 20 { - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO e2e_test_%s."%s"(dur) SELECT - floor(random() * 100)::int || ' days ' || - floor(random() * 24)::int || ' hours ' || - floor(random() * 60)::int || ' minutes ' || - floor(random() * 60)::int || ' seconds ' || - floor(random() * 30)::int || ' months' AS random_interval; + make_interval( + 20, + floor(random() * 24)::int, + floor(random() * 60)::int, + floor(random() * 60)::int, + floor(random() * 30)::int + ); `, s.pgSuffix, "testintervalsf")) - e2e.EnvNoError(s.t, env, err) - } - s.t.Log("Inserted 20 rows into the source table") - e2e.EnvWaitForEqualTablesWithNames( - env, - s, - "normalize interval sf test", - "testintervalsf", - "\"testintervalsf\"", - "id,dur", - ) + e2e.EnvNoError(s.t, env, err) + + s.t.Log("Inserted a row into the source table") + err = s.checkJSONValue(dstTableName, "dur", "days", "20") + e2e.EnvNoError(s.t, env, err) env.Cancel() e2e.RequireEnvCanceled(s.t, env) } From 5c716b051323b74fe2f5fb86631422b4047dc7bf Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 03:50:44 +0530 Subject: [PATCH 04/16] another test fix attempt --- flow/e2e/snowflake/peer_flow_sf_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index b1e7e2de70..1e596268be 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1144,7 +1144,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { flowConnConfig.MaxBatchSize = 100 // wait for PeerFlowStatusQuery to finish setup - // and then insert 20 rows into the source table env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) @@ -1162,8 +1161,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted a row into the source table") - err = s.checkJSONValue(dstTableName, "dur", "days", "20") - e2e.EnvNoError(s.t, env, err) env.Cancel() e2e.RequireEnvCanceled(s.t, env) + err = s.checkJSONValue(dstTableName, "dur", "days", "20") + require.NoError(s.t, err) } From 3e88efa8ba781b9060430ebafbfad933abc7afda Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 04:30:04 +0530 Subject: [PATCH 05/16] test fix attempt --- flow/e2e/snowflake/peer_flow_sf_test.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 1e596268be..418d7930d1 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1127,7 +1127,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "testintervalsf") _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS e2e_test_%s."%s" ( + CREATE TABLE IF NOT EXISTS e2e_test_%s.%s ( id SERIAL PRIMARY KEY, dur INTERVAL ); @@ -1141,14 +1141,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { } flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 100 + flowConnConfig.MaxBatchSize = 5 // wait for PeerFlowStatusQuery to finish setup env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO e2e_test_%s."%s"(dur) + INSERT INTO e2e_test_%s.%s(dur) SELECT make_interval( 20, @@ -1161,6 +1161,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted a row into the source table") + e2e.EnvWaitForEqualTablesWithNames( + env, + s, + "normalize interval type", + "testintervalsf", + "testintervalsf", + "id", + ) env.Cancel() e2e.RequireEnvCanceled(s.t, env) err = s.checkJSONValue(dstTableName, "dur", "days", "20") From b0a3b830086aa682bcf2670e88dc16a90108749e Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 04:48:16 +0530 Subject: [PATCH 06/16] fix test --- flow/e2e/snowflake/peer_flow_sf_test.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 418d7930d1..f61305d37a 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1150,13 +1150,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO e2e_test_%s.%s(dur) SELECT - make_interval( - 20, - floor(random() * 24)::int, - floor(random() * 60)::int, - floor(random() * 60)::int, - floor(random() * 30)::int - ); + '2 days' `, s.pgSuffix, "testintervalsf")) e2e.EnvNoError(s.t, env, err) @@ -1171,6 +1165,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { ) env.Cancel() e2e.RequireEnvCanceled(s.t, env) - err = s.checkJSONValue(dstTableName, "dur", "days", "20") + err = s.checkJSONValue(dstTableName, "dur", "days", "2") require.NoError(s.t, err) } From 1dde8c4267cdd2fe53fd12db72f0a86763e3389c Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 05:00:11 +0530 Subject: [PATCH 07/16] use an existing test --- flow/e2e/snowflake/peer_flow_sf_test.go | 54 +++---------------------- 1 file changed, 5 insertions(+), 49 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index f61305d37a..ede957ea2e 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -440,6 +440,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { return false } + // check interval + if err := s.checkJSONValue(dstTableName, "c16", "years", "5"); err != nil { + return false + } + // check if JSON on snowflake side is a good JSON if err := s.checkJSONValue(dstTableName, "c17", "sai", "1"); err != nil { return false @@ -1119,52 +1124,3 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { e2e.RequireEnvCanceled(s.t, env) } - -func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() { - tc := e2e.NewTemporalClient(s.t) - - srcTableName := s.attachSchemaSuffix("testintervalsf") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "testintervalsf") - - _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS e2e_test_%s.%s ( - id SERIAL PRIMARY KEY, - dur INTERVAL - ); - `, s.pgSuffix, "testintervalsf")) - require.NoError(s.t, err) - - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_interval_sf"), - TableNameMapping: map[string]string{srcTableName: dstTableName}, - Destination: s.sfHelper.Peer, - } - - flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() - flowConnConfig.MaxBatchSize = 5 - - // wait for PeerFlowStatusQuery to finish setup - env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) - e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) - - _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` - INSERT INTO e2e_test_%s.%s(dur) - SELECT - '2 days' - `, s.pgSuffix, "testintervalsf")) - e2e.EnvNoError(s.t, env, err) - - s.t.Log("Inserted a row into the source table") - e2e.EnvWaitForEqualTablesWithNames( - env, - s, - "normalize interval type", - "testintervalsf", - "testintervalsf", - "id", - ) - env.Cancel() - e2e.RequireEnvCanceled(s.t, env) - err = s.checkJSONValue(dstTableName, "dur", "days", "2") - require.NoError(s.t, err) -} From b86a824fd43ce28202725065c9c8be2324ad8af6 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 05:35:54 +0530 Subject: [PATCH 08/16] increase timeout --- flow/e2e/snowflake/peer_flow_sf_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index ede957ea2e..257d9e013c 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -427,7 +427,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { `, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize types", func() bool { + e2e.EnvWaitFor(s.t, env, 5*time.Minute, "normalize types", func() bool { noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{ "c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", From f347105eca471e39aca6019bc94d85cffa365706 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 05:50:10 +0530 Subject: [PATCH 09/16] check with no test --- flow/e2e/snowflake/peer_flow_sf_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 257d9e013c..190cf82fbd 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -440,11 +440,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { return false } - // check interval - if err := s.checkJSONValue(dstTableName, "c16", "years", "5"); err != nil { - return false - } - // check if JSON on snowflake side is a good JSON if err := s.checkJSONValue(dstTableName, "c17", "sai", "1"); err != nil { return false From 23b03007d2e8a0a689b80dfc3e3e4109f7731d89 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 06:16:39 +0530 Subject: [PATCH 10/16] add interval type test again --- flow/e2e/snowflake/peer_flow_sf_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 190cf82fbd..4b7f3b9963 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -440,6 +440,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { return false } + // interval checks + if err := s.checkJSONValue(dstTableName, "c16", "years", "5"); err != nil { + return false + } + + if err := s.checkJSONValue(dstTableName, "c16", "months", "2"); err != nil { + return false + } + + if err := s.checkJSONValue(dstTableName, "c16", "days", "29"); err != nil { + return false + } + // check if JSON on snowflake side is a good JSON if err := s.checkJSONValue(dstTableName, "c17", "sai", "1"); err != nil { return false From e1e391f5c625d5fd97388f28f40e9fbd4cd2846b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 06:43:38 +0530 Subject: [PATCH 11/16] float division for seconds --- flow/connectors/postgres/qvalue_convert.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index b42936cd5f..d0e3e8cc0b 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -233,7 +233,7 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( var interval peerdb_interval.PeerDBInterval interval.Hours = int(intervalObject.Microseconds / 3600000000) interval.Minutes = int((intervalObject.Microseconds % 3600000000) / 60000000) - interval.Seconds = float64((intervalObject.Microseconds % 60000000) / 1000000) + interval.Seconds = float64(intervalObject.Microseconds%60000000) / 1000000.0 interval.Days = int(intervalObject.Days) interval.Years = int(intervalObject.Months / 12) interval.Months = int(intervalObject.Months % 12) From 0fca9d94ffb04c164a9a4d7429782c93f7b63e2f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 07:17:54 +0530 Subject: [PATCH 12/16] increase timeout, add logs --- flow/e2e/snowflake/peer_flow_sf_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 4b7f3b9963..8d1ba58d9d 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -427,7 +427,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { `, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.EnvWaitFor(s.t, env, 5*time.Minute, "normalize types", func() bool { + e2e.EnvWaitFor(s.t, env, 10*time.Minute, "normalize types", func() bool { + s.t.Log("Checking for nulls in all types") noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{ "c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -440,6 +441,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { return false } + s.t.Log("Checking for intervals in all types") // interval checks if err := s.checkJSONValue(dstTableName, "c16", "years", "5"); err != nil { return false @@ -453,6 +455,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { return false } + s.t.Log("Checking for json in all types") // check if JSON on snowflake side is a good JSON if err := s.checkJSONValue(dstTableName, "c17", "sai", "1"); err != nil { return false From 4642eb8ae3e42af375ae0d77aed2e86e36bbfc40 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 11:09:30 +0530 Subject: [PATCH 13/16] add error logs --- flow/e2e/snowflake/peer_flow_sf_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 8d1ba58d9d..12c0dda671 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -444,14 +444,17 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { s.t.Log("Checking for intervals in all types") // interval checks if err := s.checkJSONValue(dstTableName, "c16", "years", "5"); err != nil { + s.t.Log(err) return false } if err := s.checkJSONValue(dstTableName, "c16", "months", "2"); err != nil { + s.t.Log(err) return false } if err := s.checkJSONValue(dstTableName, "c16", "days", "29"); err != nil { + s.t.Log(err) return false } From f15ed785f0ca4c009a7ef97a2ccb48a9eddd434e Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 12:16:11 +0530 Subject: [PATCH 14/16] fix merge --- flow/connectors/snowflake/merge_stmt_generator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index 19be0cfd94..b25d465a74 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -51,7 +51,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) { flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("TO_GEOMETRY(CAST(%s:\"%s\" AS STRING),true) AS %s", toVariantColumnName, column.Name, targetColumnName)) - case qvalue.QValueKindJSON, qvalue.QValueKindHStore: + case qvalue.QValueKindJSON, qvalue.QValueKindHStore, qvalue.QValueKindInterval: flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("PARSE_JSON(CAST(%s:\"%s\" AS STRING)) AS %s", toVariantColumnName, column.Name, targetColumnName)) From fcba2141023ece075b86899f2f2173fe2732a8d8 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 12:27:02 +0530 Subject: [PATCH 15/16] remove logs --- flow/e2e/snowflake/peer_flow_sf_test.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 12c0dda671..be8377f522 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -427,8 +427,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { `, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.EnvWaitFor(s.t, env, 10*time.Minute, "normalize types", func() bool { - s.t.Log("Checking for nulls in all types") + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize types", func() bool { noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{ "c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -437,28 +436,22 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { "c50", "c51", "c52", "c53", "c54", }) if err != nil { - s.t.Log(err) return false } - s.t.Log("Checking for intervals in all types") // interval checks if err := s.checkJSONValue(dstTableName, "c16", "years", "5"); err != nil { - s.t.Log(err) return false } if err := s.checkJSONValue(dstTableName, "c16", "months", "2"); err != nil { - s.t.Log(err) return false } if err := s.checkJSONValue(dstTableName, "c16", "days", "29"); err != nil { - s.t.Log(err) return false } - s.t.Log("Checking for json in all types") // check if JSON on snowflake side is a good JSON if err := s.checkJSONValue(dstTableName, "c17", "sai", "1"); err != nil { return false From f75d9fa5ce4dbbef3d1194ab555582c17a8eeb90 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 21 Mar 2024 13:28:10 +0530 Subject: [PATCH 16/16] revert qvalue.go --- flow/model/qvalue/qvalue.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 33be135e3d..ae0a3945ab 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -64,8 +64,6 @@ func (q QValue) Equals(other QValue) bool { case QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ: return compareGoTime(q.Value, other.Value) - case QValueKindInterval: - return compareString(q.Value, other.Value) case QValueKindTime, QValueKindTimeTZ: return compareGoCivilTime(q.Value, other.Value) case QValueKindNumeric: