Skip to content

Commit

Permalink
Merge branch 'main' into soft-delete-tests-bq
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 25, 2023
2 parents 3ca22f3 + 93d754a commit a22be94
Show file tree
Hide file tree
Showing 19 changed files with 225 additions and 99 deletions.
8 changes: 6 additions & 2 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ run:
linters:
enable:
- dogsled
- dupl
- gofumpt
- gosec
- gosimple
Expand All @@ -18,9 +17,14 @@ linters:
- prealloc
- staticcheck
- ineffassign
- unparam
- unused
- lll
linters-settings:
stylecheck:
checks:
- all
- '-ST1003'
lll:
line-length: 120
line-length: 144
tab-width: 4
17 changes: 11 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
}
defer connectors.CloseConnector(dstConn)

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
if err := dstConn.SetupMetadataTables(); err != nil {
a.Alerter.LogFlowError(ctx, config.Name, err)
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

Expand Down Expand Up @@ -112,7 +113,7 @@ func (a *FlowableActivity) EnsurePullability(

output, err := srcConn.EnsurePullability(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

Expand Down Expand Up @@ -169,7 +170,8 @@ func (a *FlowableActivity) CreateNormalizedTable(

setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
flowName, _ := ctx.Value(shared.FlowNameKey).(string)
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}

Expand Down Expand Up @@ -580,7 +582,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
slog.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
} else {
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx,
a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
goroutineErr = err
Expand Down Expand Up @@ -935,7 +938,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
},
}
}
updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
updateErr := monitoring.InitializeQRepRun(
ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}
Expand All @@ -945,7 +949,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
err = monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
return err
Expand Down
2 changes: 1 addition & 1 deletion flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (a *FlowableActivity) handleSlotInfo(
return err
}

if slotInfo == nil || len(slotInfo) == 0 {
if len(slotInfo) == 0 {
slog.WarnContext(ctx, "warning: unable to get slot info", slog.Any("slotName", slotName))
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func APIMain(args *APIServerParams) error {
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: certs},
TLS: &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS13,
},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
4 changes: 3 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ func (h *FlowRequestHandler) CreateQRepFlow(
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("invalid xmin txid for xmin rep: %w", err)
}
state.LastPartition.Range = &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}}}
state.LastPartition.Range = &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: txid}},
}
}

workflowFn = peerflow.XminFlowWorkflow
Expand Down
16 changes: 8 additions & 8 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,24 @@ func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName strin
return &pgPeerConfig, nil
}

func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) {
func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, error) {
pgPeerConfig, err := h.getPGPeerConfig(ctx, peerName)
if err != nil {
return nil, "", err
return nil, err
}
connStr := utils.GetPGConnectionString(pgPeerConfig)
peerPool, err := pgxpool.New(ctx, connStr)
if err != nil {
return nil, "", err
return nil, err
}
return peerPool, pgPeerConfig.User, nil
return peerPool, nil
}

