diff --git a/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts b/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts index 98c0aa3340a..164cf477d4a 100644 --- a/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts +++ b/packages/sanity/src/core/store/_legacy/document/document-pair/checkoutPair.ts @@ -2,7 +2,7 @@ import {type Action, type SanityClient} from '@sanity/client' import {type Mutation} from '@sanity/mutator' import {type SanityDocument} from '@sanity/types' import {omit} from 'lodash' -import {EMPTY, from, merge, type Observable, Subject} from 'rxjs' +import {EMPTY, from, merge, type Observable} from 'rxjs' import {filter, map, mergeMap, share, take, tap} from 'rxjs/operators' import { @@ -65,7 +65,7 @@ export interface Pair { transactionsPendingEvents$: Observable<PendingMutationsEvent> published: DocumentVersion draft: DocumentVersion - complete: () => void + _keepalive: Observable<never> } function setVersion<T>(version: 'draft' | 'published') { @@ -204,10 +204,7 @@ export function checkoutPair( ): Pair { const {publishedId, draftId} = idPair - const listenerEventsConnector = new Subject<ListenerEvent>() - const listenerEvents$ = getPairListener(client, idPair, pairListenerOptions).pipe( - share({connector: () => listenerEventsConnector}), - ) + const listenerEvents$ = getPairListener(client, idPair, pairListenerOptions).pipe(share()) const reconnect$ = listenerEvents$.pipe( filter((ev) => ev.type === 'reconnect'), @@ -255,6 +252,8 @@ export function checkoutPair( consistency$: published.consistency$, remoteSnapshot$: published.remoteSnapshot$.pipe(map(setVersion('published'))), }, - complete: () => listenerEventsConnector.complete(), + // Use this to keep the mutation pipeline active. + // It won't ever emit any events, but it will prevent the eventsource connection from completing for as long as it is subscribed to + _keepalive: commits$.pipe(mergeMap(() => EMPTY)), } } diff --git a/packages/sanity/src/core/store/_legacy/document/document-pair/memoizedPair.ts b/packages/sanity/src/core/store/_legacy/document/document-pair/memoizedPair.ts index cdce1271a74..f2d6dd1637e 100644 --- a/packages/sanity/src/core/store/_legacy/document/document-pair/memoizedPair.ts +++ b/packages/sanity/src/core/store/_legacy/document/document-pair/memoizedPair.ts @@ -1,6 +1,5 @@ import {type SanityClient} from '@sanity/client' -import {Observable} from 'rxjs' -import {publishReplay, refCount} from 'rxjs/operators' +import {merge, type Observable, of, ReplaySubject, share, timer} from 'rxjs' import {type PairListenerOptions} from '../getPairListener' import {type IdPair} from '../types' @@ -8,6 +7,9 @@ import {memoize} from '../utils/createMemoizer' import {checkoutPair, type Pair} from './checkoutPair' import {memoizeKeyGen} from './memoizeKeyGen' +// How long to keep listener connected for after last unsubscribe +const LISTENER_RESET_DELAY = 10_000 + export const memoizedPair: ( client: SanityClient, idPair: IdPair, @@ -22,12 +24,18 @@ export const memoizedPair: ( serverActionsEnabled: Observable<boolean>, pairListenerOptions?: PairListenerOptions, ): Observable<Pair> => { - return new Observable<Pair>((subscriber) => { - const pair = checkoutPair(client, idPair, serverActionsEnabled, pairListenerOptions) - subscriber.next(pair) - - return pair.complete - }).pipe(publishReplay(1), refCount()) + const pair = checkoutPair(client, idPair, serverActionsEnabled, pairListenerOptions) + return merge( + of(pair), + // makes sure the pair listener is kept alive for as long as there are subscribers + pair._keepalive, + ).pipe( + share({ + connector: () => new ReplaySubject(1), + resetOnComplete: true, + resetOnRefCountZero: () => timer(LISTENER_RESET_DELAY), + }), + ) }, memoizeKeyGen, ) diff --git a/packages/sanity/src/core/store/_legacy/document/getPairListener.ts b/packages/sanity/src/core/store/_legacy/document/getPairListener.ts index e0f0fc9f173..c47b0038101 100644 --- a/packages/sanity/src/core/store/_legacy/document/getPairListener.ts +++ b/packages/sanity/src/core/store/_legacy/document/getPairListener.ts @@ -2,10 +2,9 @@ import {type SanityClient} from '@sanity/client' import {type SanityDocument} from '@sanity/types' import {groupBy} from 'lodash' -import {defer, merge, type Observable, of, throwError, timer} from 'rxjs' +import {defer, merge, type Observable, of, throwError} from 'rxjs' import {catchError, concatMap, filter, map, mergeMap, scan, share} from 'rxjs/operators' -import {LISTENER_RESET_DELAY} from '../../../preview/constants' import {shareReplayLatest} from '../../../preview/utils/shareReplayLatest' import {debug} from './debug' import { @@ -96,7 +95,6 @@ export function getPairListener( //filter((event) => Math.random() < 0.99 || event.type !== 'mutation'), shareReplayLatest({ predicate: (event) => event.type === 'welcome' || event.type === 'reconnect', - resetOnRefCountZero: () => timer(LISTENER_RESET_DELAY), }), ), ) as Observable<WelcomeEvent | MutationEvent | ReconnectEvent>