Skip to content

Commit

Permalink
Fetching latest live spec right before updating (#1362)
Browse files Browse the repository at this point in the history
* Fetching latest live spec right before updating
No longer listening to changes

* removing the expected pub id for right now and will have a follow up with retry logic
  • Loading branch information
travjenkins authored Nov 18, 2024
1 parent 46875a6 commit 981880d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 45 deletions.
35 changes: 29 additions & 6 deletions src/api/liveSpecsExt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ export interface LiveSpecsExtQuery_ByCatalogName {
last_pub_id: string;
}

// TODO (live spec) - remove this and combine with getLatestLiveSpecByName
// as the cata log name is a unique index on this
const getLiveSpecsByCatalogName = async (
catalogName: string,
specType: Entity
Expand Down Expand Up @@ -399,6 +401,26 @@ const getLiveSpecSpec = (liveSpecId: string) => {
.single();
};

export interface LiveSpecsExtQuery_Latest {
spec: any;
id: string;
last_pub_id: string;
}

const getLatestLiveSpecByName = async (catalogName: string) => {
const data = await supabaseRetry(
() =>
supabaseClient
.from(TABLES.LIVE_SPECS_EXT)
.select(`spec, id, last_pub_id`)
.eq('catalog_name', catalogName)
.single(),
'getNotificationSubscriptionForUser'
).then(handleSuccess<LiveSpecsExtQuery_Latest>, handleFailure);

return data;
};

const getLiveSpecShards = (tenant: string, entityType: Entity) => {
return supabaseClient
.from(TABLES.LIVE_SPECS_EXT)
Expand Down Expand Up @@ -446,17 +468,18 @@ export interface LiveSpecsExt_Related {
// };

export {
getLatestLiveSpecByName,
getLiveSpecShards,
getLiveSpecSpec,
getLiveSpecsByCatalogName,
getLiveSpecsByCatalogNames,
getLiveSpecsByConnectorId,
getLiveSpecsByLiveSpecId,
getLiveSpecs_captures,
getLiveSpecs_collections,
getLiveSpecs_dataPlaneAuthReq,
getLiveSpecs_detailsForm,
getLiveSpecs_entitySelector,
getLiveSpecs_existingTasks,
getLiveSpecs_materializations,
getLiveSpecsByCatalogName,
getLiveSpecsByCatalogNames,
getLiveSpecsByConnectorId,
getLiveSpecsByLiveSpecId,
getLiveSpecShards,
getLiveSpecSpec,
};
72 changes: 33 additions & 39 deletions src/components/tables/RowActions/Shared/UpdateEntity.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ import {
createDraftSpec,
draftCollectionsEligibleForDeletion,
} from 'api/draftSpecs';
import { CaptureQuery } from 'api/liveSpecsExt';
import { CaptureQuery, getLatestLiveSpecByName } from 'api/liveSpecsExt';
import { createPublication } from 'api/publications';
import AlertBox from 'components/shared/AlertBox';
import DraftErrors from 'components/shared/Entity/Error/DraftErrors';
import Error from 'components/shared/Error';
import { useZustandStore } from 'context/Zustand/provider';
import {
LiveSpecsExtQueryWithSpec,
useLiveSpecsExtWithSpec,
} from 'hooks/useLiveSpecsExt';
import { LiveSpecsExtQueryWithSpec } from 'hooks/useLiveSpecsExt';
import usePublications from 'hooks/usePublications';
import { useEffect, useRef, useState } from 'react';
import { jobSucceeded } from 'services/supabase';
Expand Down Expand Up @@ -54,7 +51,9 @@ function UpdateEntity({
selectableStoreName,
validateNewSpec,
}: UpdateEntityProps) {
const updateStarted = useRef(false);
const publishCompleted = useRef(false);

const [state, setState] = useState<ProgressStates>(ProgressStates.RUNNING);
const [error, setError] = useState<any | null>(null);
const [draftId, setDraftId] = useState<string | null>(null);
Expand All @@ -74,22 +73,18 @@ function UpdateEntity({
SelectableTableStore['actionSettings']
>(selectableStoreName, selectableTableStoreSelectors.actionSettings.get);

const liveSpecsResponse = useLiveSpecsExtWithSpec(
entity.id,
entity.spec_type
);
const liveSpecs = liveSpecsResponse.liveSpecs;
const liveSpecsError = liveSpecsResponse.error;
const liveSpecsValidating = liveSpecsResponse.isValidating;

const deleteCollections =
selectableStoreName === SelectTableStoreNames.CAPTURE;

useEffect(() => {
if (publishCompleted.current) {
// If we published or started already exit
if (publishCompleted.current || updateStarted.current) {
return;
}

// Mark that we have started updating
updateStarted.current = true;

const done = (progressState: ProgressStates, response: any) => {
setState(progressState);
onFinish(response);
Expand All @@ -99,29 +94,28 @@ function UpdateEntity({
done(ProgressStates.FAILED, response);
};

if (liveSpecsValidating) {
return;
}
const updateEntity = async (targetEntity: CaptureQuery) => {
// Fetch this as late as possible so we have the latest as possible
const liveSpecResponse = await getLatestLiveSpecByName(
targetEntity.catalog_name
);

if (liveSpecsError) {
return failed({ error: liveSpecsError });
}
// Make sure we're good to continue
if (liveSpecResponse.error) {
return failed({ error: liveSpecResponse.error });
}

if (liveSpecs.length <= 0) {
return failed({
error: {
message: 'updateEntity.noLiveSpecs',
},
});
}
if (!liveSpecResponse.data?.spec) {
return failed({
error: {
message: 'updateEntity.noLiveSpecs',
},
});
}

const updateEntity = async (
targetEntity: CaptureQuery,
spec: LiveSpecsExtQueryWithSpec['spec']
) => {
// We want to make sure there is a new spec to update before
// calling anything on
const newSpec = generateNewSpec(spec);
// calling anything on it
const newSpec = generateNewSpec(liveSpecResponse.data.spec);
if (validateNewSpec && !newSpec) {
// If we have a skipped message ID set it to the error
if (skippedMessageID) {
Expand All @@ -148,6 +142,10 @@ function UpdateEntity({
entityName,
newSpec,
generateNewSpecType(targetEntity)
// TODO (update entity) - add last pub id when we add retry for pub failure
// Should use a regex like this
// export const PUBLICATION_MISMATCH_ERROR = RegExp( `expected publication ID \d was not matched`, 'gi');
// liveSpecResponse.data.last_pub_id
);
if (draftSpecsResponse.error) {
return failed(draftSpecsResponse);
Expand Down Expand Up @@ -176,12 +174,8 @@ function UpdateEntity({
setPubID(publishResponse.data[0].id);
};

void updateEntity(entity, liveSpecs[0].spec);

// We only want to run the useEffect after the data is fetched
// OR if there was an error
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [liveSpecs, liveSpecsError, liveSpecsValidating]);
void updateEntity(entity);
});

// Start fetching publication status.
// If update is running keep checking
Expand Down

0 comments on commit 981880d

Please sign in to comment.