diff --git a/flow/connectors/clickhouse/qvalue_convert.go b/flow/connectors/clickhouse/qvalue_convert.go index f80ea7857e..8ec8a52f0f 100644 --- a/flow/connectors/clickhouse/qvalue_convert.go +++ b/flow/connectors/clickhouse/qvalue_convert.go @@ -16,6 +16,7 @@ var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{ "CHAR": qvalue.QValueKindString, "TEXT": qvalue.QValueKindString, "String": qvalue.QValueKindString, + "FixedString(1)": qvalue.QValueKindQChar, "Bool": qvalue.QValueKindBoolean, "DateTime": qvalue.QValueKindTimestamp, "TIMESTAMP": qvalue.QValueKindTimestamp, diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index bd39fd1048..81f19e1ec9 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -32,6 +32,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind { return qvalue.QValueKindFloat32 case pgtype.Float8OID: return qvalue.QValueKindFloat64 + case pgtype.QCharOID: + return qvalue.QValueKindQChar case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID: return qvalue.QValueKindString case pgtype.ByteaOID: @@ -122,6 +124,8 @@ func qValueKindToPostgresType(colTypeStr string) string { return "REAL" case qvalue.QValueKindFloat64: return "DOUBLE PRECISION" + case qvalue.QValueKindQChar: + return "\"char\"" case qvalue.QValueKindString: return "TEXT" case qvalue.QValueKindBytes: @@ -277,6 +281,8 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( case qvalue.QValueKindFloat64: floatVal := value.(float64) val = qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal} + case qvalue.QValueKindQChar: + val = qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: uint8(value.(rune))} case qvalue.QValueKindString: // handling all unsupported types with strings as well for now. val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(value)} diff --git a/flow/connectors/postgres/schema_delta_test_constants.go b/flow/connectors/postgres/schema_delta_test_constants.go index d86f8e98a0..6ded70625a 100644 --- a/flow/connectors/postgres/schema_delta_test_constants.go +++ b/flow/connectors/postgres/schema_delta_test_constants.go @@ -19,6 +19,7 @@ var AddAllColumnTypes = []string{ string(qvalue.QValueKindJSON), string(qvalue.QValueKindNumeric), string(qvalue.QValueKindString), + string(qvalue.QValueKindQChar), string(qvalue.QValueKindTime), string(qvalue.QValueKindTimestamp), string(qvalue.QValueKindTimestampTZ), @@ -93,21 +94,26 @@ var AddAllColumnTypesFields = []*protos.FieldDescription{ }, { Name: "c13", - Type: string(qvalue.QValueKindTime), + Type: string(qvalue.QValueKindQChar), TypeModifier: -1, }, { Name: "c14", - Type: string(qvalue.QValueKindTimestamp), + Type: string(qvalue.QValueKindTime), TypeModifier: -1, }, { Name: "c15", - Type: string(qvalue.QValueKindTimestampTZ), + Type: string(qvalue.QValueKindTimestamp), TypeModifier: -1, }, { Name: "c16", + Type: string(qvalue.QValueKindTimestampTZ), + TypeModifier: -1, + }, + { + Name: "c17", Type: string(qvalue.QValueKindUUID), TypeModifier: -1, }, diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index bd9b155dab..0252b53fc8 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -40,6 +40,8 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue. value = big.NewRat(int64(placeHolder), 1) case qvalue.QValueKindUUID: value = uuid.New() // assuming you have the github.com/google/uuid package + case qvalue.QValueKindQChar: + value = uint8(48) // case qvalue.QValueKindArray: // value = []int{1, 2, 3} // placeholder array, replace with actual logic // case qvalue.QValueKindStruct: @@ -85,6 +87,7 @@ func generateRecords( qvalue.QValueKindNumeric, qvalue.QValueKindBytes, qvalue.QValueKindUUID, + qvalue.QValueKindQChar, // qvalue.QValueKindJSON, qvalue.QValueKindBit, } diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 14bc6629b7..64ff4ecc54 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -308,6 +308,9 @@ func (g *GenericSQLQueryExecutor) CheckNull(ctx context.Context, schema string, } func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { + if val == nil { + return qvalue.QValue{Kind: kind, Value: nil}, nil + } switch kind { case qvalue.QValueKindInt32: if v, ok := val.(*sql.NullInt32); ok { @@ -341,6 +344,10 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { return qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: nil}, nil } } + case qvalue.QValueKindQChar: + if v, ok := val.(uint8); ok { + return qvalue.QValue{Kind: qvalue.QValueKindQChar, Value: v}, nil + } case qvalue.QValueKindString: if v, ok := val.(*sql.NullString); ok { if v.Valid { diff --git a/flow/connectors/sqlserver/qvalue_convert.go b/flow/connectors/sqlserver/qvalue_convert.go index cff634139a..b4f73420e1 100644 --- a/flow/connectors/sqlserver/qvalue_convert.go +++ b/flow/connectors/sqlserver/qvalue_convert.go @@ -10,6 +10,7 @@ var qValueKindToSQLServerTypeMap = map[qvalue.QValueKind]string{ qvalue.QValueKindFloat32: "REAL", qvalue.QValueKindFloat64: "FLOAT", qvalue.QValueKindNumeric: "DECIMAL(38, 9)", + qvalue.QValueKindQChar: "CHAR", qvalue.QValueKindString: "NTEXT", qvalue.QValueKindJSON: "NTEXT", // SQL Server doesn't have a native JSON type qvalue.QValueKindTimestamp: "DATETIME2", @@ -51,7 +52,7 @@ var sqlServerTypeToQValueKindMap = map[string]qvalue.QValueKind{ "UNIQUEIDENTIFIER": qvalue.QValueKindUUID, "SMALLINT": qvalue.QValueKindInt32, "TINYINT": qvalue.QValueKindInt32, - "CHAR": qvalue.QValueKindString, + "CHAR": qvalue.QValueKindQChar, "VARCHAR": qvalue.QValueKindString, "NCHAR": qvalue.QValueKindString, "NVARCHAR": qvalue.QValueKindString, diff --git a/flow/model/model.go b/flow/model/model.go index 14de42a44e..a5bb754f2a 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -158,7 +158,13 @@ func (r *RecordItems) toMap(hstoreAsJSON bool) (map[string]interface{}, error) { } jsonStruct[col] = binStr + case qvalue.QValueKindQChar: + ch, ok := v.Value.(uint8) + if !ok { + return nil, fmt.Errorf("expected \"char\" value for column %s for %T", col, v.Value) + } + jsonStruct[col] = string(ch) case qvalue.QValueKindString, qvalue.QValueKindJSON: strVal, ok := v.Value.(string) if !ok { diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 572c788630..4455101ad8 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -137,6 +137,14 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { } values[i] = v + case qvalue.QValueKindQChar: + v, ok := qValue.Value.(uint8) + if !ok { + src.err = fmt.Errorf("invalid \"char\" value") + return nil, src.err + } + values[i] = rune(v) + case qvalue.QValueKindString: v, ok := qValue.Value.(string) if !ok { diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 4f5488cbff..e5e6046fb5 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -64,7 +64,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision } switch kind { - case QValueKindString: + case QValueKindString, QValueKindQChar: return "string", nil case QValueKindUUID: return AvroSchemaLogical{ @@ -291,7 +291,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return goavro.Union("int.date", t), nil } return t, nil - + case QValueKindQChar: + return c.processNullableUnion("string", string(c.Value.Value.(uint8))) case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr: if c.TargetDWH == QDWHTypeSnowflake && c.Value.Value != nil && (len(c.Value.Value.(string)) > 15*1024*1024) { diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 6328897d3f..78c9ece45b 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -17,6 +17,7 @@ const ( QValueKindInt64 QValueKind = "int64" QValueKindBoolean QValueKind = "bool" QValueKindStruct QValueKind = "struct" + QValueKindQChar QValueKind = "qchar" QValueKindString QValueKind = "string" QValueKindTimestamp QValueKind = "timestamp" QValueKindTimestampTZ QValueKind = "timestamptz" @@ -63,6 +64,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindFloat32: "FLOAT", QValueKindFloat64: "FLOAT", QValueKindNumeric: "NUMBER(38, 9)", + QValueKindQChar: "CHAR", QValueKindString: "STRING", QValueKindJSON: "VARIANT", QValueKindTimestamp: "TIMESTAMP_NTZ", @@ -101,6 +103,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindFloat32: "Float32", QValueKindFloat64: "Float64", QValueKindNumeric: "Decimal128(9)", + QValueKindQChar: "FixedString(1)", QValueKindString: "String", QValueKindJSON: "String", QValueKindTimestamp: "DateTime64(6)", diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index b9e040e170..b0b556b3d7 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -42,6 +42,12 @@ func (q QValue) Equals(other QValue) bool { return compareBoolean(q.Value, other.Value) case QValueKindStruct: return compareStruct(q.Value, other.Value) + case QValueKindQChar: + if (q.Value == nil) == (other.Value == nil) { + return q.Value == nil || q.Value.(uint8) == other.Value.(uint8) + } else { + return false + } case QValueKindString: return compareString(q.Value, other.Value) // all internally represented as a Golang time.Time