Skip to content

Commit

Permalink
Add support for "char" (#1285)
Browse files Browse the repository at this point in the history
Postgres offers a type "char" distinct from CHAR,
represented by one byte

Map this type in QValue, sqlserver also has char,
& on clickhouse we can represent it with FixedString(1)
  • Loading branch information
serprex authored Feb 14, 2024
1 parent 0fb50d9 commit b2ed20a
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 6 deletions.
1 change: 1 addition & 0 deletions flow/connectors/clickhouse/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{
"CHAR": qvalue.QValueKindString,
"TEXT": qvalue.QValueKindString,
"String": qvalue.QValueKindString,
"FixedString(1)": qvalue.QValueKindQChar,
"Bool": qvalue.QValueKindBoolean,
"DateTime": qvalue.QValueKindTimestamp,
"TIMESTAMP": qvalue.QValueKindTimestamp,
Expand Down
6 changes: 6 additions & 0 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind {
return qvalue.QValueKindFloat32
case pgtype.Float8OID:
return qvalue.QValueKindFloat64
case pgtype.QCharOID:
return qvalue.QValueKindQChar
case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID:
return qvalue.QValueKindString
case pgtype.ByteaOID:
Expand Down Expand Up @@ -122,6 +124,8 @@ func qValueKindToPostgresType(colTypeStr string) string {
return "REAL"
case qvalue.QValueKindFloat64:
return "DOUBLE PRECISION"
case qvalue.QValueKindQChar:
return "\"char\""
case qvalue.QValueKindString:
return "TEXT"
case qvalue.QValueKindBytes:
Expand Down Expand Up @@ -277,6 +281,8 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
case qvalue.QValueKindFloat64:
floatVal := value.(float64)
val = qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal}
case qvalue.QValueKindQChar:
val = qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: uint8(value.(rune))}
case qvalue.QValueKindString:
// handling all unsupported types with strings as well for now.
val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(value)}
Expand Down
12 changes: 9 additions & 3 deletions flow/connectors/postgres/schema_delta_test_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var AddAllColumnTypes = []string{
string(qvalue.QValueKindJSON),
string(qvalue.QValueKindNumeric),
string(qvalue.QValueKindString),
string(qvalue.QValueKindQChar),
string(qvalue.QValueKindTime),
string(qvalue.QValueKindTimestamp),
string(qvalue.QValueKindTimestampTZ),
Expand Down Expand Up @@ -93,21 +94,26 @@ var AddAllColumnTypesFields = []*protos.FieldDescription{
},
{
Name: "c13",
Type: string(qvalue.QValueKindTime),
Type: string(qvalue.QValueKindQChar),
TypeModifier: -1,
},
{
Name: "c14",
Type: string(qvalue.QValueKindTimestamp),
Type: string(qvalue.QValueKindTime),
TypeModifier: -1,
},
{
Name: "c15",
Type: string(qvalue.QValueKindTimestampTZ),
Type: string(qvalue.QValueKindTimestamp),
TypeModifier: -1,
},
{
Name: "c16",
Type: string(qvalue.QValueKindTimestampTZ),
TypeModifier: -1,
},
{
Name: "c17",
Type: string(qvalue.QValueKindUUID),
TypeModifier: -1,
},
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue.
value = big.NewRat(int64(placeHolder), 1)
case qvalue.QValueKindUUID:
value = uuid.New() // assuming you have the github.com/google/uuid package
case qvalue.QValueKindQChar:
value = uint8(48)
// case qvalue.QValueKindArray:
// value = []int{1, 2, 3} // placeholder array, replace with actual logic
// case qvalue.QValueKindStruct:
Expand Down Expand Up @@ -85,6 +87,7 @@ func generateRecords(
qvalue.QValueKindNumeric,
qvalue.QValueKindBytes,
qvalue.QValueKindUUID,
qvalue.QValueKindQChar,
// qvalue.QValueKindJSON,
qvalue.QValueKindBit,
}
Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ func (g *GenericSQLQueryExecutor) CheckNull(ctx context.Context, schema string,
}

func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
if val == nil {
return qvalue.QValue{Kind: kind, Value: nil}, nil
}
switch kind {
case qvalue.QValueKindInt32:
if v, ok := val.(*sql.NullInt32); ok {
Expand Down Expand Up @@ -341,6 +344,10 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
return qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: nil}, nil
}
}
case qvalue.QValueKindQChar:
if v, ok := val.(uint8); ok {
return qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: v}, nil
}
case qvalue.QValueKindString:
if v, ok := val.(*sql.NullString); ok {
if v.Valid {
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/sqlserver/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var qValueKindToSQLServerTypeMap = map[qvalue.QValueKind]string{
qvalue.QValueKindFloat32: "REAL",
qvalue.QValueKindFloat64: "FLOAT",
qvalue.QValueKindNumeric: "DECIMAL(38, 9)",
qvalue.QValueKindQChar: "CHAR",
qvalue.QValueKindString: "NTEXT",
qvalue.QValueKindJSON: "NTEXT", // SQL Server doesn't have a native JSON type
qvalue.QValueKindTimestamp: "DATETIME2",
Expand Down Expand Up @@ -51,7 +52,7 @@ var sqlServerTypeToQValueKindMap = map[string]qvalue.QValueKind{
"UNIQUEIDENTIFIER": qvalue.QValueKindUUID,
"SMALLINT": qvalue.QValueKindInt32,
"TINYINT": qvalue.QValueKindInt32,
"CHAR": qvalue.QValueKindString,
"CHAR": qvalue.QValueKindQChar,
"VARCHAR": qvalue.QValueKindString,
"NCHAR": qvalue.QValueKindString,
"NVARCHAR": qvalue.QValueKindString,
Expand Down
6 changes: 6 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ func (r *RecordItems) toMap(hstoreAsJSON bool) (map[string]interface{}, error) {
}

jsonStruct[col] = binStr
case qvalue.QValueKindQChar:
ch, ok := v.Value.(uint8)
if !ok {
return nil, fmt.Errorf("expected \"char\" value for column %s for %T", col, v.Value)
}

jsonStruct[col] = string(ch)
case qvalue.QValueKindString, qvalue.QValueKindJSON:
strVal, ok := v.Value.(string)
if !ok {
Expand Down
8 changes: 8 additions & 0 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) {
}
values[i] = v

case qvalue.QValueKindQChar:
v, ok := qValue.Value.(uint8)
if !ok {
src.err = fmt.Errorf("invalid \"char\" value")
return nil, src.err
}
values[i] = rune(v)

case qvalue.QValueKindString:
v, ok := qValue.Value.(string)
if !ok {
Expand Down
5 changes: 3 additions & 2 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision
}

switch kind {
case QValueKindString:
case QValueKindString, QValueKindQChar:
return "string", nil
case QValueKindUUID:
return AvroSchemaLogical{
Expand Down Expand Up @@ -291,7 +291,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
return goavro.Union("int.date", t), nil
}
return t, nil

case QValueKindQChar:
return c.processNullableUnion("string", string(c.Value.Value.(uint8)))
case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr:
if c.TargetDWH == QDWHTypeSnowflake && c.Value.Value != nil &&
(len(c.Value.Value.(string)) > 15*1024*1024) {
Expand Down
3 changes: 3 additions & 0 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
QValueKindInt64 QValueKind = "int64"
QValueKindBoolean QValueKind = "bool"
QValueKindStruct QValueKind = "struct"
QValueKindQChar QValueKind = "qchar"
QValueKindString QValueKind = "string"
QValueKindTimestamp QValueKind = "timestamp"
QValueKindTimestampTZ QValueKind = "timestamptz"
Expand Down Expand Up @@ -63,6 +64,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindFloat32: "FLOAT",
QValueKindFloat64: "FLOAT",
QValueKindNumeric: "NUMBER(38, 9)",
QValueKindQChar: "CHAR",
QValueKindString: "STRING",
QValueKindJSON: "VARIANT",
QValueKindTimestamp: "TIMESTAMP_NTZ",
Expand Down Expand Up @@ -101,6 +103,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{
QValueKindFloat32: "Float32",
QValueKindFloat64: "Float64",
QValueKindNumeric: "Decimal128(9)",
QValueKindQChar: "FixedString(1)",
QValueKindString: "String",
QValueKindJSON: "String",
QValueKindTimestamp: "DateTime64(6)",
Expand Down
6 changes: 6 additions & 0 deletions flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func (q QValue) Equals(other QValue) bool {
return compareBoolean(q.Value, other.Value)
case QValueKindStruct:
return compareStruct(q.Value, other.Value)
case QValueKindQChar:
if (q.Value == nil) == (other.Value == nil) {
return q.Value == nil || q.Value.(uint8) == other.Value.(uint8)
} else {
return false
}
case QValueKindString:
return compareString(q.Value, other.Value)
// all internally represented as a Golang time.Time
Expand Down

0 comments on commit b2ed20a

Please sign in to comment.