diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go
index 9591cab251..66a4a2033f 100644
--- a/flow/activities/flowable.go
+++ b/flow/activities/flowable.go
@@ -72,8 +72,9 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot
}
defer connectors.CloseConnector(dstConn)
+ flowName, _ := ctx.Value(shared.FlowNameKey).(string)
if err := dstConn.SetupMetadataTables(); err != nil {
- a.Alerter.LogFlowError(ctx, config.Name, err)
+ a.Alerter.LogFlowError(ctx, flowName, err)
return fmt.Errorf("failed to setup metadata tables: %w", err)
}
@@ -112,7 +113,7 @@ func (a *FlowableActivity) EnsurePullability(
output, err := srcConn.EnsurePullability(config)
if err != nil {
- a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
+ a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}
@@ -169,7 +170,8 @@ func (a *FlowableActivity) CreateNormalizedTable(
setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
- a.Alerter.LogFlowError(ctx, config.PeerConnectionConfig.Name, err)
+ flowName, _ := ctx.Value(shared.FlowNameKey).(string)
+ a.Alerter.LogFlowError(ctx, flowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}
@@ -580,7 +582,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
slog.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
} else {
- err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
+ err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx,
+ a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
goroutineErr = err
@@ -935,7 +938,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
},
}
}
- updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
+ updateErr := monitoring.InitializeQRepRun(
+ ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}
@@ -945,7 +949,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}
- err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
+ err = monitoring.UpdatePullEndTimeAndRowsForPartition(
+ errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
slog.Error(fmt.Sprintf("%v", err))
return err
diff --git a/ui/app/api/mirrors/alerts/route.ts b/ui/app/api/mirrors/alerts/route.ts
index ecb9891cbd..13cc612503 100644
--- a/ui/app/api/mirrors/alerts/route.ts
+++ b/ui/app/api/mirrors/alerts/route.ts
@@ -19,3 +19,22 @@ export async function POST(request: Request) {
}
return new Response(JSON.stringify(mirrorStatus));
}
+
+// We accept a list here in preparation for a Select All feature in UI
+export async function PUT(request: Request) {
+ const { mirrorIDStringList } = await request.json();
+ const mirrorIDList: bigint[] = mirrorIDStringList.map((id: string) =>
+ BigInt(id)
+ );
+ const success = await prisma.flow_errors.updateMany({
+ where: {
+ id: {
+ in: mirrorIDList,
+ },
+ },
+ data: {
+ ack: true,
+ },
+ });
+ return new Response(JSON.stringify(success.count));
+}
diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
index e7729c487d..738218c64f 100644
--- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
+++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
@@ -32,10 +32,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
-
+
diff --git a/ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx b/ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx
new file mode 100644
index 0000000000..b6d14eed89
--- /dev/null
+++ b/ui/app/mirrors/errors/[mirrorName]/ackbutton.tsx
@@ -0,0 +1,57 @@
+'use client';
+import { Button } from '@/lib/Button';
+import { Label } from '@/lib/Label';
+import { ProgressCircle } from '@/lib/ProgressCircle';
+import { useState } from 'react';
+import { toast } from 'react-toastify';
+
+const notifyErr = (errMsg: string) => {
+ toast.error(errMsg, {
+ position: toast.POSITION.BOTTOM_CENTER,
+ });
+};
+
+const AckButton = ({ ack, id }: { ack: boolean; id: number | bigint }) => {
+ const [loading, setLoading] = useState(false);
+ const [updated, setUpdated] = useState(false);
+ // handleAck updates ack to true for the given mirrorID
+ const handleAck = async (mirrorID: bigint | number) => {
+ setLoading(true);
+ const updateResResult = await fetch('/api/mirrors/alerts', {
+ method: 'PUT',
+ body: JSON.stringify({
+ mirrorIDStringList: [mirrorID.toString()],
+ }),
+ });
+ const updateRes = await updateResResult.json();
+ setLoading(false);
+ if (!updateRes) {
+ notifyErr('Something went wrong when trying to acknowledge');
+ return;
+ }
+ setUpdated(true);
+ };
+ return (
+ <>
+ {ack !== true && updated !== true ? (
+
+ ) : (
+
+ )}
+ >
+ );
+};
+
+export default AckButton;
diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx
index 97efca644a..899b25a496 100644
--- a/ui/app/mirrors/errors/[mirrorName]/page.tsx
+++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx
@@ -3,6 +3,9 @@ import prisma from '@/app/utils/prisma';
import TimeLabel from '@/components/TimeComponent';
import { Label } from '@/lib/Label';
import { Table, TableCell, TableRow } from '@/lib/Table';
+import { ToastContainer } from 'react-toastify';
+import 'react-toastify/dist/ReactToastify.css';
+import AckButton from './ackbutton';
type MirrorErrorProps = {
params: { mirrorName: string };
@@ -14,62 +17,84 @@ const MirrorError = async ({ params: { mirrorName } }: MirrorErrorProps) => {
flow_name: mirrorName,
error_type: 'error',
},
- distinct: ['error_message'],
orderBy: {
error_timestamp: 'desc',
},
});
return (
-
-
-
-
-
-
-
-
- Type
-
- Message
-
- }
+ <>
+
+
+
+
+
+
+
+
+
+
+
+
- {mirrorErrors.map((mirrorError) => (
-
-
- {mirrorError.error_type.toUpperCase()}
-
-
-
-
-
- {mirrorError.error_message}
-
-
- ))}
-
+
+ Type
+
+
+
+ Message
+
+
+ }
+ >
+ {mirrorErrors.map((mirrorError) => (
+
+
+ {mirrorError.error_type.toUpperCase()}
+
+
+
+
+
+ {mirrorError.error_message}
+
+
+
+
+
+ ))}
+
+
-
+
+ >
);
};
diff --git a/ui/app/mirrors/mirror-status.tsx b/ui/app/mirrors/mirror-status.tsx
index 27d797e389..b64d87ff23 100644
--- a/ui/app/mirrors/mirror-status.tsx
+++ b/ui/app/mirrors/mirror-status.tsx
@@ -13,26 +13,21 @@ export const ErrorModal = ({ flowName }: { flowName: string }) => {
);
};
-export const MirrorError = ({
- flowName,
- detailed,
-}: {
- flowName: string;
- detailed: boolean;
-}) => {
+export const MirrorError = ({ flowName }: { flowName: string }) => {
const [flowStatus, setFlowStatus] = useState();
const [isLoading, setIsLoading] = useState(true);
const [error, setError] = useState(null);
@@ -81,15 +76,23 @@ export const MirrorError = ({
);
}
- if (flowStatus == 'healthy') {
- if (detailed)
- return (
-
+ if (flowStatus === 'healthy') {
+ return (
+
+
- );
- return ;
+
+
+
+ );
}
return ;
diff --git a/ui/app/mirrors/tables.tsx b/ui/app/mirrors/tables.tsx
index 106c7cd22d..6c1289befc 100644
--- a/ui/app/mirrors/tables.tsx
+++ b/ui/app/mirrors/tables.tsx
@@ -90,7 +90,7 @@ export function CDCFlows({ cdcFlows }: { cdcFlows: any }) {
-
+