diff --git a/flow/connectors/postgres/bench_test.sql b/flow/connectors/postgres/bench_test.sql deleted file mode 100644 index b7e4fcbbcf..0000000000 --- a/flow/connectors/postgres/bench_test.sql +++ /dev/null @@ -1,60 +0,0 @@ -CREATE SCHEMA IF NOT EXISTS bench; - -CREATE TABLE bench.large_table ( - id bigserial PRIMARY KEY, - text1 text, - text2 text, - text3 text, - text4 text, - uuid1 uuid, - uuid2 uuid, - uuid3 uuid, - float1 float8, - float2 float8, - float3 float8, - float4 float8, - int1 int, - int2 int, - int3 int, - int4 int -); - -DO -$do$ -DECLARE - counter bigint := 0; -BEGIN - WHILE counter < 5000000 LOOP -- adjust the number based on your needs - INSERT INTO bench.large_table( - text1, text2, text3, text4, - uuid1, uuid2, uuid3, - float1, float2, float3, float4, - int1, int2, int3, int4 - ) - VALUES ( - md5(random()::text), - md5(random()::text), - md5(random()::text), - md5(random()::text), - gen_random_uuid(), - gen_random_uuid(), - gen_random_uuid(), - random() * 1000000, - random() * 1000000, - random() * 1000000, - random() * 1000000, - floor(random() * 100000)::int, - floor(random() * 100000)::int, - floor(random() * 100000)::int, - floor(random() * 100000)::int - ); - counter := counter + 1; - - -- Print progress every 1000 rows - IF counter % 1000 = 0 THEN - RAISE NOTICE 'Inserted % rows', counter; - END IF; - END LOOP; -END -$do$ -; diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index a4e90a8544..9eead5ec34 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -10,7 +10,6 @@ import ( util "github.com/PeerDB-io/peer-flow/utils" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/types/known/timestamppb" ) type testCase struct { @@ -60,33 +59,6 @@ func newTestCaseForCTID(schema string, name string, rows uint32, expectedNum int } } -func (tc *testCase) appendPartition(start time.Time, end time.Time) *testCase { - tsRange := &protos.PartitionRange_TimestampRange{ - TimestampRange: &protos.TimestampPartitionRange{ - Start: timestamppb.New(start), - End: timestamppb.New(end), - }, - } - tc.want = append(tc.want, &protos.QRepPartition{ - PartitionId: "test_uuid", - Range: &protos.PartitionRange{ - Range: tsRange, - }, - }) - return tc -} - -func (tc *testCase) appendPartitions(start, end time.Time, numPartitions int) *testCase { - duration := end.Sub(start) - partitionDuration := duration / time.Duration(numPartitions) - for i := 0; i < numPartitions; i++ { - partitionStart := start.Add(time.Duration(i) * partitionDuration) - partitionEnd := start.Add(time.Duration(i+1) * partitionDuration) - tc.appendPartition(partitionStart, partitionEnd) - } - return tc -} - func TestGetQRepPartitions(t *testing.T) { // log.SetLevel(log.DebugLevel) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index eb9b42cc03..c42f64c9d1 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -280,12 +280,6 @@ func CDCFlowWorkflowWithConfig( state.Progress = append(state.Progress, "executed setup flow and snapshot flow") } - heartbeatCancelCtx, cancelHeartbeat := workflow.WithCancel(ctx) - walHeartbeatCtx := workflow.WithActivityOptions(heartbeatCancelCtx, workflow.ActivityOptions{ - StartToCloseTimeout: 7 * 24 * time.Hour, - }) - workflow.ExecuteActivity(walHeartbeatCtx, flowable.SendWALHeartbeat, cfg) - syncFlowOptions := &protos.SyncFlowOptions{ BatchSize: int32(limits.MaxBatchSize), } @@ -443,9 +437,6 @@ func CDCFlowWorkflowWithConfig( selector.Select(ctx) } - // cancel the SendWalHeartbeat activity - defer cancelHeartbeat() - state.TruncateProgress() return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state) }