From bb8608ef78cef62829a63a44c9da3721c91782f7 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sat, 9 Dec 2023 13:38:31 -0500 Subject: [PATCH 1/3] Fix button sizing issue on resizes (#785) --- ui/app/mirrors/page.tsx | 12 +----------- ui/app/peers/page.tsx | 16 ++-------------- 2 files changed, 3 insertions(+), 25 deletions(-) diff --git a/ui/app/mirrors/page.tsx b/ui/app/mirrors/page.tsx index 0f380bd1e4..717af7b956 100644 --- a/ui/app/mirrors/page.tsx +++ b/ui/app/mirrors/page.tsx @@ -68,17 +68,7 @@ export default async function Mirrors() {
+ } From da25258b4988d698925eb41dcfc5d53bc297a9df Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sat, 9 Dec 2023 13:38:59 -0500 Subject: [PATCH 2/3] Re-add versions.go (#784) I missed adding this in a previous commit --- flow/cmd/version.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 flow/cmd/version.go diff --git a/flow/cmd/version.go b/flow/cmd/version.go new file mode 100644 index 0000000000..58074fa75c --- /dev/null +++ b/flow/cmd/version.go @@ -0,0 +1,16 @@ +package main + +import ( + "context" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" +) + +func (h *FlowRequestHandler) GetVersion( + ctx context.Context, + req *protos.PeerDBVersionRequest, +) (*protos.PeerDBVersionResponse, error) { + version := utils.GetEnvString("PEERDB_VERSION_SHA_SHORT", "unknown") + return &protos.PeerDBVersionResponse{Version: version}, nil +} From 77cd516d8a34a268736ea0f1c6c459154f8053f7 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Sun, 10 Dec 2023 00:15:50 +0530 Subject: [PATCH 3/3] Remove unused stuff (#780) https://github.com/PeerDB-io/peerdb/pull/708 makes this obsolete --- flow/connectors/postgres/bench_test.sql | 60 ------------------- .../postgres/qrep_partition_test.go | 28 --------- flow/workflows/cdc_flow.go | 9 --- 3 files changed, 97 deletions(-) delete mode 100644 flow/connectors/postgres/bench_test.sql diff --git a/flow/connectors/postgres/bench_test.sql b/flow/connectors/postgres/bench_test.sql deleted file mode 100644 index b7e4fcbbcf..0000000000 --- a/flow/connectors/postgres/bench_test.sql +++ /dev/null @@ -1,60 +0,0 @@ -CREATE SCHEMA IF NOT EXISTS bench; - -CREATE TABLE bench.large_table ( - id bigserial PRIMARY KEY, - text1 text, - text2 text, - text3 text, - text4 text, - uuid1 uuid, - uuid2 uuid, - uuid3 uuid, - float1 float8, - float2 float8, - float3 float8, - float4 float8, - int1 int, - int2 int, - int3 int, - int4 int -); - -DO -$do$ -DECLARE - counter bigint := 0; -BEGIN - WHILE counter < 5000000 LOOP -- adjust the number based on your needs - INSERT INTO bench.large_table( - text1, text2, text3, text4, - uuid1, uuid2, uuid3, - float1, float2, float3, float4, - int1, int2, int3, int4 - ) - VALUES ( - md5(random()::text), - md5(random()::text), - md5(random()::text), - md5(random()::text), - gen_random_uuid(), - gen_random_uuid(), - gen_random_uuid(), - random() * 1000000, - random() * 1000000, - random() * 1000000, - random() * 1000000, - floor(random() * 100000)::int, - floor(random() * 100000)::int, - floor(random() * 100000)::int, - floor(random() * 100000)::int - ); - counter := counter + 1; - - -- Print progress every 1000 rows - IF counter % 1000 = 0 THEN - RAISE NOTICE 'Inserted % rows', counter; - END IF; - END LOOP; -END -$do$ -; diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index a4e90a8544..9eead5ec34 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -10,7 +10,6 @@ import ( util "github.com/PeerDB-io/peer-flow/utils" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/types/known/timestamppb" ) type testCase struct { @@ -60,33 +59,6 @@ func newTestCaseForCTID(schema string, name string, rows uint32, expectedNum int } } -func (tc *testCase) appendPartition(start time.Time, end time.Time) *testCase { - tsRange := &protos.PartitionRange_TimestampRange{ - TimestampRange: &protos.TimestampPartitionRange{ - Start: timestamppb.New(start), - End: timestamppb.New(end), - }, - } - tc.want = append(tc.want, &protos.QRepPartition{ - PartitionId: "test_uuid", - Range: &protos.PartitionRange{ - Range: tsRange, - }, - }) - return tc -} - -func (tc *testCase) appendPartitions(start, end time.Time, numPartitions int) *testCase { - duration := end.Sub(start) - partitionDuration := duration / time.Duration(numPartitions) - for i := 0; i < numPartitions; i++ { - partitionStart := start.Add(time.Duration(i) * partitionDuration) - partitionEnd := start.Add(time.Duration(i+1) * partitionDuration) - tc.appendPartition(partitionStart, partitionEnd) - } - return tc -} - func TestGetQRepPartitions(t *testing.T) { // log.SetLevel(log.DebugLevel) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index eb9b42cc03..c42f64c9d1 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -280,12 +280,6 @@ func CDCFlowWorkflowWithConfig( state.Progress = append(state.Progress, "executed setup flow and snapshot flow") } - heartbeatCancelCtx, cancelHeartbeat := workflow.WithCancel(ctx) - walHeartbeatCtx := workflow.WithActivityOptions(heartbeatCancelCtx, workflow.ActivityOptions{ - StartToCloseTimeout: 7 * 24 * time.Hour, - }) - workflow.ExecuteActivity(walHeartbeatCtx, flowable.SendWALHeartbeat, cfg) - syncFlowOptions := &protos.SyncFlowOptions{ BatchSize: int32(limits.MaxBatchSize), } @@ -443,9 +437,6 @@ func CDCFlowWorkflowWithConfig( selector.Select(ctx) } - // cancel the SendWalHeartbeat activity - defer cancelHeartbeat() - state.TruncateProgress() return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) }