Skip to content

Commit

Permalink
HStore and Geospatial for Postgres (#1091)
Browse files Browse the repository at this point in the history
Adds support for hstore and geospatial types for postgres as destination
for qrep and cdc.

Tests added
  • Loading branch information
Amogh-Bharadwaj authored Jan 17, 2024
1 parent c92825e commit 9440390
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 75 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *EventHubConnector) processBatch(
) (uint32, error) {
ctx := context.Background()
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false)

eventHubFlushTimeout := peerdbenv.PeerDBEventhubFlushTimeoutSeconds()

Expand Down
21 changes: 16 additions & 5 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
replConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConfig.MaxConns = 1

pool, err := NewSSHWrappedPostgresPool(ctx, connConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create connection pool: %w", err)
Expand Down Expand Up @@ -284,7 +283,10 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
for record := range req.Records.GetRecords() {
switch typedRecord := record.(type) {
case *model.InsertRecord:
itemsJSON, err := typedRecord.Items.ToJSON()
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize insert record items to JSON: %w", err)
}
Expand All @@ -301,11 +303,17 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSON()
newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err)
}
oldItemsJSON, err := typedRecord.OldItems.ToJSON()
oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize update record old items to JSON: %w", err)
}
Expand All @@ -322,7 +330,10 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
})
tableNameRowsMapping[typedRecord.DestinationTableName] += 1
case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSON()
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize delete record items to JSON: %w", err)
}
Expand Down
12 changes: 9 additions & 3 deletions flow/connectors/postgres/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"google.golang.org/protobuf/encoding/protojson"
)
Expand Down Expand Up @@ -44,16 +45,21 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
)
partitionID := partition.PartitionId
startTime := time.Now()

pool := s.connector.pool
schema, err := stream.Schema()
if err != nil {
slog.Error("failed to get schema from stream", slog.Any("error", err), syncLog)
return 0, fmt.Errorf("failed to get schema from stream: %w", err)
}

txConfig := s.connector.pool.poolConfig.Copy()
txConfig.AfterConnect = utils.RegisterHStore
txPool, err := pgxpool.NewWithConfig(s.connector.pool.ctx, txConfig)
if err != nil {
return 0, fmt.Errorf("failed to create tx pool: %v", err)
}

// Second transaction - to handle rest of the processing
tx, err := pool.Begin(context.Background())
tx, err := txPool.Begin(context.Background())
if err != nil {
return 0, fmt.Errorf("failed to begin transaction: %v", err)
}
Expand Down
6 changes: 6 additions & 0 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func qValueKindToPostgresType(qvalueKind string) string {
return "DOUBLE PRECISION[]"
case qvalue.QValueKindArrayString:
return "TEXT[]"
case qvalue.QValueKindGeography:
return "GEOGRAPHY"
case qvalue.QValueKindGeometry:
return "GEOMETRY"
case qvalue.QValueKindPoint:
return "POINT"
default:
return "TEXT"
}
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -56,3 +57,15 @@ func GetCustomDataTypes(ctx context.Context, pool *pgxpool.Pool) (map[uint32]str
}
return customTypeMap, nil
}

func RegisterHStore(ctx context.Context, conn *pgx.Conn) error {
var hstoreOID uint32
err := conn.QueryRow(context.Background(), `select oid from pg_type where typname = 'hstore'`).Scan(&hstoreOID)
if err != nil {
return err
}

conn.TypeMap().RegisterType(&pgtype.Type{Name: "hstore", OID: hstoreOID, Codec: pgtype.HstoreCodec{}})

return nil
}
59 changes: 57 additions & 2 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
value TEXT NOT NULL,
myh HSTORE NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -112,7 +113,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"')
`, srcTableName), testKey, testValue)
e2e.EnvNoError(s.t, env, err)
}
Expand All @@ -132,6 +133,60 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)

srcTableName := s.attachSchemaSuffix("test_geospatial_pg")
dstTableName := s.attachSchemaSuffix("test_geospatial_pg_dst")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
gg geography NOT NULL,
gm geometry NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_geo_flow_pg"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.peer,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()

limits := peerflow.CDCFlowLimits{
ExitAfterRecords: 1,
MaxBatchSize: 100,
}

go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 1 row into the source table
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(gg, gm) VALUES ('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))','LINESTRING(0 0, 1 1, 2 2)')
`, srcTableName))
e2e.EnvNoError(s.t, env, err)

