diff --git a/src/state/messages/convo/agent.ts b/src/state/messages/convo/agent.ts index 38a3f5e620..a852934941 100644 --- a/src/state/messages/convo/agent.ts +++ b/src/state/messages/convo/agent.ts @@ -9,6 +9,10 @@ import {nanoid} from 'nanoid/non-secure' import {logger} from '#/logger' import {isNative} from '#/platform/detection' +import { + ACTIVE_POLL_INTERVAL, + BACKGROUND_POLL_INTERVAL, +} from '#/state/messages/convo/const' import { ConvoDispatch, ConvoDispatchEvent, @@ -19,9 +23,8 @@ import { ConvoState, ConvoStatus, } from '#/state/messages/convo/types' - -const ACTIVE_POLL_INTERVAL = 1e3 -const BACKGROUND_POLL_INTERVAL = 10e3 +import {MessagesEventBus} from '#/state/messages/events/agent' +import {MessagesEventBusError} from '#/state/messages/events/types' // TODO temporary let DEBUG_ACTIVE_CHAT: string | undefined @@ -41,10 +44,10 @@ export class Convo { private id: string private agent: BskyAgent + private events: MessagesEventBus private __tempFromUserDid: string private status: ConvoStatus = ConvoStatus.Uninitialized - private pollInterval = ACTIVE_POLL_INTERVAL private error: | { code: ConvoErrorCode @@ -52,9 +55,9 @@ export class Convo { retry: () => void } | undefined - private historyCursor: string | undefined | null = undefined + private oldestRev: string | undefined | null = undefined private isFetchingHistory = false - private eventsCursor: string | undefined = undefined + private latestRev: string | undefined = undefined private pastMessages: Map< string, @@ -73,7 +76,6 @@ export class Convo { private headerItems: Map = new Map() private isProcessingPendingMessages = false - private nextPoll: NodeJS.Timeout | undefined convoId: string convo: ChatBskyConvoDefs.ConvoView | undefined @@ -85,6 +87,7 @@ export class Convo { this.id = nanoid(3) this.convoId = params.convoId this.agent = params.agent + this.events = params.events this.__tempFromUserDid = params.__tempFromUserDid this.subscribe = this.subscribe.bind(this) @@ -92,6 +95,9 @@ export class Convo { this.sendMessage = this.sendMessage.bind(this) this.deleteMessage = this.deleteMessage.bind(this) this.fetchMessageHistory = this.fetchMessageHistory.bind(this) + this.ingestFirehose = this.ingestFirehose.bind(this) + this.onFirehoseConnect = this.onFirehoseConnect.bind(this) + this.onFirehoseError = this.onFirehoseError.bind(this) if (DEBUG_ACTIVE_CHAT) { logger.error(`Convo: another chat was already active`, { @@ -100,6 +106,12 @@ export class Convo { } else { DEBUG_ACTIVE_CHAT = this.convoId } + + this.events.trailConvo(this.convoId, events => { + this.ingestFirehose(events) + }) + this.events.onConnect(this.onFirehoseConnect) + this.events.onError(this.onFirehoseError) } private commit() { @@ -198,6 +210,7 @@ export class Convo { case ConvoDispatchEvent.Init: { this.status = ConvoStatus.Initializing this.setup() + this.requestPollInterval(ACTIVE_POLL_INTERVAL) break } } @@ -207,27 +220,24 @@ export class Convo { switch (action.event) { case ConvoDispatchEvent.Ready: { this.status = ConvoStatus.Ready - this.pollInterval = ACTIVE_POLL_INTERVAL - this.fetchMessageHistory().then(() => { - this.restartPoll() - }) + this.fetchMessageHistory() break } case ConvoDispatchEvent.Background: { this.status = ConvoStatus.Backgrounded - this.pollInterval = BACKGROUND_POLL_INTERVAL - this.fetchMessageHistory().then(() => { - this.restartPoll() - }) + this.fetchMessageHistory() + this.requestPollInterval(BACKGROUND_POLL_INTERVAL) break } case ConvoDispatchEvent.Suspend: { this.status = ConvoStatus.Suspended + this.withdrawRequestedPollInterval() break } case ConvoDispatchEvent.Error: { this.status = ConvoStatus.Error this.error = action.payload + this.withdrawRequestedPollInterval() break } } @@ -237,24 +247,23 @@ export class Convo { switch (action.event) { case ConvoDispatchEvent.Resume: { this.refreshConvo() - this.restartPoll() + this.requestPollInterval(ACTIVE_POLL_INTERVAL) break } case ConvoDispatchEvent.Background: { this.status = ConvoStatus.Backgrounded - this.pollInterval = BACKGROUND_POLL_INTERVAL - this.restartPoll() + this.requestPollInterval(BACKGROUND_POLL_INTERVAL) break } case ConvoDispatchEvent.Suspend: { this.status = ConvoStatus.Suspended - this.cancelNextPoll() + this.withdrawRequestedPollInterval() break } case ConvoDispatchEvent.Error: { this.status = ConvoStatus.Error this.error = action.payload - this.cancelNextPoll() + this.withdrawRequestedPollInterval() break } } @@ -262,23 +271,27 @@ export class Convo { } case ConvoStatus.Backgrounded: { switch (action.event) { + // TODO truncate history if needed case ConvoDispatchEvent.Resume: { - this.status = ConvoStatus.Ready - this.pollInterval = ACTIVE_POLL_INTERVAL - this.refreshConvo() - // TODO truncate history if needed - this.restartPoll() + if (this.convo) { + this.status = ConvoStatus.Ready + this.refreshConvo() + } else { + this.status = ConvoStatus.Initializing + this.setup() + } + this.requestPollInterval(ACTIVE_POLL_INTERVAL) break } case ConvoDispatchEvent.Suspend: { this.status = ConvoStatus.Suspended - this.cancelNextPoll() + this.withdrawRequestedPollInterval() break } case ConvoDispatchEvent.Error: { this.status = ConvoStatus.Error this.error = action.payload - this.cancelNextPoll() + this.withdrawRequestedPollInterval() break } } @@ -287,18 +300,11 @@ export class Convo { case ConvoStatus.Suspended: { switch (action.event) { case ConvoDispatchEvent.Init: { - this.status = ConvoStatus.Ready - this.pollInterval = ACTIVE_POLL_INTERVAL - this.refreshConvo() - // TODO truncate history if needed - this.restartPoll() + this.reset() break } case ConvoDispatchEvent.Resume: { - this.status = ConvoStatus.Ready - this.pollInterval = ACTIVE_POLL_INTERVAL - this.refreshConvo() - this.restartPoll() + this.reset() break } case ConvoDispatchEvent.Error: { @@ -356,8 +362,8 @@ export class Convo { this.status = ConvoStatus.Uninitialized this.error = undefined - this.historyCursor = undefined - this.eventsCursor = undefined + this.oldestRev = undefined + this.latestRev = undefined this.pastMessages = new Map() this.newMessages = new Map() @@ -426,6 +432,17 @@ export class Convo { DEBUG_ACTIVE_CHAT = undefined } + private requestedPollInterval: (() => void) | undefined + private requestPollInterval(interval: number) { + this.withdrawRequestedPollInterval() + this.requestedPollInterval = this.events.requestPollInterval(interval) + } + private withdrawRequestedPollInterval() { + if (this.requestedPollInterval) { + this.requestedPollInterval() + } + } + private pendingFetchConvo: | Promise<{ convo: ChatBskyConvoDefs.ConvoView @@ -499,9 +516,9 @@ export class Convo { logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo) /* - * If historyCursor is null, we've fetched all history. + * If oldestRev is null, we've fetched all history. */ - if (this.historyCursor === null) return + if (this.oldestRev === null) return /* * Don't fetch again if a fetch is already in progress @@ -529,7 +546,7 @@ export class Convo { const response = await this.agent.api.chat.bsky.convo.getMessages( { - cursor: this.historyCursor, + cursor: this.oldestRev, convoId: this.convoId, limit: isNative ? 25 : 50, }, @@ -541,21 +558,22 @@ export class Convo { ) const {cursor, messages} = response.data - this.historyCursor = cursor ?? null + this.oldestRev = cursor ?? null for (const message of messages) { if ( ChatBskyConvoDefs.isMessageView(message) || ChatBskyConvoDefs.isDeletedMessageView(message) ) { - this.pastMessages.set(message.id, message) - - // set to latest rev - if ( - message.rev > (this.eventsCursor = this.eventsCursor || message.rev) - ) { - this.eventsCursor = message.rev + /* + * If this message is already in new messages, it was added by the + * firehose ingestion, and we can safely overwrite it. This trusts + * the server on ordering, and keeps it in sync. + */ + if (this.newMessages.has(message.id)) { + this.newMessages.delete(message.id) } + this.pastMessages.set(message.id, message) } } } catch (e: any) { @@ -576,84 +594,26 @@ export class Convo { } } - private restartPoll() { - this.cancelNextPoll() - this.pollLatestEvents() - } - - private cancelNextPoll() { - if (this.nextPoll) clearTimeout(this.nextPoll) - } - - private pollLatestEvents() { - /* - * Uncomment to view poll events - */ - logger.debug('Convo: poll events', {id: this.id}, logger.DebugContext.convo) - - try { - this.fetchLatestEvents().then(({events}) => { - this.applyLatestEvents(events) - }) - this.nextPoll = setTimeout(() => { - this.pollLatestEvents() - }, this.pollInterval) - } catch (e: any) { - logger.error('Convo: poll events failed') - - this.cancelNextPoll() - - this.footerItems.set(ConvoItemError.PollFailed, { - type: 'error-recoverable', - key: ConvoItemError.PollFailed, - code: ConvoItemError.PollFailed, - retry: () => { - this.footerItems.delete(ConvoItemError.PollFailed) - this.commit() - this.pollLatestEvents() - }, - }) - - this.commit() - } + onFirehoseConnect() { + this.footerItems.delete(ConvoItemError.PollFailed) + this.commit() } - private pendingFetchLatestEvents: - | Promise<{ - events: ChatBskyConvoGetLog.OutputSchema['logs'] - }> - | undefined - async fetchLatestEvents() { - if (this.pendingFetchLatestEvents) return this.pendingFetchLatestEvents - - this.pendingFetchLatestEvents = new Promise<{ - events: ChatBskyConvoGetLog.OutputSchema['logs'] - }>(async (resolve, reject) => { - try { - // throw new Error('UNCOMMENT TO TEST POLL FAILURE') - const response = await this.agent.api.chat.bsky.convo.getLog( - { - cursor: this.eventsCursor, - }, - { - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - const {logs} = response.data - resolve({events: logs}) - } catch (e) { - reject(e) - } finally { - this.pendingFetchLatestEvents = undefined - } + onFirehoseError(error?: MessagesEventBusError) { + this.footerItems.set(ConvoItemError.PollFailed, { + type: 'error-recoverable', + key: ConvoItemError.PollFailed, + code: ConvoItemError.PollFailed, + retry: () => { + this.footerItems.delete(ConvoItemError.PollFailed) + this.commit() + error?.retry() + }, }) - - return this.pendingFetchLatestEvents + this.commit() } - private applyLatestEvents(events: ChatBskyConvoGetLog.OutputSchema['logs']) { + ingestFirehose(events: ChatBskyConvoGetLog.OutputSchema['logs']) { let needsCommit = false for (const ev of events) { @@ -662,14 +622,25 @@ export class Convo { * know what it is. */ if (typeof ev.rev === 'string') { + const isUninitialized = !this.latestRev + const isNewEvent = this.latestRev && ev.rev > this.latestRev + + /* + * We received an event prior to fetching any history, so we can safely + * use this as the initial history cursor + */ + if (this.oldestRev === undefined && isUninitialized) { + this.oldestRev = ev.rev + } + /* * We only care about new events */ - if (ev.rev > (this.eventsCursor = this.eventsCursor || ev.rev)) { + if (isNewEvent || isUninitialized) { /* * Update rev regardless of if it's a ev type we care about or not */ - this.eventsCursor = ev.rev + this.latestRev = ev.rev /* * This is VERY important. We don't want to insert any messages from @@ -681,8 +652,14 @@ export class Convo { ChatBskyConvoDefs.isLogCreateMessage(ev) && ChatBskyConvoDefs.isMessageView(ev.message) ) { + /** + * If this message is already in new messages, it was added by our + * sending logic, and is based on client-ordering. When we receive + * the "commited" event from the log, we should replace this + * reference and re-insert in order to respect the order we receied + * from the log. + */ if (this.newMessages.has(ev.message.id)) { - // Trust the ev as the source of truth on ordering this.newMessages.delete(ev.message.id) } this.newMessages.set(ev.message.id, ev.message) @@ -694,6 +671,7 @@ export class Convo { /* * Update if we have this in state. If we don't, don't worry about it. */ + // TODO check for other storage spots if (this.pastMessages.has(ev.message.id)) { /* * For now, we remove deleted messages from the thread, if we receive one. diff --git a/src/state/messages/convo/const.ts b/src/state/messages/convo/const.ts new file mode 100644 index 0000000000..0b88733417 --- /dev/null +++ b/src/state/messages/convo/const.ts @@ -0,0 +1,2 @@ +export const ACTIVE_POLL_INTERVAL = 1e3 +export const BACKGROUND_POLL_INTERVAL = 5e3 diff --git a/src/state/messages/convo/index.tsx b/src/state/messages/convo/index.tsx index c4fe71d30d..311e8ce05e 100644 --- a/src/state/messages/convo/index.tsx +++ b/src/state/messages/convo/index.tsx @@ -5,6 +5,7 @@ import {useFocusEffect, useIsFocused} from '@react-navigation/native' import {Convo} from '#/state/messages/convo/agent' import {ConvoParams, ConvoState} from '#/state/messages/convo/types' +import {useMessagesEventBus} from '#/state/messages/events' import {useMarkAsReadMutation} from '#/state/queries/messages/conversation' import {useAgent} from '#/state/session' import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage' @@ -26,6 +27,7 @@ export function ConvoProvider({ const isScreenFocused = useIsFocused() const {serviceUrl} = useDmServiceUrlStorage() const {getAgent} = useAgent() + const events = useMessagesEventBus() const [convo] = useState( () => new Convo({ @@ -33,6 +35,7 @@ export function ConvoProvider({ agent: new BskyAgent({ service: serviceUrl, }), + events, __tempFromUserDid: getAgent().session?.did!, }), ) diff --git a/src/state/messages/convo/types.ts b/src/state/messages/convo/types.ts index cfbde6d7e2..2ed2eeaff2 100644 --- a/src/state/messages/convo/types.ts +++ b/src/state/messages/convo/types.ts @@ -5,9 +5,12 @@ import { ChatBskyConvoSendMessage, } from '@atproto-labs/api' +import {MessagesEventBus} from '#/state/messages/events/agent' + export type ConvoParams = { convoId: string agent: BskyAgent + events: MessagesEventBus __tempFromUserDid: string } diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts index f22cff9d96..eea61a61b7 100644 --- a/src/state/messages/events/agent.ts +++ b/src/state/messages/events/agent.ts @@ -347,7 +347,15 @@ export class MessagesEventBus { this.isPolling = true - logger.debug(`${LOGGER_CONTEXT}: poll`, {}, logger.DebugContext.convo) + // logger.debug( + // `${LOGGER_CONTEXT}: poll`, + // { + // requestedPollIntervals: Array.from( + // this.requestedPollIntervals.values(), + // ), + // }, + // logger.DebugContext.convo, + // ) try { const response = await this.agent.api.chat.bsky.convo.getLog(