diff --git a/src/App.native.tsx b/src/App.native.tsx
index 9fa82e9cdb..08ef35bcf5 100644
--- a/src/App.native.tsx
+++ b/src/App.native.tsx
@@ -16,6 +16,7 @@ import {useQueryClient} from '@tanstack/react-query'
import {Provider as StatsigProvider} from '#/lib/statsig/statsig'
import {logger} from '#/logger'
+import {MessagesEventBusProvider} from '#/state/messages/events'
import {init as initPersistedState} from '#/state/persisted'
import {Provider as LabelDefsProvider} from '#/state/preferences/label-defs'
import {Provider as ModerationOptsProvider} from '#/state/preferences/moderation-opts'
@@ -95,25 +96,27 @@ function InnerApp() {
// Resets the entire tree below when it changes:
key={currentAccount?.did}>
-
-
- {/* LabelDefsProvider MUST come before ModerationOptsProvider */}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+ {/* LabelDefsProvider MUST come before ModerationOptsProvider */}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/App.web.tsx b/src/App.web.tsx
index 9c2b34a788..9dfbfbe528 100644
--- a/src/App.web.tsx
+++ b/src/App.web.tsx
@@ -9,6 +9,7 @@ import {useLingui} from '@lingui/react'
import {Provider as StatsigProvider} from '#/lib/statsig/statsig'
import {logger} from '#/logger'
+import {MessagesEventBusProvider} from '#/state/messages/events'
import {init as initPersistedState} from '#/state/persisted'
import {Provider as LabelDefsProvider} from '#/state/preferences/label-defs'
import {Provider as ModerationOptsProvider} from '#/state/preferences/moderation-opts'
@@ -83,22 +84,24 @@ function InnerApp() {
// Resets the entire tree below when it changes:
key={currentAccount?.did}>
-
- {/* LabelDefsProvider MUST come before ModerationOptsProvider */}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+ {/* LabelDefsProvider MUST come before ModerationOptsProvider */}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -112,12 +115,7 @@ function App() {
const [isReady, setReady] = useState(false)
React.useEffect(() => {
- initPersistedState().then(() => {
- setReady(true)
-
- const preloadElement = document.getElementById('preload')
- preloadElement?.remove()
- })
+ initPersistedState().then(() => setReady(true))
}, [])
if (!isReady) {
diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts
new file mode 100644
index 0000000000..c75f42ba0c
--- /dev/null
+++ b/src/state/messages/events/agent.ts
@@ -0,0 +1,466 @@
+import {BskyAgent, ChatBskyConvoGetLog} from '@atproto-labs/api'
+import EventEmitter from 'eventemitter3'
+import {nanoid} from 'nanoid/non-secure'
+
+import {logger} from '#/logger'
+import {
+ MessagesEventBusDispatch,
+ MessagesEventBusDispatchEvent,
+ MessagesEventBusError,
+ MessagesEventBusErrorCode,
+ MessagesEventBusParams,
+ MessagesEventBusState,
+ MessagesEventBusStatus,
+} from '#/state/messages/events/types'
+
+const LOGGER_CONTEXT = 'MessagesEventBus'
+
+const ACTIVE_POLL_INTERVAL = 60e3
+const BACKGROUND_POLL_INTERVAL = 60e3
+
+export class MessagesEventBus {
+ private id: string
+
+ private agent: BskyAgent
+ private __tempFromUserDid: string
+ private emitter = new EventEmitter()
+
+ private status: MessagesEventBusStatus = MessagesEventBusStatus.Uninitialized
+ private pollInterval = ACTIVE_POLL_INTERVAL
+ private error: MessagesEventBusError | undefined
+ private latestRev: string | undefined = undefined
+
+ snapshot: MessagesEventBusState | undefined
+
+ constructor(params: MessagesEventBusParams) {
+ this.id = nanoid(3)
+ this.agent = params.agent
+ this.__tempFromUserDid = params.__tempFromUserDid
+
+ this.subscribe = this.subscribe.bind(this)
+ this.getSnapshot = this.getSnapshot.bind(this)
+ this.init = this.init.bind(this)
+ this.suspend = this.suspend.bind(this)
+ this.resume = this.resume.bind(this)
+ this.setPollInterval = this.setPollInterval.bind(this)
+ this.trail = this.trail.bind(this)
+ this.trailConvo = this.trailConvo.bind(this)
+ }
+
+ private commit() {
+ this.snapshot = undefined
+ this.subscribers.forEach(subscriber => subscriber())
+ }
+
+ private subscribers: (() => void)[] = []
+
+ subscribe(subscriber: () => void) {
+ if (this.subscribers.length === 0) this.init()
+
+ this.subscribers.push(subscriber)
+
+ return () => {
+ this.subscribers = this.subscribers.filter(s => s !== subscriber)
+ if (this.subscribers.length === 0) this.suspend()
+ }
+ }
+
+ getSnapshot(): MessagesEventBusState {
+ if (!this.snapshot) this.snapshot = this.generateSnapshot()
+ // logger.debug(`${LOGGER_CONTEXT}: snapshotted`, {}, logger.DebugContext.convo)
+ return this.snapshot
+ }
+
+ private generateSnapshot(): MessagesEventBusState {
+ switch (this.status) {
+ case MessagesEventBusStatus.Initializing: {
+ return {
+ status: MessagesEventBusStatus.Initializing,
+ rev: undefined,
+ error: undefined,
+ setPollInterval: this.setPollInterval,
+ trail: this.trail,
+ trailConvo: this.trailConvo,
+ }
+ }
+ case MessagesEventBusStatus.Ready: {
+ return {
+ status: this.status,
+ rev: this.latestRev!,
+ error: undefined,
+ setPollInterval: this.setPollInterval,
+ trail: this.trail,
+ trailConvo: this.trailConvo,
+ }
+ }
+ case MessagesEventBusStatus.Suspended: {
+ return {
+ status: this.status,
+ rev: this.latestRev,
+ error: undefined,
+ setPollInterval: this.setPollInterval,
+ trail: this.trail,
+ trailConvo: this.trailConvo,
+ }
+ }
+ case MessagesEventBusStatus.Error: {
+ return {
+ status: MessagesEventBusStatus.Error,
+ rev: this.latestRev,
+ error: this.error || {
+ code: MessagesEventBusErrorCode.Unknown,
+ retry: () => {
+ this.init()
+ },
+ },
+ setPollInterval: this.setPollInterval,
+ trail: this.trail,
+ trailConvo: this.trailConvo,
+ }
+ }
+ default: {
+ return {
+ status: MessagesEventBusStatus.Uninitialized,
+ rev: undefined,
+ error: undefined,
+ setPollInterval: this.setPollInterval,
+ trail: this.trail,
+ trailConvo: this.trailConvo,
+ }
+ }
+ }
+ }
+
+ dispatch(action: MessagesEventBusDispatch) {
+ const prevStatus = this.status
+
+ switch (this.status) {
+ case MessagesEventBusStatus.Uninitialized: {
+ switch (action.event) {
+ case MessagesEventBusDispatchEvent.Init: {
+ this.status = MessagesEventBusStatus.Initializing
+ this.setup()
+ break
+ }
+ }
+ break
+ }
+ case MessagesEventBusStatus.Initializing: {
+ switch (action.event) {
+ case MessagesEventBusDispatchEvent.Ready: {
+ this.status = MessagesEventBusStatus.Ready
+ this.setPollInterval(ACTIVE_POLL_INTERVAL)
+ break
+ }
+ case MessagesEventBusDispatchEvent.Background: {
+ this.status = MessagesEventBusStatus.Backgrounded
+ this.setPollInterval(BACKGROUND_POLL_INTERVAL)
+ break
+ }
+ case MessagesEventBusDispatchEvent.Suspend: {
+ this.status = MessagesEventBusStatus.Suspended
+ break
+ }
+ case MessagesEventBusDispatchEvent.Error: {
+ this.status = MessagesEventBusStatus.Error
+ this.error = action.payload
+ break
+ }
+ }
+ break
+ }
+ case MessagesEventBusStatus.Ready: {
+ switch (action.event) {
+ case MessagesEventBusDispatchEvent.Background: {
+ this.status = MessagesEventBusStatus.Backgrounded
+ this.setPollInterval(BACKGROUND_POLL_INTERVAL)
+ break
+ }
+ case MessagesEventBusDispatchEvent.Suspend: {
+ this.status = MessagesEventBusStatus.Suspended
+ this.stopPoll()
+ break
+ }
+ case MessagesEventBusDispatchEvent.Error: {
+ this.status = MessagesEventBusStatus.Error
+ this.error = action.payload
+ this.stopPoll()
+ break
+ }
+ }
+ break
+ }
+ case MessagesEventBusStatus.Backgrounded: {
+ switch (action.event) {
+ case MessagesEventBusDispatchEvent.Resume: {
+ this.status = MessagesEventBusStatus.Ready
+ this.setPollInterval(ACTIVE_POLL_INTERVAL)
+ break
+ }
+ case MessagesEventBusDispatchEvent.Suspend: {
+ this.status = MessagesEventBusStatus.Suspended
+ this.stopPoll()
+ break
+ }
+ case MessagesEventBusDispatchEvent.Error: {
+ this.status = MessagesEventBusStatus.Error
+ this.error = action.payload
+ this.stopPoll()
+ break
+ }
+ }
+ break
+ }
+ case MessagesEventBusStatus.Suspended: {
+ switch (action.event) {
+ case MessagesEventBusDispatchEvent.Resume: {
+ this.status = MessagesEventBusStatus.Ready
+ this.setPollInterval(ACTIVE_POLL_INTERVAL)
+ break
+ }
+ case MessagesEventBusDispatchEvent.Background: {
+ this.status = MessagesEventBusStatus.Backgrounded
+ this.setPollInterval(BACKGROUND_POLL_INTERVAL)
+ break
+ }
+ case MessagesEventBusDispatchEvent.Error: {
+ this.status = MessagesEventBusStatus.Error
+ this.error = action.payload
+ this.stopPoll()
+ break
+ }
+ }
+ break
+ }
+ case MessagesEventBusStatus.Error: {
+ switch (action.event) {
+ case MessagesEventBusDispatchEvent.Resume:
+ case MessagesEventBusDispatchEvent.Init: {
+ this.status = MessagesEventBusStatus.Initializing
+ this.error = undefined
+ this.latestRev = undefined
+ this.setup()
+ break
+ }
+ }
+ break
+ }
+ default:
+ break
+ }
+
+ logger.debug(
+ `${LOGGER_CONTEXT}: dispatch '${action.event}'`,
+ {
+ id: this.id,
+ prev: prevStatus,
+ next: this.status,
+ },
+ logger.DebugContext.convo,
+ )
+
+ this.commit()
+ }
+
+ private async setup() {
+ logger.debug(`${LOGGER_CONTEXT}: setup`, {}, logger.DebugContext.convo)
+
+ try {
+ await this.initializeLatestRev()
+ this.dispatch({event: MessagesEventBusDispatchEvent.Ready})
+ } catch (e: any) {
+ logger.error(e, {
+ context: `${LOGGER_CONTEXT}: setup failed`,
+ })
+
+ this.dispatch({
+ event: MessagesEventBusDispatchEvent.Error,
+ payload: {
+ exception: e,
+ code: MessagesEventBusErrorCode.InitFailed,
+ retry: () => {
+ this.init()
+ },
+ },
+ })
+ }
+ }
+
+ init() {
+ logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
+ this.dispatch({event: MessagesEventBusDispatchEvent.Init})
+ }
+
+ background() {
+ logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo)
+ this.dispatch({event: MessagesEventBusDispatchEvent.Background})
+ }
+
+ suspend() {
+ logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo)
+ this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
+ }
+
+ resume() {
+ logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo)
+ this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
+ }
+
+ setPollInterval(interval: number) {
+ this.pollInterval = interval
+ this.resetPoll()
+ }
+
+ trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) {
+ this.emitter.on('events', handler)
+ return () => {
+ this.emitter.off('events', handler)
+ }
+ }
+
+ trailConvo(
+ convoId: string,
+ handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void,
+ ) {
+ const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => {
+ const convoEvents = events.filter(ev => {
+ if (typeof ev.convoId === 'string' && ev.convoId === convoId) {
+ return ev.convoId === convoId
+ }
+ return false
+ })
+
+ if (convoEvents.length > 0) {
+ handler(convoEvents)
+ }
+ }
+
+ this.emitter.on('events', handle)
+ return () => {
+ this.emitter.off('events', handle)
+ }
+ }
+
+ private async initializeLatestRev() {
+ logger.debug(
+ `${LOGGER_CONTEXT}: initialize latest rev`,
+ {},
+ logger.DebugContext.convo,
+ )
+
+ const response = await this.agent.api.chat.bsky.convo.listConvos(
+ {
+ limit: 1,
+ },
+ {
+ headers: {
+ Authorization: this.__tempFromUserDid,
+ },
+ },
+ )
+
+ const {convos} = response.data
+
+ for (const convo of convos) {
+ if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) {
+ this.latestRev = convo.rev
+ }
+ }
+ }
+
+ /*
+ * Polling
+ */
+
+ private isPolling = false
+ private pollIntervalRef: NodeJS.Timeout | undefined
+
+ private resetPoll() {
+ this.stopPoll()
+ this.startPoll()
+ }
+
+ private startPoll() {
+ if (!this.isPolling) this.poll()
+
+ this.pollIntervalRef = setInterval(() => {
+ if (this.isPolling) return
+ this.poll()
+ }, this.pollInterval)
+ }
+
+ private stopPoll() {
+ if (this.pollIntervalRef) clearInterval(this.pollIntervalRef)
+ }
+
+ private async poll() {
+ if (this.isPolling) return
+
+ this.isPolling = true
+
+ logger.debug(`${LOGGER_CONTEXT}: poll`, {}, logger.DebugContext.convo)
+
+ try {
+ const response = await this.agent.api.chat.bsky.convo.getLog(
+ {
+ cursor: this.latestRev,
+ },
+ {
+ headers: {
+ Authorization: this.__tempFromUserDid,
+ },
+ },
+ )
+
+ const {logs: events} = response.data
+
+ let needsEmit = false
+ let batch: ChatBskyConvoGetLog.OutputSchema['logs'] = []
+
+ for (const ev of events) {
+ /*
+ * If there's a rev, we should handle it. If there's not a rev, we don't
+ * know what it is.
+ */
+ if (typeof ev.rev === 'string') {
+ /*
+ * We only care about new events
+ */
+ if (ev.rev > (this.latestRev = this.latestRev || ev.rev)) {
+ /*
+ * Update rev regardless of if it's a ev type we care about or not
+ */
+ this.latestRev = ev.rev
+ needsEmit = true
+ batch.push(ev)
+ }
+ }
+ }
+
+ if (needsEmit) {
+ try {
+ this.emitter.emit('events', batch)
+ } catch (e: any) {
+ logger.error(e, {
+ context: `${LOGGER_CONTEXT}: process latest events`,
+ })
+ }
+ }
+ } catch (e: any) {
+ logger.error(e, {context: `${LOGGER_CONTEXT}: poll events failed`})
+
+ this.dispatch({
+ event: MessagesEventBusDispatchEvent.Error,
+ payload: {
+ exception: e,
+ code: MessagesEventBusErrorCode.PollFailed,
+ retry: () => {
+ this.init()
+ },
+ },
+ })
+ } finally {
+ this.isPolling = false
+ }
+ }
+}
diff --git a/src/state/messages/events/index.tsx b/src/state/messages/events/index.tsx
new file mode 100644
index 0000000000..f37d0abea2
--- /dev/null
+++ b/src/state/messages/events/index.tsx
@@ -0,0 +1,67 @@
+import React from 'react'
+import {AppState} from 'react-native'
+import {BskyAgent} from '@atproto-labs/api'
+
+import {isWeb} from '#/platform/detection'
+import {MessagesEventBus} from '#/state/messages/events/agent'
+import {MessagesEventBusState} from '#/state/messages/events/types'
+import {useAgent} from '#/state/session'
+import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
+import {IS_DEV} from '#/env'
+
+const MessagesEventBusContext =
+ React.createContext(null)
+
+export function useMessagesEventBus() {
+ const ctx = React.useContext(MessagesEventBusContext)
+ if (!ctx) {
+ throw new Error('useChat must be used within a ChatProvider')
+ }
+ return ctx
+}
+
+export function MessagesEventBusProvider({
+ children,
+}: {
+ children: React.ReactNode
+}) {
+ const {serviceUrl} = useDmServiceUrlStorage()
+ const {getAgent} = useAgent()
+ const [bus] = React.useState(
+ () =>
+ new MessagesEventBus({
+ agent: new BskyAgent({
+ service: serviceUrl,
+ }),
+ __tempFromUserDid: getAgent().session?.did!,
+ }),
+ )
+ const service = React.useSyncExternalStore(bus.subscribe, bus.getSnapshot)
+
+ if (isWeb && IS_DEV) {
+ // @ts-ignore
+ window.messagesEventBus = service
+ }
+
+ React.useEffect(() => {
+ const handleAppStateChange = (nextAppState: string) => {
+ if (nextAppState === 'active') {
+ bus.resume()
+ } else {
+ bus.background()
+ }
+ }
+
+ const sub = AppState.addEventListener('change', handleAppStateChange)
+
+ return () => {
+ sub.remove()
+ }
+ }, [bus])
+
+ return (
+
+ {children}
+
+ )
+}
diff --git a/src/state/messages/events/types.ts b/src/state/messages/events/types.ts
new file mode 100644
index 0000000000..52083b2c69
--- /dev/null
+++ b/src/state/messages/events/types.ts
@@ -0,0 +1,111 @@
+import {BskyAgent, ChatBskyConvoGetLog} from '@atproto-labs/api'
+
+export type MessagesEventBusParams = {
+ agent: BskyAgent
+ __tempFromUserDid: string
+}
+
+export enum MessagesEventBusStatus {
+ Uninitialized = 'uninitialized',
+ Initializing = 'initializing',
+ Ready = 'ready',
+ Error = 'error',
+ Backgrounded = 'backgrounded',
+ Suspended = 'suspended',
+}
+
+export enum MessagesEventBusDispatchEvent {
+ Init = 'init',
+ Ready = 'ready',
+ Error = 'error',
+ Background = 'background',
+ Suspend = 'suspend',
+ Resume = 'resume',
+}
+
+export enum MessagesEventBusErrorCode {
+ Unknown = 'unknown',
+ InitFailed = 'initFailed',
+ PollFailed = 'pollFailed',
+}
+
+export type MessagesEventBusError = {
+ code: MessagesEventBusErrorCode
+ exception?: Error
+ retry: () => void
+}
+
+export type MessagesEventBusDispatch =
+ | {
+ event: MessagesEventBusDispatchEvent.Init
+ }
+ | {
+ event: MessagesEventBusDispatchEvent.Ready
+ }
+ | {
+ event: MessagesEventBusDispatchEvent.Background
+ }
+ | {
+ event: MessagesEventBusDispatchEvent.Suspend
+ }
+ | {
+ event: MessagesEventBusDispatchEvent.Resume
+ }
+ | {
+ event: MessagesEventBusDispatchEvent.Error
+ payload: MessagesEventBusError
+ }
+
+export type TrailHandler = (
+ events: ChatBskyConvoGetLog.OutputSchema['logs'],
+) => void
+
+export type MessagesEventBusState =
+ | {
+ status: MessagesEventBusStatus.Uninitialized
+ rev: undefined
+ error: undefined
+ setPollInterval: (interval: number) => void
+ trail: (handler: TrailHandler) => () => void
+ trailConvo: (convoId: string, handler: TrailHandler) => () => void
+ }
+ | {
+ status: MessagesEventBusStatus.Initializing
+ rev: undefined
+ error: undefined
+ setPollInterval: (interval: number) => void
+ trail: (handler: TrailHandler) => () => void
+ trailConvo: (convoId: string, handler: TrailHandler) => () => void
+ }
+ | {
+ status: MessagesEventBusStatus.Ready
+ rev: string
+ error: undefined
+ setPollInterval: (interval: number) => void
+ trail: (handler: TrailHandler) => () => void
+ trailConvo: (convoId: string, handler: TrailHandler) => () => void
+ }
+ | {
+ status: MessagesEventBusStatus.Backgrounded
+ rev: string | undefined
+ error: undefined
+ setPollInterval: (interval: number) => void
+ trail: (handler: TrailHandler) => () => void
+ trailConvo: (convoId: string, handler: TrailHandler) => () => void
+ }
+ | {
+ status: MessagesEventBusStatus.Suspended
+ rev: string | undefined
+ error: undefined
+ setPollInterval: (interval: number) => void
+ trail: (handler: TrailHandler) => () => void
+ trailConvo: (convoId: string, handler: TrailHandler) => () => void
+ }
+ | {
+ status: MessagesEventBusStatus.Error
+ rev: string | undefined
+ error: MessagesEventBusError
+ setPollInterval: (interval: number) => void
+ trail: (handler: TrailHandler) => () => void
+ trailConvo: (convoId: string, handler: TrailHandler) => () => void
+ }