Skip to content

Commit

Permalink
fix subscribe repos tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Oct 31, 2023
1 parent 37807a8 commit 0cb274f
Showing 1 changed file with 104 additions and 90 deletions.
194 changes: 104 additions & 90 deletions packages/pds/tests/sync/subscribe-repos.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { TestNetworkNoAppView, SeedClient } from '@atproto/dev-env'
import AtpAgent from '@atproto/api'
import { cborDecode, HOUR, readFromGenerator, wait } from '@atproto/common'
import {
cborDecode,
HOUR,
MINUTE,
readFromGenerator,
wait,
} from '@atproto/common'
import { randomStr } from '@atproto/crypto'
import * as repo from '@atproto/repo'
import { readCar } from '@atproto/repo'
Expand Down Expand Up @@ -67,6 +73,25 @@ describe('repo subscribe repos', () => {
return evts
}

const getAllEvents = (userDid: string, frames: Frame[]) => {
const types: unknown[] = []
for (const frame of frames) {
if (frame instanceof MessageFrame) {
if (
(frame.header.t === '#commit' &&
(frame.body as CommitEvt).repo === userDid) ||
(frame.header.t === '#handle' &&
(frame.body as HandleEvt).did === userDid) ||
(frame.header.t === '#tombstone' &&
(frame.body as TombstoneEvt).did === userDid)
) {
types.push(frame.body)
}
}
}
return types
}

const getTombstoneEvts = (frames: Frame[]): TombstoneEvt[] => {
const evts: TombstoneEvt[] = []
for (const frame of frames) {
Expand Down Expand Up @@ -330,93 +355,82 @@ describe('repo subscribe repos', () => {
verifyTombstoneEvent(tombstoneEvts[1], baddie2)
})

// it('account deletions invalidate all seq ops', async () => {
// const baddie3 = (
// await sc.createAccount('baddie3.test', {
// email: '[email protected]',
// handle: 'baddie3.test',
// password: 'baddie3-pass',
// })
// ).did

// await randomPost(baddie3)
// await sc.updateHandle(baddie3, 'baddie3-update.test')

// await ctx.services.record(db).deleteForActor(baddie3)
// await ctx.services.repo(db).deleteRepo(baddie3)
// await ctx.services.account(db).deleteAccount(baddie3)

// const ws = new WebSocket(
// `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
// )

// const gen = byFrame(ws)
// const evts = await readTillCaughtUp(gen)
// ws.terminate()

// const didEvts = getAllEvents(baddie3, evts)
// expect(didEvts.length).toBe(1)
// verifyTombstoneEvent(didEvts[0], baddie3)
// })

// it('does not return invalidated events', async () => {
// await sc.updateHandle(alice, 'alice3.test')
// await sc.updateHandle(alice, 'alice4.test')
// await sc.updateHandle(alice, 'alice5.test')
// await sc.updateHandle(bob, 'bob3.test')
// await sc.updateHandle(bob, 'bob4.test')
// await sc.updateHandle(bob, 'bob5.test')

// const ws = new WebSocket(
// `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
// )

// const gen = byFrame(ws)
// const evts = await readTillCaughtUp(gen)
// ws.terminate()

// const handleEvts = getHandleEvts(evts)
// expect(handleEvts.length).toBe(2)
// verifyHandleEvent(handleEvts[0], alice, 'alice5.test')
// verifyHandleEvent(handleEvts[1], bob, 'bob5.test')
// })

// it('sends info frame on out of date cursor', async () => {
// // we rewrite the sequenceAt time for existing seqs to be past the backfill cutoff
// // then we create some new posts
// const overAnHourAgo = new Date(Date.now() - HOUR - MINUTE).toISOString()
// await db.db
// .updateTable('repo_seq')
// .set({ sequencedAt: overAnHourAgo })
// .execute()

// await makePosts()

// const ws = new WebSocket(
// `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
// )
// const [info, ...evts] = await readTillCaughtUp(byFrame(ws))
// ws.terminate()

// if (!(info instanceof MessageFrame)) {
// throw new Error('Expected first frame to be a MessageFrame')
// }
// expect(info.header.t).toBe('#info')
// const body = info.body as Record<string, unknown>
// expect(body.name).toEqual('OutdatedCursor')
// expect(evts.length).toBe(40)
// })

// it('errors on future cursor', async () => {
// const ws = new WebSocket(
// `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${100000}`,
// )
// const frames = await readTillCaughtUp(byFrame(ws))
// ws.terminate()
// expect(frames.length).toBe(1)
// if (!(frames[0] instanceof ErrorFrame)) {
// throw new Error('Expected ErrorFrame')
// }
// expect(frames[0].body.error).toBe('FutureCursor')
// })
it('account deletions invalidate all seq ops', async () => {
const baddie3 = (
await sc.createAccount('baddie3', {
email: '[email protected]',
handle: 'baddie3.test',
password: 'baddie3-pass',
})
).did

await randomPost(baddie3)
await sc.updateHandle(baddie3, 'baddie3-update.test')

await agent.api.com.atproto.server.requestAccountDelete(undefined, {
headers: sc.getHeaders(baddie3),
})
const { token } = await network.pds.ctx.db.db
.selectFrom('email_token')
.selectAll()
.where('purpose', '=', 'delete_account')
.where('did', '=', baddie3)
.executeTakeFirstOrThrow()
await agent.api.com.atproto.server.deleteAccount({
token,
did: baddie3,
password: sc.accounts[baddie3].password,
})

const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)

const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen)
ws.terminate()

const didEvts = getAllEvents(baddie3, evts)
expect(didEvts.length).toBe(1)
verifyTombstoneEvent(didEvts[0], baddie3)
})

it('sends info frame on out of date cursor', async () => {
// we rewrite the sequenceAt time for existing seqs to be past the backfill cutoff
// then we create some new posts
const overAnHourAgo = new Date(Date.now() - HOUR - MINUTE).toISOString()
await ctx.sequencer.db.db
.updateTable('repo_seq')
.set({ sequencedAt: overAnHourAgo })
.execute()

await makePosts()

const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const [info, ...evts] = await readTillCaughtUp(byFrame(ws))
ws.terminate()

if (!(info instanceof MessageFrame)) {
throw new Error('Expected first frame to be a MessageFrame')
}
expect(info.header.t).toBe('#info')
const body = info.body as Record<string, unknown>
expect(body.name).toEqual('OutdatedCursor')
expect(evts.length).toBe(40)
})

it('errors on future cursor', async () => {
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${100000}`,
)
const frames = await readTillCaughtUp(byFrame(ws))
ws.terminate()
expect(frames.length).toBe(1)
if (!(frames[0] instanceof ErrorFrame)) {
throw new Error('Expected ErrorFrame')
}
expect(frames[0].body.error).toBe('FutureCursor')
})
})

0 comments on commit 0cb274f

Please sign in to comment.