diff --git a/packages/common-web/src/retry.ts b/packages/common-web/src/retry.ts index 90e42263ba8..357e765e873 100644 --- a/packages/common-web/src/retry.ts +++ b/packages/common-web/src/retry.ts @@ -2,8 +2,7 @@ import { wait } from './util' export type RetryOptions = { maxRetries?: number - backoffMultiplier?: number - backoffMax?: number + getWaitMs?: (n: number) => number | null retryable?: (err: unknown) => boolean } @@ -11,28 +10,34 @@ export async function retry( fn: () => Promise, opts: RetryOptions = {}, ): Promise { - const { maxRetries = 3, retryable = () => true } = opts + const { maxRetries = 3, retryable = () => true, getWaitMs = backoffMs } = opts let retries = 0 let doneError: unknown while (!doneError) { try { - if (retries) - await backoff(retries, opts.backoffMultiplier, opts.backoffMax) return await fn() } catch (err) { - const willRetry = retries < maxRetries && retryable(err) - if (!willRetry) doneError = err - retries += 1 + const waitMs = getWaitMs(retries) + const willRetry = + retries < maxRetries && waitMs !== null && retryable(err) + if (willRetry) { + retries += 1 + if (waitMs !== 0) { + await wait(waitMs) + } + } else { + doneError = err + } } } throw doneError } -// Waits exponential backoff with max and jitter: ~50, ~100, ~200, ~400, ~800, ~1000, ~1000, ... -async function backoff(n: number, multiplier = 50, max = 1000) { +// Waits exponential backoff with max and jitter: ~100, ~200, ~400, ~800, ~1000, ~1000, ... +export function backoffMs(n: number, multiplier = 100, max = 1000) { const exponentialMs = Math.pow(2, n) * multiplier const ms = Math.min(exponentialMs, max) - await wait(jitter(ms)) + return jitter(ms) } // Adds randomness +/-15% of value diff --git a/packages/common-web/tests/retry.test.ts b/packages/common-web/tests/retry.test.ts new file mode 100644 index 00000000000..641bb133b5f --- /dev/null +++ b/packages/common-web/tests/retry.test.ts @@ -0,0 +1,93 @@ +import { retry } from '../src/index' + +describe('retry', () => { + describe('retry()', () => { + it('retries until max retries', async () => { + let fnCalls = 0 + let waitMsCalls = 0 + const fn = async () => { + fnCalls++ + throw new Error(`Oops ${fnCalls}!`) + } + const getWaitMs = (retries) => { + waitMsCalls++ + expect(retries).toEqual(waitMsCalls - 1) + return 0 + } + await expect(retry(fn, { maxRetries: 13, getWaitMs })).rejects.toThrow( + 'Oops 14!', + ) + expect(fnCalls).toEqual(14) + expect(waitMsCalls).toEqual(14) + }) + + it('retries until max wait', async () => { + let fnCalls = 0 + let waitMsCalls = 0 + const fn = async () => { + fnCalls++ + throw new Error(`Oops ${fnCalls}!`) + } + const getWaitMs = (retries) => { + waitMsCalls++ + expect(retries).toEqual(waitMsCalls - 1) + if (retries === 13) { + return null + } + return 0 + } + await expect( + retry(fn, { maxRetries: Infinity, getWaitMs }), + ).rejects.toThrow('Oops 14!') + expect(fnCalls).toEqual(14) + expect(waitMsCalls).toEqual(14) + }) + + it('retries until non-retryable error', async () => { + let fnCalls = 0 + let waitMsCalls = 0 + const fn = async () => { + fnCalls++ + throw new Error(`Oops ${fnCalls}!`) + } + const getWaitMs = (retries) => { + waitMsCalls++ + expect(retries).toEqual(waitMsCalls - 1) + return 0 + } + const retryable = (err: unknown) => err?.['message'] !== 'Oops 14!' + await expect( + retry(fn, { maxRetries: Infinity, getWaitMs, retryable }), + ).rejects.toThrow('Oops 14!') + expect(fnCalls).toEqual(14) + expect(waitMsCalls).toEqual(14) + }) + + it('returns latest result after retries', async () => { + let fnCalls = 0 + const fn = async () => { + fnCalls++ + if (fnCalls < 14) { + throw new Error(`Oops ${fnCalls}!`) + } + return 'ok' + } + const getWaitMs = () => 0 + const result = await retry(fn, { maxRetries: Infinity, getWaitMs }) + expect(result).toBe('ok') + expect(fnCalls).toBe(14) + }) + + it('returns result immediately on success', async () => { + let fnCalls = 0 + const fn = async () => { + fnCalls++ + return 'ok' + } + const getWaitMs = () => 0 + const result = await retry(fn, { maxRetries: Infinity, getWaitMs }) + expect(result).toBe('ok') + expect(fnCalls).toBe(1) + }) + }) +}) diff --git a/packages/pds/src/account-manager/helpers/account.ts b/packages/pds/src/account-manager/helpers/account.ts index 8c36a45f349..6ccf98d7ea6 100644 --- a/packages/pds/src/account-manager/helpers/account.ts +++ b/packages/pds/src/account-manager/helpers/account.ts @@ -63,16 +63,17 @@ export const registerActor = async ( }, ) => { const { did, handle } = opts - const registered = await db.db - .insertInto('actor') - .values({ - did, - handle, - createdAt: new Date().toISOString(), - }) - .onConflict((oc) => oc.doNothing()) - .returning('did') - .executeTakeFirst() + const [registered] = await db.executeWithRetry( + db.db + .insertInto('actor') + .values({ + did, + handle, + createdAt: new Date().toISOString(), + }) + .onConflict((oc) => oc.doNothing()) + .returning('did'), + ) if (!registered) { throw new Error('actor already exists') } @@ -87,16 +88,17 @@ export const registerAccount = async ( }, ) => { const { did, email, passwordScrypt } = opts - const registered = await db.db - .insertInto('account') - .values({ - did, - email: email.toLowerCase(), - passwordScrypt, - }) - .onConflict((oc) => oc.doNothing()) - .returning('did') - .executeTakeFirst() + const [registered] = await db.executeWithRetry( + db.db + .insertInto('account') + .values({ + did, + email: email.toLowerCase(), + passwordScrypt, + }) + .onConflict((oc) => oc.doNothing()) + .returning('did'), + ) if (!registered) { throw new Error('account already exists') } @@ -108,11 +110,21 @@ export const deleteAccount = async ( ): Promise => { // Not done in transaction because it would be too long, prone to contention. // Also, this can safely be run multiple times if it fails. - await db.db.deleteFrom('repo_root').where('did', '=', did).execute() - await db.db.deleteFrom('email_token').where('did', '=', did).execute() - await db.db.deleteFrom('refresh_token').where('did', '=', did).execute() - await db.db.deleteFrom('account').where('account.did', '=', did).execute() - await db.db.deleteFrom('actor').where('actor.did', '=', did).execute() + await db.executeWithRetry( + db.db.deleteFrom('repo_root').where('did', '=', did), + ) + await db.executeWithRetry( + db.db.deleteFrom('email_token').where('did', '=', did), + ) + await db.executeWithRetry( + db.db.deleteFrom('refresh_token').where('did', '=', did), + ) + await db.executeWithRetry( + db.db.deleteFrom('account').where('account.did', '=', did), + ) + await db.executeWithRetry( + db.db.deleteFrom('actor').where('actor.did', '=', did), + ) } export const updateHandle = async ( @@ -120,14 +132,15 @@ export const updateHandle = async ( did: string, handle: string, ) => { - const res = await db.db - .updateTable('actor') - .set({ handle }) - .where('did', '=', did) - .whereNotExists( - db.db.selectFrom('actor').where('handle', '=', handle).selectAll(), - ) - .executeTakeFirst() + const [res] = await db.executeWithRetry( + db.db + .updateTable('actor') + .set({ handle }) + .where('did', '=', did) + .whereNotExists( + db.db.selectFrom('actor').where('handle', '=', handle).selectAll(), + ), + ) if (res.numUpdatedRows < 1) { throw new Error('user already exists') } @@ -138,11 +151,12 @@ export const updateEmail = async ( did: string, email: string, ) => { - await db.db - .updateTable('account') - .set({ email: email.toLowerCase(), emailConfirmedAt: null }) - .where('did', '=', did) - .executeTakeFirst() + await db.executeWithRetry( + db.db + .updateTable('account') + .set({ email: email.toLowerCase(), emailConfirmedAt: null }) + .where('did', '=', did), + ) } export const setEmailConfirmedAt = async ( @@ -150,11 +164,12 @@ export const setEmailConfirmedAt = async ( did: string, emailConfirmedAt: string, ) => { - await db.db - .updateTable('account') - .set({ emailConfirmedAt }) - .where('did', '=', did) - .execute() + await db.executeWithRetry( + db.db + .updateTable('account') + .set({ emailConfirmedAt }) + .where('did', '=', did), + ) } export const getAccountTakedownStatus = async ( @@ -180,9 +195,7 @@ export const updateAccountTakedownStatus = async ( const takedownRef = takedown.applied ? takedown.ref ?? new Date().toISOString() : null - await db.db - .updateTable('actor') - .set({ takedownRef }) - .where('did', '=', did) - .executeTakeFirst() + await db.executeWithRetry( + db.db.updateTable('actor').set({ takedownRef }).where('did', '=', did), + ) } diff --git a/packages/pds/src/account-manager/helpers/auth.ts b/packages/pds/src/account-manager/helpers/auth.ts index 4cbfff8cb6d..083d5cfd39b 100644 --- a/packages/pds/src/account-manager/helpers/auth.ts +++ b/packages/pds/src/account-manager/helpers/auth.ts @@ -89,16 +89,18 @@ export const storeRefreshToken = async ( payload: RefreshToken, appPasswordName: string | null, ) => { - return db.db - .insertInto('refresh_token') - .values({ - id: payload.jti, - did: payload.sub, - appPasswordName, - expiresAt: new Date(payload.exp * 1000).toISOString(), - }) - .onConflict((oc) => oc.doNothing()) // E.g. when re-granting during a refresh grace period - .executeTakeFirst() + const [result] = await db.executeWithRetry( + db.db + .insertInto('refresh_token') + .values({ + id: payload.jti, + did: payload.sub, + appPasswordName, + expiresAt: new Date(payload.exp * 1000).toISOString(), + }) + .onConflict((oc) => oc.doNothing()), // E.g. when re-granting during a refresh grace period + ) + return result } export const getRefreshToken = async (db: AccountDb, id: string) => { @@ -114,12 +116,12 @@ export const deleteExpiredRefreshTokens = async ( did: string, now: string, ) => { - await db.db - .deleteFrom('refresh_token') - .where('did', '=', did) - .where('expiresAt', '<=', now) - .returningAll() - .executeTakeFirst() + await db.executeWithRetry( + db.db + .deleteFrom('refresh_token') + .where('did', '=', did) + .where('expiresAt', '<=', now), + ) } export const addRefreshGracePeriod = async ( @@ -131,33 +133,32 @@ export const addRefreshGracePeriod = async ( }, ) => { const { id, expiresAt, nextId } = opts - const res = await db.db - .updateTable('refresh_token') - .where('id', '=', id) - .where((inner) => - inner.where('nextId', 'is', null).orWhere('nextId', '=', nextId), - ) - .set({ expiresAt, nextId }) - .returningAll() - .executeTakeFirst() + const [res] = await db.executeWithRetry( + db.db + .updateTable('refresh_token') + .where('id', '=', id) + .where((inner) => + inner.where('nextId', 'is', null).orWhere('nextId', '=', nextId), + ) + .set({ expiresAt, nextId }) + .returningAll(), + ) if (!res) { throw new ConcurrentRefreshError() } } export const revokeRefreshToken = async (db: AccountDb, id: string) => { - const { numDeletedRows } = await db.db - .deleteFrom('refresh_token') - .where('id', '=', id) - .executeTakeFirst() + const [{ numDeletedRows }] = await db.executeWithRetry( + db.db.deleteFrom('refresh_token').where('id', '=', id), + ) return numDeletedRows > 0 } export const revokeRefreshTokensByDid = async (db: AccountDb, did: string) => { - const { numDeletedRows } = await db.db - .deleteFrom('refresh_token') - .where('did', '=', did) - .executeTakeFirst() + const [{ numDeletedRows }] = await db.executeWithRetry( + db.db.deleteFrom('refresh_token').where('did', '=', did), + ) return numDeletedRows > 0 } @@ -166,11 +167,13 @@ export const revokeAppPasswordRefreshToken = async ( did: string, appPassName: string, ) => { - const { numDeletedRows } = await db.db - .deleteFrom('refresh_token') - .where('did', '=', did) - .where('appPasswordName', '=', appPassName) - .executeTakeFirst() + const [{ numDeletedRows }] = await db.executeWithRetry( + db.db + .deleteFrom('refresh_token') + .where('did', '=', did) + .where('appPasswordName', '=', appPassName), + ) + return numDeletedRows > 0 } diff --git a/packages/pds/src/account-manager/helpers/email-token.ts b/packages/pds/src/account-manager/helpers/email-token.ts index c2e383fdc96..85be5d4b6d0 100644 --- a/packages/pds/src/account-manager/helpers/email-token.ts +++ b/packages/pds/src/account-manager/helpers/email-token.ts @@ -10,13 +10,14 @@ export const createEmailToken = async ( ): Promise => { const token = getRandomToken().toUpperCase() const now = new Date().toISOString() - await db.db - .insertInto('email_token') - .values({ purpose, did, token, requestedAt: now }) - .onConflict((oc) => - oc.columns(['purpose', 'did']).doUpdateSet({ token, requestedAt: now }), - ) - .execute() + await db.executeWithRetry( + db.db + .insertInto('email_token') + .values({ purpose, did, token, requestedAt: now }) + .onConflict((oc) => + oc.columns(['purpose', 'did']).doUpdateSet({ token, requestedAt: now }), + ), + ) return token } @@ -25,11 +26,12 @@ export const deleteEmailToken = async ( did: string, purpose: EmailTokenPurpose, ) => { - await db.db - .deleteFrom('email_token') - .where('did', '=', did) - .where('purpose', '=', purpose) - .executeTakeFirst() + await db.executeWithRetry( + db.db + .deleteFrom('email_token') + .where('did', '=', did) + .where('purpose', '=', purpose), + ) } export const assertValidToken = async ( diff --git a/packages/pds/src/account-manager/helpers/invite.ts b/packages/pds/src/account-manager/helpers/invite.ts index 611a503e743..81034b7f5d9 100644 --- a/packages/pds/src/account-manager/helpers/invite.ts +++ b/packages/pds/src/account-manager/helpers/invite.ts @@ -21,7 +21,7 @@ export const createInviteCodes = async ( ) await Promise.all( chunkArray(rows, 50).map((chunk) => - db.db.insertInto('invite_code').values(chunk).execute(), + db.executeWithRetry(db.db.insertInto('invite_code').values(chunk)), ), ) } @@ -45,7 +45,7 @@ export const createAccountInviteCodes = async ( createdAt: now, } as InviteCode), ) - await db.db.insertInto('invite_code').values(rows).execute() + await db.executeWithRetry(db.db.insertInto('invite_code').values(rows)) const finalRoutineInviteCodes = await db.db .selectFrom('invite_code') @@ -77,14 +77,13 @@ export const recordInviteUse = async ( }, ) => { if (!opts.inviteCode) return - await db.db - .insertInto('invite_code_use') - .values({ + await db.executeWithRetry( + db.db.insertInto('invite_code_use').values({ code: opts.inviteCode, usedBy: opts.did, usedAt: opts.now, - }) - .execute() + }), + ) } export const ensureInviteIsAvailable = async ( @@ -214,18 +213,20 @@ export const disableInviteCodes = async ( ) => { const { codes, accounts } = opts if (codes.length > 0) { - await db.db - .updateTable('invite_code') - .set({ disabled: 1 }) - .where('code', 'in', codes) - .execute() + await db.executeWithRetry( + db.db + .updateTable('invite_code') + .set({ disabled: 1 }) + .where('code', 'in', codes), + ) } if (accounts.length > 0) { - await db.db - .updateTable('invite_code') - .set({ disabled: 1 }) - .where('forAccount', 'in', accounts) - .execute() + await db.executeWithRetry( + db.db + .updateTable('invite_code') + .set({ disabled: 1 }) + .where('forAccount', 'in', accounts), + ) } } @@ -234,11 +235,12 @@ export const setAccountInvitesDisabled = async ( did: string, disabled: boolean, ) => { - await db.db - .updateTable('account') - .where('did', '=', did) - .set({ invitesDisabled: disabled ? 1 : 0 }) - .execute() + await db.executeWithRetry( + db.db + .updateTable('account') + .where('did', '=', did) + .set({ invitesDisabled: disabled ? 1 : 0 }), + ) } export type CodeDetail = { diff --git a/packages/pds/src/account-manager/helpers/password.ts b/packages/pds/src/account-manager/helpers/password.ts index 01d252b388a..d7dc7ae1e62 100644 --- a/packages/pds/src/account-manager/helpers/password.ts +++ b/packages/pds/src/account-manager/helpers/password.ts @@ -39,11 +39,12 @@ export const updateUserPassword = async ( passwordScrypt: string }, ) => { - await db.db - .updateTable('account') - .set({ passwordScrypt: opts.passwordScrypt }) - .where('did', '=', opts.did) - .execute() + await db.executeWithRetry( + db.db + .updateTable('account') + .set({ passwordScrypt: opts.passwordScrypt }) + .where('did', '=', opts.did), + ) } export const createAppPassword = async ( @@ -62,16 +63,17 @@ export const createAppPassword = async ( ] const password = chunks.join('-') const passwordScrypt = await scrypt.hashAppPassword(did, password) - const got = await db.db - .insertInto('app_password') - .values({ - did, - name, - passwordScrypt, - createdAt: new Date().toISOString(), - }) - .returningAll() - .executeTakeFirst() + const [got] = await db.executeWithRetry( + db.db + .insertInto('app_password') + .values({ + did, + name, + passwordScrypt, + createdAt: new Date().toISOString(), + }) + .returningAll(), + ) if (!got) { throw new InvalidRequestError('could not create app-specific password') } @@ -98,9 +100,10 @@ export const deleteAppPassword = async ( did: string, name: string, ) => { - await db.db - .deleteFrom('app_password') - .where('did', '=', did) - .where('name', '=', name) - .execute() + await db.executeWithRetry( + db.db + .deleteFrom('app_password') + .where('did', '=', did) + .where('name', '=', name), + ) } diff --git a/packages/pds/src/account-manager/helpers/repo.ts b/packages/pds/src/account-manager/helpers/repo.ts index 8a3efd08058..30988efe040 100644 --- a/packages/pds/src/account-manager/helpers/repo.ts +++ b/packages/pds/src/account-manager/helpers/repo.ts @@ -7,16 +7,18 @@ export const updateRoot = async ( cid: CID, rev: string, ) => { - await db.db - .insertInto('repo_root') - .values({ - did, - cid: cid.toString(), - rev, - indexedAt: new Date().toISOString(), - }) - .onConflict((oc) => - oc.column('did').doUpdateSet({ cid: cid.toString(), rev }), - ) - .execute() + // @TODO balance risk of a race in the case of a long retry + await db.executeWithRetry( + db.db + .insertInto('repo_root') + .values({ + did, + cid: cid.toString(), + rev, + indexedAt: new Date().toISOString(), + }) + .onConflict((oc) => + oc.column('did').doUpdateSet({ cid: cid.toString(), rev }), + ), + ) } diff --git a/packages/pds/src/account-manager/index.ts b/packages/pds/src/account-manager/index.ts index 1868223e469..1a6a2493fb9 100644 --- a/packages/pds/src/account-manager/index.ts +++ b/packages/pds/src/account-manager/index.ts @@ -26,6 +26,7 @@ export class AccountManager { } async migrateOrThrow() { + await this.db.ensureWal() await getMigrator(this.db).migrateToLatestOrThrow() } diff --git a/packages/pds/src/actor-store/index.ts b/packages/pds/src/actor-store/index.ts index b15c12fc272..59ee12e3274 100644 --- a/packages/pds/src/actor-store/index.ts +++ b/packages/pds/src/actor-store/index.ts @@ -106,6 +106,7 @@ export class ActorStore { const db: ActorDb = getDb(dbLocation, this.cfg.disableWalAutoCheckpoint) try { + await db.ensureWal() const migrator = getMigrator(db) await migrator.migrateToLatestOrThrow() } finally { diff --git a/packages/pds/src/actor-store/record/transactor.ts b/packages/pds/src/actor-store/record/transactor.ts index 562546d9738..7e67caec752 100644 --- a/packages/pds/src/actor-store/record/transactor.ts +++ b/packages/pds/src/actor-store/record/transactor.ts @@ -77,12 +77,6 @@ export class RecordTransactor extends RecordReader { log.info({ uri }, 'deleted indexed record') } - async deleteForActor(_did: string) { - // Not done in transaction because it would be too long, prone to contention. - // Also, this can safely be run multiple times if it fails. - await this.db.db.deleteFrom('record').execute() - } - async removeBacklinksByUri(uri: AtUri) { await this.db.db .deleteFrom('backlink') diff --git a/packages/pds/src/db/db.ts b/packages/pds/src/db/db.ts index 7e5417472b8..a0e5dac8e77 100644 --- a/packages/pds/src/db/db.ts +++ b/packages/pds/src/db/db.ts @@ -1,5 +1,6 @@ import assert from 'assert' import { + sql, Kysely, SqliteDialect, KyselyPlugin, @@ -10,17 +11,13 @@ import { UnknownRow, } from 'kysely' import SqliteDB from 'better-sqlite3' -import { retry } from '@atproto/common' import { dbLogger } from '../logger' +import { retrySqlite } from './util' const DEFAULT_PRAGMAS = { - journal_mode: 'WAL', - busy_timeout: '5000', - strict: 'ON', + strict: 'ON', // @TODO strictness should live on table defs instead } -const RETRY_ERRORS = new Set(['SQLITE_BUSY', 'SQLITE_BUSY_SNAPSHOT']) - export class Database { destroyed = false commitHooks: CommitHook[] = [] @@ -31,7 +28,9 @@ export class Database { location: string, opts?: { pragmas?: Record }, ): Database { - const sqliteDb = new SqliteDB(location) + const sqliteDb = new SqliteDB(location, { + timeout: 0, // handled by application + }) const pragmas = { ...DEFAULT_PRAGMAS, ...(opts?.pragmas ?? {}), @@ -47,6 +46,10 @@ export class Database { return new Database(db) } + async ensureWal() { + await sql`PRAGMA journal_mode = WAL`.execute(this.db) + } + async transactionNoRetry( fn: (db: Database) => Promise, ): Promise { @@ -73,13 +76,15 @@ export class Database { } async transaction(fn: (db: Database) => Promise): Promise { - return retry(() => this.transactionNoRetry(fn), { - retryable: (err) => - typeof err?.['code'] === 'string' && RETRY_ERRORS.has(err['code']), - maxRetries: 5, - backoffMultiplier: 50, - backoffMax: 2000, - }) + return retrySqlite(() => this.transactionNoRetry(fn)) + } + + async executeWithRetry(query: { execute: () => Promise }) { + if (this.isTransaction) { + // transaction() ensures retry on entire transaction, no need to retry individual statements. + return query.execute() + } + return retrySqlite(() => query.execute()) } onCommit(fn: () => void) { diff --git a/packages/pds/src/db/util.ts b/packages/pds/src/db/util.ts index f593ec87601..36d581eba98 100644 --- a/packages/pds/src/db/util.ts +++ b/packages/pds/src/db/util.ts @@ -1,3 +1,4 @@ +import { retry } from '@atproto/common' import { DummyDriver, DynamicModule, @@ -47,6 +48,42 @@ export const dummyDialect = { }, } +export const retrySqlite = (fn: () => Promise): Promise => { + return retry(fn, { + retryable: retryableSqlite, + getWaitMs: getWaitMsSqlite, + maxRetries: 60, // a safety measure: getWaitMsSqlite() times out before this after 5000ms of waiting. + }) +} + +const retryableSqlite = (err: unknown) => { + return typeof err?.['code'] === 'string' && RETRY_ERRORS.has(err['code']) +} + +// based on sqlite's backoff strategy https://github.com/sqlite/sqlite/blob/91c8e65dd4bf17d21fbf8f7073565fe1a71c8948/src/main.c#L1704-L1713 +const getWaitMsSqlite = (n: number, timeout = 5000) => { + if (n < 0) return null + let delay: number + let prior: number + if (n < DELAYS.length) { + delay = DELAYS[n] + prior = TOTALS[n] + } else { + delay = last(DELAYS) + prior = last(TOTALS) + delay * (n - (DELAYS.length - 1)) + } + if (prior + delay > timeout) { + delay = timeout - prior + if (delay <= 0) return null + } + return delay +} + +const last = (arr: T[]) => arr[arr.length - 1] +const DELAYS = [1, 2, 5, 10, 15, 20, 25, 25, 25, 50, 50, 100] +const TOTALS = [0, 1, 3, 8, 18, 33, 53, 78, 103, 128, 178, 228] +const RETRY_ERRORS = new Set(['SQLITE_BUSY', 'SQLITE_BUSY_SNAPSHOT']) + export type Ref = ReferenceExpression export type DbRef = RawBuilder | ReturnType diff --git a/packages/pds/src/did-cache/index.ts b/packages/pds/src/did-cache/index.ts index faca59f35cb..526283090c5 100644 --- a/packages/pds/src/did-cache/index.ts +++ b/packages/pds/src/did-cache/index.ts @@ -25,23 +25,25 @@ export class DidSqliteCache implements DidCache { ): Promise { try { if (prevResult) { - await this.db.db - .updateTable('did_doc') - .set({ doc: JSON.stringify(doc), updatedAt: Date.now() }) - .where('did', '=', did) - .where('updatedAt', '=', prevResult.updatedAt) - .execute() + await this.db.executeWithRetry( + this.db.db + .updateTable('did_doc') + .set({ doc: JSON.stringify(doc), updatedAt: Date.now() }) + .where('did', '=', did) + .where('updatedAt', '=', prevResult.updatedAt), + ) } else { - await this.db.db - .insertInto('did_doc') - .values({ did, doc: JSON.stringify(doc), updatedAt: Date.now() }) - .onConflict((oc) => - oc.column('did').doUpdateSet({ - doc: excluded(this.db.db, 'doc'), - updatedAt: excluded(this.db.db, 'updatedAt'), - }), - ) - .executeTakeFirst() + await this.db.executeWithRetry( + this.db.db + .insertInto('did_doc') + .values({ did, doc: JSON.stringify(doc), updatedAt: Date.now() }) + .onConflict((oc) => + oc.column('did').doUpdateSet({ + doc: excluded(this.db.db, 'doc'), + updatedAt: excluded(this.db.db, 'updatedAt'), + }), + ), + ) } } catch (err) { didCacheLogger.error({ did, doc, err }, 'failed to cache did') @@ -98,10 +100,9 @@ export class DidSqliteCache implements DidCache { async clearEntry(did: string): Promise { try { - await this.db.db - .deleteFrom('did_doc') - .where('did', '=', did) - .executeTakeFirst() + await this.db.executeWithRetry( + this.db.db.deleteFrom('did_doc').where('did', '=', did), + ) } catch (err) { didCacheLogger.error({ did, err }, 'clearing did cache entry failed') } @@ -116,6 +117,7 @@ export class DidSqliteCache implements DidCache { } async migrateOrThrow() { + await this.db.ensureWal() await getMigrator(this.db).migrateToLatestOrThrow() } diff --git a/packages/pds/src/sequencer/sequencer.ts b/packages/pds/src/sequencer/sequencer.ts index b2ab77d2faf..7ea23db13f0 100644 --- a/packages/pds/src/sequencer/sequencer.ts +++ b/packages/pds/src/sequencer/sequencer.ts @@ -43,6 +43,7 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { } async start() { + await this.db.ensureWal() const migrator = getMigrator(this.db) await migrator.migrateToLatestOrThrow() const curr = await this.curr() @@ -188,7 +189,9 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { } async sequenceEvt(evt: RepoSeqInsert) { - await this.db.db.insertInto('repo_seq').values(evt).execute() + await this.db.executeWithRetry( + this.db.db.insertInto('repo_seq').values(evt), + ) this.crawlers.notifyOfUpdate() } @@ -212,11 +215,12 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { } async deleteAllForUser(did: string) { - await this.db.db - .deleteFrom('repo_seq') - .where('did', '=', did) - .where('eventType', '!=', 'tombstone') - .execute() + await this.db.executeWithRetry( + this.db.db + .deleteFrom('repo_seq') + .where('did', '=', did) + .where('eventType', '!=', 'tombstone'), + ) } }