diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 14878a5c6f..f13579d92e 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -377,7 +377,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor }, nil } -// returns whether the function errors or there are nulls +// returns whether the function errors or there are no nulls func (b *BigQueryTestHelper) CheckNull(tableName string, colName []string) (bool, error) { if len(colName) == 0 { return true, nil diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index d8c3385510..bf1e6ef9db 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -1,7 +1,9 @@ package e2e_bigquery import ( + "context" "fmt" + "strings" "github.com/stretchr/testify/require" @@ -15,6 +17,34 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) require.NoError(s.t, err) } +func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) { + tblFields := []string{ + "watermark_ts timestamp", + "mytimestamp timestamp", + "mytztimestamp timestamptz", + } + tblFieldStr := strings.Join(tblFields, ",") + _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE e2e_test_%s.%s ( + %s + );`, s.bqSuffix, tableName, tblFieldStr)) + + require.NoError(s.t, err) + + var rows []string + row := `(CURRENT_TIMESTAMP,'10001-03-14 23:05:52','50001-03-14 23:05:52.216809+00')` + rows = append(rows, row) + + _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO e2e_test_%s.%s ( + watermark_ts, + mytimestamp, + mytztimestamp + ) VALUES %s; + `, s.bqSuffix, tableName, strings.Join(rows, ","))) + require.NoError(s.t, err) +} + func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) @@ -47,6 +77,43 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { e2e.RequireEqualTables(s, tblName, "*") } +func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + tblName := "test_qrep_flow_avro_bq" + s.setupTimeTable(tblName) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE watermark_ts BETWEEN {{.start}} AND {{.end}}", + s.bqSuffix, tblName) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro", + fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), + tblName, + query, + s.bqHelper.Peer, + "", + true, + "") + qrepConfig.WatermarkColumn = "watermark_ts" + require.NoError(s.t, err) + e2e.RunQrepFlowWorkflow(env, qrepConfig) + + // Verify workflow completes without error + require.True(s.t, env.IsWorkflowCompleted()) + + err = env.GetWorkflowError() + require.NoError(s.t, err) + + ok, err := s.bqHelper.CheckNull(tblName, []string{"mytimestamp"}) + require.NoError(s.t, err) + require.False(s.t, ok) + + ok, err = s.bqHelper.CheckNull(tblName, []string{"mytztimestamp"}) + require.NoError(s.t, err) + require.False(s.t, ok) +} + func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 3bd44e6cd6..a90e8335c2 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -218,6 +218,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return t.(int64), nil } } + if c.Nullable { return goavro.Union("long.timestamp-micros", t.(int64)), nil } @@ -242,6 +243,17 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return t.(int64), nil } } + + // Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD + // So, we make such timestamps as null + if c.TargetDWH == QDWHTypeBigQuery { + if t.(int64) < 0 || t.(int64) > 253402300799999999 { + slog.Info("Unlimited TimestampTZ value for BigQuery", + slog.Int64("timestamptz unix micro", t.(int64))) + return nil, nil + } + } + if c.Nullable { return goavro.Union("long.timestamp-micros", t.(int64)), nil } @@ -386,7 +398,20 @@ func (c *QValueAvroConverter) processGoTimestampTZ() (interface{}, error) { if c.TargetDWH == QDWHTypeSnowflake { return t.Format("2006-01-02 15:04:05.999999-0700"), nil } - return t.UnixMicro(), nil + + tMicro := t.UnixMicro() + // Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD + // So, we make such timestamps as null + if c.TargetDWH == QDWHTypeBigQuery { + if tMicro < 0 || tMicro > 253402300799999999 { + slog.Info("Unlimited TimestampTZ value for BigQuery", + slog.String("timestamptz string", t.String()), + slog.Int64("timestamptz unix micro", tMicro)) + return nil, nil + } + } + + return tMicro, nil } func (c *QValueAvroConverter) processGoTimestamp() (interface{}, error) { @@ -404,7 +429,19 @@ func (c *QValueAvroConverter) processGoTimestamp() (interface{}, error) { if c.TargetDWH == QDWHTypeSnowflake { return t.Format("2006-01-02 15:04:05.999999"), nil } - return t.UnixMicro(), nil + + tMicro := t.UnixMicro() + // Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD + // So, we make such timestamps as null + if c.TargetDWH == QDWHTypeBigQuery { + if tMicro < 0 || tMicro > 253402300799999999 { + slog.Info("Unlimited Timestamp value for BigQuery", + slog.String("timestamp string", t.String()), + slog.Int64("timestamp unix micro", tMicro)) + return nil, nil + } + } + return tMicro, nil } func (c *QValueAvroConverter) processGoDate() (interface{}, error) {