From 756679ebdbca68e26b1dbbb3918307134e5a9bf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 7 May 2024 18:19:38 +0000 Subject: [PATCH 1/7] PopulateCountMap: use atomics (#1682) Race existed with parallel queue processing --- flow/connectors/utils/monitoring/monitoring.go | 8 +++++--- flow/connectors/utils/stream.go | 6 +----- flow/model/qrecord_stream.go | 8 +++++--- flow/model/record.go | 6 +++--- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 7ecf248c0a..16f65655c5 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -123,14 +123,16 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool *pgxpool.Pool, flowJobNa }() for destinationTableName, rowCounts := range tableNameRowsMapping { - numRows := rowCounts.InsertCount + rowCounts.UpdateCount + rowCounts.DeleteCount + inserts := rowCounts.InsertCount.Load() + updates := rowCounts.UpdateCount.Load() + deletes := rowCounts.DeleteCount.Load() _, err = insertBatchTablesTx.Exec(ctx, `INSERT INTO peerdb_stats.cdc_batch_table (flow_name,batch_id,destination_table_name,num_rows, insert_count,update_count,delete_count) VALUES($1,$2,$3,$4,$5,$6,$7) ON CONFLICT DO NOTHING`, - flowJobName, batchID, destinationTableName, numRows, - rowCounts.InsertCount, rowCounts.UpdateCount, rowCounts.DeleteCount) + flowJobName, batchID, destinationTableName, + inserts+updates+deletes, inserts, updates, deletes) if err != nil { return fmt.Errorf("error while inserting statistics into cdc_batch_table: %w", err) } diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 483d0f58ee..00688cc2ce 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -130,11 +130,7 @@ func recordToQRecordOrError[Items model.Items](batchID int64, record model.Recor func InitialiseTableRowsMap(tableMaps []*protos.TableMapping) map[string]*model.RecordTypeCounts { tableNameRowsMapping := make(map[string]*model.RecordTypeCounts, len(tableMaps)) for _, mapping := range tableMaps { - tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{ - InsertCount: 0, - UpdateCount: 0, - DeleteCount: 0, - } + tableNameRowsMapping[mapping.DestinationTableIdentifier] = &model.RecordTypeCounts{} } return tableNameRowsMapping diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 0a0c26d2e4..576074db3b 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -1,13 +1,15 @@ package model import ( + "sync/atomic" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) type RecordTypeCounts struct { - InsertCount int - UpdateCount int - DeleteCount int + InsertCount atomic.Int32 + UpdateCount atomic.Int32 + DeleteCount atomic.Int32 } type QRecordStream struct { diff --git a/flow/model/record.go b/flow/model/record.go index 30f3493cf4..9b728ff705 100644 --- a/flow/model/record.go +++ b/flow/model/record.go @@ -58,7 +58,7 @@ func (r *InsertRecord[T]) GetItems() T { func (r *InsertRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { recordCount, ok := mapOfCounts[r.DestinationTableName] if ok { - recordCount.InsertCount++ + recordCount.InsertCount.Add(1) } } @@ -91,7 +91,7 @@ func (r *UpdateRecord[T]) GetItems() T { func (r *UpdateRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { recordCount, ok := mapOfCounts[r.DestinationTableName] if ok { - recordCount.UpdateCount++ + recordCount.UpdateCount.Add(1) } } @@ -122,7 +122,7 @@ func (r *DeleteRecord[T]) GetItems() T { func (r *DeleteRecord[T]) PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) { recordCount, ok := mapOfCounts[r.DestinationTableName] if ok { - recordCount.DeleteCount++ + recordCount.DeleteCount.Add(1) } } From a11c3d4bf627d9949708204db97c9241aefcde01 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 8 May 2024 01:37:05 +0530 Subject: [PATCH 2/7] Docker + run scripts minor fixes (#1686) 1. `run-peerdb.sh` should now support Podman too (needs to be tested) 2. implict network creation, check not needed 3. misc things to script --- dev-peerdb.sh | 36 +++++++++++++++--------------------- docker-compose-dev.yml | 1 - docker-compose.yml | 1 - run-peerdb.sh | 25 ++++++++++++++++--------- 4 files changed, 31 insertions(+), 32 deletions(-) diff --git a/dev-peerdb.sh b/dev-peerdb.sh index 7c32151845..fb5da35f1b 100755 --- a/dev-peerdb.sh +++ b/dev-peerdb.sh @@ -1,33 +1,27 @@ #!/bin/sh -if test -z "$USE_PODMAN" +set -Eeu + +DOCKER="docker" +EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui" + +if test -n "${USE_PODMAN:=}" then - if ! command -v docker &> /dev/null - then - if command -v podman-compose - then - echo "docker could not be found on PATH, using podman-compose" + # 0 is found, checking for not found so we check for podman then + if $(docker compose &>/dev/null) && [ $? -ne 0 ]; then + if $(podman compose &>/dev/null) && [ $? -eq 0 ]; then + echo "docker could not be found on PATH, using podman compose" USE_PODMAN=1 else - echo "docker could not be found on PATH" + echo "docker compose could not be found on PATH" exit 1 fi fi fi -if test -z "$USE_PODMAN" -then - DOCKER="docker compose" - EXTRA_ARGS="--no-attach temporal --no-attach pyroscope --no-attach temporal-ui" -else - DOCKER="podman-compose --podman-run-args=--replace" - EXTRA_ARGS="" -fi - -# check if peerdb_network exists if not create it -if ! $DOCKER network inspect peerdb_network &> /dev/null -then - $DOCKER network create peerdb_network +if test -n "$USE_PODMAN"; then + DOCKER="podman" + EXTRA_ARGS="--podman-run-args=--replace" fi export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD) -exec $DOCKER -f docker-compose-dev.yml up --build $EXTRA_ARGS +exec $DOCKER compose -f docker-compose-dev.yml up --build $EXTRA_ARGS diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index d7d93f6de9..1868c755bf 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -221,5 +221,4 @@ volumes: networks: default: - external: true name: peerdb_network diff --git a/docker-compose.yml b/docker-compose.yml index 42fa2f26fc..fb144173aa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -192,5 +192,4 @@ volumes: networks: default: - external: true name: peerdb_network diff --git a/run-peerdb.sh b/run-peerdb.sh index 002bf93679..91bce75a02 100755 --- a/run-peerdb.sh +++ b/run-peerdb.sh @@ -1,17 +1,24 @@ #!/bin/sh set -Eeu -if ! command -v docker &> /dev/null +DOCKER="docker" + +if test -n "${USE_PODMAN:=}" then - echo "docker could not be found on PATH" - exit 1 + if ! (command -v docker &> /dev/null); then + if (command -v podman &> /dev/null); then + echo "docker could not be found on PATH, using podman" + USE_PODMAN=1 + else + echo "docker could not be found on PATH" + exit 1 + fi + fi fi -# check if peerdb_network exists if not create it -if ! docker network inspect peerdb_network &> /dev/null -then - docker network create peerdb_network +if test -n "$USE_PODMAN"; then + DOCKER="podman" fi -docker compose pull -docker compose -f docker-compose.yml up --no-attach catalog --no-attach temporal --no-attach temporal-ui --no-attach temporal-admin-tools +$DOCKER compose pull +exec $DOCKER compose -f docker-compose.yml up --no-attach catalog --no-attach temporal --no-attach temporal-ui --no-attach temporal-admin-tools From 829e7538c79c9e8aeb81dc8c7a07130cddeca0fd Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 8 May 2024 17:50:23 +0530 Subject: [PATCH 3/7] UI: Use redpanda logo for peer component's kafka (#1690) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Uses redpanda logo for peer component Screenshot 2024-05-08 at 2 16 56 PM Screenshot 2024-05-08 at 2 17 09 PM --- ui/components/PeerComponent.tsx | 2 +- ui/public/svgs/redpanda.svg | 31 +++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 ui/public/svgs/redpanda.svg diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index 6527a20247..bedbf41531 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -37,7 +37,7 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => { return '/svgs/ms.svg'; case DBType.KAFKA: case 'KAFKA': - return '/svgs/kafka.svg'; + return '/svgs/redpanda.svg'; case DBType.PUBSUB: case 'PUBSUB': return '/svgs/pubsub.svg'; diff --git a/ui/public/svgs/redpanda.svg b/ui/public/svgs/redpanda.svg new file mode 100644 index 0000000000..ba9d666066 --- /dev/null +++ b/ui/public/svgs/redpanda.svg @@ -0,0 +1,31 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From d9bdb107a7d13da1474e7d05c63495ed4af1d1f3 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 8 May 2024 19:04:51 +0530 Subject: [PATCH 4/7] CH UI: Help for user bucket setup (#1691) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Adds placeholder for s3 path field - Link to doc for setting up s3 bucket Screenshot 2024-05-08 at 6 20 46 PM --- ui/app/peers/create/[peerType]/helpers/ch.ts | 1 + ui/components/PeerForms/ClickhouseConfig.tsx | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/ui/app/peers/create/[peerType]/helpers/ch.ts b/ui/app/peers/create/[peerType]/helpers/ch.ts index d45029a4e3..09626d73c0 100644 --- a/ui/app/peers/create/[peerType]/helpers/ch.ts +++ b/ui/app/peers/create/[peerType]/helpers/ch.ts @@ -52,6 +52,7 @@ export const clickhouseSetting: PeerSetting[] = [ stateHandler: (value, setter) => setter((curr) => ({ ...curr, s3Path: value as string })), tips: `This is an S3 bucket/object URL field. This bucket will be used as our intermediate stage for CDC`, + placeholder: 's3://', }, { label: 'Access Key ID', diff --git a/ui/components/PeerForms/ClickhouseConfig.tsx b/ui/components/PeerForms/ClickhouseConfig.tsx index f2ea31eb0c..405dd9af77 100644 --- a/ui/components/PeerForms/ClickhouseConfig.tsx +++ b/ui/components/PeerForms/ClickhouseConfig.tsx @@ -8,6 +8,7 @@ import { RowWithSwitch, RowWithTextField } from '@/lib/Layout'; import { Switch } from '@/lib/Switch'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; +import Link from 'next/link'; import { useState } from 'react'; import { InfoPopover } from '../InfoPopover'; interface ConfigProps { @@ -122,6 +123,14 @@ export default function ClickhouseForm({ settings, setter }: ConfigProps) {

If you want this stage to belong to you, you can configure a bucket below. +

+ + Setting up an S3 bucket +