Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move utils.Array into shared #1423

Merged
merged 1 commit into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
Expand All @@ -26,7 +27,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

// CheckConnectionResult is the result of a CheckConnection call.
Expand Down
2 changes: 1 addition & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (

"go.temporal.io/sdk/activity"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

type SnapshotActivity struct {
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/alerting"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor
// doesn't exceed the limit. We should make this configurable.
const batchSize = 8
stmtNum := 0
err = utils.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error {
err = shared.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error {
stmtNum += 1
mergeStmt := mergeGen.generateMergeStmt(chunk)
c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..",
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

"cloud.google.com/go/bigquery"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type mergeStmtGenerator struct {
Expand Down Expand Up @@ -233,7 +233,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchange

for _, cols := range unchangedToastColumns {
unchangedColsArray := strings.Split(cols, ",")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
otherCols := shared.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, m.shortColumn[colName]))
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/jackc/pgx/v5/pgxpool"

"github.com/PeerDB-io/peer-flow/alerting"
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared/alerting"
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/normalize_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type normalizeStmtGenerator struct {
Expand Down Expand Up @@ -201,7 +202,7 @@ func (n *normalizeStmtGenerator) generateUpdateStatements(quotedCols []string) [
for i, unchangedToastCol := range unchangedColsArray {
unchangedColsArray[i] = QuoteIdentifier(unchangedToastCol)
}
otherCols := utils.ArrayMinus(quotedCols, unchangedColsArray)
otherCols := shared.ArrayMinus(quotedCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("%s=src.%s", colName, colName))
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared/alerting"
"github.com/PeerDB-io/peer-flow/shared"
)

type PostgresConnector struct {
Expand Down Expand Up @@ -1124,7 +1125,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}
notPresentTables := utils.ArrayMinus(additionalSrcTables, tableNames)
notPresentTables := shared.ArrayMinus(additionalSrcTables, tableNames)
if len(notPresentTables) > 0 {
return fmt.Errorf("some additional tables not present in custom publication: %s",
strings.Join(notPresentTables, ", "))
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/lib/pq/oid"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

var big10 = big.NewInt(10)
Expand Down Expand Up @@ -208,7 +208,7 @@ func convertToArray[T any](kind qvalue.QValueKind, value interface{}) (qvalue.QV
case []T:
return qvalue.QValue{Kind: kind, Value: v}, nil
case []interface{}:
return qvalue.QValue{Kind: kind, Value: utils.ArrayCastElements[T](v)}, nil
return qvalue.QValue{Kind: kind, Value: shared.ArrayCastElements[T](v)}, nil
}
return qvalue.QValue{}, fmt.Errorf("failed to parse array %s from %T: %v", kind, value, value)
}
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/snowflake/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type mergeStmtGenerator struct {
Expand Down Expand Up @@ -178,7 +179,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string

for _, cols := range m.unchangedToastColumns {
unchangedColsArray := strings.Split(cols, ",")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
otherCols := shared.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols)+2)
for _, colName := range otherCols {
normalizedColName := SnowflakeIdentifierNormalize(colName)
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
Expand All @@ -34,7 +35,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

Expand Down
5 changes: 2 additions & 3 deletions flow/shared/additional_tables.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shared

import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

Expand All @@ -22,6 +21,6 @@ func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping,
additionalDstTables = append(additionalDstTables, additionalTableMapping.DestinationTableIdentifier)
}

return utils.ArraysHaveOverlap(currentSrcTables, additionalSrcTables) ||
utils.ArraysHaveOverlap(currentDstTables, additionalDstTables)
return ArraysHaveOverlap(currentSrcTables, additionalSrcTables) ||
ArraysHaveOverlap(currentDstTables, additionalDstTables)
}
2 changes: 1 addition & 1 deletion flow/connectors/utils/array.go → flow/shared/array.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package shared

// first - second
func ArrayMinus[T comparable](first, second []T) []T {
Expand Down
Loading