From 86c8180e41c0bec8153480bcf03acc7f3f93e0e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 13 Feb 2024 22:05:22 +0000 Subject: [PATCH 1/2] Add support for "char" Postgres offers a type "char" distinct from CHAR, represented by one byte Map this type in QValue, sqlserver also has char, & on clickhouse we can represent it with FixedString(1) --- flow/connectors/clickhouse/qvalue_convert.go | 1 + flow/connectors/postgres/qvalue_convert.go | 11 +++-- .../postgres/schema_delta_test_constants.go | 12 ++++-- .../snowflake/avro_file_writer_test.go | 3 ++ .../snowflake/merge_stmt_generator.go | 28 ++++++------ flow/connectors/sql/query_executor.go | 7 +++ flow/connectors/sqlserver/qvalue_convert.go | 3 +- flow/model/conversion_avro.go | 2 +- flow/model/model.go | 6 +++ flow/model/qrecord_batch.go | 13 +++--- flow/model/qvalue/avro_converter.go | 43 +++++++++++-------- flow/model/qvalue/kind.go | 3 ++ flow/model/qvalue/qvalue.go | 6 +++ 13 files changed, 91 insertions(+), 47 deletions(-) diff --git a/flow/connectors/clickhouse/qvalue_convert.go b/flow/connectors/clickhouse/qvalue_convert.go index f80ea7857e..8ec8a52f0f 100644 --- a/flow/connectors/clickhouse/qvalue_convert.go +++ b/flow/connectors/clickhouse/qvalue_convert.go @@ -16,6 +16,7 @@ var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{ "CHAR": qvalue.QValueKindString, "TEXT": qvalue.QValueKindString, "String": qvalue.QValueKindString, + "FixedString(1)": qvalue.QValueKindQChar, "Bool": qvalue.QValueKindBoolean, "DateTime": qvalue.QValueKindTimestamp, "TIMESTAMP": qvalue.QValueKindTimestamp, diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 4823aa2ecc..0cab79c942 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -31,6 +31,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { return qvalue.QValueKindFloat32 case pgtype.Float8OID: return qvalue.QValueKindFloat64 + case pgtype.QCharOID: + return qvalue.QValueKindQChar case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID: return qvalue.QValueKindString case pgtype.ByteaOID: @@ -121,6 +123,8 @@ func qValueKindToPostgresType(colTypeStr string) string { return "REAL" case qvalue.QValueKindFloat64: return "DOUBLE PRECISION" + case qvalue.QValueKindQChar: + return "\"char\"" case qvalue.QValueKindString: return "TEXT" case qvalue.QValueKindBytes: @@ -262,6 +266,8 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( case qvalue.QValueKindFloat64: floatVal := value.(float64) val = qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal} + case qvalue.QValueKindQChar: + val = qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: uint8(value.(rune))} case qvalue.QValueKindString: // handling all unsupported types with strings as well for now. val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(value)} @@ -501,10 +507,9 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( } default: textVal, ok := value.(string) - if !ok { - return qvalue.QValue{}, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind) + if ok { + val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal} } - val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal} } // parsing into pgtype failed. diff --git a/flow/connectors/postgres/schema_delta_test_constants.go b/flow/connectors/postgres/schema_delta_test_constants.go index d86f8e98a0..6ded70625a 100644 --- a/flow/connectors/postgres/schema_delta_test_constants.go +++ b/flow/connectors/postgres/schema_delta_test_constants.go @@ -19,6 +19,7 @@ var AddAllColumnTypes = []string{ string(qvalue.QValueKindJSON), string(qvalue.QValueKindNumeric), string(qvalue.QValueKindString), + string(qvalue.QValueKindQChar), string(qvalue.QValueKindTime), string(qvalue.QValueKindTimestamp), string(qvalue.QValueKindTimestampTZ), @@ -93,21 +94,26 @@ var AddAllColumnTypesFields = []*protos.FieldDescription{ }, { Name: "c13", - Type: string(qvalue.QValueKindTime), + Type: string(qvalue.QValueKindQChar), TypeModifier: -1, }, { Name: "c14", - Type: string(qvalue.QValueKindTimestamp), + Type: string(qvalue.QValueKindTime), TypeModifier: -1, }, { Name: "c15", - Type: string(qvalue.QValueKindTimestampTZ), + Type: string(qvalue.QValueKindTimestamp), TypeModifier: -1, }, { Name: "c16", + Type: string(qvalue.QValueKindTimestampTZ), + TypeModifier: -1, + }, + { + Name: "c17", Type: string(qvalue.QValueKindUUID), TypeModifier: -1, }, diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index bd9b155dab..0252b53fc8 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -40,6 +40,8 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue. value = big.NewRat(int64(placeHolder), 1) case qvalue.QValueKindUUID: value = uuid.New() // assuming you have the github.com/google/uuid package + case qvalue.QValueKindQChar: + value = uint8(48) // case qvalue.QValueKindArray: // value = []int{1, 2, 3} // placeholder array, replace with actual logic // case qvalue.QValueKindStruct: @@ -85,6 +87,7 @@ func generateRecords( qvalue.QValueKindNumeric, qvalue.QValueKindBytes, qvalue.QValueKindUUID, + qvalue.QValueKindQChar, // qvalue.QValueKindJSON, qvalue.QValueKindBit, } diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index 4d8ea7dc5a..1ab579069c 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -40,7 +40,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) { } targetColumnName := SnowflakeIdentifierNormalize(column.Name) - switch qvalue.QValueKind(genericColumnType) { + switch qvKind { case qvalue.QValueKindBytes, qvalue.QValueKindBit: flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("BASE64_DECODE_BINARY(%s:\"%s\") "+ "AS %s", toVariantColumnName, column.Name, targetColumnName)) @@ -61,21 +61,19 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) { // flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("TIME_FROM_PARTS(0,0,0,%s:%s:"+ // "Microseconds*1000) "+ // "AS %s", toVariantColumnName, columnName, columnName)) - default: - if qvKind == qvalue.QValueKindNumeric { - precision, scale := numeric.ParseNumericTypmod(column.TypeModifier) - if column.TypeModifier == -1 || precision > 38 || scale > 37 { - precision = numeric.PeerDBNumericPrecision - scale = numeric.PeerDBNumericScale - } - numericType := fmt.Sprintf("NUMERIC(%d,%d)", precision, scale) - flattenedCastsSQLArray = append(flattenedCastsSQLArray, - fmt.Sprintf("TRY_CAST((%s:\"%s\")::text AS %s) AS %s", - toVariantColumnName, column.Name, numericType, targetColumnName)) - } else { - flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("CAST(%s:\"%s\" AS %s) AS %s", - toVariantColumnName, column.Name, sfType, targetColumnName)) + case qvalue.QValueKindNumeric: + precision, scale := numeric.ParseNumericTypmod(column.TypeModifier) + if column.TypeModifier == -1 || precision > 38 || scale > 37 { + precision = numeric.PeerDBNumericPrecision + scale = numeric.PeerDBNumericScale } + numericType := fmt.Sprintf("NUMERIC(%d,%d)", precision, scale) + flattenedCastsSQLArray = append(flattenedCastsSQLArray, + fmt.Sprintf("TRY_CAST((%s:\"%s\")::text AS %s) AS %s", + toVariantColumnName, column.Name, numericType, targetColumnName)) + default: + flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("CAST(%s:\"%s\" AS %s) AS %s", + toVariantColumnName, column.Name, sfType, targetColumnName)) } } flattenedCastsSQL := strings.Join(flattenedCastsSQLArray, ",") diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 14bc6629b7..64ff4ecc54 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -308,6 +308,9 @@ func (g *GenericSQLQueryExecutor) CheckNull(ctx context.Context, schema string, } func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { + if val == nil { + return qvalue.QValue{Kind: kind, Value: nil}, nil + } switch kind { case qvalue.QValueKindInt32: if v, ok := val.(*sql.NullInt32); ok { @@ -341,6 +344,10 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { return qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: nil}, nil } } + case qvalue.QValueKindQChar: + if v, ok := val.(uint8); ok { + return qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: v}, nil + } case qvalue.QValueKindString: if v, ok := val.(*sql.NullString); ok { if v.Valid { diff --git a/flow/connectors/sqlserver/qvalue_convert.go b/flow/connectors/sqlserver/qvalue_convert.go index cff634139a..b4f73420e1 100644 --- a/flow/connectors/sqlserver/qvalue_convert.go +++ b/flow/connectors/sqlserver/qvalue_convert.go @@ -10,6 +10,7 @@ var qValueKindToSQLServerTypeMap = map[qvalue.QValueKind]string{ qvalue.QValueKindFloat32: "REAL", qvalue.QValueKindFloat64: "FLOAT", qvalue.QValueKindNumeric: "DECIMAL(38, 9)", + qvalue.QValueKindQChar: "CHAR", qvalue.QValueKindString: "NTEXT", qvalue.QValueKindJSON: "NTEXT", // SQL Server doesn't have a native JSON type qvalue.QValueKindTimestamp: "DATETIME2", @@ -51,7 +52,7 @@ var sqlServerTypeToQValueKindMap = map[string]qvalue.QValueKind{ "UNIQUEIDENTIFIER": qvalue.QValueKindUUID, "SMALLINT": qvalue.QValueKindInt32, "TINYINT": qvalue.QValueKindInt32, - "CHAR": qvalue.QValueKindString, + "CHAR": qvalue.QValueKindQChar, "VARCHAR": qvalue.QValueKindString, "NCHAR": qvalue.QValueKindString, "NVARCHAR": qvalue.QValueKindString, diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index b379abc09a..b26aeaf9d7 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -29,7 +29,7 @@ func NewQRecordAvroConverter( } func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) { - m := map[string]interface{}{} + m := make(map[string]interface{}, len(qac.QRecord)) for idx, val := range qac.QRecord { key := qac.ColNames[idx] diff --git a/flow/model/model.go b/flow/model/model.go index 14de42a44e..a5bb754f2a 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -158,7 +158,13 @@ func (r *RecordItems) toMap(hstoreAsJSON bool) (map[string]interface{}, error) { } jsonStruct[col] = binStr + case qvalue.QValueKindQChar: + ch, ok := v.Value.(uint8) + if !ok { + return nil, fmt.Errorf("expected \"char\" value for column %s for %T", col, v.Value) + } + jsonStruct[col] = string(ch) case qvalue.QValueKindString, qvalue.QValueKindJSON: strVal, ok := v.Value.(string) if !ok { diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index d7f25c7f5d..4455101ad8 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -137,6 +137,14 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { } values[i] = v + case qvalue.QValueKindQChar: + v, ok := qValue.Value.(uint8) + if !ok { + src.err = fmt.Errorf("invalid \"char\" value") + return nil, src.err + } + values[i] = rune(v) + case qvalue.QValueKindString: v, ok := qValue.Value.(string) if !ok { @@ -173,11 +181,6 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { values[i] = timestampTZ case qvalue.QValueKindUUID: - if qValue.Value == nil { - values[i] = nil - break - } - v, ok := qValue.Value.([16]byte) // treat it as byte slice if !ok { src.err = fmt.Errorf("invalid UUID value %v", qValue.Value) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 7c902ffcb5..8397cb61d3 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -64,7 +64,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision } switch kind { - case QValueKindString: + case QValueKindString, QValueKindQChar: return "string", nil case QValueKindUUID: return AvroSchemaLogical{ @@ -169,6 +169,10 @@ func NewQValueAvroConverter(value QValue, targetDWH QDWHType, nullable bool) *QV } func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { + if c.Nullable && c.Value.Value == nil { + return nil, nil + } + switch c.Value.Kind { case QValueKindInvalid: // we will attempt to convert invalid to a string @@ -180,7 +184,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t.(string)) + return c.processNullableUnion("string", t) } else { return t.(string), nil } @@ -188,13 +192,13 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { - return c.processNullableUnion("string", t.(string)) + return c.processNullableUnion("string", t) } else { return t.(string), nil } } if c.Nullable { - return goavro.Union("long.time-micros", t.(int64)), nil + return goavro.Union("long.time-micros", t), nil } return t.(int64), nil case QValueKindTimeTZ: @@ -204,7 +208,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t.(string)) + return c.processNullableUnion("string", t) } else { return t.(string), nil } @@ -212,13 +216,13 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { - return c.processNullableUnion("long", t.(int64)) + return c.processNullableUnion("long", t) } else { return t.(int64), nil } } if c.Nullable { - return goavro.Union("long.time-micros", t.(int64)), nil + return goavro.Union("long.time-micros", t), nil } return t.(int64), nil case QValueKindTimestamp: @@ -228,7 +232,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t.(string)) + return c.processNullableUnion("string", t) } else { return t.(string), nil } @@ -236,13 +240,13 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { - return c.processNullableUnion("long", t.(int64)) + return c.processNullableUnion("long", t) } else { return t.(int64), nil } } if c.Nullable { - return goavro.Union("long.timestamp-micros", t.(int64)), nil + return goavro.Union("long.timestamp-micros", t), nil } return t.(int64), nil case QValueKindTimestampTZ: @@ -252,7 +256,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t.(string)) + return c.processNullableUnion("string", t) } else { return t.(string), nil } @@ -260,13 +264,13 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { - return c.processNullableUnion("long", t.(int64)) + return c.processNullableUnion("long", t) } else { return t.(int64), nil } } if c.Nullable { - return goavro.Union("long.timestamp-micros", t.(int64)), nil + return goavro.Union("long.timestamp-micros", t), nil } return t.(int64), nil case QValueKindDate: @@ -277,7 +281,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t.(string)) + return c.processNullableUnion("string", t) } else { return t.(string), nil } @@ -287,7 +291,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return goavro.Union("int.date", t), nil } return t, nil - + case QValueKindQChar: + return c.processNullableUnion("string", string(c.Value.Value.(uint8))) case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr: if c.TargetDWH == QDWHTypeSnowflake && c.Value.Value != nil && (len(c.Value.Value.(string)) > 15*1024*1024) { @@ -457,11 +462,11 @@ func (c *QValueAvroConverter) processNullableUnion( avroType string, value interface{}, ) (interface{}, error) { - if value == nil && c.Nullable { - return nil, nil - } - if c.Nullable { + if value == nil { + return nil, nil + } + return goavro.Union(avroType, value), nil } diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 6328897d3f..78c9ece45b 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -17,6 +17,7 @@ const ( QValueKindInt64 QValueKind = "int64" QValueKindBoolean QValueKind = "bool" QValueKindStruct QValueKind = "struct" + QValueKindQChar QValueKind = "qchar" QValueKindString QValueKind = "string" QValueKindTimestamp QValueKind = "timestamp" QValueKindTimestampTZ QValueKind = "timestamptz" @@ -63,6 +64,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindFloat32: "FLOAT", QValueKindFloat64: "FLOAT", QValueKindNumeric: "NUMBER(38, 9)", + QValueKindQChar: "CHAR", QValueKindString: "STRING", QValueKindJSON: "VARIANT", QValueKindTimestamp: "TIMESTAMP_NTZ", @@ -101,6 +103,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindFloat32: "Float32", QValueKindFloat64: "Float64", QValueKindNumeric: "Decimal128(9)", + QValueKindQChar: "FixedString(1)", QValueKindString: "String", QValueKindJSON: "String", QValueKindTimestamp: "DateTime64(6)", diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index b9e040e170..b0b556b3d7 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -42,6 +42,12 @@ func (q QValue) Equals(other QValue) bool { return compareBoolean(q.Value, other.Value) case QValueKindStruct: return compareStruct(q.Value, other.Value) + case QValueKindQChar: + if (q.Value == nil) == (other.Value == nil) { + return q.Value == nil || q.Value.(uint8) == other.Value.(uint8) + } else { + return false + } case QValueKindString: return compareString(q.Value, other.Value) // all internally represented as a Golang time.Time From c8b425cd8083d08aa63381e23dc96f3d22dfb641 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 13:32:05 +0000 Subject: [PATCH 2/2] revert removing unnecessary casts --- flow/model/qvalue/avro_converter.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index f20dd2b778..e5e6046fb5 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -184,7 +184,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t) + return c.processNullableUnion("string", t.(string)) } else { return t.(string), nil } @@ -192,13 +192,13 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { - return c.processNullableUnion("string", t) + return c.processNullableUnion("string", t.(string)) } else { return t.(string), nil } } if c.Nullable { - return goavro.Union("long.time-micros", t), nil + return goavro.Union("long.time-micros", t.(int64)), nil } return t.(int64), nil case QValueKindTimeTZ: @@ -208,7 +208,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t) + return c.processNullableUnion("string", t.(string)) } else { return t.(string), nil } @@ -216,13 +216,13 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { - return c.processNullableUnion("long", t) + return c.processNullableUnion("long", t.(int64)) } else { return t.(int64), nil } } if c.Nullable { - return goavro.Union("long.time-micros", t), nil + return goavro.Union("long.time-micros", t.(int64)), nil } return t.(int64), nil case QValueKindTimestamp: @@ -232,7 +232,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t) + return c.processNullableUnion("string", t.(string)) } else { return t.(string), nil } @@ -240,13 +240,13 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { - return c.processNullableUnion("long", t) + return c.processNullableUnion("long", t.(int64)) } else { return t.(int64), nil } } if c.Nullable { - return goavro.Union("long.timestamp-micros", t), nil + return goavro.Union("long.timestamp-micros", t.(int64)), nil } return t.(int64), nil case QValueKindTimestampTZ: @@ -256,7 +256,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t) + return c.processNullableUnion("string", t.(string)) } else { return t.(string), nil } @@ -264,13 +264,13 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { - return c.processNullableUnion("long", t) + return c.processNullableUnion("long", t.(int64)) } else { return t.(int64), nil } } if c.Nullable { - return goavro.Union("long.timestamp-micros", t), nil + return goavro.Union("long.timestamp-micros", t.(int64)), nil } return t.(int64), nil case QValueKindDate: @@ -281,7 +281,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { - return c.processNullableUnion("string", t) + return c.processNullableUnion("string", t.(string)) } else { return t.(string), nil }