From 1cab754b91c48ad9c1e5039806bbc1ceceb74b74 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 14 May 2024 11:58:14 +0530 Subject: [PATCH 1/5] UI: Better table picker data (#1713) - Let's not list partitioned tables as what matters is the root table - Fix duplicate columns showing in the table picker functionally tested --- flow/cmd/peer_data.go | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index c88cb0c868..5838157799 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -12,7 +12,6 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" ) func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName string) (*protos.PostgresConfig, error) { @@ -88,17 +87,6 @@ func (h *FlowRequestHandler) GetTablesInSchema( defer tunnel.Close() defer peerConn.Close(ctx) - pgVersion, err := shared.GetMajorVersion(ctx, peerConn) - if err != nil { - slog.Error("unable to get pgversion for schema tables", slog.Any("error", err)) - return &protos.SchemaTablesResponse{Tables: nil}, err - } - - relKindFilterClause := "t.relkind IN ('r', 'p')" - if pgVersion <= shared.POSTGRES_12 { - relKindFilterClause = "t.relkind = 'r'" - } - rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname) t.relname, (con.contype = 'p' OR t.relreplident in ('i', 'f')) AS can_mirror, @@ -106,9 +94,7 @@ func (h *FlowRequestHandler) GetTablesInSchema( FROM pg_class t LEFT JOIN pg_namespace n ON t.relnamespace = n.oid LEFT JOIN pg_constraint con ON con.conrelid = t.oid - WHERE n.nspname = $1 AND `+ - relKindFilterClause+ - ` ORDER BY t.relname, can_mirror DESC; + WHERE n.nspname = $1 AND t.relkind = 'r' ORDER BY t.relname, can_mirror DESC; `, req.SchemaName) if err != nil { slog.Info("failed to fetch publications", slog.Any("error", err)) @@ -195,7 +181,7 @@ func (h *FlowRequestHandler) GetColumns( rows, err := peerConn.Query(ctx, ` SELECT - attname AS column_name, + distinct attname AS column_name, format_type(atttypid, atttypmod) AS data_type, CASE WHEN attnum = ANY(conkey) THEN true @@ -217,7 +203,7 @@ func (h *FlowRequestHandler) GetColumns( AND pg_attribute.attnum > 0 AND NOT attisdropped ORDER BY - attnum; + column_name; `, req.SchemaName, req.TableName) if err != nil { return &protos.TableColumnsResponse{Columns: nil}, err From 593e13307cb2449e579805611546b431daf578f2 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 14 May 2024 22:26:51 +0530 Subject: [PATCH 2/5] UI: Fix replication slot field for queues (#1716) This PR fixes a bug where replication slot name field was not respected for PG to queues. Functionally tested --- ui/app/mirrors/create/cdc/cdc.tsx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index 495a406cc5..c820046c7d 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -79,8 +79,7 @@ export default function CDCConfigForm({ if ( (label.includes('snapshot') && mirrorConfig.doInitialSnapshot !== true) || (label === 'replication slot name' && - mirrorConfig.doInitialSnapshot === true && - !isQueue) || + mirrorConfig.doInitialSnapshot === true) || (label.includes('staging path') && defaultSyncMode(mirrorConfig.destination?.type) !== 'AVRO') || (isQueue && label.includes('soft delete')) || From d4eb4227e3f26a1e8fb9e904d770ac484913002b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 14 May 2024 23:27:05 +0000 Subject: [PATCH 3/5] replace infinite time values from postgres with NULL (#1720) --- flow/connectors/postgres/qvalue_convert.go | 24 ++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index ef12fd61c4..246da6b49a 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -247,11 +247,19 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( switch qvalueKind { case qvalue.QValueKindTimestamp: - timestamp := value.(time.Time) - return qvalue.QValueTimestamp{Val: timestamp}, nil + switch val := value.(type) { + case time.Time: + return qvalue.QValueTimestamp{Val: val}, nil + case pgtype.InfinityModifier: + return qvalue.QValueNull(qvalueKind), nil + } case qvalue.QValueKindTimestampTZ: - timestamp := value.(time.Time) - return qvalue.QValueTimestampTZ{Val: timestamp}, nil + switch val := value.(type) { + case time.Time: + return qvalue.QValueTimestampTZ{Val: val}, nil + case pgtype.InfinityModifier: + return qvalue.QValueNull(qvalueKind), nil + } case qvalue.QValueKindInterval: intervalObject := value.(pgtype.Interval) var interval datatypes.PeerDBInterval @@ -274,8 +282,12 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( return qvalue.QValueString{Val: string(intervalJSON)}, nil case qvalue.QValueKindDate: - date := value.(time.Time) - return qvalue.QValueDate{Val: date}, nil + switch val := value.(type) { + case time.Time: + return qvalue.QValueDate{Val: val}, nil + case pgtype.InfinityModifier: + return qvalue.QValueNull(qvalueKind), nil + } case qvalue.QValueKindTime: timeVal := value.(pgtype.Time) if timeVal.Valid { From 09dd75dce310f7f236963c1fcc822c73cd1cebf6 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 15 May 2024 17:38:38 +0530 Subject: [PATCH 4/5] Numeric: fix scale check for bqnumeric (#1723) For bigquery we were checking for numeric scale being < precision, also need to make sure it is less than 20 as that is the scale we set for bignumeric in bq --- flow/datatypes/numeric.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 0a072ae42f..356ef06605 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -61,7 +61,7 @@ func (BigQueryNumericCompatibility) MaxPrecision() int16 { } func (BigQueryNumericCompatibility) MaxScale() int16 { - return 37 + return 20 } func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { @@ -69,7 +69,7 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { } func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { - return precision > 0 && precision <= 38 && scale < precision + return precision > 0 && precision <= 38 && scale <= 20 && scale < precision } type DefaultNumericCompatibility struct{} From 1d45f6a5c020e6a5139cc44e79187e4f316938f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 15 May 2024 14:13:22 +0000 Subject: [PATCH 5/5] flow/nexus docker: nonroot (#1724) ui container already nonroot --- stacks/flow.Dockerfile | 8 +++++--- stacks/peerdb-server.Dockerfile | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index a8ac567d86..276cf13f18 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -19,9 +19,11 @@ ENV CGO_ENABLED=1 RUN go build -ldflags="-s -w" -o /root/peer-flow FROM alpine:3.19 AS flow-base -RUN apk add --no-cache ca-certificates geos -WORKDIR /root -COPY --from=builder /root/peer-flow . +RUN apk add --no-cache ca-certificates geos && \ + adduser -s /bin/sh -D peerdb +USER peerdb +WORKDIR /home/peerdb +COPY --from=builder --chown=peerdb /root/peer-flow . FROM flow-base AS flow-api diff --git a/stacks/peerdb-server.Dockerfile b/stacks/peerdb-server.Dockerfile index 63c78624e9..ac85f63dce 100644 --- a/stacks/peerdb-server.Dockerfile +++ b/stacks/peerdb-server.Dockerfile @@ -23,7 +23,9 @@ RUN CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse cargo build --release --bin peerd FROM alpine:3.19 RUN apk add --no-cache ca-certificates postgresql-client curl iputils && \ - mkdir -p /var/log/peerdb -WORKDIR /root -COPY --from=builder /root/nexus/target/release/peerdb-server . + adduser -s /bin/sh -D peerdb && \ + install -d -m 0755 -o peerdb /var/log/peerdb +USER peerdb +WORKDIR /home/peerdb +COPY --from=builder --chown=peerdb /root/nexus/target/release/peerdb-server . CMD ["./peerdb-server"]