From 981880d7355b56392fcc103c25862b5db53643d9 Mon Sep 17 00:00:00 2001 From: Travis Jenkins Date: Mon, 18 Nov 2024 10:54:29 -0500 Subject: [PATCH] Fetching latest live spec right before updating (#1362) * Fetching latest live spec right before updating No longer listening to changes * removing the expected pub id for right now and will have a follow up with retry logic --- src/api/liveSpecsExt.ts | 35 +++++++-- .../tables/RowActions/Shared/UpdateEntity.tsx | 72 +++++++++---------- 2 files changed, 62 insertions(+), 45 deletions(-) diff --git a/src/api/liveSpecsExt.ts b/src/api/liveSpecsExt.ts index 4e2ac0191..e5decc08d 100644 --- a/src/api/liveSpecsExt.ts +++ b/src/api/liveSpecsExt.ts @@ -267,6 +267,8 @@ export interface LiveSpecsExtQuery_ByCatalogName { last_pub_id: string; } +// TODO (live spec) - remove this and combine with getLatestLiveSpecByName +// as the cata log name is a unique index on this const getLiveSpecsByCatalogName = async ( catalogName: string, specType: Entity @@ -399,6 +401,26 @@ const getLiveSpecSpec = (liveSpecId: string) => { .single(); }; +export interface LiveSpecsExtQuery_Latest { + spec: any; + id: string; + last_pub_id: string; +} + +const getLatestLiveSpecByName = async (catalogName: string) => { + const data = await supabaseRetry( + () => + supabaseClient + .from(TABLES.LIVE_SPECS_EXT) + .select(`spec, id, last_pub_id`) + .eq('catalog_name', catalogName) + .single(), + 'getNotificationSubscriptionForUser' + ).then(handleSuccess, handleFailure); + + return data; +}; + const getLiveSpecShards = (tenant: string, entityType: Entity) => { return supabaseClient .from(TABLES.LIVE_SPECS_EXT) @@ -446,6 +468,13 @@ export interface LiveSpecsExt_Related { // }; export { + getLatestLiveSpecByName, + getLiveSpecShards, + getLiveSpecSpec, + getLiveSpecsByCatalogName, + getLiveSpecsByCatalogNames, + getLiveSpecsByConnectorId, + getLiveSpecsByLiveSpecId, getLiveSpecs_captures, getLiveSpecs_collections, getLiveSpecs_dataPlaneAuthReq, @@ -453,10 +482,4 @@ export { getLiveSpecs_entitySelector, getLiveSpecs_existingTasks, getLiveSpecs_materializations, - getLiveSpecsByCatalogName, - getLiveSpecsByCatalogNames, - getLiveSpecsByConnectorId, - getLiveSpecsByLiveSpecId, - getLiveSpecShards, - getLiveSpecSpec, }; diff --git a/src/components/tables/RowActions/Shared/UpdateEntity.tsx b/src/components/tables/RowActions/Shared/UpdateEntity.tsx index 7e3e2080c..b773896a0 100644 --- a/src/components/tables/RowActions/Shared/UpdateEntity.tsx +++ b/src/components/tables/RowActions/Shared/UpdateEntity.tsx @@ -3,16 +3,13 @@ import { createDraftSpec, draftCollectionsEligibleForDeletion, } from 'api/draftSpecs'; -import { CaptureQuery } from 'api/liveSpecsExt'; +import { CaptureQuery, getLatestLiveSpecByName } from 'api/liveSpecsExt'; import { createPublication } from 'api/publications'; import AlertBox from 'components/shared/AlertBox'; import DraftErrors from 'components/shared/Entity/Error/DraftErrors'; import Error from 'components/shared/Error'; import { useZustandStore } from 'context/Zustand/provider'; -import { - LiveSpecsExtQueryWithSpec, - useLiveSpecsExtWithSpec, -} from 'hooks/useLiveSpecsExt'; +import { LiveSpecsExtQueryWithSpec } from 'hooks/useLiveSpecsExt'; import usePublications from 'hooks/usePublications'; import { useEffect, useRef, useState } from 'react'; import { jobSucceeded } from 'services/supabase'; @@ -54,7 +51,9 @@ function UpdateEntity({ selectableStoreName, validateNewSpec, }: UpdateEntityProps) { + const updateStarted = useRef(false); const publishCompleted = useRef(false); + const [state, setState] = useState(ProgressStates.RUNNING); const [error, setError] = useState(null); const [draftId, setDraftId] = useState(null); @@ -74,22 +73,18 @@ function UpdateEntity({ SelectableTableStore['actionSettings'] >(selectableStoreName, selectableTableStoreSelectors.actionSettings.get); - const liveSpecsResponse = useLiveSpecsExtWithSpec( - entity.id, - entity.spec_type - ); - const liveSpecs = liveSpecsResponse.liveSpecs; - const liveSpecsError = liveSpecsResponse.error; - const liveSpecsValidating = liveSpecsResponse.isValidating; - const deleteCollections = selectableStoreName === SelectTableStoreNames.CAPTURE; useEffect(() => { - if (publishCompleted.current) { + // If we published or started already exit + if (publishCompleted.current || updateStarted.current) { return; } + // Mark that we have started updating + updateStarted.current = true; + const done = (progressState: ProgressStates, response: any) => { setState(progressState); onFinish(response); @@ -99,29 +94,28 @@ function UpdateEntity({ done(ProgressStates.FAILED, response); }; - if (liveSpecsValidating) { - return; - } + const updateEntity = async (targetEntity: CaptureQuery) => { + // Fetch this as late as possible so we have the latest as possible + const liveSpecResponse = await getLatestLiveSpecByName( + targetEntity.catalog_name + ); - if (liveSpecsError) { - return failed({ error: liveSpecsError }); - } + // Make sure we're good to continue + if (liveSpecResponse.error) { + return failed({ error: liveSpecResponse.error }); + } - if (liveSpecs.length <= 0) { - return failed({ - error: { - message: 'updateEntity.noLiveSpecs', - }, - }); - } + if (!liveSpecResponse.data?.spec) { + return failed({ + error: { + message: 'updateEntity.noLiveSpecs', + }, + }); + } - const updateEntity = async ( - targetEntity: CaptureQuery, - spec: LiveSpecsExtQueryWithSpec['spec'] - ) => { // We want to make sure there is a new spec to update before - // calling anything on - const newSpec = generateNewSpec(spec); + // calling anything on it + const newSpec = generateNewSpec(liveSpecResponse.data.spec); if (validateNewSpec && !newSpec) { // If we have a skipped message ID set it to the error if (skippedMessageID) { @@ -148,6 +142,10 @@ function UpdateEntity({ entityName, newSpec, generateNewSpecType(targetEntity) + // TODO (update entity) - add last pub id when we add retry for pub failure + // Should use a regex like this + // export const PUBLICATION_MISMATCH_ERROR = RegExp( `expected publication ID \d was not matched`, 'gi'); + // liveSpecResponse.data.last_pub_id ); if (draftSpecsResponse.error) { return failed(draftSpecsResponse); @@ -176,12 +174,8 @@ function UpdateEntity({ setPubID(publishResponse.data[0].id); }; - void updateEntity(entity, liveSpecs[0].spec); - - // We only want to run the useEffect after the data is fetched - // OR if there was an error - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [liveSpecs, liveSpecsError, liveSpecsValidating]); + void updateEntity(entity); + }); // Start fetching publication status. // If update is running keep checking