Skip to content

Commit

Permalink
fix(app, app-shell): block HTTP refetching until successful MQTT subs…
Browse files Browse the repository at this point in the history
…cription

Currently, the desktop app/ODD attempt to address the "missed update problem" in the following way:
while we subscribe to a topic, we simultaneously GET whatever equivalent HTTP resource we just
subscribed to. However, there's definitely a world (albeit a very small one) in which we receive the
HTTP response, a server update occurs, the server publishes, and then we successfully subscribe to a
topic. In this world, we've missed the update event. Solve for this by simply blocking the initial
HTTP GET until we subscribe. While the subscribe handshake could theoretically take a maximum of 2
seconds (at which point we forcefully timeout the subscribe action and fallback to polling), in
practice it's more like 250ms. We already handle failed connections and don't go through this
handshake if we can't connect to the client to begin with, so the "wait 2 second until sub failure"
scenario shouldn't realistically happen. This bug was a product of initially using retained messages
during prototyping, however we removed retained messaging by MQTT launch.
  • Loading branch information
mjhuff committed May 3, 2024
1 parent 72f3200 commit 30fddab
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 19 deletions.
4 changes: 4 additions & 0 deletions app-shell-odd/src/notifications/deserialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ export function sendDeserialized(
} catch {} // Prevents shell erroring during app shutdown event.
}

export function sendDeserializedRefetch(topic: NotifyTopic): void {
sendDeserialized(topic, { refetch: true })
}

export function sendDeserializedGenericError(topic: NotifyTopic): void {
sendDeserialized(topic, FAILURE_STATUSES.ECONNFAILED)
}
Expand Down
24 changes: 16 additions & 8 deletions app-shell-odd/src/notifications/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import mqtt from 'mqtt'

import { connectionStore } from './store'
import { sendDeserialized, sendDeserializedGenericError } from './deserialize'
import {
sendDeserialized,
sendDeserializedGenericError,
sendDeserializedRefetch,
} from './deserialize'
import { notifyLog } from './notifyLog'

import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types'
Expand Down Expand Up @@ -30,8 +34,8 @@ export function subscribe(topic: NotifyTopic): Promise<void> {
if (client == null) {
return Promise.reject(new Error('Expected hostData, received null.'))
}

if (
// The first time the client wants to subscribe on a robot to a particular topic.
else if (
!connectionStore.isActiveSub(topic) &&
!connectionStore.isPendingSub(topic)
) {
Expand All @@ -44,13 +48,15 @@ export function subscribe(topic: NotifyTopic): Promise<void> {
})
)
.catch((error: Error) => notifyLog.debug(error.message))
} else {
void waitUntilActiveOrErrored('subscription', topic).catch(
(error: Error) => {
}
// The client is either already subscribed or the subscription is currently pending.
else {
void waitUntilActiveOrErrored('subscription', topic)
.then(() => sendDeserializedRefetch(topic))
.catch((error: Error) => {
notifyLog.debug(error.message)
sendDeserializedGenericError(topic)
}
)
})
}
})
.catch((error: Error) => {
Expand All @@ -74,6 +80,8 @@ export function subscribe(topic: NotifyTopic): Promise<void> {
connectionStore
.setSubStatus(topic, 'subscribed')
.catch((error: Error) => notifyLog.debug(error.message))

sendDeserializedRefetch(topic)
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions app-shell/src/notifications/deserialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ export function sendDeserialized({
} catch {} // Prevents shell erroring during app shutdown event.
}

export function sendDeserializedRefetch(ip: string, topic: NotifyTopic): void {
sendDeserialized({
ip,
topic,
message: { refetch: true },
})
}

export function sendDeserializedGenericError(
ip: string,
topic: NotifyTopic
Expand Down
24 changes: 17 additions & 7 deletions app-shell/src/notifications/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import mqtt from 'mqtt'

import { connectionStore } from './store'
import { sendDeserialized, sendDeserializedGenericError } from './deserialize'
import {
sendDeserialized,
sendDeserializedGenericError,
sendDeserializedRefetch,
} from './deserialize'
import { notifyLog } from './notifyLog'

import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types'
Expand Down Expand Up @@ -36,8 +40,8 @@ export function subscribe(ip: string, topic: NotifyTopic): Promise<void> {
if (client == null) {
return Promise.reject(new Error('Expected hostData, received null.'))
}

if (
// The first time the client wants to subscribe on a robot to a particular topic.
else if (
!connectionStore.isActiveSub(robotName, topic) &&
!connectionStore.isPendingSub(robotName, topic)
) {
Expand All @@ -50,16 +54,20 @@ export function subscribe(ip: string, topic: NotifyTopic): Promise<void> {
})
)
.catch((error: Error) => notifyLog.debug(error.message))
} else {
}
// The client is either already subscribed or the subscription is currently pending.
else {
void waitUntilActiveOrErrored({
connection: 'subscription',
ip,
robotName,
topic,
}).catch((error: Error) => {
notifyLog.debug(error.message)
sendDeserializedGenericError(ip, topic)
})
.then(() => sendDeserializedRefetch(ip, topic))
.catch((error: Error) => {
notifyLog.debug(error.message)
sendDeserializedGenericError(ip, topic)
})
}
})
.catch((error: Error) => {
Expand All @@ -81,6 +89,8 @@ export function subscribe(ip: string, topic: NotifyTopic): Promise<void> {
connectionStore
.setSubStatus(ip, topic, 'subscribed')
.catch((error: Error) => notifyLog.debug(error.message))

sendDeserializedRefetch(ip, topic)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions app/src/resources/__tests__/useNotifyService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@ describe('useNotifyService', () => {
vi.clearAllMocks()
})

it('should trigger an HTTP refetch and subscribe action on a successful initial mount', () => {
it('should trigger a subscribe action on a successful initial mount', () => {
renderHook(() =>
useNotifyService({
topic: MOCK_TOPIC,
setRefetch: mockHTTPRefetch,
options: MOCK_OPTIONS,
} as any)
)
expect(mockHTTPRefetch).toHaveBeenCalledWith('once')
expect(mockDispatch).toHaveBeenCalledWith(
notifySubscribeAction(MOCK_HOST_CONFIG.hostname, MOCK_TOPIC)
)
Expand Down
2 changes: 0 additions & 2 deletions app/src/resources/useNotifyService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ export function useNotifyService<TData, TError = Error>({

React.useEffect(() => {
if (shouldUseNotifications) {
// Always fetch on initial mount.
setRefetch('once')
appShellListener({
hostname,
topic,
Expand Down

0 comments on commit 30fddab

Please sign in to comment.