Skip to content

Commit

Permalink
Merge branch 'main' into update-dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Dec 25, 2023
2 parents ca877ce + b86f106 commit 8ffe69c
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 148 deletions.
17 changes: 11 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
}
defer connectors.CloseConnector(dstConn)

flowName, _ := ctx.Value(shared.FlowNameKey).(string)
if err := dstConn.SetupMetadataTables(); err != nil {
a.Alerter.LogFlowError(ctx, config.Name, err)
a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}

Expand Down Expand Up @@ -112,7 +113,7 @@ func (a *FlowableActivity) EnsurePullability(

output, err := srcConn.EnsurePullability(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}

Expand Down Expand Up @@ -169,7 +170,8 @@ func (a *FlowableActivity) CreateNormalizedTable(

setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
flowName, _ := ctx.Value(shared.FlowNameKey).(string)
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}

Expand Down Expand Up @@ -580,7 +582,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
slog.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
} else {
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx,
a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
goroutineErr = err
Expand Down Expand Up @@ -935,7 +938,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
},
}
}
updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
updateErr := monitoring.InitializeQRepRun(
ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}
Expand All @@ -945,7 +949,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
err = monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
return err
Expand Down
63 changes: 5 additions & 58 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,25 +781,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
return nil, fmt.Errorf("couldn't get tablename to unchanged cols mapping: %w", err)
}

stmts := []string{}
stmts := make([]string, 0, len(distinctTableNames)+1)
// append all the statements to one list
c.logger.Info(fmt.Sprintf("merge raw records to corresponding tables: %s %s %v",
c.datasetID, rawTableName, distinctTableNames))

release, err := c.grabJobsUpdateLock()
if err != nil {
return nil, fmt.Errorf("failed to grab lock: %v", err)
}

defer func() {
err := release()
if err != nil {
c.logger.Error("failed to release lock", slog.Any("error", err))
}
}()

stmts = append(stmts, "BEGIN TRANSACTION;")

for _, tableName := range distinctTableNames {
mergeGen := &mergeStmtGenerator{
Dataset: c.datasetID,
Expand All @@ -824,11 +810,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
"UPDATE %s.%s SET normalize_batch_id=%d WHERE mirror_job_name='%s';",
c.datasetID, MirrorJobsTable, syncBatchID, req.FlowJobName)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")

_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
query := strings.Join(stmts, "\n")
_, err = c.client.Query(query).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements %s in a transaction: %v", strings.Join(stmts, "\n"), err)
return nil, fmt.Errorf("failed to execute statements %s: %v", query, err)
}

