Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into remove-temporal-tests…
Browse files Browse the repository at this point in the history
…uite
  • Loading branch information
serprex committed Feb 29, 2024
2 parents 2c84c68 + 449b86f commit f3fe29f
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
go-version: "1.22"
cache: false
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@v4
with:
version: v1.56
working-directory: ./flow
Expand Down
6 changes: 3 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,16 +660,16 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})

errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
return nil
return context.Canceled
})

err = errGroup.Wait()
if err != nil {
if err != nil && err != context.Canceled {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/avro_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestAvroTransform(t *testing.T) {
"ST_GEOGFROMTEXT(`col1`) AS `col1`",
"PARSE_JSON(`col2`,wide_number_mode=>'round') AS `col2`",
"`camelCol4`",
"CURRENT_TIMESTAMP AS `sync_col`",
}
transformedCols := getTransformedColumns(dstSchema, "sync_col", "del_col")
if !reflect.DeepEqual(transformedCols, expectedTransformCols) {
Expand Down
24 changes: 15 additions & 9 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,15 @@ func (s *QRepAvroSyncMethod) SyncRecords(
func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softDeleteCol string) []string {
transformedColumns := make([]string, 0, len(*dstSchema))
for _, col := range *dstSchema {
if col.Name == syncedAtCol || col.Name == softDeleteCol {
if col.Name == syncedAtCol { // PeerDB column
transformedColumns = append(transformedColumns, "CURRENT_TIMESTAMP AS `"+col.Name+"`")
continue
}
if col.Name == softDeleteCol { // PeerDB column
transformedColumns = append(transformedColumns, "FALSE AS `"+col.Name+"`")
continue
}

switch col.Type {
case bigquery.GeographyFieldType:
transformedColumns = append(transformedColumns,
Expand Down Expand Up @@ -184,18 +190,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client

insertColumns := make([]string, 0, len(dstTableMetadata.Schema))
for _, col := range dstTableMetadata.Schema {
insertColumns = append(insertColumns, fmt.Sprintf("`%s`", col.Name))
}

insertColumnSQL := strings.Join(insertColumns, ", ")
transformedColumns := getTransformedColumns(&dstTableMetadata.Schema, syncedAtCol, softDeleteCol)
selector := strings.Join(transformedColumns, ", ")

if softDeleteCol != "" { // PeerDB column
selector += ", FALSE"
}
if syncedAtCol != "" { // PeerDB column
selector += ", CURRENT_TIMESTAMP"
}
// Insert the records from the staging table into the destination table
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;",
dstTableName, selector, stagingDatasetTable.string())
insertStmt := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM `%s`;",
dstTableName, insertColumnSQL, selector, stagingDatasetTable.string())

s.connector.logger.Info("Performing transaction inside QRep sync function", flowLog)

Expand Down
52 changes: 34 additions & 18 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type CDCFlowWorkflowState struct {
RelationMessageMapping model.RelationMessageMapping
CurrentFlowStatus protos.FlowStatus
// flow config update request, set to nil after processed
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
FlowConfigUpdate *protos.CDCFlowConfigUpdate
// options passed to all SyncFlows
SyncFlowOptions *protos.SyncFlowOptions
}
Expand All @@ -59,7 +59,7 @@ func NewCDCFlowWorkflowState(cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflow
SyncFlowErrors: nil,
NormalizeFlowErrors: nil,
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
FlowConfigUpdates: nil,
FlowConfigUpdate: nil,
SyncFlowOptions: &protos.SyncFlowOptions{
BatchSize: cfg.MaxBatchSize,
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
Expand Down Expand Up @@ -141,21 +141,23 @@ func GetChildWorkflowID(
type CDCFlowWorkflowResult = CDCFlowWorkflowState

const (
maxSyncsPerCdcFlow = 60
maxSyncsPerCdcFlow = 32
)

func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Context,
func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Context,
cfg *protos.FlowConnectionConfigs, state *CDCFlowWorkflowState,
mirrorNameSearch map[string]interface{},
) error {
for _, flowConfigUpdate := range state.FlowConfigUpdates {
flowConfigUpdate := state.FlowConfigUpdate
if flowConfigUpdate != nil {
if len(flowConfigUpdate.AdditionalTables) == 0 {
continue
return nil
}
if shared.AdditionalTablesHasOverlap(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables) {
w.logger.Warn("duplicate source/destination tables found in additionalTables")
continue
return nil
}
state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT

alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
Expand Down Expand Up @@ -200,12 +202,16 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont

maps.Copy(state.SyncFlowOptions.SrcTableIdNameMapping, res.SyncFlowOptions.SrcTableIdNameMapping)
maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, res.SyncFlowOptions.TableNameSchemaMapping)
maps.Copy(state.SyncFlowOptions.RelationMessageMapping, res.SyncFlowOptions.RelationMessageMapping)

state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)

if w.syncFlowFuture != nil {
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
}

// finished processing, wipe it
state.FlowConfigUpdate = nil
}
// finished processing, wipe it
state.FlowConfigUpdates = nil
return nil
}

Expand All @@ -223,9 +229,8 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener(
if cdcConfigUpdate.IdleTimeout > 0 {
state.SyncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}
// do this irrespective of additional tables being present, for auto unpausing
state.FlowConfigUpdate = cdcConfigUpdate

if w.syncFlowFuture != nil {
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
Expand Down Expand Up @@ -299,17 +304,20 @@ func CDCFlowWorkflow(

for state.ActiveSignal == model.PauseSignal {
// only place we block on receive, so signal processing is immediate
for state.ActiveSignal == model.PauseSignal && ctx.Err() == nil {
for state.ActiveSignal == model.PauseSignal && state.FlowConfigUpdate == nil && ctx.Err() == nil {
w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime)))
selector.Select(ctx)
}
if err := ctx.Err(); err != nil {
return state, err
}

err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
if state.FlowConfigUpdate != nil {
err = w.processCDCFlowConfigUpdate(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
}
state.ActiveSignal = model.NoopSignal
}
}

Expand Down Expand Up @@ -591,13 +599,21 @@ func CDCFlowWorkflow(
}

if restart {
if state.ActiveSignal == model.PauseSignal {
finished = true
}

for ctx.Err() == nil && (!finished || mainLoopSelector.HasPending()) {
mainLoopSelector.Select(ctx)
}

if err := ctx.Err(); err != nil {
w.logger.Info("mirror canceled", slog.Any("error", err))
return state, err
return nil, err
}

// important to control the size of inputs.
state.TruncateProgress(w.logger)
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

const (
maxSyncsPerSyncFlow = 72
maxSyncsPerSyncFlow = 64
)

func SyncFlowWorkflow(
Expand Down
16 changes: 10 additions & 6 deletions ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
}
};

const isNotPaused =
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];

return (
<div>
<Label variant='title3'>Edit {mirrorId}</Label>
Expand Down Expand Up @@ -178,6 +182,11 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
omitAdditionalTablesMapping={omitAdditionalTablesMapping}
/>

{isNotPaused ? (
<Label>Mirror can only be edited while paused.</Label>
) : (
<Label>Editing mirror will automatically unpause it.</Label>
)}
<div style={{ display: 'flex' }}>
<Button
style={{
Expand All @@ -187,12 +196,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
height: '2.5rem',
}}
variant='normalSolid'
disabled={
loading ||
(additionalTables.length > 0 &&
mirrorState.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED])
}
disabled={loading || isNotPaused}
onClick={sendFlowStateChangeRequest}
>
{loading ? (
Expand Down
10 changes: 8 additions & 2 deletions ui/app/mirrors/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import prisma from '@/app/utils/prisma';
import EditButton from '@/components/EditButton';
import { ResyncDialog } from '@/components/ResyncDialog';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow';
import { DBType } from '@/grpc_generated/peers';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Header } from '@/lib/Header';
Expand Down Expand Up @@ -104,9 +104,15 @@ export default async function ViewMirror({
syncStatusChild = (
<SyncStatus rowsSynced={rowsSynced} rows={rows} flowJobName={mirrorId} />
);
const isNotPaused =
mirrorStatus.currentFlowState.toString() !==
FlowStatus[FlowStatus.STATUS_PAUSED];
editButtonHTML = (
<div style={{ display: 'flex', alignItems: 'center' }}>
<EditButton toLink={`/mirrors/${mirrorId}/edit`} />
<EditButton
toLink={`/mirrors/${mirrorId}/edit`}
disabled={isNotPaused}
/>
</div>
);
} else {
Expand Down
14 changes: 11 additions & 3 deletions ui/components/EditButton.tsx
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
'use client';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { useRouter } from 'next/navigation';
import { useState } from 'react';

const EditButton = ({ toLink }: { toLink: string }) => {
const EditButton = ({
toLink,
disabled,
}: {
toLink: string;
disabled: boolean;
}) => {
const [loading, setLoading] = useState(false);
const router = useRouter();

Expand All @@ -14,7 +21,7 @@ const EditButton = ({ toLink }: { toLink: string }) => {
router.push(toLink);
};
return (
<button
<Button
className='IconButton'
onClick={handleEdit}
aria-label='sort up'
Expand All @@ -26,14 +33,15 @@ const EditButton = ({ toLink }: { toLink: string }) => {
border: '1px solid rgba(0,0,0,0.1)',
borderRadius: '0.5rem',
}}
disabled={disabled}
>
<Label>Edit Mirror</Label>
{loading ? (
<ProgressCircle variant='determinate_progress_circle' />
) : (
<Icon name='edit' />
)}
</button>
</Button>
);
};

Expand Down

0 comments on commit f3fe29f

Please sign in to comment.