Skip to content

Commit

Permalink
HStore For Snowflake (#1047)
Browse files Browse the repository at this point in the history
Maps HStore to a JSON-compatible Variant in Snowflake - for CDC and QRep
  • Loading branch information
Amogh-Bharadwaj authored Jan 11, 2024
1 parent 884c187 commit 3beed6a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("TO_GEOMETRY(CAST(%s:\"%s\" AS STRING),true) AS %s,",
toVariantColumnName, columnName, targetColumnName))
case qvalue.QValueKindJSON:
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("PARSE_JSON(CAST(%s:\"%s\" AS STRING)) AS %s,",
toVariantColumnName, columnName, targetColumnName))
Expand Down
14 changes: 11 additions & 3 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() {
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT),
c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON), c48 mood);
c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON),
c48 mood, c49 HSTORE);
`, srcTableName))
require.NoError(s.t, err)

Expand Down Expand Up @@ -722,7 +723,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() {
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'),
'POINT(1 2)','POINT(40.7128 -74.0060)','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)',
'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))', 'happy';
'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))', 'happy','"a"=>"a\"quote\"", "b"=>NULL';
`, srcTableName))
e2e.EnvNoError(s.t, env, err)
}()
Expand All @@ -740,7 +741,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() {
"c41", "c1", "c2", "c3", "c4",
"c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18",
"c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36",
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48",
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48", "c49",
})
if err != nil {
s.t.Log(err)
Expand All @@ -750,6 +751,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() {
err = s.checkJSONValue(dstTableName, "c17", "sai", "1")
require.NoError(s.t, err)

// check if HSTORE on snowflake is a good JSON
err = s.checkJSONValue(dstTableName, "c49", "a", `"a\"quote\""`)
require.NoError(s.t, err)

err = s.checkJSONValue(dstTableName, "c49", "b", "null")
require.NoError(s.t, err)

// Make sure that there are no nulls
require.Equal(s.t, noNulls, true)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindUUID: "STRING",
QValueKindTimeTZ: "STRING",
QValueKindInvalid: "STRING",
QValueKindHStore: "STRING",
QValueKindHStore: "VARIANT",
QValueKindGeography: "GEOGRAPHY",
QValueKindGeometry: "GEOMETRY",
QValueKindPoint: "GEOMETRY",
Expand Down

0 comments on commit 3beed6a

Please sign in to comment.