From 6331f15a6c5ba8f0cb835722e8a93e386d914d11 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 8 Jan 2024 20:38:58 +0530 Subject: [PATCH] registers a couple of times in sync --- flow/connectors/postgres/postgres.go | 2 +- flow/connectors/postgres/qrep_sync_method.go | 4 +++- flow/connectors/postgres/qvalue_convert.go | 4 ++++ flow/connectors/utils/postgres.go | 19 ++++++------------- flow/e2e/postgres/peer_flow_pg_test.go | 3 ++- flow/e2e/test_utils.go | 3 ++- 6 files changed, 18 insertions(+), 17 deletions(-) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 43a11a2e72..605af24cd3 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -58,7 +58,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) replConfig.ConnConfig.RuntimeParams["replication"] = "database" replConfig.ConnConfig.RuntimeParams["bytea_output"] = "hex" replConfig.MaxConns = 1 - + connConfig.AfterConnect = utils.RegisterHStore pool, err := NewSSHWrappedPostgresPool(ctx, connConfig, pgConfig.SshConfig) if err != nil { return nil, fmt.Errorf("failed to create connection pool: %w", err) diff --git a/flow/connectors/postgres/qrep_sync_method.go b/flow/connectors/postgres/qrep_sync_method.go index 0577fd6d3f..137461d68b 100644 --- a/flow/connectors/postgres/qrep_sync_method.go +++ b/flow/connectors/postgres/qrep_sync_method.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "google.golang.org/protobuf/encoding/protojson" @@ -52,7 +53,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( } txConfig := s.connector.pool.poolConfig.Copy() - txConfig.AfterConnect = utils.RegisterCustomTypesForConnection + txConfig.AfterConnect = utils.RegisterHStore txPool, err := pgxpool.NewWithConfig(s.connector.pool.ctx, txConfig) if err != nil { return 0, fmt.Errorf("failed to create tx pool: %v", err) @@ -60,6 +61,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( // Second transaction - to handle rest of the processing tx, err := txPool.Begin(context.Background()) + tx.Conn().TypeMap().RegisterType(&pgtype.Type{Name: "hstore", OID: 19698, Codec: pgtype.HstoreCodec{}}) if err != nil { return 0, fmt.Errorf("failed to begin transaction: %v", err) } diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 0037a43fa1..02a519ceb5 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -111,6 +111,8 @@ func qValueKindToPostgresType(qvalueKind string) string { return "BYTEA" case qvalue.QValueKindJSON: return "JSONB" + case qvalue.QValueKindHStore: + return "HSTORE" case qvalue.QValueKindUUID: return "UUID" case qvalue.QValueKindTime: @@ -399,6 +401,8 @@ func customTypeToQKind(typeName string) qvalue.QValueKind { qValueKind = qvalue.QValueKindGeometry case "geography": qValueKind = qvalue.QValueKindGeography + case "hstore": + qValueKind = qvalue.QValueKindHStore default: qValueKind = qvalue.QValueKindString } diff --git a/flow/connectors/utils/postgres.go b/flow/connectors/utils/postgres.go index 9a9742fef3..476cabfcf0 100644 --- a/flow/connectors/utils/postgres.go +++ b/flow/connectors/utils/postgres.go @@ -58,21 +58,14 @@ func GetCustomDataTypes(ctx context.Context, pool *pgxpool.Pool) (map[uint32]str return customTypeMap, nil } -func RegisterCustomTypesForConnection(ctx context.Context, conn *pgx.Conn) error { - typeNames := []string{"hstore", "geometry", "geography"} - typeOIDs := make(map[string]uint32) - - for _, typeName := range typeNames { - err := conn.QueryRow(ctx, `SELECT oid FROM pg_type WHERE typname = $1`, typeName).Scan(typeOIDs[typeName]) - if err != nil { - return err - } +func RegisterHStore(ctx context.Context, conn *pgx.Conn) error { + var hstoreOID uint32 + err := conn.QueryRow(context.Background(), `select oid from pg_type where typname = 'hstore'`).Scan(&hstoreOID) + if err != nil { + return err } - typeMap := conn.TypeMap() - for typeName, typeOID := range typeOIDs { - typeMap.RegisterType(&pgtype.Type{Name: typeName, OID: typeOID}) - } + conn.TypeMap().RegisterType(&pgtype.Type{Name: "hstore", OID: hstoreOID, Codec: pgtype.HstoreCodec{}}) return nil } diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 90d557d863..cbfa627c85 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -54,6 +54,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { id SERIAL PRIMARY KEY, key TEXT NOT NULL, value TEXT NOT NULL + myh HSTORE NOT NULL ); `, srcTableName)) require.NoError(s.t, err) @@ -82,7 +83,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(key, value) VALUES ($1, $2) + INSERT INTO %s(key, value, myh) VALUES ($1, $2, '"a"=>"b"') `, srcTableName), testKey, testValue) e2e.EnvNoError(s.t, env, err) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 88f4fb3486..b0bbafb66e 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -223,6 +223,7 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err "f7 jsonb", "f8 smallint", "my_date DATE", + "myh HSTORE", } if strings.Contains(tableName, "sf") || strings.Contains(tableName, "bq") { tblFields = append(tblFields, `"geometryPoint" geometry(point)`, @@ -283,7 +284,7 @@ func PopulateSourceTable(pool *pgxpool.Pool, suffix string, tableName string, ro CURRENT_TIMESTAMP, 1, ARRAY['text1', 'text2'], ARRAY[123, 456], ARRAY[789, 012], ARRAY['varchar1', 'varchar2'], '{"key": -8.02139037433155}', '[{"key1": "value1", "key2": "value2", "key3": "value3"}]', - '{"key": "value"}', 15, CURRENT_DATE %s + '{"key": "value"}', 15, CURRENT_DATE, '"a"=>"b"' %s )`, id, uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), uuid.New().String(), geoValues)