diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 1c5f713416..e96df083ac 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -191,8 +191,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, for _, col := range v.Exclude { exclude[col] = struct{}{} } - tblNameMapping[v.SourceTableIdentifier] = model.NameAndExclude { - Name: v.DestinationTableIdentifier, + tblNameMapping[v.SourceTableIdentifier] = model.NameAndExclude{ + Name: v.DestinationTableIdentifier, Exclude: exclude, } } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 658af02824..fe177e8ffa 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -782,10 +782,10 @@ func (c *PostgresConnector) SetupReplication(signal *SlotSignal, req *protos.Set return fmt.Errorf("error checking for replication slot and publication: %w", err) } - tableNameMapping := make(map[string]model.NameAndExclude); + tableNameMapping := make(map[string]model.NameAndExclude) for k, v := range req.TableNameMapping { - tableNameMapping[k] = model.NameAndExclude { - Name: v, + tableNameMapping[k] = model.NameAndExclude{ + Name: v, Exclude: make(map[string]struct{}, 0), } } diff --git a/flow/model/model.go b/flow/model/model.go index 9ac58f7608..ffc2c296bd 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -13,7 +13,7 @@ import ( ) type NameAndExclude struct { - Name string + Name string Exclude map[string]struct{} } diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 8c5b03db01..cae676146b 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -201,9 +201,9 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables( if mapping.SourceTableIdentifier == srcTableName { if len(mapping.Exclude) != 0 { tableSchema = &protos.TableSchema{ - TableIdentifier: tableSchema.TableIdentifier, - Columns: maps.Clone(tableSchema.Columns), - PrimaryKeyColumns: tableSchema.PrimaryKeyColumns, + TableIdentifier: tableSchema.TableIdentifier, + Columns: maps.Clone(tableSchema.Columns), + PrimaryKeyColumns: tableSchema.PrimaryKeyColumns, IsReplicaIdentityFull: tableSchema.IsReplicaIdentityFull, } for _, exclude := range mapping.Exclude { diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 2e4622cdb8..26f8475e12 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -12,10 +12,10 @@ import ( "github.com/PeerDB-io/peer-flow/shared" "github.com/google/uuid" logrus "github.com/sirupsen/logrus" - "golang.org/x/exp/maps" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" + "golang.org/x/exp/maps" ) type SnapshotFlowExecution struct { diff --git a/ui/app/dto/MirrorsDTO.ts b/ui/app/dto/MirrorsDTO.ts index 5d5e20c5ca..33fa094c59 100644 --- a/ui/app/dto/MirrorsDTO.ts +++ b/ui/app/dto/MirrorsDTO.ts @@ -1,17 +1,18 @@ -import {FlowConnectionConfigs, QRepConfig} from '@/grpc_generated/flow'; -import {Dispatch, SetStateAction} from 'react'; +import { FlowConnectionConfigs, QRepConfig } from '@/grpc_generated/flow'; +import { Dispatch, SetStateAction } from 'react'; export type UCreateMirrorResponse = { created: boolean; }; export type UDropMirrorResponse = { - dropped: boolean; errorMessage : string; + dropped: boolean; + errorMessage: string; }; export type CDCConfig = FlowConnectionConfigs; -export type MirrorConfig = CDCConfig|QRepConfig; -export type MirrorSetter = Dispatch>; +export type MirrorConfig = CDCConfig | QRepConfig; +export type MirrorSetter = Dispatch>; export type TableMapRow = { source: string; destination: string; diff --git a/ui/app/mirrors/create/tablemapping.tsx b/ui/app/mirrors/create/tablemapping.tsx index 9e67795cd7..14a393cc5d 100644 --- a/ui/app/mirrors/create/tablemapping.tsx +++ b/ui/app/mirrors/create/tablemapping.tsx @@ -103,7 +103,7 @@ const TableMapping = ({ source: `${schemaName}.${tableName}`, destination: dstName, partitionKey: '', - exclude: [], + exclude: [], selected: false, }); }