s.t.Log("Inserted 1 row into the source table")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

err = s.comparePGTables(srcTableName, dstTableName, "id,gg,gm")
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down
59 changes: 24 additions & 35 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,13 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err
"f8 smallint",
"my_date DATE",
"my_mood mood",
}
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
tblFields = append(tblFields, `"geometryPoint" geometry(point)`,
"geography_point geography(point)",
"geometry_linestring geometry(linestring)",
"geography_linestring geography(linestring)",
"geometry_polygon geometry(polygon)",
"geography_polygon geography(polygon)")
"myh HSTORE",
`"geometryPoint" geometry(point)`,
"geography_point geography(point)",
"geometry_linestring geometry(linestring)",
"geography_linestring geography(linestring)",
"geometry_polygon geometry(polygon)",
"geography_polygon geography(polygon)",
}
tblFieldStr := strings.Join(tblFields, ",")
var pgErr *pgconn.PgError
Expand Down Expand Up @@ -289,37 +288,27 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
for i := 0; i < rowCount-1; i++ {
id := uuid.New().String()
ids = append(ids, id)
geoValues := ""
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
geoValues = `,'POINT(1 2)','POINT(40.7128 -74.0060)',
'LINESTRING(0 0, 1 1, 2 2)',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)',
'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'`
}
row := fmt.Sprintf(`
(
'%s', '%s', CURRENT_TIMESTAMP, 3.86487206688919, CURRENT_TIMESTAMP,
CURRENT_TIMESTAMP, E'\\\\xDEADBEEF', 'type1', '%s',
1, 0, 1, 'dealType1',
'%s', '%s', false, 1.2345,
1.2345, false, 12345, '%s',
12345, 1, '%s', CURRENT_TIMESTAMP, 'refID',
CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012],
ARRAY['varchar1', 'varchar2'], '{"key": -8.02139037433155}',
'[{"key1": "value1", "key2": "value2", "key3": "value3"}]',
'{"key": "value"}', 15, CURRENT_DATE, 'happy' %s
'%s', '%s', CURRENT_TIMESTAMP, 3.86487206688919, CURRENT_TIMESTAMP,
CURRENT_TIMESTAMP, E'\\\\xDEADBEEF', 'type1', '%s',
1, 0, 1, 'dealType1',
'%s', '%s', false, 1.2345,
1.2345, false, 12345, '%s',
12345, 1, '%s', CURRENT_TIMESTAMP, 'refID',
CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012],
ARRAY['varchar1', 'varchar2'], '{"key": -8.02139037433155}',
'[{"key1": "value1", "key2": "value2", "key3": "value3"}]',
'{"key": "value"}', 15, CURRENT_DATE, 'happy', '"a"=>"b"','POINT(1 2)','POINT(40.7128 -74.0060)',
'LINESTRING(0 0, 1 1, 2 2)',
'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)',
'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))','POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'
)`,
id, uuid.New().String(), uuid.New().String(),
uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), geoValues)
uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String())
rows = append(rows, row)
}

geoColumns := ""
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
geoColumns = `,"geometryPoint", geography_point,` +
"geometry_linestring, geography_linestring," +
"geometry_polygon, geography_polygon"
}
_, err := pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO e2e_test_%s.%s (
id, card_id, "from", price, created_at,
Expand All @@ -328,10 +317,10 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
deal_id, ethereum_transaction_id, ignore_price, card_eth_value,
paid_eth_price, card_bought_notified, address, account_id,
asset_id, status, transaction_id, settled_at, reference_id,
settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, my_date, my_mood
%s
settle_at, settlement_delay_reason, f1, f2, f3, f4, f5, f6, f7, f8, my_date, my_mood, myh,
"geometryPoint", geography_point,geometry_linestring, geography_linestring,geometry_polygon, geography_polygon
) VALUES %s;
`, suffix, tableName, geoColumns, strings.Join(rows, ",")))
`, suffix, tableName, strings.Join(rows, ",")))
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions flow/geo/geo.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ func GeoValidate(hexWkb string) (string, error) {
return wkt, nil
}

