Skip to content

Commit

Permalink
BigQuery: Better disallow timestamp function usage (#1395)
Browse files Browse the repository at this point in the history
The lower bound check of 1 AD was not correct in processTimestamp/TZ in
avro writing in BigQuery. More readable function now.
Also adds the check for Date in avro writing.
Test updated
  • Loading branch information
Amogh-Bharadwaj authored Feb 28, 2024
1 parent efdffa2 commit d3060ff
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
40 changes: 29 additions & 11 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) {
"watermark_ts timestamp",
"mytimestamp timestamp",
"mytztimestamp timestamptz",
"medieval timestamptz",
"mybaddate date",
"mydate date",
}
tblFieldStr := strings.Join(tblFields, ",")
_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -32,14 +35,22 @@ func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) {
require.NoError(s.t, err)

var rows []string
row := `(CURRENT_TIMESTAMP,'10001-03-14 23:05:52','50001-03-14 23:05:52.216809+00')`
row := `(CURRENT_TIMESTAMP,
'10001-03-14 23:05:52',
'50001-03-14 23:05:52.216809+00',
'1534-03-14 23:05:52.216809+00',
'10000-03-14',
CURRENT_TIMESTAMP)`
rows = append(rows, row)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO e2e_test_%s.%s (
watermark_ts,
mytimestamp,
mytztimestamp
mytztimestamp,
medieval,
mybaddate,
mydate
) VALUES %s;
`, s.bqSuffix, tableName, strings.Join(rows, ",")))
require.NoError(s.t, err)
Expand Down Expand Up @@ -76,16 +87,16 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
e2e.RequireEqualTables(s, tblName, "*")
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() {
func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_And_Date_QRep() {
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

tblName := "test_qrep_flow_avro_bq"
tblName := "test_invalid_time_bq"
s.setupTimeTable(tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE watermark_ts BETWEEN {{.start}} AND {{.end}}",
s.bqSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro",
qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_invalid_time_bq",
fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName),
tblName,
query,
Expand All @@ -103,13 +114,20 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() {
err = env.GetWorkflowError()
require.NoError(s.t, err)

ok, err := s.bqHelper.CheckNull(tblName, []string{"mytimestamp"})
require.NoError(s.t, err)
require.False(s.t, ok)
goodValues := []string{"watermark_ts", "mydate", "medieval"}
badValues := []string{"mytimestamp", "mytztimestamp", "mybaddate"}

ok, err = s.bqHelper.CheckNull(tblName, []string{"mytztimestamp"})
require.NoError(s.t, err)
require.False(s.t, ok)
for _, col := range goodValues {
ok, err := s.bqHelper.CheckNull(tblName, []string{col})
require.NoError(s.t, err)
require.True(s.t, ok)
}

for _, col := range badValues {
ok, err := s.bqHelper.CheckNull(tblName, []string{col})
require.NoError(s.t, err)
require.False(s.t, ok)
}
}

func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
Expand Down
6 changes: 6 additions & 0 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,12 @@ func (c *QValueAvroConverter) processGoDate() (interface{}, error) {
return nil, errors.New("invalid Time value for Date")
}

// Bigquery will not allow Date if it is less than 1AD and more than 9999AD
// So make such Dates null
if DisallowedTimestamp(c.TargetDWH, t, c.logger) {
return nil, nil
}

// Snowflake has issues with avro timestamp types, returning as string form
// See: https://stackoverflow.com/questions/66104762/snowflake-date-column-have-incorrect-date-from-avro-file
if c.TargetDWH == QDWHTypeSnowflake {
Expand Down
4 changes: 2 additions & 2 deletions flow/model/qvalue/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
// Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD
func DisallowedTimestamp(dwh QDWHType, t time.Time, logger log.Logger) bool {
if dwh == QDWHTypeBigQuery {
tMicro := t.UnixMicro()
if tMicro < 0 || tMicro > 253402300799999999 { // 9999-12-31 23:59:59.999999
year := t.Year()
if year < 1 || year > 9999 {
logger.Warn("Nulling Timestamp value for BigQuery as it exceeds allowed range",
"timestamp", t.String())
return true
Expand Down

0 comments on commit d3060ff

Please sign in to comment.