Skip to content

Commit

Permalink
courier push retries
Browse files Browse the repository at this point in the history
  • Loading branch information
devinivy committed Jan 23, 2024
1 parent 300b25e commit a1ecddb
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
10 changes: 6 additions & 4 deletions packages/bsky/src/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { NotificationPushToken as PushToken } from './db/tables/notification-pus
import logger from './indexer/logger'
import { notSoftDeletedClause, valuesList } from './db/util'
import { ids } from './lexicon/lexicons'
import { retryHttp } from './util/retry'
import { retryConnect, retryHttp } from './util/retry'
import { Notification as CourierNotification } from './proto/courier_pb'
import { CourierClient } from './courier'

Expand Down Expand Up @@ -339,8 +339,8 @@ export class CourierNotificationServer extends NotificationServer<CourierNotific
alwaysDeliver: !n.rateLimit,
timestamp: Timestamp.fromDate(new Date(n.notif.sortAt)),
additional: Struct.fromJson({
uri: n.notif.recordUri || '',
reason: n.notif.reason || '',
uri: n.notif.recordUri,
reason: n.notif.reason,
subject: n.notif.reasonSubject || '',
}),
})
Expand All @@ -350,7 +350,9 @@ export class CourierNotificationServer extends NotificationServer<CourierNotific

async processNotifications(prepared: CourierNotification[]): Promise<void> {
try {
await this.courierClient.pushNotifications({ notifications: prepared })
await retryConnect(() =>
this.courierClient.pushNotifications({ notifications: prepared }),
)
} catch (err) {
logger.error({ err }, 'notification push to courier failed')
}
Expand Down
12 changes: 12 additions & 0 deletions packages/bsky/src/util/retry.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AxiosError } from 'axios'
import { XRPCError, ResponseType } from '@atproto/xrpc'
import { RetryOptions, retry } from '@atproto/common'
import { Code, ConnectError } from '@connectrpc/connect'

export async function retryHttp<T>(
fn: () => Promise<T>,
Expand All @@ -24,3 +25,14 @@ export function retryableHttp(err: unknown) {
const retryableHttpStatusCodes = new Set([
408, 425, 429, 500, 502, 503, 504, 522, 524,
])

export async function retryConnect<T>(
fn: () => Promise<T>,
opts: RetryOptions = {},
): Promise<T> {
return retry(fn, { retryable: retryableConnect, ...opts })
}

export function retryableConnect(err: unknown) {
return err instanceof ConnectError && err.code === Code.Unavailable
}

0 comments on commit a1ecddb

Please sign in to comment.