Skip to content

Commit

Permalink
registers a couple of times in sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 8, 2024
1 parent 4271ea0 commit 6331f15
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
replConfig.ConnConfig.RuntimeParams["replication"] = "database"
replConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex"
replConfig.MaxConns = 1

connConfig.AfterConnect = utils.RegisterHStore
pool, err := NewSSHWrappedPostgresPool(ctx, connConfig, pgConfig.SshConfig)
if err != nil {
return nil, fmt.Errorf("failed to create connection pool: %w", err)
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"

"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -52,14 +53,15 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
}

txConfig := s.connector.pool.poolConfig.Copy()
txConfig.AfterConnect = utils.RegisterCustomTypesForConnection
txConfig.AfterConnect = utils.RegisterHStore
txPool, err := pgxpool.NewWithConfig(s.connector.pool.ctx, txConfig)
if err != nil {
return 0, fmt.Errorf("failed to create tx pool: %v", err)
}

// Second transaction - to handle rest of the processing
tx, err := txPool.Begin(context.Background())
tx.Conn().TypeMap().RegisterType(&pgtype.Type{Name: "hstore", OID: 19698, Codec: pgtype.HstoreCodec{}})
if err != nil {
return 0, fmt.Errorf("failed to begin transaction: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func qValueKindToPostgresType(qvalueKind string) string {
return "BYTEA"
case qvalue.QValueKindJSON:
return "JSONB"
case qvalue.QValueKindHStore:
return "HSTORE"
case qvalue.QValueKindUUID:
return "UUID"
case qvalue.QValueKindTime:
Expand Down Expand Up @@ -399,6 +401,8 @@ func customTypeToQKind(typeName string) qvalue.QValueKind {
qValueKind = qvalue.QValueKindGeometry
case "geography":
qValueKind = qvalue.QValueKindGeography
case "hstore":
qValueKind = qvalue.QValueKindHStore
default:
qValueKind = qvalue.QValueKindString
}
Expand Down
19 changes: 6 additions & 13 deletions flow/connectors/utils/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,14 @@ func GetCustomDataTypes(ctx context.Context, pool *pgxpool.Pool) (map[uint32]str
return customTypeMap, nil
}

func RegisterCustomTypesForConnection(ctx context.Context, conn *pgx.Conn) error {
typeNames := []string{"hstore", "geometry", "geography"}
typeOIDs := make(map[string]uint32)

for _, typeName := range typeNames {
err := conn.QueryRow(ctx, `SELECT oid FROM pg_type WHERE typname = $1`, typeName).Scan(typeOIDs[typeName])
if err != nil {
return err
}
func RegisterHStore(ctx context.Context, conn *pgx.Conn) error {
var hstoreOID uint32
err := conn.QueryRow(context.Background(), `select oid from pg_type where typname = 'hstore'`).Scan(&hstoreOID)
if err != nil {
return err
}

typeMap := conn.TypeMap()
for typeName, typeOID := range typeOIDs {
typeMap.RegisterType(&pgtype.Type{Name: typeName, OID: typeOID})
}
conn.TypeMap().RegisterType(&pgtype.Type{Name: "hstore", OID: hstoreOID, Codec: pgtype.HstoreCodec{}})

return nil
}
3 changes: 2 additions & 1 deletion flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
myh HSTORE NOT NULL
);
`, srcTableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -82,7 +83,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"')
`, srcTableName), testKey, testValue)
e2e.EnvNoError(s.t, env, err)
}
Expand Down
3 changes: 2 additions & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err
"f7 jsonb",
"f8 smallint",
"my_date DATE",
"myh HSTORE",
}
if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") {
tblFields = append(tblFields, `"geometryPoint" geometry(point)`,
Expand Down Expand Up @@ -283,7 +284,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro
CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012],
ARRAY['varchar1', 'varchar2'], '{"key": -8.02139037433155}',
'[{"key1": "value1", "key2": "value2", "key3": "value3"}]',
'{"key": "value"}', 15, CURRENT_DATE %s
'{"key": "value"}', 15, CURRENT_DATE, '"a"=>"b"' %s
)`,
id, uuid.New().String(), uuid.New().String(),
uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), geoValues)
Expand Down

0 comments on commit 6331f15

Please sign in to comment.