Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 16, 2024
2 parents b6df941 + 1d45f6a commit 2322003
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 33 deletions.
20 changes: 3 additions & 17 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName string) (*protos.PostgresConfig, error) {
Expand Down Expand Up @@ -88,27 +87,14 @@ func (h *FlowRequestHandler) GetTablesInSchema(
defer tunnel.Close()
defer peerConn.Close(ctx)

pgVersion, err := shared.GetMajorVersion(ctx, peerConn)
if err != nil {
slog.Error("unable to get pgversion for schema tables", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}

relKindFilterClause := "t.relkind IN ('r', 'p')"
if pgVersion <= shared.POSTGRES_12 {
relKindFilterClause = "t.relkind = 'r'"
}

rows, err := peerConn.Query(ctx, `SELECT DISTINCT ON (t.relname)
t.relname,
(con.contype = 'p' OR t.relreplident in ('i', 'f')) AS can_mirror,
pg_size_pretty(pg_total_relation_size(t.oid))::text AS table_size
FROM pg_class t
LEFT JOIN pg_namespace n ON t.relnamespace = n.oid
LEFT JOIN pg_constraint con ON con.conrelid = t.oid
WHERE n.nspname = $1 AND `+
relKindFilterClause+
` ORDER BY t.relname, can_mirror DESC;
WHERE n.nspname = $1 AND t.relkind = 'r' ORDER BY t.relname, can_mirror DESC;
`, req.SchemaName)
if err != nil {
slog.Info("failed to fetch publications", slog.Any("error", err))
Expand Down Expand Up @@ -195,7 +181,7 @@ func (h *FlowRequestHandler) GetColumns(

rows, err := peerConn.Query(ctx, `
SELECT
attname AS column_name,
distinct attname AS column_name,
format_type(atttypid, atttypmod) AS data_type,
CASE
WHEN attnum = ANY(conkey) THEN true
Expand All @@ -217,7 +203,7 @@ func (h *FlowRequestHandler) GetColumns(
AND pg_attribute.attnum > 0
AND NOT attisdropped
ORDER BY
attnum;
column_name;
`, req.SchemaName, req.TableName)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
Expand Down
24 changes: 18 additions & 6 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,19 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (

switch qvalueKind {
case qvalue.QValueKindTimestamp:
timestamp := value.(time.Time)
return qvalue.QValueTimestamp{Val: timestamp}, nil
switch val := value.(type) {
case time.Time:
return qvalue.QValueTimestamp{Val: val}, nil
case pgtype.InfinityModifier:
return qvalue.QValueNull(qvalueKind), nil
}
case qvalue.QValueKindTimestampTZ:
timestamp := value.(time.Time)
return qvalue.QValueTimestampTZ{Val: timestamp}, nil
switch val := value.(type) {
case time.Time:
return qvalue.QValueTimestampTZ{Val: val}, nil
case pgtype.InfinityModifier:
return qvalue.QValueNull(qvalueKind), nil
}
case qvalue.QValueKindInterval:
intervalObject := value.(pgtype.Interval)
var interval datatypes.PeerDBInterval
Expand All @@ -274,8 +282,12 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (

return qvalue.QValueString{Val: string(intervalJSON)}, nil
case qvalue.QValueKindDate:
date := value.(time.Time)
return qvalue.QValueDate{Val: date}, nil
switch val := value.(type) {
case time.Time:
return qvalue.QValueDate{Val: val}, nil
case pgtype.InfinityModifier:
return qvalue.QValueNull(qvalueKind), nil
}
case qvalue.QValueKindTime:
timeVal := value.(pgtype.Time)
if timeVal.Valid {
Expand Down
4 changes: 2 additions & 2 deletions flow/datatypes/numeric.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ func (BigQueryNumericCompatibility) MaxPrecision() int16 {
}

func (BigQueryNumericCompatibility) MaxScale() int16 {
return 37
return 20
}

func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return PeerDBBigQueryPrecision, PeerDBBigQueryScale
}

func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale < precision
return precision > 0 && precision <= 38 && scale <= 20 && scale < precision
}

type DefaultNumericCompatibility struct{}
Expand Down
8 changes: 5 additions & 3 deletions stacks/flow.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ ENV CGO_ENABLED=1
RUN go build -ldflags="-s -w" -o /root/peer-flow

FROM alpine:3.19 AS flow-base
RUN apk add --no-cache ca-certificates geos
WORKDIR /root
COPY --from=builder /root/peer-flow .
RUN apk add --no-cache ca-certificates geos && \
adduser -s /bin/sh -D peerdb
USER peerdb
WORKDIR /home/peerdb
COPY --from=builder --chown=peerdb /root/peer-flow .

FROM flow-base AS flow-api

Expand Down
8 changes: 5 additions & 3 deletions stacks/peerdb-server.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ RUN CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse cargo build --release --bin peerd

FROM alpine:3.19
RUN apk add --no-cache ca-certificates postgresql-client curl iputils && \
mkdir -p /var/log/peerdb
WORKDIR /root
COPY --from=builder /root/nexus/target/release/peerdb-server .
adduser -s /bin/sh -D peerdb && \
install -d -m 0755 -o peerdb /var/log/peerdb
USER peerdb
WORKDIR /home/peerdb
COPY --from=builder --chown=peerdb /root/nexus/target/release/peerdb-server .
CMD ["./peerdb-server"]
3 changes: 1 addition & 2 deletions ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ export default function CDCConfigForm({
if (
(label.includes('snapshot') && mirrorConfig.doInitialSnapshot !== true) ||
(label === 'replication slot name' &&
mirrorConfig.doInitialSnapshot === true &&
!isQueue) ||
mirrorConfig.doInitialSnapshot === true) ||
(label.includes('staging path') &&
defaultSyncMode(mirrorConfig.destination?.type) !== 'AVRO') ||
(isQueue && label.includes('soft delete')) ||
Expand Down

0 comments on commit 2322003

Please sign in to comment.