Skip to content

Commit

Permalink
Remove unused stuff (#780)
Browse files Browse the repository at this point in the history
#708 makes this obsolete
  • Loading branch information
Amogh-Bharadwaj authored Dec 9, 2023
1 parent da25258 commit 77cd516
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 97 deletions.
60 changes: 0 additions & 60 deletions flow/connectors/postgres/bench_test.sql

This file was deleted.

28 changes: 0 additions & 28 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/timestamppb"
)

type testCase struct {
Expand Down Expand Up @@ -60,33 +59,6 @@ func newTestCaseForCTID(schema string, name string, rows uint32, expectedNum int
}
}

func (tc *testCase) appendPartition(start time.Time, end time.Time) *testCase {
tsRange := &protos.PartitionRange_TimestampRange{
TimestampRange: &protos.TimestampPartitionRange{
Start: timestamppb.New(start),
End: timestamppb.New(end),
},
}
tc.want = append(tc.want, &protos.QRepPartition{
PartitionId: "test_uuid",
Range: &protos.PartitionRange{
Range: tsRange,
},
})
return tc
}

func (tc *testCase) appendPartitions(start, end time.Time, numPartitions int) *testCase {
duration := end.Sub(start)
partitionDuration := duration / time.Duration(numPartitions)
for i := 0; i < numPartitions; i++ {
partitionStart := start.Add(time.Duration(i) * partitionDuration)
partitionEnd := start.Add(time.Duration(i+1) * partitionDuration)
tc.appendPartition(partitionStart, partitionEnd)
}
return tc
}

func TestGetQRepPartitions(t *testing.T) {
// log.SetLevel(log.DebugLevel)

Expand Down
9 changes: 0 additions & 9 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,6 @@ func CDCFlowWorkflowWithConfig(
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")
}

heartbeatCancelCtx, cancelHeartbeat := workflow.WithCancel(ctx)
walHeartbeatCtx := workflow.WithActivityOptions(heartbeatCancelCtx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
})
workflow.ExecuteActivity(walHeartbeatCtx, flowable.SendWALHeartbeat, cfg)

syncFlowOptions := &protos.SyncFlowOptions{
BatchSize: int32(limits.MaxBatchSize),
}
Expand Down Expand Up @@ -443,9 +437,6 @@ func CDCFlowWorkflowWithConfig(
selector.Select(ctx)
}

// cancel the SendWalHeartbeat activity
defer cancelHeartbeat()

state.TruncateProgress()
return nil, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, limits, state)
}

0 comments on commit 77cd516

Please sign in to comment.