Skip to content

Commit

Permalink
null out invalid timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 12, 2024
1 parent 95d1901 commit aba0653
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 3 deletions.
2 changes: 1 addition & 1 deletion flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
}, nil
}

// returns whether the function errors or there are nulls
// returns whether the function errors or there are no nulls
func (b *BigQueryTestHelper) CheckNull(tableName string, colName []string) (bool, error) {
if len(colName) == 0 {
return true, nil
Expand Down
67 changes: 67 additions & 0 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package e2e_bigquery

import (
"context"
"fmt"
"strings"

"github.com/stretchr/testify/require"

Expand All @@ -15,6 +17,34 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int)
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) {
tblFields := []string{
"watermark_ts timestamp",
"mytimestamp timestamp",
"mytztimestamp timestamptz",
}
tblFieldStr := strings.Join(tblFields, ",")
_, err := s.conn.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE e2e_test_%s.%s (
%s
);`, s.bqSuffix, tableName, tblFieldStr))

require.NoError(s.t, err)

var rows []string
row := `(CURRENT_TIMESTAMP,'10001-03-14 23:05:52','50001-03-14 23:05:52.216809+00')`
rows = append(rows, row)

_, err = s.conn.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO e2e_test_%s.%s (
watermark_ts,
mytimestamp,
mytztimestamp
) VALUES %s;
`, s.bqSuffix, tableName, strings.Join(rows, ",")))
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down Expand Up @@ -47,6 +77,43 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
e2e.RequireEqualTables(s, tblName, "*")
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

tblName := "test_qrep_flow_avro_bq"
s.setupTimeTable(tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE watermark_ts BETWEEN {{.start}} AND {{.end}}",
s.bqSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro",
fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName),
tblName,
query,
s.bqHelper.Peer,
"",
true,
"")
qrepConfig.WatermarkColumn = "watermark_ts"
require.NoError(s.t, err)
e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())

err = env.GetWorkflowError()
require.NoError(s.t, err)

ok, err := s.bqHelper.CheckNull(tblName, []string{"mytimestamp"})
require.NoError(s.t, err)
require.False(s.t, ok)

ok, err = s.bqHelper.CheckNull(tblName, []string{"mytztimestamp"})
require.NoError(s.t, err)
require.False(s.t, ok)
}

func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down
41 changes: 39 additions & 2 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
return t.(int64), nil
}
}

if c.Nullable {
return goavro.Union("long.timestamp-micros", t.(int64)), nil
}
Expand All @@ -242,6 +243,17 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
return t.(int64), nil
}
}

// Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD
// So, we make such timestamps as null
if c.TargetDWH == QDWHTypeBigQuery {
if t.(int64) < 0 || t.(int64) > 253402300799999999 {
slog.Info("Unlimited TimestampTZ value for BigQuery",
slog.Int64("timestamptz unix micro", t.(int64)))
return nil, nil
}
}

if c.Nullable {
return goavro.Union("long.timestamp-micros", t.(int64)), nil
}
Expand Down Expand Up @@ -386,7 +398,20 @@ func (c *QValueAvroConverter) processGoTimestampTZ() (interface{}, error) {
if c.TargetDWH == QDWHTypeSnowflake {
return t.Format("2006-01-02 15:04:05.999999-0700"), nil
}
return t.UnixMicro(), nil

tMicro := t.UnixMicro()
// Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD
// So, we make such timestamps as null
if c.TargetDWH == QDWHTypeBigQuery {
if tMicro < 0 || tMicro > 253402300799999999 {
slog.Info("Unlimited TimestampTZ value for BigQuery",
slog.String("timestamptz string", t.String()),
slog.Int64("timestamptz unix micro", tMicro))
return nil, nil
}
}

return tMicro, nil
}

func (c *QValueAvroConverter) processGoTimestamp() (interface{}, error) {
Expand All @@ -404,7 +429,19 @@ func (c *QValueAvroConverter) processGoTimestamp() (interface{}, error) {
if c.TargetDWH == QDWHTypeSnowflake {
return t.Format("2006-01-02 15:04:05.999999"), nil
}
return t.UnixMicro(), nil

tMicro := t.UnixMicro()
// Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD
// So, we make such timestamps as null
if c.TargetDWH == QDWHTypeBigQuery {
if tMicro < 0 || tMicro > 253402300799999999 {
slog.Info("Unlimited Timestamp value for BigQuery",
slog.String("timestamp string", t.String()),
slog.Int64("timestamp unix micro", tMicro))
return nil, nil
}
}
return tMicro, nil
}

func (c *QValueAvroConverter) processGoDate() (interface{}, error) {
Expand Down

0 comments on commit aba0653

Please sign in to comment.