func (h *FlowRequestHandler) GetSchemas(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSchemasResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSchemasResponse{Schemas: nil}, err
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (h *FlowRequestHandler) GetTablesInSchema(
ctx context.Context,
req *protos.SchemaTablesRequest,
) (*protos.SchemaTablesResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (h *FlowRequestHandler) GetAllTables(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.AllTablesResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.AllTablesResponse{Tables: nil}, err
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (h *FlowRequestHandler) GetColumns(
ctx context.Context,
req *protos.TableColumnsRequest,
) (*protos.TableColumnsResponse, error) {
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
}
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: certs},
TLS: &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS13,
},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func WorkerMain(opts *WorkerOptions) error {
return fmt.Errorf("unable to process certificate and key: %w", err)
}
connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: certs},
TLS: &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS13,
},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ func (p *PostgresCDCSource) consumeStream(
p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
rec, err := p.processMessage(records, xld, clientXLogPos)

if err != nil {
return fmt.Errorf("error processing message: %w", err)
}
Expand Down Expand Up @@ -470,7 +469,8 @@ func (p *PostgresCDCSource) consumeStream(
}

func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pglogrepl.XLogData,
currentClientXlogPos pglogrepl.LSN) (model.Record, error) {
currentClientXlogPos pglogrepl.LSN,
) (model.Record, error) {
logicalMsg, err := pglogrepl.Parse(xld.WALData)
if err != nil {
return nil, fmt.Errorf("error parsing logical message: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ type ReplicaIdentityType rune

const (
ReplicaIdentityDefault ReplicaIdentityType = 'd'
ReplicaIdentityFull = 'f'
ReplicaIdentityIndex = 'i'
ReplicaIdentityNothing = 'n'
ReplicaIdentityFull ReplicaIdentityType = 'f'
ReplicaIdentityIndex ReplicaIdentityType = 'i'
ReplicaIdentityNothing ReplicaIdentityType = 'n'
)

// getRelIDForTable returns the relation ID for a table.
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue.
}
}

// nolint:unparam
func generateRecords(
t *testing.T,
nullable bool,
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/utils/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ func GetSSHClientConfig(user, password, privateKeyString string) (*ssh.ClientCon
}

return &ssh.ClientConfig{
User: user,
Auth: authMethods,
User: user,
Auth: authMethods,
//nolint:gosec
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}, nil
}
1 change: 1 addition & 0 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
)

// nolint:unparam
func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) {
err := e2e.CreateTableForQRep(s.pool, s.pgSuffix, tableName)
require.NoError(s.t, err)
Expand Down
19 changes: 19 additions & 0 deletions ui/app/api/mirrors/alerts/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,22 @@ export async function POST(request: Request) {
}
return new Response(JSON.stringify(mirrorStatus));
}

// We accept a list here in preparation for a Select All feature in UI
export async function PUT(request: Request) {
const { mirrorIDStringList } = await request.json();
const mirrorIDList: bigint[] = mirrorIDStringList.map((id: string) =>
BigInt(id)
);
const success = await prisma.flow_errors.updateMany({
where: {
id: {
in: mirrorIDList,
},
},
data: {
ack: true,
},
});
return new Response(JSON.stringify(success.count));
}
7 changes: 3 additions & 4 deletions ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
</Label>
</div>
<div>
<MirrorError
detailed
flowName={mirrorConfig?.flowJobName || ''}
/>
<Label>
<MirrorError flowName={mirrorConfig?.flowJobName || ''} />
</Label>
</div>
</div>
<div className='basis-1/4 md:basis-1/3'>
Expand Down
57 changes: 57 additions & 0 deletions ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use client';
import { Button } from '@/lib/Button';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { useState } from 'react';
import { toast } from 'react-toastify';

const notifyErr = (errMsg: string) => {
toast.error(errMsg, {
position: toast.POSITION.BOTTOM_CENTER,
});
};

const AckButton = ({ ack, id }: { ack: boolean; id: number | bigint }) => {
const [loading, setLoading] = useState(false);
const [updated, setUpdated] = useState(false);
// handleAck updates ack to true for the given mirrorID
const handleAck = async (mirrorID: bigint | number) => {
setLoading(true);
const updateResResult = await fetch('/api/mirrors/alerts', {
method: 'PUT',
body: JSON.stringify({
mirrorIDStringList: [mirrorID.toString()],
}),
});
const updateRes = await updateResResult.json();
setLoading(false);
if (!updateRes) {
notifyErr('Something went wrong when trying to acknowledge');
return;
}
setUpdated(true);
};
return (
<>
{ack !== true && updated !== true ? (
<Button variant='normalSolid' onClick={() => handleAck(id)}>
<Label as='label' style={{ fontSize: 13 }}>
{loading ? (
<ProgressCircle variant='intermediate_progress_circle' />
) : (
'Acknowledge'
)}
</Label>
</Button>
) : (
<Button variant='normal' disabled={true}>
<Label as='label' style={{ fontSize: 13 }}>
Acknowledged
</Label>
</Button>
)}
</>
);
};

export default AckButton;
Loading

0 comments on commit a22be94

Please sign in to comment.