diff --git a/flow/concurrency/bound_selector.go b/flow/concurrency/bound_selector.go index 29735c09d6..75d6a5c983 100644 --- a/flow/concurrency/bound_selector.go +++ b/flow/concurrency/bound_selector.go @@ -13,7 +13,7 @@ type BoundSelector struct { ferrors []error } -func NewBoundSelector(limit int, total int, ctx workflow.Context) *BoundSelector { +func NewBoundSelector(limit int, ctx workflow.Context) *BoundSelector { return &BoundSelector{ ctx: ctx, limit: limit, diff --git a/flow/connectors/bigquery/qrecord_value_saver.go b/flow/connectors/bigquery/qrecord_value_saver.go deleted file mode 100644 index 202ac3df4d..0000000000 --- a/flow/connectors/bigquery/qrecord_value_saver.go +++ /dev/null @@ -1,203 +0,0 @@ -package connbigquery - -import ( - "fmt" - "math/big" - - "cloud.google.com/go/bigquery" - "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/google/uuid" -) - -type QRecordValueSaver struct { - ColumnNames []string - Record *model.QRecord - PartitionID string - RunID int64 -} - -// RatToBigQueryNumeric converts a *big.Rat to a decimal string compatible with -// BigQuery's NUMERIC type. -// -// BigQuery's NUMERIC type supports large-scale fixed-point numbers with up to -// 38 digits of precision and 9 digits of scale. This function converts a *big.Rat -// to a decimal string that respects these limits. -// -// The function uses *big.Rat's FloatString method with 9 as the argument, which -// converts the *big.Rat to a string that represents a floating-point number with -// 9 digits after the decimal point. The resulting string can be inserted into a -// NUMERIC field in BigQuery. -// -// Parameters: -// rat: The *big.Rat to convert. This should represent a decimal number with up to -// -// 38 digits of precision and 9 digits of scale. -// -// Returns: -// A string representing the *big.Rat as a decimal number with up to 38 digits -// of precision and 9 digits of scale. This string can be inserted into a NUMERIC -// field in BigQuery. -func RatToBigQueryNumeric(rat *big.Rat) string { - // Convert the *big.Rat to a decimal string with 9 digits of scale - return rat.FloatString(9) // 9 is the scale of the NUMERIC type -} - -func (q QRecordValueSaver) Save() (map[string]bigquery.Value, string, error) { - bqValues := make(map[string]bigquery.Value, q.Record.NumEntries) - - for i, v := range q.Record.Entries { - k := q.ColumnNames[i] - if v.Value == nil { - if v.Kind.IsArray() { - bqValues[k] = make([]interface{}, 0) - } else { - bqValues[k] = nil - } - continue - } - - switch v.Kind { - case qvalue.QValueKindFloat32: - val, ok := v.Value.(float32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to float64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindFloat64: - val, ok := v.Value.(float64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to float64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindInt16: - switch v.Value.(type) { - case int16: - bqValues[k] = v.Value - case int32: - bqValues[k] = int16(v.Value.(int32)) - case int64: - bqValues[k] = int16(v.Value.(int64)) - default: - return nil, "", fmt.Errorf("failed to convert %v to int16", v.Value) - } - - case qvalue.QValueKindInt32: - val, ok := v.Value.(int32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to int32", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindInt64: - val, ok := v.Value.(int64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to int64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindBoolean: - val, ok := v.Value.(bool) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to bool", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindString: - val, ok := v.Value.(string) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to string", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindTimestamp, qvalue.QValueKindDate, qvalue.QValueKindTime: - var err error - bqValues[k], err = v.GoTimeConvert() - if err != nil { - return nil, "", fmt.Errorf("failed to convert parse %v into time.Time", v) - } - - case qvalue.QValueKindNumeric: - val, ok := v.Value.(*big.Rat) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to *big.Rat", v.Value) - } - - bqValues[k] = RatToBigQueryNumeric(val) - - case qvalue.QValueKindBytes, qvalue.QValueKindBit: - val, ok := v.Value.([]byte) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []byte", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindUUID: - val, ok := v.Value.([16]byte) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to string", v.Value) - } - uuidVal := uuid.UUID(val) - bqValues[k] = uuidVal.String() - - case qvalue.QValueKindJSON: - val, ok := v.Value.(string) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to string", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayFloat32: - val, ok := v.Value.([]float32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []float32", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayFloat64: - val, ok := v.Value.([]float64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []float64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayInt32: - val, ok := v.Value.([]int32) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []int32", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayInt64: - val, ok := v.Value.([]int64) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []int64", v.Value) - } - bqValues[k] = val - - case qvalue.QValueKindArrayString: - val, ok := v.Value.([]string) - if !ok { - return nil, "", fmt.Errorf("failed to convert %v to []string", v.Value) - } - bqValues[k] = val - - default: - // Skip invalid QValueKind, but log the type for debugging - fmt.Printf("[bigquery] Invalid QValueKind: %v\n", v.Kind) - } - } - - // add partition id to the map - bqValues["PartitionID"] = q.PartitionID - - // add run id to the map - bqValues["RunID"] = q.RunID - - // log the bigquery values - // fmt.Printf("BigQuery Values: %v\n", bqValues) - - return bqValues, "", nil -} diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index d514a16646..8765bf7f6c 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -187,7 +187,6 @@ func (s *SnapshotFlowExecution) cloneTable( return nil } -// startChildQrepWorkflow starts a child workflow for query based replication. func (s *SnapshotFlowExecution) cloneTables( ctx workflow.Context, slotInfo *protos.SetupReplicationOutput, @@ -196,8 +195,7 @@ func (s *SnapshotFlowExecution) cloneTables( slog.Info(fmt.Sprintf("cloning tables for slot name %s and snapshotName %s", slotInfo.SlotName, slotInfo.SnapshotName)) - numTables := len(s.config.TableMappings) - boundSelector := concurrency.NewBoundSelector(maxParallelClones, numTables, ctx) + boundSelector := concurrency.NewBoundSelector(maxParallelClones, ctx) for _, v := range s.config.TableMappings { source := v.SourceTableIdentifier diff --git a/ui/app/peers/page.tsx b/ui/app/peers/page.tsx index c6baa73c0d..65182ae178 100644 --- a/ui/app/peers/page.tsx +++ b/ui/app/peers/page.tsx @@ -1,3 +1,4 @@ +'use client'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; @@ -5,18 +6,18 @@ import { LayoutMain } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; import Link from 'next/link'; import { Header } from '../../lib/Header'; -import { getTruePeer } from '../api/peers/route'; -import prisma from '../utils/prisma'; import PeersTable from './peersTable'; export const dynamic = 'force-dynamic'; -async function fetchPeers() { - const peers = await prisma.peers.findMany({}); - return peers; -} +import { ProgressCircle } from '@/lib/ProgressCircle'; + +import useSWR from 'swr'; + +const fetcher = (...args: [any]) => fetch(...args).then((res) => res.json()); + +export default function Peers() { + const { data: peers, error, isLoading } = useSWR('/api/peers', fetcher); -export default async function Peers() { - let peers = await fetchPeers(); return ( @@ -41,10 +42,17 @@ export default async function Peers() { - getTruePeer(peer))} - /> + {isLoading && ( +
+ +
+ )} + {!isLoading && ( + peer)} + /> + )}
);