return &model.NormalizeResponse{
Expand Down Expand Up @@ -1023,21 +1009,9 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

func (c *BigQueryConnector) SyncFlowCleanup(jobName string) error {
release, err := c.grabJobsUpdateLock()
if err != nil {
return fmt.Errorf("failed to grab lock: %w", err)
}

defer func() {
err := release()
if err != nil {
c.logger.Error("failed to release lock", slog.Any("error", err))
}
}()

dataset := c.client.Dataset(c.datasetID)
// deleting PeerDB specific tables
err = dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
err := dataset.Table(c.getRawTableName(jobName)).Delete(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete raw table: %w", err)
}
Expand Down Expand Up @@ -1069,33 +1043,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string {
return fmt.Sprintf("_peerdb_staging_%s", flowJobName)
}

// Bigquery doesn't allow concurrent updates to the same table.
// we grab a lock on catalog to ensure that only one job is updating
// bigquery tables at a time.
// returns a function to release the lock.
func (c *BigQueryConnector) grabJobsUpdateLock() (func() error, error) {
tx, err := c.catalogPool.Begin(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}

// grab an advisory lock based on the mirror jobs table hash
mjTbl := fmt.Sprintf("%s.%s", c.datasetID, MirrorJobsTable)
_, err = tx.Exec(c.ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", mjTbl)
if err != nil {
err = tx.Rollback(c.ctx)
return nil, fmt.Errorf("failed to grab lock on %s: %w", mjTbl, err)
}

return func() error {
err = tx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}, nil
}

func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
Expand Down
26 changes: 13 additions & 13 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)

// execute the statements in a transaction
stmts := []string{}
stmts = append(stmts, "BEGIN TRANSACTION;")
stmts = append(stmts, insertStmt)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
stmts := []string{
"BEGIN TRANSACTION;",
insertStmt,
updateMetadataStmt,
"COMMIT TRANSACTION;",
}
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
Expand Down Expand Up @@ -136,8 +136,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client
datasetID := s.connector.datasetID
// Start a transaction
stmts := []string{"BEGIN TRANSACTION;"}

selector := "*"
if softDeleteCol != "" { // PeerDB column
Expand All @@ -150,16 +148,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;",
datasetID, dstTableName, selector, datasetID, stagingTable)

stmts = append(stmts, insertStmt)

insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime)
if err != nil {
return -1, fmt.Errorf("failed to create metadata insert statement: %v", err)
}
slog.Info("Performing transaction inside QRep sync function", flowLog)
stmts = append(stmts, insertMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
// Execute the statements in a transaction

stmts := []string{
"BEGIN TRANSACTION;",
insertStmt,
insertMetadataStmt,
"COMMIT TRANSACTION;",
}
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
Expand Down
19 changes: 19 additions & 0 deletions ui/app/api/mirrors/alerts/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,22 @@ export async function POST(request: Request) {
}
return new Response(JSON.stringify(mirrorStatus));
}

// We accept a list here in preparation for a Select All feature in UI
export async function PUT(request: Request) {
const { mirrorIDStringList } = await request.json();
const mirrorIDList: bigint[] = mirrorIDStringList.map((id: string) =>
BigInt(id)
);
const success = await prisma.flow_errors.updateMany({
where: {
id: {
in: mirrorIDList,
},
},
data: {
ack: true,
},
});
return new Response(JSON.stringify(success.count));
}
7 changes: 3 additions & 4 deletions ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
</Label>
</div>
<div>
<MirrorError
detailed
flowName={mirrorConfig?.flowJobName || ''}
/>
<Label>
<MirrorError flowName={mirrorConfig?.flowJobName || ''} />
</Label>
</div>
</div>
<div className='basis-1/4 md:basis-1/3'>
Expand Down
57 changes: 57 additions & 0 deletions ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use client';
import { Button } from '@/lib/Button';
import { Label } from '@/lib/Label';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { useState } from 'react';
import { toast } from 'react-toastify';

const notifyErr = (errMsg: string) => {
toast.error(errMsg, {
position: toast.POSITION.BOTTOM_CENTER,
});
};

const AckButton = ({ ack, id }: { ack: boolean; id: number | bigint }) => {
const [loading, setLoading] = useState(false);
const [updated, setUpdated] = useState(false);
// handleAck updates ack to true for the given mirrorID
const handleAck = async (mirrorID: bigint | number) => {
setLoading(true);
const updateResResult = await fetch('/api/mirrors/alerts', {
method: 'PUT',
body: JSON.stringify({
mirrorIDStringList: [mirrorID.toString()],
}),
});
const updateRes = await updateResResult.json();
setLoading(false);
if (!updateRes) {
notifyErr('Something went wrong when trying to acknowledge');
return;
}
setUpdated(true);
};
return (
<>
{ack !== true && updated !== true ? (
<Button variant='normalSolid' onClick={() => handleAck(id)}>
<Label as='label' style={{ fontSize: 13 }}>
{loading ? (
<ProgressCircle variant='intermediate_progress_circle' />
) : (
'Acknowledge'
)}
</Label>
</Button>
) : (
<Button variant='normal' disabled={true}>
<Label as='label' style={{ fontSize: 13 }}>
Acknowledged
</Label>
</Button>
)}
</>
);
};

export default AckButton;
Loading

0 comments on commit 8ffe69c

Please sign in to comment.