func GeoToWKB(wkt string) ([]byte, error) {
// UnmarshalWKB performs geometry validation along with WKB parsing
geometryObject, geoErr := geom.NewGeomFromWKT(wkt)
if geoErr != nil {
return []byte{}, geoErr
}

return geometryObject.ToWKB(), nil
}

// compares WKTs
func GeoCompare(wkt1, wkt2 string) bool {
geom1, geoErr := geom.NewGeomFromWKT(wkt1)
Expand Down
36 changes: 24 additions & 12 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (r *RecordItems) Len() int {
return len(r.Values)
}

func (r *RecordItems) toMap() (map[string]interface{}, error) {
func (r *RecordItems) toMap(hstoreAsJSON bool) (map[string]interface{}, error) {
if r.ColToValIdx == nil {
return nil, errors.New("colToValIdx is nil")
}
Expand Down Expand Up @@ -163,15 +163,19 @@ func (r *RecordItems) toMap() (map[string]interface{}, error) {
return nil, fmt.Errorf("expected string value for hstore column %s for value %T", col, v.Value)
}

jsonVal, err := hstore_util.ParseHstore(hstoreVal)
if err != nil {
return nil, fmt.Errorf("unable to convert hstore column %s to json for value %T", col, v.Value)
}

if len(jsonVal) > 15*1024*1024 {
jsonStruct[col] = ""
if !hstoreAsJSON {
jsonStruct[col] = hstoreVal
} else {
jsonStruct[col] = jsonVal
jsonVal, err := hstore_util.ParseHstore(hstoreVal)
if err != nil {
return nil, fmt.Errorf("unable to convert hstore column %s to json for value %T", col, v.Value)
}

if len(jsonVal) > 15*1024*1024 {
jsonStruct[col] = ""
} else {
jsonStruct[col] = jsonVal
}
}

case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ, qvalue.QValueKindDate,
Expand Down Expand Up @@ -246,20 +250,22 @@ func (r *RecordItems) toMap() (map[string]interface{}, error) {

type ToJSONOptions struct {
UnnestColumns map[string]struct{}
HStoreAsJSON bool
}

func NewToJSONOptions(unnestCols []string) *ToJSONOptions {
func NewToJSONOptions(unnestCols []string, hstoreAsJSON bool) *ToJSONOptions {
unnestColumns := make(map[string]struct{}, len(unnestCols))
for _, col := range unnestCols {
unnestColumns[col] = struct{}{}
}
return &ToJSONOptions{
UnnestColumns: unnestColumns,
HStoreAsJSON: hstoreAsJSON,
}
}

func (r *RecordItems) ToJSONWithOpts(opts *ToJSONOptions) (string, error) {
jsonStruct, err := r.toMap()
jsonStruct, err := r.toMap(opts.HStoreAsJSON)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -290,9 +296,15 @@ func (r *RecordItems) ToJSONWithOpts(opts *ToJSONOptions) (string, error) {
return string(jsonBytes), nil
}

// a separate method like gives flexibility
// for us to handle some data types differently
func (r *RecordItems) ToJSONWithOptions(options *ToJSONOptions) (string, error) {
return r.ToJSONWithOpts(options)
}

func (r *RecordItems) ToJSON() (string, error) {
unnestCols := make([]string, 0)
return r.ToJSONWithOpts(NewToJSONOptions(unnestCols))
return r.ToJSONWithOpts(NewToJSONOptions(unnestCols, true))
}

type InsertRecord struct {
Expand Down
Loading

0 comments on commit 9440390

Please sign in to comment.