diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9fa859b5b4..1d6433c8f4 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -16,6 +16,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" + "github.com/PeerDB-io/peer-flow/alerting" "github.com/PeerDB-io/peer-flow/connectors" connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -26,7 +27,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" - "github.com/PeerDB-io/peer-flow/shared/alerting" ) // CheckConnectionResult is the result of a CheckConnection call. diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 0e75ea2e06..6ee2bb5a8b 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -9,11 +9,11 @@ import ( "go.temporal.io/sdk/activity" + "github.com/PeerDB-io/peer-flow/alerting" "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" - "github.com/PeerDB-io/peer-flow/shared/alerting" ) type SnapshotActivity struct { diff --git a/flow/shared/alerting/alerting.go b/flow/alerting/alerting.go similarity index 100% rename from flow/shared/alerting/alerting.go rename to flow/alerting/alerting.go diff --git a/flow/shared/alerting/slack_alert_sender.go b/flow/alerting/slack_alert_sender.go similarity index 100% rename from flow/shared/alerting/slack_alert_sender.go rename to flow/alerting/slack_alert_sender.go diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 89680f51d9..07cf5a19dc 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -11,10 +11,10 @@ import ( "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" + "github.com/PeerDB-io/peer-flow/alerting" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/shared" - "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index c43515dc98..476a274de6 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -16,11 +16,11 @@ import ( "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" + "github.com/PeerDB-io/peer-flow/alerting" "github.com/PeerDB-io/peer-flow/connectors" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/shared" - "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5d3bf1e679..5963cedc10 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -537,7 +537,7 @@ func (c *BigQueryConnector) NormalizeRecords(ctx context.Context, req *model.Nor // doesn't exceed the limit. We should make this configurable. const batchSize = 8 stmtNum := 0 - err = utils.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error { + err = shared.ArrayIterChunks(unchangedToastColumns, batchSize, func(chunk []string) error { stmtNum += 1 mergeStmt := mergeGen.generateMergeStmt(chunk) c.logger.Info(fmt.Sprintf("running merge statement %d for table %s..", diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 59a269a092..e7810f6a8f 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -6,9 +6,9 @@ import ( "cloud.google.com/go/bigquery" - "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" ) type mergeStmtGenerator struct { @@ -233,7 +233,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchange for _, cols := range unchangedToastColumns { unchangedColsArray := strings.Split(cols, ",") - otherCols := utils.ArrayMinus(allCols, unchangedColsArray) + otherCols := shared.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)) for _, colName := range otherCols { tmpArray = append(tmpArray, fmt.Sprintf("`%s`=_d.%s", colName, m.shortColumn[colName])) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 47621ea2d0..ed306f73bb 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -7,6 +7,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" + "github.com/PeerDB-io/peer-flow/alerting" connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" @@ -17,7 +18,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/shared/alerting" ) var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality") diff --git a/flow/connectors/postgres/normalize_stmt_generator.go b/flow/connectors/postgres/normalize_stmt_generator.go index 01fe11273d..f9ff1d5f07 100644 --- a/flow/connectors/postgres/normalize_stmt_generator.go +++ b/flow/connectors/postgres/normalize_stmt_generator.go @@ -11,6 +11,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" ) type normalizeStmtGenerator struct { @@ -201,7 +202,7 @@ func (n *normalizeStmtGenerator) generateUpdateStatements(quotedCols []string) [ for i, unchangedToastCol := range unchangedColsArray { unchangedColsArray[i] = QuoteIdentifier(unchangedToastCol) } - otherCols := utils.ArrayMinus(quotedCols, unchangedColsArray) + otherCols := shared.ArrayMinus(quotedCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)) for _, colName := range otherCols { tmpArray = append(tmpArray, fmt.Sprintf("%s=src.%s", colName, colName)) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index dd3f24b5a9..49afa99002 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -19,13 +19,14 @@ import ( "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" + "github.com/PeerDB-io/peer-flow/alerting" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/PeerDB-io/peer-flow/shared/alerting" + "github.com/PeerDB-io/peer-flow/shared" ) type PostgresConnector struct { @@ -1124,7 +1125,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro if err != nil { return fmt.Errorf("failed to check tables in publication: %w", err) } - notPresentTables := utils.ArrayMinus(additionalSrcTables, tableNames) + notPresentTables := shared.ArrayMinus(additionalSrcTables, tableNames) if len(notPresentTables) > 0 { return fmt.Errorf("some additional tables not present in custom publication: %s", strings.Join(notPresentTables, ", ")) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index defc2f492f..830d9b8450 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -12,8 +12,8 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/lib/pq/oid" - "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" ) var big10 = big.NewInt(10) @@ -208,7 +208,7 @@ func convertToArray[T any](kind qvalue.QValueKind, value interface{}) (qvalue.QV case []T: return qvalue.QValue{Kind: kind, Value: v}, nil case []interface{}: - return qvalue.QValue{Kind: kind, Value: utils.ArrayCastElements[T](v)}, nil + return qvalue.QValue{Kind: kind, Value: shared.ArrayCastElements[T](v)}, nil } return qvalue.QValue{}, fmt.Errorf("failed to parse array %s from %T: %v", kind, value, value) } diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index dd166e708f..98b45f3911 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/numeric" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" ) type mergeStmtGenerator struct { @@ -178,7 +179,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string) []string for _, cols := range m.unchangedToastColumns { unchangedColsArray := strings.Split(cols, ",") - otherCols := utils.ArrayMinus(allCols, unchangedColsArray) + otherCols := shared.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)+2) for _, colName := range otherCols { normalizedColName := SnowflakeIdentifierNormalize(colName) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 598297106b..6239f7d148 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -23,6 +23,7 @@ import ( "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" + "github.com/PeerDB-io/peer-flow/alerting" "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" @@ -34,7 +35,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" - "github.com/PeerDB-io/peer-flow/shared/alerting" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) diff --git a/flow/shared/additional_tables.go b/flow/shared/additional_tables.go index 00dc8efa42..0eb0b79f35 100644 --- a/flow/shared/additional_tables.go +++ b/flow/shared/additional_tables.go @@ -1,7 +1,6 @@ package shared import ( - "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" ) @@ -22,6 +21,6 @@ func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping, additionalDstTables = append(additionalDstTables, additionalTableMapping.DestinationTableIdentifier) } - return utils.ArraysHaveOverlap(currentSrcTables, additionalSrcTables) || - utils.ArraysHaveOverlap(currentDstTables, additionalDstTables) + return ArraysHaveOverlap(currentSrcTables, additionalSrcTables) || + ArraysHaveOverlap(currentDstTables, additionalDstTables) } diff --git a/flow/connectors/utils/array.go b/flow/shared/array.go similarity index 98% rename from flow/connectors/utils/array.go rename to flow/shared/array.go index 2633153ae6..4acca12d4d 100644 --- a/flow/connectors/utils/array.go +++ b/flow/shared/array.go @@ -1,4 +1,4 @@ -package utils +package shared // first - second func ArrayMinus[T comparable](first, second []T) []T {