From db9a1caebee8a3e1c35d45bd62d3fb1cc38ddcac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 13 Dec 2023 22:20:51 +0000 Subject: [PATCH] Merge flow/utils into flow/shared (#820) Resolves util/utils ambiguity with flow/connectors/utils --- flow/connectors/bigquery/merge_statement_generator.go | 4 ++-- flow/connectors/postgres/qrep_partition_test.go | 3 +-- flow/connectors/postgres/qrep_query_executor.go | 3 +-- flow/connectors/postgres/qrep_sync_method.go | 3 +-- flow/connectors/snowflake/client.go | 4 ++-- flow/connectors/snowflake/qrep_avro_sync.go | 5 ++--- flow/connectors/snowflake/snowflake.go | 3 +-- flow/connectors/utils/cdc_records/cdc_records_storage.go | 4 ++-- flow/e2e/bigquery/bigquery_helper.go | 4 ++-- flow/e2e/bigquery/peer_flow_bq_test.go | 4 ++-- flow/e2e/snowflake/peer_flow_sf_test.go | 4 ++-- flow/e2e/snowflake/snowflake_helper.go | 4 ++-- flow/e2e/sqlserver/sqlserver_helper.go | 4 ++-- flow/{utils => shared}/crypto.go | 2 +- flow/{utils => shared}/random.go | 2 +- 15 files changed, 24 insertions(+), 29 deletions(-) rename flow/{utils => shared}/crypto.go (97%) rename flow/{utils => shared}/random.go (98%) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 427fcd788..f150e63ac 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -8,7 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - util "github.com/PeerDB-io/peer-flow/utils" + "github.com/PeerDB-io/peer-flow/shared" ) type mergeStmtGenerator struct { @@ -33,7 +33,7 @@ func (m *mergeStmtGenerator) generateMergeStmts() []string { // return an empty array for now flattenedCTE := m.generateFlattenedCTE() deDupedCTE := m.generateDeDupedCTE() - tempTable := fmt.Sprintf("_peerdb_de_duplicated_data_%s", util.RandomString(5)) + tempTable := fmt.Sprintf("_peerdb_de_duplicated_data_%s", shared.RandomString(5)) // create temp table stmt createTempTableStmt := fmt.Sprintf( "CREATE TEMP TABLE %s AS (%s, %s);", diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index aabe28aab..164312010 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -9,7 +9,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" - util "github.com/PeerDB-io/peer-flow/utils" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" ) @@ -78,7 +77,7 @@ func TestGetQRepPartitions(t *testing.T) { } // Generate a random schema name - rndUint, err := util.RandomUInt64() + rndUint, err := shared.RandomUInt64() if err != nil { t.Fatalf("Failed to generate random uint: %v", err) } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index ef2b6fec5..08f9f9348 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -10,7 +10,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - util "github.com/PeerDB-io/peer-flow/utils" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" @@ -364,7 +363,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( } } - randomUint, err := util.RandomUInt64() + randomUint, err := shared.RandomUInt64() if err != nil { stream.Records <- &model.QRecordOrError{ Err: fmt.Errorf("failed to generate random uint: %w", err), diff --git a/flow/connectors/postgres/qrep_sync_method.go b/flow/connectors/postgres/qrep_sync_method.go index 4590fd7b0..a54769e3d 100644 --- a/flow/connectors/postgres/qrep_sync_method.go +++ b/flow/connectors/postgres/qrep_sync_method.go @@ -11,7 +11,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/shared" - util "github.com/PeerDB-io/peer-flow/utils" "github.com/jackc/pgx/v5" "google.golang.org/protobuf/encoding/protojson" @@ -84,7 +83,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( } } else { // Step 2.1: Create a temp staging table - stagingTableName := fmt.Sprintf("_peerdb_staging_%s", util.RandomString(8)) + stagingTableName := fmt.Sprintf("_peerdb_staging_%s", shared.RandomString(8)) stagingTableIdentifier := pgx.Identifier{s.connector.metadataSchema, stagingTableName} dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table} diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index 79aa2a7b9..ae2da1a89 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -11,7 +11,7 @@ import ( peersql "github.com/PeerDB-io/peer-flow/connectors/sql" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - util "github.com/PeerDB-io/peer-flow/utils" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgtype" ) @@ -24,7 +24,7 @@ type SnowflakeClient struct { } func NewSnowflakeClient(ctx context.Context, config *protos.SnowflakeConfig) (*SnowflakeClient, error) { - privateKey, err := util.DecodePKCS8PrivateKey([]byte(config.PrivateKey), config.Password) + privateKey, err := shared.DecodePKCS8PrivateKey([]byte(config.PrivateKey), config.Password) if err != nil { return nil, fmt.Errorf("failed to read private key: %w", err) } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 20852782a..a765ac9e6 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -14,7 +14,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - util "github.com/PeerDB-io/peer-flow/utils" _ "github.com/snowflakedb/gosnowflake" "go.temporal.io/sdk/activity" ) @@ -58,7 +57,7 @@ func (s *SnowflakeAvroSyncMethod) SyncRecords( return 0, err } - partitionID := util.RandomString(16) + partitionID := shared.RandomString(16) avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName) if err != nil { return 0, err @@ -502,7 +501,7 @@ func (s *SnowflakeAvroWriteHandler) HandleUpsertMode( flowJobName string, copyInfo *CopyInfo, ) error { - runID, err := util.RandomUInt64() + runID, err := shared.RandomUInt64() if err != nil { return fmt.Errorf("failed to generate run ID: %w", err) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index f854647e9..1d7dcba31 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -18,7 +18,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - util "github.com/PeerDB-io/peer-flow/utils" "github.com/jackc/pgx/v5/pgtype" "github.com/snowflakedb/gosnowflake" "go.temporal.io/sdk/activity" @@ -115,7 +114,7 @@ type UnchangedToastColumnResult struct { func NewSnowflakeConnector(ctx context.Context, snowflakeProtoConfig *protos.SnowflakeConfig) (*SnowflakeConnector, error) { - PrivateKeyRSA, err := util.DecodePKCS8PrivateKey([]byte(snowflakeProtoConfig.PrivateKey), + PrivateKeyRSA, err := shared.DecodePKCS8PrivateKey([]byte(snowflakeProtoConfig.PrivateKey), snowflakeProtoConfig.Password) if err != nil { return nil, err diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 5c6fba4d3..3147045a7 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -13,7 +13,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" - util "github.com/PeerDB-io/peer-flow/utils" + "github.com/PeerDB-io/peer-flow/shared" "github.com/cockroachdb/pebble" ) @@ -42,7 +42,7 @@ func NewCDCRecordsStore(flowJobName string) *cdcRecordsStore { pebbleDB: nil, numRecords: 0, flowJobName: flowJobName, - dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, util.RandomString(8)), + dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)), numRecordsSwitchThreshold: peerdbenv.GetPeerDBCDCDiskSpillThreshold(), } } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index d17fe7a66..f3508d033 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -16,7 +16,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - util "github.com/PeerDB-io/peer-flow/utils" + "github.com/PeerDB-io/peer-flow/shared" "google.golang.org/api/iterator" ) @@ -36,7 +36,7 @@ type BigQueryTestHelper struct { // NewBigQueryTestHelper creates a new BigQueryTestHelper. func NewBigQueryTestHelper() (*BigQueryTestHelper, error) { // random 64 bit int to namespace stateful schemas. - runID, err := util.RandomUInt64() + runID, err := shared.RandomUInt64() if err != nil { return nil, fmt.Errorf("failed to generate random uint64: %w", err) } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 37e32ce34..70a27b52d 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" - util "github.com/PeerDB-io/peer-flow/utils" + "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" @@ -77,7 +77,7 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { slog.Info("Unable to load .env file, using default values from env") } - suffix := util.RandomString(8) + suffix := shared.RandomString(8) tsSuffix := time.Now().Format("20060102150405") bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) pool, err := e2e.SetupPostgres(bqSuffix) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9ea391b5c..037bd10ed 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -15,7 +15,7 @@ import ( "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - util "github.com/PeerDB-io/peer-flow/utils" + "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" @@ -65,7 +65,7 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { slog.Info("Unable to load .env file, using default values from env") } - suffix := util.RandomString(8) + suffix := shared.RandomString(8) tsSuffix := time.Now().Format("20060102150405") pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index b4405725a..38fefeddc 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -12,7 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - util "github.com/PeerDB-io/peer-flow/utils" + "github.com/PeerDB-io/peer-flow/shared" ) type SnowflakeTestHelper struct { @@ -48,7 +48,7 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) { } peer := generateSFPeer(&config) - runID, err := util.RandomUInt64() + runID, err := shared.RandomUInt64() if err != nil { return nil, fmt.Errorf("failed to generate random uint64: %w", err) } diff --git a/flow/e2e/sqlserver/sqlserver_helper.go b/flow/e2e/sqlserver/sqlserver_helper.go index 775b7f83d..749367d99 100644 --- a/flow/e2e/sqlserver/sqlserver_helper.go +++ b/flow/e2e/sqlserver/sqlserver_helper.go @@ -10,7 +10,7 @@ import ( connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - util "github.com/PeerDB-io/peer-flow/utils" + "github.com/PeerDB-io/peer-flow/shared" ) type SQLServerHelper struct { @@ -46,7 +46,7 @@ func NewSQLServerHelper(name string) (*SQLServerHelper, error) { return nil, fmt.Errorf("invalid connection configs: %v", connErr) } - rndNum, err := util.RandomUInt64() + rndNum, err := shared.RandomUInt64() if err != nil { return nil, err } diff --git a/flow/utils/crypto.go b/flow/shared/crypto.go similarity index 97% rename from flow/utils/crypto.go rename to flow/shared/crypto.go index 61acc8884..7aeb58c2f 100644 --- a/flow/utils/crypto.go +++ b/flow/shared/crypto.go @@ -1,4 +1,4 @@ -package util +package shared import ( "crypto/rsa" diff --git a/flow/utils/random.go b/flow/shared/random.go similarity index 98% rename from flow/utils/random.go rename to flow/shared/random.go index 7222a23f9..9565c34c4 100644 --- a/flow/utils/random.go +++ b/flow/shared/random.go @@ -1,4 +1,4 @@ -package util +package shared import ( "crypto/rand"