From c72e772e29e92b8bac880342d0c92c31d35d0463 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B8rge=20N=C3=A6ss?= Date: Wed, 8 Jan 2025 11:16:29 +0100 Subject: [PATCH] fix(sanity): fix race condition regression introduced by #8120 --- .../_legacy/document/document-pair/checkoutPair.ts | 13 +++++++------ .../_legacy/document/document-pair/memoizedPair.ts | 14 +++++++------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts b/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts index 34a42be66b21..98c0aa3340a7 100644 --- a/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts +++ b/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts @@ -2,7 +2,7 @@ import {type Action, type SanityClient} from '@sanity/client' import {type Mutation} from '@sanity/mutator' import {type SanityDocument} from '@sanity/types' import {omit} from 'lodash' -import {EMPTY, from, merge, type Observable} from 'rxjs' +import {EMPTY, from, merge, type Observable, Subject} from 'rxjs' import {filter, map, mergeMap, share, take, tap} from 'rxjs/operators' import { @@ -65,7 +65,7 @@ export interface Pair { transactionsPendingEvents$: Observable published: DocumentVersion draft: DocumentVersion - _keepalive: Observable + complete: () => void } function setVersion(version: 'draft' | 'published') { @@ -204,7 +204,10 @@ export function checkoutPair( ): Pair { const {publishedId, draftId} = idPair - const listenerEvents$ = getPairListener(client, idPair, pairListenerOptions).pipe(share()) + const listenerEventsConnector = new Subject() + const listenerEvents$ = getPairListener(client, idPair, pairListenerOptions).pipe( + share({connector: () => listenerEventsConnector}), + ) const reconnect$ = listenerEvents$.pipe( filter((ev) => ev.type === 'reconnect'), @@ -252,8 +255,6 @@ export function checkoutPair( consistency$: published.consistency$, remoteSnapshot$: published.remoteSnapshot$.pipe(map(setVersion('published'))), }, - // Use this to keep the mutation pipeline active. - // It won't ever emit any events, but it will prevent the eventsource connection from completing for as long as it is subscribed to - _keepalive: merge(listenerEvents$, commits$).pipe(mergeMap(() => EMPTY)), + complete: () => listenerEventsConnector.complete(), } } diff --git a/packages/sanity/src/core/store/_legacy/document/document-pair/memoizedPair.ts b/packages/sanity/src/core/store/_legacy/document/document-pair/memoizedPair.ts index f2d6dd1637e0..d383dda59002 100644 --- a/packages/sanity/src/core/store/_legacy/document/document-pair/memoizedPair.ts +++ b/packages/sanity/src/core/store/_legacy/document/document-pair/memoizedPair.ts @@ -1,5 +1,5 @@ import {type SanityClient} from '@sanity/client' -import {merge, type Observable, of, ReplaySubject, share, timer} from 'rxjs' +import {Observable, ReplaySubject, share, timer} from 'rxjs' import {type PairListenerOptions} from '../getPairListener' import {type IdPair} from '../types' @@ -24,12 +24,12 @@ export const memoizedPair: ( serverActionsEnabled: Observable, pairListenerOptions?: PairListenerOptions, ): Observable => { - const pair = checkoutPair(client, idPair, serverActionsEnabled, pairListenerOptions) - return merge( - of(pair), - // makes sure the pair listener is kept alive for as long as there are subscribers - pair._keepalive, - ).pipe( + return new Observable((subscriber) => { + const pair = checkoutPair(client, idPair, serverActionsEnabled, pairListenerOptions) + subscriber.next(pair) + + return pair.complete + }).pipe( share({ connector: () => new ReplaySubject(1), resetOnComplete: true,