diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index e881662c7b..4fdfb52e95 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -22,6 +22,9 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) { "watermark_ts timestamp", "mytimestamp timestamp", "mytztimestamp timestamptz", + "medieval timestamptz", + "mybaddate date", + "mydate date", } tblFieldStr := strings.Join(tblFields, ",") _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -32,14 +35,22 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) { require.NoError(s.t, err) var rows []string - row := `(CURRENT_TIMESTAMP,'10001-03-14 23:05:52','50001-03-14 23:05:52.216809+00')` + row := `(CURRENT_TIMESTAMP, + '10001-03-14 23:05:52', + '50001-03-14 23:05:52.216809+00', + '1534-03-14 23:05:52.216809+00', + '10000-03-14', + CURRENT_TIMESTAMP)` rows = append(rows, row) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO e2e_test_%s.%s ( watermark_ts, mytimestamp, - mytztimestamp + mytztimestamp, + medieval, + mybaddate, + mydate ) VALUES %s; `, s.bqSuffix, tableName, strings.Join(rows, ","))) require.NoError(s.t, err) @@ -76,16 +87,16 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { e2e.RequireEqualTables(s, tblName, "*") } -func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() { +func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_And_Date_QRep() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - tblName := "test_qrep_flow_avro_bq" + tblName := "test_invalid_time_bq" s.setupTimeTable(tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE watermark_ts BETWEEN {{.start}} AND {{.end}}", s.bqSuffix, tblName) - qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro", + qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_invalid_time_bq", fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), tblName, query, @@ -103,13 +114,20 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() { err = env.GetWorkflowError() require.NoError(s.t, err) - ok, err := s.bqHelper.CheckNull(tblName, []string{"mytimestamp"}) - require.NoError(s.t, err) - require.False(s.t, ok) + goodValues := []string{"watermark_ts", "mydate", "medieval"} + badValues := []string{"mytimestamp", "mytztimestamp", "mybaddate"} - ok, err = s.bqHelper.CheckNull(tblName, []string{"mytztimestamp"}) - require.NoError(s.t, err) - require.False(s.t, ok) + for _, col := range goodValues { + ok, err := s.bqHelper.CheckNull(tblName, []string{col}) + require.NoError(s.t, err) + require.True(s.t, ok) + } + + for _, col := range badValues { + ok, err := s.bqHelper.CheckNull(tblName, []string{col}) + require.NoError(s.t, err) + require.False(s.t, ok) + } } func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 017e16962c..d5aef18113 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -456,6 +456,12 @@ func (c *QValueAvroConverter) processGoDate() (interface{}, error) { return nil, errors.New("invalid Time value for Date") } + // Bigquery will not allow Date if it is less than 1AD and more than 9999AD + // So make such Dates null + if DisallowedTimestamp(c.TargetDWH, t, c.logger) { + return nil, nil + } + // Snowflake has issues with avro timestamp types, returning as string form // See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file if c.TargetDWH == QDWHTypeSnowflake { diff --git a/flow/model/qvalue/timestamp.go b/flow/model/qvalue/timestamp.go index 20d564b927..d7d7e6ad4a 100644 --- a/flow/model/qvalue/timestamp.go +++ b/flow/model/qvalue/timestamp.go @@ -9,8 +9,8 @@ import ( // Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD func DisallowedTimestamp(dwh QDWHType, t time.Time, logger log.Logger) bool { if dwh == QDWHTypeBigQuery { - tMicro := t.UnixMicro() - if tMicro < 0 || tMicro > 253402300799999999 { // 9999-12-31 23:59:59.999999 + year := t.Year() + if year < 1 || year > 9999 { logger.Warn("Nulling Timestamp value for BigQuery as it exceeds allowed range", "timestamp", t.String()) return true