Skip to content

Commit

Permalink
remove/refactor functions from shared (#2307)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 3, 2024
1 parent 1f07e7b commit 928ee3c
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 82 deletions.
10 changes: 5 additions & 5 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
cmap "github.com/orcaman/concurrent-map/v2"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -186,10 +186,10 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE

func (m *EventHubManager) getEventHubMgmtClient(subID string) (*armeventhub.EventHubsClient, error) {
if subID == "" {
envSubID, err := utils.GetAzureSubscriptionID()
if err != nil {
slog.Error("failed to get azure subscription id", slog.Any("error", err))
return nil, err
envSubID := peerdbenv.GetEnvString("AZURE_SUBSCRIPTION_ID", "")
if envSubID == "" {
slog.Error("couldn't find AZURE_SUBSCRIPTION_ID in environment")
return nil, errors.New("couldn't find AZURE_SUBSCRIPTION_ID in environment")
}
subID = envSubID
}
Expand Down
8 changes: 3 additions & 5 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"math/rand/v2"
"testing"
"time"

Expand Down Expand Up @@ -84,11 +85,8 @@ func TestGetQRepPartitions(t *testing.T) {
}
defer conn.Close(context.Background())

// Generate a random schema name
rndUint, err := shared.RandomUInt64()
if err != nil {
t.Fatalf("Failed to generate random uint: %v", err)
}
//nolint:gosec // Generate a random schema name, number has no cryptographic significance
rndUint := rand.Uint64()
schemaName := fmt.Sprintf("test_%d", rndUint)

// Create the schema
Expand Down
14 changes: 5 additions & 9 deletions flow/connectors/postgres/sink_q.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"math/rand/v2"

"github.com/jackc/pgx/v5"

Expand Down Expand Up @@ -35,20 +36,15 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
}
}

randomUint, err := shared.RandomUInt64()
if err != nil {
qe.logger.Error("[pg_query_executor] failed to generate random uint", slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err)
stream.Close(err)
return 0, err
}
//nolint:gosec // number has no cryptographic significance
randomUint := rand.Uint64()

cursorName := fmt.Sprintf("peerdb_cursor_%d", randomUint)
fetchSize := shared.FetchAndChannelSize
cursorQuery := fmt.Sprintf("DECLARE %s CURSOR FOR %s", cursorName, query)
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args))
_, err = tx.Exec(ctx, cursorQuery, args...)
if err != nil {

if _, err := tx.Exec(ctx, cursorQuery, args...); err != nil {
qe.logger.Info("[pg_query_executor] failed to declare cursor",
slog.String("cursorQuery", cursorQuery), slog.Any("error", err))
err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
Expand Down
12 changes: 5 additions & 7 deletions flow/connectors/snowflake/qrep_avro_consolidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"fmt"
"log/slog"
"math/rand/v2"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

type SnowflakeAvroConsolidateHandler struct {
Expand Down Expand Up @@ -214,10 +214,8 @@ func (s *SnowflakeAvroConsolidateHandler) generateUpsertMergeCommand(

// handleUpsertMode handles the upsert mode
func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context) error {
runID, err := shared.RandomUInt64()
if err != nil {
return fmt.Errorf("failed to generate run ID: %w", err)
}
//nolint:gosec // number has no cryptographic significance
runID := rand.Uint64()

tempTableName := fmt.Sprintf("%s_temp_%d", s.dstTableName, runID)

Expand All @@ -230,8 +228,8 @@ func (s *SnowflakeAvroConsolidateHandler) handleUpsertMode(ctx context.Context)
s.connector.logger.Info("created temp table " + tempTableName)

copyCmd := s.getCopyTransformation(tempTableName)
_, err = s.connector.database.ExecContext(ctx, copyCmd)
if err != nil {

if _, err := s.connector.database.ExecContext(ctx, copyCmd); err != nil {
return fmt.Errorf("failed to run COPY INTO command: %w", err)
}
s.connector.logger.Info("copied file from stage " + s.stage + " to temp table " + tempTableName)
Expand Down
15 changes: 0 additions & 15 deletions flow/connectors/utils/azure.go

This file was deleted.

8 changes: 3 additions & 5 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"math/rand/v2"
"os"
"strings"
"testing"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type BigQueryTestHelper struct {
Expand All @@ -37,10 +37,8 @@ type BigQueryTestHelper struct {
func NewBigQueryTestHelper(t *testing.T) (*BigQueryTestHelper, error) {
t.Helper()
// random 64 bit int to namespace stateful schemas.
runID, err := shared.RandomUInt64()
if err != nil {
return nil, fmt.Errorf("failed to generate random uint64: %w", err)
}
//nolint:gosec // number has no cryptographic significance
runID := rand.Uint64()

jsonPath := os.Getenv("TEST_BQ_CREDS")
if jsonPath == "" {
Expand Down
9 changes: 3 additions & 6 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand/v2"
"os"
"testing"

Expand All @@ -13,7 +14,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type SnowflakeTestHelper struct {
Expand Down Expand Up @@ -47,11 +47,8 @@ func NewSnowflakeTestHelper(t *testing.T) (*SnowflakeTestHelper, error) {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
}

runID, err := shared.RandomUInt64()
if err != nil {
return nil, fmt.Errorf("failed to generate random uint64: %w", err)
}

//nolint:gosec // number has no cryptographic significance
runID := rand.Uint64()
testDatabaseName := fmt.Sprintf("e2e_test_%d", runID)

adminClient, err := connsnowflake.NewSnowflakeClient(context.Background(), config)
Expand Down
9 changes: 3 additions & 6 deletions flow/e2e/sqlserver/sqlserver_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package e2e_sqlserver
import (
"context"
"fmt"
"math/rand/v2"
"os"
"strconv"

peersql "github.com/PeerDB-io/peer-flow/connectors/sql"
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type SQLServerHelper struct {
Expand Down Expand Up @@ -45,11 +45,8 @@ func NewSQLServerHelper() (*SQLServerHelper, error) {
return nil, fmt.Errorf("invalid connection configs: %v", connErr)
}

rndNum, err := shared.RandomUInt64()
if err != nil {
return nil, err
}

//nolint:gosec // number has no cryptographic significance
rndNum := rand.Uint64()
testSchema := fmt.Sprintf("e2e_test_%d", rndNum)
if err := connector.CreateSchema(context.Background(), testSchema); err != nil {
return nil, err
Expand Down
24 changes: 0 additions & 24 deletions flow/shared/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,8 @@ package shared

import (
"crypto/rand"
"encoding/binary"
"errors"
)

// RandomInt64 returns a random 64 bit integer.
func RandomInt64() (int64, error) {
b := make([]byte, 8)
_, err := rand.Read(b)
if err != nil {
return 0, errors.New("could not generate random int64: " + err.Error())
}
// Convert bytes to int64
return int64(binary.LittleEndian.Uint64(b)), nil
}

// RandomUInt64 returns a random 64 bit unsigned integer.
func RandomUInt64() (uint64, error) {
b := make([]byte, 8)
_, err := rand.Read(b)
if err != nil {
return 0, errors.New("could not generate random uint64: " + err.Error())
}
// Convert bytes to uint64
return binary.LittleEndian.Uint64(b), nil
}

func RandomString(n int) string {
const alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
bytes := make([]byte, n)
Expand Down
File renamed without changes.

0 comments on commit 928ee3c

Please sign in to comment.