From fdc7a69863dac467fee042c24a2ec93888e5380b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 13 Feb 2024 22:05:22 +0000 Subject: [PATCH] Add support for "char" Postgres offers a type "char" distinct from CHAR, represented by one byte Map this type in QValue, sqlserver also has char, & on clickhouse we can represent it with FixedString(1) --- flow/connectors/clickhouse/qvalue_convert.go | 1 + flow/connectors/postgres/qvalue_convert.go | 11 ++++++++--- .../postgres/schema_delta_test_constants.go | 12 +++++++++--- flow/connectors/snowflake/avro_file_writer_test.go | 3 +++ flow/connectors/sql/query_executor.go | 7 +++++++ flow/connectors/sqlserver/qvalue_convert.go | 3 ++- flow/model/qvalue/avro_converter.go | 5 +++-- flow/model/qvalue/kind.go | 3 +++ 8 files changed, 36 insertions(+), 9 deletions(-) diff --git a/flow/connectors/clickhouse/qvalue_convert.go b/flow/connectors/clickhouse/qvalue_convert.go index f80ea7857e..8ec8a52f0f 100644 --- a/flow/connectors/clickhouse/qvalue_convert.go +++ b/flow/connectors/clickhouse/qvalue_convert.go @@ -16,6 +16,7 @@ var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{ "CHAR": qvalue.QValueKindString, "TEXT": qvalue.QValueKindString, "String": qvalue.QValueKindString, + "FixedString(1)": qvalue.QValueKindQChar, "Bool": qvalue.QValueKindBoolean, "DateTime": qvalue.QValueKindTimestamp, "TIMESTAMP": qvalue.QValueKindTimestamp, diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 4823aa2ecc..0cab79c942 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -31,6 +31,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { return qvalue.QValueKindFloat32 case pgtype.Float8OID: return qvalue.QValueKindFloat64 + case pgtype.QCharOID: + return qvalue.QValueKindQChar case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID: return qvalue.QValueKindString case pgtype.ByteaOID: @@ -121,6 +123,8 @@ func qValueKindToPostgresType(colTypeStr string) string { return "REAL" case qvalue.QValueKindFloat64: return "DOUBLE PRECISION" + case qvalue.QValueKindQChar: + return "\"char\"" case qvalue.QValueKindString: return "TEXT" case qvalue.QValueKindBytes: @@ -262,6 +266,8 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( case qvalue.QValueKindFloat64: floatVal := value.(float64) val = qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal} + case qvalue.QValueKindQChar: + val = qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: uint8(value.(rune))} case qvalue.QValueKindString: // handling all unsupported types with strings as well for now. val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(value)} @@ -501,10 +507,9 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( } default: textVal, ok := value.(string) - if !ok { - return qvalue.QValue{}, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind) + if ok { + val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal} } - val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal} } // parsing into pgtype failed. diff --git a/flow/connectors/postgres/schema_delta_test_constants.go b/flow/connectors/postgres/schema_delta_test_constants.go index d86f8e98a0..6ded70625a 100644 --- a/flow/connectors/postgres/schema_delta_test_constants.go +++ b/flow/connectors/postgres/schema_delta_test_constants.go @@ -19,6 +19,7 @@ var AddAllColumnTypes = []string{ string(qvalue.QValueKindJSON), string(qvalue.QValueKindNumeric), string(qvalue.QValueKindString), + string(qvalue.QValueKindQChar), string(qvalue.QValueKindTime), string(qvalue.QValueKindTimestamp), string(qvalue.QValueKindTimestampTZ), @@ -93,21 +94,26 @@ var AddAllColumnTypesFields = []*protos.FieldDescription{ }, { Name: "c13", - Type: string(qvalue.QValueKindTime), + Type: string(qvalue.QValueKindQChar), TypeModifier: -1, }, { Name: "c14", - Type: string(qvalue.QValueKindTimestamp), + Type: string(qvalue.QValueKindTime), TypeModifier: -1, }, { Name: "c15", - Type: string(qvalue.QValueKindTimestampTZ), + Type: string(qvalue.QValueKindTimestamp), TypeModifier: -1, }, { Name: "c16", + Type: string(qvalue.QValueKindTimestampTZ), + TypeModifier: -1, + }, + { + Name: "c17", Type: string(qvalue.QValueKindUUID), TypeModifier: -1, }, diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index bd9b155dab..0252b53fc8 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -40,6 +40,8 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue. value = big.NewRat(int64(placeHolder), 1) case qvalue.QValueKindUUID: value = uuid.New() // assuming you have the github.com/google/uuid package + case qvalue.QValueKindQChar: + value = uint8(48) // case qvalue.QValueKindArray: // value = []int{1, 2, 3} // placeholder array, replace with actual logic // case qvalue.QValueKindStruct: @@ -85,6 +87,7 @@ func generateRecords( qvalue.QValueKindNumeric, qvalue.QValueKindBytes, qvalue.QValueKindUUID, + qvalue.QValueKindQChar, // qvalue.QValueKindJSON, qvalue.QValueKindBit, } diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 14bc6629b7..64ff4ecc54 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -308,6 +308,9 @@ func (g *GenericSQLQueryExecutor) CheckNull(ctx context.Context, schema string, } func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { + if val == nil { + return qvalue.QValue{Kind: kind, Value: nil}, nil + } switch kind { case qvalue.QValueKindInt32: if v, ok := val.(*sql.NullInt32); ok { @@ -341,6 +344,10 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { return qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: nil}, nil } } + case qvalue.QValueKindQChar: + if v, ok := val.(uint8); ok { + return qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: v}, nil + } case qvalue.QValueKindString: if v, ok := val.(*sql.NullString); ok { if v.Valid { diff --git a/flow/connectors/sqlserver/qvalue_convert.go b/flow/connectors/sqlserver/qvalue_convert.go index cff634139a..b4f73420e1 100644 --- a/flow/connectors/sqlserver/qvalue_convert.go +++ b/flow/connectors/sqlserver/qvalue_convert.go @@ -10,6 +10,7 @@ var qValueKindToSQLServerTypeMap = map[qvalue.QValueKind]string{ qvalue.QValueKindFloat32: "REAL", qvalue.QValueKindFloat64: "FLOAT", qvalue.QValueKindNumeric: "DECIMAL(38, 9)", + qvalue.QValueKindQChar: "CHAR", qvalue.QValueKindString: "NTEXT", qvalue.QValueKindJSON: "NTEXT", // SQL Server doesn't have a native JSON type qvalue.QValueKindTimestamp: "DATETIME2", @@ -51,7 +52,7 @@ var sqlServerTypeToQValueKindMap = map[string]qvalue.QValueKind{ "UNIQUEIDENTIFIER": qvalue.QValueKindUUID, "SMALLINT": qvalue.QValueKindInt32, "TINYINT": qvalue.QValueKindInt32, - "CHAR": qvalue.QValueKindString, + "CHAR": qvalue.QValueKindQChar, "VARCHAR": qvalue.QValueKindString, "NCHAR": qvalue.QValueKindString, "NVARCHAR": qvalue.QValueKindString, diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 7c902ffcb5..347909bcec 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -64,7 +64,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision } switch kind { - case QValueKindString: + case QValueKindString, QValueKindQChar: return "string", nil case QValueKindUUID: return AvroSchemaLogical{ @@ -287,7 +287,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return goavro.Union("int.date", t), nil } return t, nil - + case QValueKindQChar: + return c.processNullableUnion("string", string(c.Value.Value.(uint8))) case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr: if c.TargetDWH == QDWHTypeSnowflake && c.Value.Value != nil && (len(c.Value.Value.(string)) > 15*1024*1024) { diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 6328897d3f..78c9ece45b 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -17,6 +17,7 @@ const ( QValueKindInt64 QValueKind = "int64" QValueKindBoolean QValueKind = "bool" QValueKindStruct QValueKind = "struct" + QValueKindQChar QValueKind = "qchar" QValueKindString QValueKind = "string" QValueKindTimestamp QValueKind = "timestamp" QValueKindTimestampTZ QValueKind = "timestamptz" @@ -63,6 +64,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindFloat32: "FLOAT", QValueKindFloat64: "FLOAT", QValueKindNumeric: "NUMBER(38, 9)", + QValueKindQChar: "CHAR", QValueKindString: "STRING", QValueKindJSON: "VARIANT", QValueKindTimestamp: "TIMESTAMP_NTZ", @@ -101,6 +103,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindFloat32: "Float32", QValueKindFloat64: "Float64", QValueKindNumeric: "Decimal128(9)", + QValueKindQChar: "FixedString(1)", QValueKindString: "String", QValueKindJSON: "String", QValueKindTimestamp: "DateTime64(6)",