From bc2bdee768786e4a1382be6b04efe4a29d2be463 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 18 Dec 2023 07:25:59 +0000 Subject: [PATCH 1/2] git rm connectors/bigquery/qrecord_value_saver.go (#838) dead code also remove unused arg to BoundSelector & remove bad comment --- flow/concurrency/bound_selector.go | 2 +- .../bigquery/qrecord_value_saver.go | 203 ------------------ flow/workflows/snapshot_flow.go | 4 +- 3 files changed, 2 insertions(+), 207 deletions(-) delete mode 100644 flow/connectors/bigquery/qrecord_value_saver.go diff --git a/flow/concurrency/bound_selector.go b/flow/concurrency/bound_selector.go index 29735c09d6..75d6a5c983 100644 --- a/flow/concurrency/bound_selector.go +++ b/flow/concurrency/bound_selector.go @@ -13,7 +13,7 @@ type BoundSelector struct { ferrors []error } -func NewBoundSelector(limit int, total int, ctx workflow.Context) *BoundSelector { +func NewBoundSelector(limit int, ctx workflow.Context) *BoundSelector { return &BoundSelector{ ctx: ctx, limit: limit, diff --git a/flow/connectors/bigquery/qrecord_value_saver.go b/flow/connectors/bigquery/qrecord_value_saver.go deleted file mode 100644 index 202ac3df4d..0000000000 --- a/flow/connectors/bigquery/qrecord_value_saver.go +++ /dev/null @@ -1,203 +0,0 @@ -package connbigquery - -import ( - "fmt" - "math/big" - - "cloud.google.com/go/bigquery" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/google/uuid" -) - -type QRecordValueSaver struct { - ColumnNames []string - Record *model.QRecord - PartitionID string - RunID int64 -} - -// RatToBigQueryNumeric converts a *big.Rat to a decimal string compatible with -// BigQuery's NUMERIC type. -// -// BigQuery's NUMERIC type supports large-scale fixed-point numbers with up to -// 38 digits of precision and 9 digits of scale. This function converts a *big.Rat -// to a decimal string that respects these limits. -// -// The function uses *big.Rat's FloatString method with 9 as the argument, which -// converts the *big.Rat to a string that represents a floating-point number with -// 9 digits after the decimal point. The resulting string can be inserted into a -// NUMERIC field in BigQuery. -// -// Parameters: -// rat: The *big.Rat to convert. This should represent a decimal number with up to -// -// 38 digits of precision and 9 digits of scale. -// -// Returns: -// A string representing the *big.Rat as a decimal number with up to 38 digits -// of precision and 9 digits of scale. This string can be inserted into a NUMERIC -// field in BigQuery. -func RatToBigQueryNumeric(rat *big.Rat) string { - // Convert the *big.Rat to a decimal string with 9 digits of scale - return rat.FloatString(9) // 9 is the scale of the NUMERIC type -} - -func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) { - bqValues := make(map[string]bigquery.Value, q.Record.NumEntries) - - for i, v := range q.Record.Entries { - k := q.ColumnNames[i] - if v.Value == nil { - if v.Kind.IsArray() { - bqValues[k] = make([]interface{}, 0) - } else { - bqValues[k] = nil - } - continue - } - - switch v.Kind { - case qvalue.QValueKindFloat32: - val, ok := v.Value.(float32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to float64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindFloat64: - val, ok := v.Value.(float64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to float64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindInt16: - switch v.Value.(type) { - case int16: - bqValues[k] = v.Value - case int32: - bqValues[k] = int16(v.Value.(int32)) - case int64: - bqValues[k] = int16(v.Value.(int64)) - default: - return nil, "", fmt.Errorf("failed to convert %v to int16", v.Value) - } - - case qvalue.QValueKindInt32: - val, ok := v.Value.(int32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to int32", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindInt64: - val, ok := v.Value.(int64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to int64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindBoolean: - val, ok := v.Value.(bool) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to bool", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindString: - val, ok := v.Value.(string) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to string", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindTimestamp, qvalue.QValueKindDate, qvalue.QValueKindTime: - var err error - bqValues[k], err = v.GoTimeConvert() - if err != nil { - return nil, "", fmt.Errorf("failed to convert parse %v into time.Time", v) - } - - case qvalue.QValueKindNumeric: - val, ok := v.Value.(*big.Rat) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to *big.Rat", v.Value) - } - - bqValues[k] = RatToBigQueryNumeric(val) - - case qvalue.QValueKindBytes, qvalue.QValueKindBit: - val, ok := v.Value.([]byte) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []byte", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindUUID: - val, ok := v.Value.([16]byte) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to string", v.Value) - } - uuidVal := uuid.UUID(val) - bqValues[k] = uuidVal.String() - - case qvalue.QValueKindJSON: - val, ok := v.Value.(string) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to string", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayFloat32: - val, ok := v.Value.([]float32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []float32", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayFloat64: - val, ok := v.Value.([]float64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []float64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayInt32: - val, ok := v.Value.([]int32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []int32", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayInt64: - val, ok := v.Value.([]int64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []int64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayString: - val, ok := v.Value.([]string) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []string", v.Value) - } - bqValues[k] = val - - default: - // Skip invalid QValueKind, but log the type for debugging - fmt.Printf("[bigquery] Invalid QValueKind: %v\n", v.Kind) - } - } - - // add partition id to the map - bqValues["PartitionID"] = q.PartitionID - - // add run id to the map - bqValues["RunID"] = q.RunID - - // log the bigquery values - // fmt.Printf("BigQuery Values: %v\n", bqValues) - - return bqValues, "", nil -} diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index d514a16646..8765bf7f6c 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -187,7 +187,6 @@ func (s *SnapshotFlowExecution) cloneTable( return nil } -// startChildQrepWorkflow starts a child workflow for query based replication. func (s *SnapshotFlowExecution) cloneTables( ctx workflow.Context, slotInfo *protos.SetupReplicationOutput, @@ -196,8 +195,7 @@ func (s *SnapshotFlowExecution) cloneTables( slog.Info(fmt.Sprintf("cloning tables for slot name %s and snapshotName %s", slotInfo.SlotName, slotInfo.SnapshotName)) - numTables := len(s.config.TableMappings) - boundSelector := concurrency.NewBoundSelector(maxParallelClones, numTables, ctx) + boundSelector := concurrency.NewBoundSelector(maxParallelClones, ctx) for _, v := range s.config.TableMappings { source := v.SourceTableIdentifier From 49b152e85b3e6f46a129973b3993c51e67d30731 Mon Sep 17 00:00:00 2001 From: pankaj-peerdb <149565017+pankaj-peerdb@users.noreply.github.com> Date: Mon, 18 Dec 2023 18:46:09 +0530 Subject: [PATCH 2/2] Improve loading: peers page (#840) Convert /peers page to client components, so that we dont have to do a round trip on every page visit. Converting it to client side rendering, caches the page once it is loaded and we can load data via api. When data is loading we show the loader. We also cache the data(via swr) and refresh the data in background. --- ui/app/peers/page.tsx | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/ui/app/peers/page.tsx b/ui/app/peers/page.tsx index c6baa73c0d..65182ae178 100644 --- a/ui/app/peers/page.tsx +++ b/ui/app/peers/page.tsx @@ -1,3 +1,4 @@ +'use client'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; @@ -5,18 +6,18 @@ import { LayoutMain } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; import Link from 'next/link'; import { Header } from '../../lib/Header'; -import { getTruePeer } from '../api/peers/route'; -import prisma from '../utils/prisma'; import PeersTable from './peersTable'; export const dynamic = 'force-dynamic'; -async function fetchPeers() { - const peers = await prisma.peers.findMany({}); - return peers; -} +import { ProgressCircle } from '@/lib/ProgressCircle'; + +import useSWR from 'swr'; + +const fetcher = (...args: [any]) => fetch(...args).then((res) => res.json()); + +export default function Peers() { + const { data: peers, error, isLoading } = useSWR('/api/peers', fetcher); -export default async function Peers() { - let peers = await fetchPeers(); return ( @@ -41,10 +42,17 @@ export default async function Peers() { - getTruePeer(peer))} - /> + {isLoading && ( +
+ +
+ )} + {!isLoading && ( + peer)} + /> + )}
);