From b2ed20ad5a5bef697b08a35016b4737cab503f2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 14:22:12 +0000 Subject: [PATCH 1/3] Add support for "char" (#1285) Postgres offers a type "char" distinct from CHAR, represented by one byte Map this type in QValue, sqlserver also has char, & on clickhouse we can represent it with FixedString(1) --- flow/connectors/clickhouse/qvalue_convert.go | 1 + flow/connectors/postgres/qvalue_convert.go | 6 ++++++ .../postgres/schema_delta_test_constants.go | 12 +++++++++--- flow/connectors/snowflake/avro_file_writer_test.go | 3 +++ flow/connectors/sql/query_executor.go | 7 +++++++ flow/connectors/sqlserver/qvalue_convert.go | 3 ++- flow/model/model.go | 6 ++++++ flow/model/qrecord_batch.go | 8 ++++++++ flow/model/qvalue/avro_converter.go | 5 +++-- flow/model/qvalue/kind.go | 3 +++ flow/model/qvalue/qvalue.go | 6 ++++++ 11 files changed, 54 insertions(+), 6 deletions(-) diff --git a/flow/connectors/clickhouse/qvalue_convert.go b/flow/connectors/clickhouse/qvalue_convert.go index f80ea7857e..8ec8a52f0f 100644 --- a/flow/connectors/clickhouse/qvalue_convert.go +++ b/flow/connectors/clickhouse/qvalue_convert.go @@ -16,6 +16,7 @@ var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{ "CHAR": qvalue.QValueKindString, "TEXT": qvalue.QValueKindString, "String": qvalue.QValueKindString, + "FixedString(1)": qvalue.QValueKindQChar, "Bool": qvalue.QValueKindBoolean, "DateTime": qvalue.QValueKindTimestamp, "TIMESTAMP": qvalue.QValueKindTimestamp, diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index bd39fd1048..81f19e1ec9 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -32,6 +32,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { return qvalue.QValueKindFloat32 case pgtype.Float8OID: return qvalue.QValueKindFloat64 + case pgtype.QCharOID: + return qvalue.QValueKindQChar case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID: return qvalue.QValueKindString case pgtype.ByteaOID: @@ -122,6 +124,8 @@ func qValueKindToPostgresType(colTypeStr string) string { return "REAL" case qvalue.QValueKindFloat64: return "DOUBLE PRECISION" + case qvalue.QValueKindQChar: + return "\"char\"" case qvalue.QValueKindString: return "TEXT" case qvalue.QValueKindBytes: @@ -277,6 +281,8 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( case qvalue.QValueKindFloat64: floatVal := value.(float64) val = qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal} + case qvalue.QValueKindQChar: + val = qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: uint8(value.(rune))} case qvalue.QValueKindString: // handling all unsupported types with strings as well for now. val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(value)} diff --git a/flow/connectors/postgres/schema_delta_test_constants.go b/flow/connectors/postgres/schema_delta_test_constants.go index d86f8e98a0..6ded70625a 100644 --- a/flow/connectors/postgres/schema_delta_test_constants.go +++ b/flow/connectors/postgres/schema_delta_test_constants.go @@ -19,6 +19,7 @@ var AddAllColumnTypes = []string{ string(qvalue.QValueKindJSON), string(qvalue.QValueKindNumeric), string(qvalue.QValueKindString), + string(qvalue.QValueKindQChar), string(qvalue.QValueKindTime), string(qvalue.QValueKindTimestamp), string(qvalue.QValueKindTimestampTZ), @@ -93,21 +94,26 @@ var AddAllColumnTypesFields = []*protos.FieldDescription{ }, { Name: "c13", - Type: string(qvalue.QValueKindTime), + Type: string(qvalue.QValueKindQChar), TypeModifier: -1, }, { Name: "c14", - Type: string(qvalue.QValueKindTimestamp), + Type: string(qvalue.QValueKindTime), TypeModifier: -1, }, { Name: "c15", - Type: string(qvalue.QValueKindTimestampTZ), + Type: string(qvalue.QValueKindTimestamp), TypeModifier: -1, }, { Name: "c16", + Type: string(qvalue.QValueKindTimestampTZ), + TypeModifier: -1, + }, + { + Name: "c17", Type: string(qvalue.QValueKindUUID), TypeModifier: -1, }, diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index bd9b155dab..0252b53fc8 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -40,6 +40,8 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue. value = big.NewRat(int64(placeHolder), 1) case qvalue.QValueKindUUID: value = uuid.New() // assuming you have the github.com/google/uuid package + case qvalue.QValueKindQChar: + value = uint8(48) // case qvalue.QValueKindArray: // value = []int{1, 2, 3} // placeholder array, replace with actual logic // case qvalue.QValueKindStruct: @@ -85,6 +87,7 @@ func generateRecords( qvalue.QValueKindNumeric, qvalue.QValueKindBytes, qvalue.QValueKindUUID, + qvalue.QValueKindQChar, // qvalue.QValueKindJSON, qvalue.QValueKindBit, } diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 14bc6629b7..64ff4ecc54 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -308,6 +308,9 @@ func (g *GenericSQLQueryExecutor) CheckNull(ctx context.Context, schema string, } func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { + if val == nil { + return qvalue.QValue{Kind: kind, Value: nil}, nil + } switch kind { case qvalue.QValueKindInt32: if v, ok := val.(*sql.NullInt32); ok { @@ -341,6 +344,10 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { return qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: nil}, nil } } + case qvalue.QValueKindQChar: + if v, ok := val.(uint8); ok { + return qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: v}, nil + } case qvalue.QValueKindString: if v, ok := val.(*sql.NullString); ok { if v.Valid { diff --git a/flow/connectors/sqlserver/qvalue_convert.go b/flow/connectors/sqlserver/qvalue_convert.go index cff634139a..b4f73420e1 100644 --- a/flow/connectors/sqlserver/qvalue_convert.go +++ b/flow/connectors/sqlserver/qvalue_convert.go @@ -10,6 +10,7 @@ var qValueKindToSQLServerTypeMap = map[qvalue.QValueKind]string{ qvalue.QValueKindFloat32: "REAL", qvalue.QValueKindFloat64: "FLOAT", qvalue.QValueKindNumeric: "DECIMAL(38, 9)", + qvalue.QValueKindQChar: "CHAR", qvalue.QValueKindString: "NTEXT", qvalue.QValueKindJSON: "NTEXT", // SQL Server doesn't have a native JSON type qvalue.QValueKindTimestamp: "DATETIME2", @@ -51,7 +52,7 @@ var sqlServerTypeToQValueKindMap = map[string]qvalue.QValueKind{ "UNIQUEIDENTIFIER": qvalue.QValueKindUUID, "SMALLINT": qvalue.QValueKindInt32, "TINYINT": qvalue.QValueKindInt32, - "CHAR": qvalue.QValueKindString, + "CHAR": qvalue.QValueKindQChar, "VARCHAR": qvalue.QValueKindString, "NCHAR": qvalue.QValueKindString, "NVARCHAR": qvalue.QValueKindString, diff --git a/flow/model/model.go b/flow/model/model.go index 14de42a44e..a5bb754f2a 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -158,7 +158,13 @@ func (r *RecordItems) toMap(hstoreAsJSON bool) (map[string]interface{}, error) { } jsonStruct[col] = binStr + case qvalue.QValueKindQChar: + ch, ok := v.Value.(uint8) + if !ok { + return nil, fmt.Errorf("expected \"char\" value for column %s for %T", col, v.Value) + } + jsonStruct[col] = string(ch) case qvalue.QValueKindString, qvalue.QValueKindJSON: strVal, ok := v.Value.(string) if !ok { diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 572c788630..4455101ad8 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -137,6 +137,14 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { } values[i] = v + case qvalue.QValueKindQChar: + v, ok := qValue.Value.(uint8) + if !ok { + src.err = fmt.Errorf("invalid \"char\" value") + return nil, src.err + } + values[i] = rune(v) + case qvalue.QValueKindString: v, ok := qValue.Value.(string) if !ok { diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 4f5488cbff..e5e6046fb5 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -64,7 +64,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision } switch kind { - case QValueKindString: + case QValueKindString, QValueKindQChar: return "string", nil case QValueKindUUID: return AvroSchemaLogical{ @@ -291,7 +291,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return goavro.Union("int.date", t), nil } return t, nil - + case QValueKindQChar: + return c.processNullableUnion("string", string(c.Value.Value.(uint8))) case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr: if c.TargetDWH == QDWHTypeSnowflake && c.Value.Value != nil && (len(c.Value.Value.(string)) > 15*1024*1024) { diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 6328897d3f..78c9ece45b 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -17,6 +17,7 @@ const ( QValueKindInt64 QValueKind = "int64" QValueKindBoolean QValueKind = "bool" QValueKindStruct QValueKind = "struct" + QValueKindQChar QValueKind = "qchar" QValueKindString QValueKind = "string" QValueKindTimestamp QValueKind = "timestamp" QValueKindTimestampTZ QValueKind = "timestamptz" @@ -63,6 +64,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindFloat32: "FLOAT", QValueKindFloat64: "FLOAT", QValueKindNumeric: "NUMBER(38, 9)", + QValueKindQChar: "CHAR", QValueKindString: "STRING", QValueKindJSON: "VARIANT", QValueKindTimestamp: "TIMESTAMP_NTZ", @@ -101,6 +103,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindFloat32: "Float32", QValueKindFloat64: "Float64", QValueKindNumeric: "Decimal128(9)", + QValueKindQChar: "FixedString(1)", QValueKindString: "String", QValueKindJSON: "String", QValueKindTimestamp: "DateTime64(6)", diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index b9e040e170..b0b556b3d7 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -42,6 +42,12 @@ func (q QValue) Equals(other QValue) bool { return compareBoolean(q.Value, other.Value) case QValueKindStruct: return compareStruct(q.Value, other.Value) + case QValueKindQChar: + if (q.Value == nil) == (other.Value == nil) { + return q.Value == nil || q.Value.(uint8) == other.Value.(uint8) + } else { + return false + } case QValueKindString: return compareString(q.Value, other.Value) // all internally represented as a Golang time.Time From db7cdfe51325e2163569dd3484237d4236ef0345 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 14 Feb 2024 20:36:55 +0530 Subject: [PATCH 2/3] fixes to handling of custom publications for AddTablesToPublication (#1289) 1. need to fetch tables schema-qualified for comparison 2. order of ArrayMinus was wrong 3. also add validation for custom publication existing --- flow/connectors/postgres/client.go | 17 +++++++++++++---- flow/connectors/postgres/postgres.go | 4 ++-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 72a5691fe5..563e0c6824 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -629,7 +629,7 @@ func (c *PostgresConnector) getDefaultPublicationName(jobName string) string { func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []string, pubName string) error { if c.conn == nil { - return fmt.Errorf("check tables: conn is nil") + return errors.New("check tables: conn is nil") } // Check that we can select from all tables @@ -649,11 +649,20 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames [] } } - // Check if tables belong to publication tableStr := strings.Join(tableArr, ",") if pubName != "" { + // Check if publication exists + err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) + if err != nil { + if err == pgx.ErrNoRows { + return fmt.Errorf("publication does not exist: %s", pubName) + } + return fmt.Errorf("error while checking for publication existence: %w", err) + } + + // Check if tables belong to publication var pubTableCount int - err := c.conn.QueryRow(ctx, fmt.Sprintf(` + err = c.conn.QueryRow(ctx, fmt.Sprintf(` with source_table_components (sname, tname) as (values %s) select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables INNER JOIN source_table_components stc @@ -663,7 +672,7 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames [] } if pubTableCount != len(tableNames) { - return fmt.Errorf("not all tables belong to publication") + return errors.New("not all tables belong to publication") } } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f41e015672..d81401511b 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -973,7 +973,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro // just check if we have all the tables already in the publication for custom publications if req.PublicationName != "" { rows, err := c.conn.Query(ctx, - "SELECT tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName) + "SELECT schemaname || '.' || tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName) if err != nil { return fmt.Errorf("failed to check tables in publication: %w", err) } @@ -982,7 +982,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro if err != nil { return fmt.Errorf("failed to check tables in publication: %w", err) } - notPresentTables := utils.ArrayMinus(tableNames, additionalSrcTables) + notPresentTables := utils.ArrayMinus(additionalSrcTables, tableNames) if len(notPresentTables) > 0 { return fmt.Errorf("some additional tables not present in custom publication: %s", strings.Join(notPresentTables, ", ")) From cc61dfb45d896548eb5a8c57d2d7187ca1149a9d Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 14 Feb 2024 21:14:44 +0530 Subject: [PATCH 3/3] Filter out invalid BigQuery timestamps (#1256) Fixes #1254 Adds a test for it in QRep Also we are nulling out and logging values of time/timestamps which cannot be scanned by pgx (time.Time). An example is a [year with more than 4 digits](https://pkg.go.dev/time#Parse) --- flow/connectors/postgres/cdc.go | 9 ++++ flow/connectors/utils/avro/avro_writer.go | 1 + flow/e2e/bigquery/bigquery_helper.go | 2 +- flow/e2e/bigquery/qrep_flow_bq_test.go | 66 +++++++++++++++++++++++ flow/model/conversion_avro.go | 7 +++ flow/model/qvalue/avro_converter.go | 54 +++++++++++++------ flow/model/qvalue/timestamp.go | 20 +++++++ 7 files changed, 141 insertions(+), 18 deletions(-) create mode 100644 flow/model/qvalue/timestamp.go diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 44e81c8227..8992a6562f 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -720,6 +720,15 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma parsedData, err = dt.Codec.DecodeValue(p.typeMap, dataType, formatCode, data) } if err != nil { + if dt.Name == "time" || dt.Name == "timetz" || + dt.Name == "timestamp" || dt.Name == "timestamptz" { + // indicates year is more than 4 digits or something similar, + // which you can insert into postgres, + // but not representable by time.Time + p.logger.Warn(fmt.Sprintf("Invalidated and hence nulled %s data: %s", + dt.Name, string(data))) + return qvalue.QValue{}, nil + } return qvalue.QValue{}, err } retVal, err := parseFieldFromPostgresOID(dataType, parsedData) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 00855bf769..806d8a0b7d 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -150,6 +150,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, ocfWriter p.targetDWH, p.avroSchema.NullableFields, colNames, + logger, ) avroMap, err := avroConverter.Convert() diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index bab2be062f..6f70046137 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -377,7 +377,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor }, nil } -// returns whether the function errors or there are nulls +// returns whether the function errors or there are no nulls func (b *BigQueryTestHelper) CheckNull(tableName string, colName []string) (bool, error) { if len(colName) == 0 { return true, nil diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index d0048b948d..395a2c5ea2 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -1,7 +1,9 @@ package e2e_bigquery import ( + "context" "fmt" + "strings" "github.com/stretchr/testify/require" @@ -15,6 +17,34 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) require.NoError(s.t, err) } +func (s PeerFlowE2ETestSuiteBQ) setupTimeTable(tableName string) { + tblFields := []string{ + "watermark_ts timestamp", + "mytimestamp timestamp", + "mytztimestamp timestamptz", + } + tblFieldStr := strings.Join(tblFields, ",") + _, err := s.conn.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE e2e_test_%s.%s ( + %s + );`, s.bqSuffix, tableName, tblFieldStr)) + + require.NoError(s.t, err) + + var rows []string + row := `(CURRENT_TIMESTAMP,'10001-03-14 23:05:52','50001-03-14 23:05:52.216809+00')` + rows = append(rows, row) + + _, err = s.conn.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO e2e_test_%s.%s ( + watermark_ts, + mytimestamp, + mytztimestamp + ) VALUES %s; + `, s.bqSuffix, tableName, strings.Join(rows, ","))) + require.NoError(s.t, err) +} + func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) @@ -46,6 +76,42 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { e2e.RequireEqualTables(s, tblName, "*") } +func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_QRep() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + + tblName := "test_qrep_flow_avro_bq" + s.setupTimeTable(tblName) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE watermark_ts BETWEEN {{.start}} AND {{.end}}", + s.bqSuffix, tblName) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro", + fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), + tblName, + query, + s.bqHelper.Peer, + "", + true, + "") + qrepConfig.WatermarkColumn = "watermark_ts" + require.NoError(s.t, err) + e2e.RunQrepFlowWorkflow(env, qrepConfig) + + // Verify workflow completes without error + require.True(s.t, env.IsWorkflowCompleted()) + + err = env.GetWorkflowError() + require.NoError(s.t, err) + + ok, err := s.bqHelper.CheckNull(tblName, []string{"mytimestamp"}) + require.NoError(s.t, err) + require.False(s.t, ok) + + ok, err = s.bqHelper.CheckNull(tblName, []string{"mytztimestamp"}) + require.NoError(s.t, err) + require.False(s.t, ok) +} + func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index b26aeaf9d7..39e3579f8f 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -4,6 +4,8 @@ import ( "encoding/json" "fmt" + "go.temporal.io/sdk/log" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -12,6 +14,7 @@ type QRecordAvroConverter struct { TargetDWH qvalue.QDWHType NullableFields map[string]struct{} ColNames []string + logger log.Logger } func NewQRecordAvroConverter( @@ -19,12 +22,14 @@ func NewQRecordAvroConverter( targetDWH qvalue.QDWHType, nullableFields map[string]struct{}, colNames []string, + logger log.Logger, ) *QRecordAvroConverter { return &QRecordAvroConverter{ QRecord: q, TargetDWH: targetDWH, NullableFields: nullableFields, ColNames: colNames, + logger: logger, } } @@ -39,7 +44,9 @@ func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) { val, qac.TargetDWH, nullable, + qac.logger, ) + avroVal, err := avroConverter.ToAvroValue() if err != nil { return nil, fmt.Errorf("failed to convert QValue to Avro-compatible value: %w", err) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index e5e6046fb5..24aeb73e20 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -1,6 +1,7 @@ package qvalue import ( + "errors" "fmt" "log/slog" "math/big" @@ -8,6 +9,7 @@ import ( "github.com/google/uuid" "github.com/linkedin/goavro/v2" + "go.temporal.io/sdk/log" hstore_util "github.com/PeerDB-io/peer-flow/hstore" "github.com/PeerDB-io/peer-flow/model/numeric" @@ -158,13 +160,15 @@ type QValueAvroConverter struct { Value QValue TargetDWH QDWHType Nullable bool + logger log.Logger } -func NewQValueAvroConverter(value QValue, targetDWH QDWHType, nullable bool) *QValueAvroConverter { +func NewQValueAvroConverter(value QValue, targetDWH QDWHType, nullable bool, logger log.Logger) *QValueAvroConverter { return &QValueAvroConverter{ Value: value, TargetDWH: targetDWH, Nullable: nullable, + logger: logger, } } @@ -245,6 +249,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return t.(int64), nil } } + if c.Nullable { return goavro.Union("long.timestamp-micros", t.(int64)), nil } @@ -269,6 +274,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return t.(int64), nil } } + if c.Nullable { return goavro.Union("long.timestamp-micros", t.(int64)), nil } @@ -318,7 +324,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { case QValueKindBoolean: return c.processNullableUnion("boolean", c.Value.Value) case QValueKindStruct: - return nil, fmt.Errorf("QValueKindStruct not supported") + return nil, errors.New("QValueKindStruct not supported") case QValueKindNumeric: return c.processNumeric() case QValueKindBytes, QValueKindBit: @@ -371,7 +377,7 @@ func (c *QValueAvroConverter) processGoTimeTZ() (interface{}, error) { t, ok := c.Value.Value.(time.Time) if !ok { - return nil, fmt.Errorf("invalid TimeTZ value") + return nil, errors.New("invalid TimeTZ value") } // Snowflake has issues with avro timestamp types, returning as string form @@ -389,7 +395,7 @@ func (c *QValueAvroConverter) processGoTime() (interface{}, error) { t, ok := c.Value.Value.(time.Time) if !ok { - return nil, fmt.Errorf("invalid Time value") + return nil, errors.New("invalid Time value") } // Snowflake has issues with avro timestamp types, returning as string form @@ -411,7 +417,7 @@ func (c *QValueAvroConverter) processGoTimestampTZ() (interface{}, error) { t, ok := c.Value.Value.(time.Time) if !ok { - return nil, fmt.Errorf("invalid TimestampTZ value") + return nil, errors.New("invalid TimestampTZ value") } // Snowflake has issues with avro timestamp types, returning as string form @@ -419,6 +425,13 @@ func (c *QValueAvroConverter) processGoTimestampTZ() (interface{}, error) { if c.TargetDWH == QDWHTypeSnowflake { return t.Format("2006-01-02 15:04:05.999999-0700"), nil } + + // Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD + // So make such timestamps null + if DisallowedTimestamp(c.TargetDWH, t, c.logger) { + return nil, nil + } + return t.UnixMicro(), nil } @@ -429,7 +442,7 @@ func (c *QValueAvroConverter) processGoTimestamp() (interface{}, error) { t, ok := c.Value.Value.(time.Time) if !ok { - return nil, fmt.Errorf("invalid Timestamp value") + return nil, errors.New("invalid Timestamp value") } // Snowflake has issues with avro timestamp types, returning as string form @@ -437,6 +450,13 @@ func (c *QValueAvroConverter) processGoTimestamp() (interface{}, error) { if c.TargetDWH == QDWHTypeSnowflake { return t.Format("2006-01-02 15:04:05.999999"), nil } + + // Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD + // So make such timestamps null + if DisallowedTimestamp(c.TargetDWH, t, c.logger) { + return nil, nil + } + return t.UnixMicro(), nil } @@ -447,7 +467,7 @@ func (c *QValueAvroConverter) processGoDate() (interface{}, error) { t, ok := c.Value.Value.(time.Time) if !ok { - return nil, fmt.Errorf("invalid Time value for Date") + return nil, errors.New("invalid Time value for Date") } // Snowflake has issues with avro timestamp types, returning as string form @@ -513,7 +533,7 @@ func (c *QValueAvroConverter) processBytes() (interface{}, error) { byteData, ok := c.Value.Value.([]byte) if !ok { - return nil, fmt.Errorf("invalid Bytes value") + return nil, errors.New("invalid Bytes value") } if c.Nullable { @@ -557,7 +577,7 @@ func (c *QValueAvroConverter) processArrayBoolean() (interface{}, error) { arrayData, ok := c.Value.Value.([]bool) if !ok { - return nil, fmt.Errorf("invalid Boolean array value") + return nil, errors.New("invalid Boolean array value") } if c.Nullable { @@ -574,7 +594,7 @@ func (c *QValueAvroConverter) processArrayTime() (interface{}, error) { arrayTime, ok := c.Value.Value.([]time.Time) if !ok { - return nil, fmt.Errorf("invalid Timestamp array value") + return nil, errors.New("invalid Timestamp array value") } transformedTimeArr := make([]interface{}, 0, len(arrayTime)) @@ -602,7 +622,7 @@ func (c *QValueAvroConverter) processArrayDate() (interface{}, error) { arrayDate, ok := c.Value.Value.([]time.Time) if !ok { - return nil, fmt.Errorf("invalid Date array value") + return nil, errors.New("invalid Date array value") } transformedTimeArr := make([]interface{}, 0, len(arrayDate)) @@ -704,7 +724,7 @@ func (c *QValueAvroConverter) processArrayInt16() (interface{}, error) { arrayData, ok := c.Value.Value.([]int16) if !ok { - return nil, fmt.Errorf("invalid Int16 array value") + return nil, errors.New("invalid Int16 array value") } // cast to int32 @@ -727,7 +747,7 @@ func (c *QValueAvroConverter) processArrayInt32() (interface{}, error) { arrayData, ok := c.Value.Value.([]int32) if !ok { - return nil, fmt.Errorf("invalid Int32 array value") + return nil, errors.New("invalid Int32 array value") } if c.Nullable { @@ -744,7 +764,7 @@ func (c *QValueAvroConverter) processArrayInt64() (interface{}, error) { arrayData, ok := c.Value.Value.([]int64) if !ok { - return nil, fmt.Errorf("invalid Int64 array value") + return nil, errors.New("invalid Int64 array value") } if c.Nullable { @@ -761,7 +781,7 @@ func (c *QValueAvroConverter) processArrayFloat32() (interface{}, error) { arrayData, ok := c.Value.Value.([]float32) if !ok { - return nil, fmt.Errorf("invalid Float32 array value") + return nil, errors.New("invalid Float32 array value") } if c.Nullable { @@ -778,7 +798,7 @@ func (c *QValueAvroConverter) processArrayFloat64() (interface{}, error) { arrayData, ok := c.Value.Value.([]float64) if !ok { - return nil, fmt.Errorf("invalid Float64 array value") + return nil, errors.New("invalid Float64 array value") } if c.Nullable { @@ -795,7 +815,7 @@ func (c *QValueAvroConverter) processArrayString() (interface{}, error) { arrayData, ok := c.Value.Value.([]string) if !ok { - return nil, fmt.Errorf("invalid String array value") + return nil, errors.New("invalid String array value") } if c.Nullable { diff --git a/flow/model/qvalue/timestamp.go b/flow/model/qvalue/timestamp.go new file mode 100644 index 0000000000..20d564b927 --- /dev/null +++ b/flow/model/qvalue/timestamp.go @@ -0,0 +1,20 @@ +package qvalue + +import ( + "time" + + "go.temporal.io/sdk/log" +) + +// Bigquery will not allow timestamp if it is less than 1AD and more than 9999AD +func DisallowedTimestamp(dwh QDWHType, t time.Time, logger log.Logger) bool { + if dwh == QDWHTypeBigQuery { + tMicro := t.UnixMicro() + if tMicro < 0 || tMicro > 253402300799999999 { // 9999-12-31 23:59:59.999999 + logger.Warn("Nulling Timestamp value for BigQuery as it exceeds allowed range", + "timestamp", t.String()) + return true + } + } + return false +}