Skip to content

Commit

Permalink
Merge branch 'main' into maps-caps
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 13, 2023
2 parents a571b8b + db9a1ca commit aaed601
Show file tree
Hide file tree
Showing 15 changed files with 24 additions and 29 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/PeerDB-io/peer-flow/shared"
)

type mergeStmtGenerator struct {
Expand All @@ -33,7 +33,7 @@ func (m *mergeStmtGenerator) generateMergeStmts() []string {
// return an empty array for now
flattenedCTE := m.generateFlattenedCTE()
deDupedCTE := m.generateDeDupedCTE()
tempTable := fmt.Sprintf("_peerdb_de_duplicated_data_%s", util.RandomString(5))
tempTable := fmt.Sprintf("_peerdb_de_duplicated_data_%s", shared.RandomString(5))
// create temp table stmt
createTempTableStmt := fmt.Sprintf(
"CREATE TEMP TABLE %s AS (%s, %s);",
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -78,7 +77,7 @@ func TestGetQRepPartitions(t *testing.T) {
}

// Generate a random schema name
rndUint, err := util.RandomUInt64()
rndUint, err := shared.RandomUInt64()
if err != nil {
t.Fatalf("Failed to generate random uint: %v", err)
}
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
Expand Down Expand Up @@ -364,7 +363,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
}
}

randomUint, err := util.RandomUInt64()
randomUint, err := shared.RandomUInt64()
if err != nil {
stream.Records <- &model.QRecordOrError{
Err: fmt.Errorf("failed to generate random uint: %w", err),
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/jackc/pgx/v5"

"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -84,7 +83,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
}
} else {
// Step 2.1: Create a temp staging table
stagingTableName := fmt.Sprintf("_peerdb_staging_%s", util.RandomString(8))
stagingTableName := fmt.Sprintf("_peerdb_staging_%s", shared.RandomString(8))
stagingTableIdentifier := pgx.Identifier{s.connector.metadataSchema, stagingTableName}
dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
peersql "github.com/PeerDB-io/peer-flow/connectors/sql"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5/pgtype"
)

Expand All @@ -24,7 +24,7 @@ type SnowflakeClient struct {
}

func NewSnowflakeClient(ctx context.Context, config *protos.SnowflakeConfig) (*SnowflakeClient, error) {
privateKey, err := util.DecodePKCS8PrivateKey([]byte(config.PrivateKey), config.Password)
privateKey, err := shared.DecodePKCS8PrivateKey([]byte(config.PrivateKey), config.Password)
if err != nil {
return nil, fmt.Errorf("failed to read private key: %w", err)
}
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
_ "github.com/snowflakedb/gosnowflake"
"go.temporal.io/sdk/activity"
)
Expand Down Expand Up @@ -58,7 +57,7 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords(
return 0, err
}

partitionID := util.RandomString(16)
partitionID := shared.RandomString(16)
avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName)
if err != nil {
return 0, err
Expand Down Expand Up @@ -502,7 +501,7 @@ func (s *SnowflakeAvroWriteHandler) HandleUpsertMode(
flowJobName string,
copyInfo *CopyInfo,
) error {
runID, err := util.RandomUInt64()
runID, err := shared.RandomUInt64()
if err != nil {
return fmt.Errorf("failed to generate run ID: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/jackc/pgx/v5/pgtype"
"github.com/snowflakedb/gosnowflake"
"go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -115,7 +114,7 @@ type UnchangedToastColumnResult struct {

func NewSnowflakeConnector(ctx context.Context,
snowflakeProtoConfig *protos.SnowflakeConfig) (*SnowflakeConnector, error) {
PrivateKeyRSA, err := util.DecodePKCS8PrivateKey([]byte(snowflakeProtoConfig.PrivateKey),
PrivateKeyRSA, err := shared.DecodePKCS8PrivateKey([]byte(snowflakeProtoConfig.PrivateKey),
snowflakeProtoConfig.Password)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/utils/cdc_records/cdc_records_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/cockroachdb/pebble"
)

Expand Down Expand Up @@ -42,7 +42,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore {
pebbleDB: nil,
numRecords: 0,
flowJobName: flowJobName,
dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, util.RandomString(8)),
dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)),
numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/PeerDB-io/peer-flow/shared"
"google.golang.org/api/iterator"
)

Expand All @@ -36,7 +36,7 @@ type BigQueryTestHelper struct {
// NewBigQueryTestHelper creates a new BigQueryTestHelper.
func NewBigQueryTestHelper() (*BigQueryTestHelper, error) {
// random 64 bit int to namespace stateful schemas.
runID, err := util.RandomUInt64()
runID, err := shared.RandomUInt64()
if err != nil {
return nil, fmt.Errorf("failed to generate random uint64: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/e2e"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/joho/godotenv"
Expand Down Expand Up @@ -77,7 +77,7 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ {
slog.Info("Unable to load .env file, using default values from env")
}

suffix := util.RandomString(8)
suffix := shared.RandomString(8)
tsSuffix := time.Now().Format("20060102150405")
bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix)
pool, err := e2e.SetupPostgres(bqSuffix)
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/joho/godotenv"
Expand Down Expand Up @@ -65,7 +65,7 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF {
slog.Info("Unable to load .env file, using default values from env")
}

suffix := util.RandomString(8)
suffix := shared.RandomString(8)
tsSuffix := time.Now().Format("20060102150405")
pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix)

Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/PeerDB-io/peer-flow/shared"
)

type SnowflakeTestHelper struct {
Expand Down Expand Up @@ -48,7 +48,7 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) {
}

peer := generateSFPeer(&config)
runID, err := util.RandomUInt64()
runID, err := shared.RandomUInt64()
if err != nil {
return nil, fmt.Errorf("failed to generate random uint64: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/sqlserver/sqlserver_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/PeerDB-io/peer-flow/shared"
)

type SQLServerHelper struct {
Expand Down Expand Up @@ -46,7 +46,7 @@ func NewSQLServerHelper(name string) (*SQLServerHelper, error) {
return nil, fmt.Errorf("invalid connection configs: %v", connErr)
}

rndNum, err := util.RandomUInt64()
rndNum, err := shared.RandomUInt64()
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion flow/utils/crypto.go → flow/shared/crypto.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package shared

import (
"crypto/rsa"
Expand Down
2 changes: 1 addition & 1 deletion flow/utils/random.go → flow/shared/random.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package shared

import (
"crypto/rand"
Expand Down

0 comments on commit aaed601

Please sign in to comment.