Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for "char" #1285

Merged
merged 3 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flow/connectors/clickhouse/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{
"CHAR": qvalue.QValueKindString,
"TEXT": qvalue.QValueKindString,
"String": qvalue.QValueKindString,
"FixedString(1)": qvalue.QValueKindQChar,
"Bool": qvalue.QValueKindBoolean,
"DateTime": qvalue.QValueKindTimestamp,
"TIMESTAMP": qvalue.QValueKindTimestamp,
Expand Down
6 changes: 6 additions & 0 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind {
return qvalue.QValueKindFloat32
case pgtype.Float8OID:
return qvalue.QValueKindFloat64
case pgtype.QCharOID:
return qvalue.QValueKindQChar
case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID:
return qvalue.QValueKindString
case pgtype.ByteaOID:
Expand Down Expand Up @@ -122,6 +124,8 @@ func qValueKindToPostgresType(colTypeStr string) string {
return "REAL"
case qvalue.QValueKindFloat64:
return "DOUBLE PRECISION"
case qvalue.QValueKindQChar:
return "\"char\""
case qvalue.QValueKindString:
return "TEXT"
case qvalue.QValueKindBytes:
Expand Down Expand Up @@ -277,6 +281,8 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
case qvalue.QValueKindFloat64:
floatVal := value.(float64)
val = qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal}
case qvalue.QValueKindQChar:
val = qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: uint8(value.(rune))}
case qvalue.QValueKindString:
// handling all unsupported types with strings as well for now.
val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(value)}
Expand Down
12 changes: 9 additions & 3 deletions flow/connectors/postgres/schema_delta_test_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var AddAllColumnTypes = []string{
string(qvalue.QValueKindJSON),
string(qvalue.QValueKindNumeric),
string(qvalue.QValueKindString),
string(qvalue.QValueKindQChar),
string(qvalue.QValueKindTime),
string(qvalue.QValueKindTimestamp),
string(qvalue.QValueKindTimestampTZ),
Expand Down Expand Up @@ -93,21 +94,26 @@ var AddAllColumnTypesFields = []*protos.FieldDescription{
},
{
Name: "c13",
Type: string(qvalue.QValueKindTime),
Type: string(qvalue.QValueKindQChar),
TypeModifier: -1,
},
{
Name: "c14",
Type: string(qvalue.QValueKindTimestamp),
Type: string(qvalue.QValueKindTime),
TypeModifier: -1,
},
{
Name: "c15",
Type: string(qvalue.QValueKindTimestampTZ),
Type: string(qvalue.QValueKindTimestamp),
TypeModifier: -1,
},
{
Name: "c16",
Type: string(qvalue.QValueKindTimestampTZ),
TypeModifier: -1,
},
{
Name: "c17",
Type: string(qvalue.QValueKindUUID),
TypeModifier: -1,
},
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue.
value = big.NewRat(int64(placeHolder), 1)
case qvalue.QValueKindUUID:
value = uuid.New() // assuming you have the github.com/google/uuid package
case qvalue.QValueKindQChar:
value = uint8(48)
// case qvalue.QValueKindArray:
// value = []int{1, 2, 3} // placeholder array, replace with actual logic
// case qvalue.QValueKindStruct:
Expand Down Expand Up @@ -85,6 +87,7 @@ func generateRecords(
qvalue.QValueKindNumeric,
qvalue.QValueKindBytes,
qvalue.QValueKindUUID,
qvalue.QValueKindQChar,
// qvalue.QValueKindJSON,
qvalue.QValueKindBit,
}
Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ func (g *GenericSQLQueryExecutor) CheckNull(ctx context.Context, schema string,
}

func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
if val == nil {
return qvalue.QValue{Kind: kind, Value: nil}, nil
}
switch kind {
case qvalue.QValueKindInt32:
if v, ok := val.(*sql.NullInt32); ok {
Expand Down Expand Up @@ -341,6 +344,10 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) {
return qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: nil}, nil
}
}
case qvalue.QValueKindQChar:
if v, ok := val.(uint8); ok {
return qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: v}, nil
}
case qvalue.QValueKindString:
if v, ok := val.(*sql.NullString); ok {
if v.Valid {
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/sqlserver/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var qValueKindToSQLServerTypeMap = map[qvalue.QValueKind]string{
qvalue.QValueKindFloat32: "REAL",
qvalue.QValueKindFloat64: "FLOAT",
qvalue.QValueKindNumeric: "DECIMAL(38, 9)",
qvalue.QValueKindQChar: "CHAR",
qvalue.QValueKindString: "NTEXT",
qvalue.QValueKindJSON: "NTEXT", // SQL Server doesn't have a native JSON type
qvalue.QValueKindTimestamp: "DATETIME2",
Expand Down Expand Up @@ -51,7 +52,7 @@ var sqlServerTypeToQValueKindMap = map[string]qvalue.QValueKind{
"UNIQUEIDENTIFIER": qvalue.QValueKindUUID,
"SMALLINT": qvalue.QValueKindInt32,
"TINYINT": qvalue.QValueKindInt32,
"CHAR": qvalue.QValueKindString,
"CHAR": qvalue.QValueKindQChar,
"VARCHAR": qvalue.QValueKindString,
"NCHAR": qvalue.QValueKindString,
"NVARCHAR": qvalue.QValueKindString,
Expand Down
6 changes: 6 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ func (r *RecordItems) toMap(hstoreAsJSON bool) (map[string]interface{}, error) {
}

jsonStruct[col] = binStr
case qvalue.QValueKindQChar:
ch, ok := v.Value.(uint8)
if !ok {
return nil, fmt.Errorf("expected \"char\" value for column %s for %T", col, v.Value)
}

jsonStruct[col] = string(ch)
case qvalue.QValueKindString, qvalue.QValueKindJSON:
strVal, ok := v.Value.(string)
if !ok {
Expand Down
8 changes: 8 additions & 0 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) {
}
values[i] = v

case qvalue.QValueKindQChar:
v, ok := qValue.Value.(uint8)
if !ok {
src.err = fmt.Errorf("invalid \"char\" value")
return nil, src.err
}
values[i] = rune(v)

case qvalue.QValueKindString:
v, ok := qValue.Value.(string)
if !ok {
Expand Down
5 changes: 3 additions & 2 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision
}

switch kind {
case QValueKindString:
case QValueKindString, QValueKindQChar:
return "string", nil
case QValueKindUUID:
return AvroSchemaLogical{
Expand Down Expand Up @@ -291,7 +291,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
return goavro.Union("int.date", t), nil
}
return t, nil

case QValueKindQChar:
return c.processNullableUnion("string", string(c.Value.Value.(uint8)))
case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr:
if c.TargetDWH == QDWHTypeSnowflake && c.Value.Value != nil &&
(len(c.Value.Value.(string)) > 15*1024*1024) {
Expand Down
3 changes: 3 additions & 0 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
QValueKindInt64 QValueKind = "int64"
QValueKindBoolean QValueKind = "bool"
QValueKindStruct QValueKind = "struct"
QValueKindQChar QValueKind = "qchar"
QValueKindString QValueKind = "string"
QValueKindTimestamp QValueKind = "timestamp"
QValueKindTimestampTZ QValueKind = "timestamptz"
Expand Down Expand Up @@ -63,6 +64,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindFloat32: "FLOAT",
QValueKindFloat64: "FLOAT",
QValueKindNumeric: "NUMBER(38, 9)",
QValueKindQChar: "CHAR",
QValueKindString: "STRING",
QValueKindJSON: "VARIANT",
QValueKindTimestamp: "TIMESTAMP_NTZ",
Expand Down Expand Up @@ -101,6 +103,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{
QValueKindFloat32: "Float32",
QValueKindFloat64: "Float64",
QValueKindNumeric: "Decimal128(9)",
QValueKindQChar: "FixedString(1)",
QValueKindString: "String",
QValueKindJSON: "String",
QValueKindTimestamp: "DateTime64(6)",
Expand Down
6 changes: 6 additions & 0 deletions flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func (q QValue) Equals(other QValue) bool {
return compareBoolean(q.Value, other.Value)
case QValueKindStruct:
return compareStruct(q.Value, other.Value)
case QValueKindQChar:
if (q.Value == nil) == (other.Value == nil) {
return q.Value == nil || q.Value.(uint8) == other.Value.(uint8)
} else {
return false
}
case QValueKindString:
return compareString(q.Value, other.Value)
// all internally represented as a Golang time.Time
Expand Down
Loading