Skip to content

Commit

Permalink
integrate read after write
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Feb 7, 2024
1 parent cc31a38 commit a61718e
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 92 deletions.
10 changes: 6 additions & 4 deletions packages/pds/src/api/app/bsky/actor/getProfile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import { OutputSchema } from '../../../../lexicon/types/app/bsky/actor/getProfil
import {
LocalViewer,
LocalRecords,
handleReadAfterWritePipeThrough,
handleReadAfterWrite,
} from '../../../../read-after-write'
import { pipethrough } from '../../../../pipethrough'

const METHOD_NSID = 'app.bsky.actor.getProfile'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.actor.getProfile({
auth: ctx.authVerifier.accessOrRole,
Expand All @@ -17,16 +19,16 @@ export default function (server: Server, ctx: AppContext) {
auth.credentials.type === 'access' ? auth.credentials.did : null
const res = await pipethrough(
ctx.cfg.bskyAppView.url,
'app.bsky.actor.getProfile',
METHOD_NSID,
params,
requester ? await ctx.appviewAuthHeaders(requester) : authPassthru(req),
)
if (!requester) {
return res
}
return handleReadAfterWritePipeThrough(
return handleReadAfterWrite(
ctx,
'app.bsky.actor.getProfile',
METHOD_NSID,
requester,
res,
getProfileMunge,
Expand Down
24 changes: 15 additions & 9 deletions packages/pds/src/api/app/bsky/actor/getProfiles.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
import AppContext from '../../../../context'
import { Server } from '../../../../lexicon'
import { OutputSchema } from '../../../../lexicon/types/app/bsky/actor/getProfiles'
import { pipethrough } from '../../../../pipethrough'
import {
LocalViewer,
handleReadAfterWrite,
LocalRecords,
} from '../../../../read-after-write'

const METHOD_NSID = 'app.bsky.actor.getProfiles'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.actor.getProfiles({
auth: ctx.authVerifier.access,
handler: async ({ auth, params }) => {
const requester = auth.credentials.did
const res = await ctx.appViewAgent.api.app.bsky.actor.getProfiles(

const res = await pipethrough(
ctx.cfg.bskyAppView.url,
METHOD_NSID,
params,
await ctx.appviewAuthHeaders(requester),
)
const hasSelf = res.data.profiles.some((prof) => prof.did === requester)
if (hasSelf) {
return await handleReadAfterWrite(ctx, requester, res, getProfilesMunge)
}
return {
encoding: 'application/json',
body: res.data,
}
return handleReadAfterWrite(
ctx,
METHOD_NSID,
requester,
res,
getProfilesMunge,
)
},
})
}
Expand All @@ -36,6 +41,7 @@ const getProfilesMunge = async (
): Promise<OutputSchema> => {
const localProf = local.profile
if (!localProf) return original

const profiles = original.profiles.map((prof) => {
if (prof.did !== requester) return prof
return localViewer.updateProfileDetailed(prof, localProf.record)
Expand Down
25 changes: 17 additions & 8 deletions packages/pds/src/api/app/bsky/feed/getActorLikes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,34 @@ import {
handleReadAfterWrite,
LocalRecords,
} from '../../../../read-after-write'
import { pipethrough } from '../../../../pipethrough'

const METHOD_NSID = 'app.bsky.feed.getActorLikes'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.feed.getActorLikes({
auth: ctx.authVerifier.accessOrRole,
handler: async ({ req, params, auth }) => {
const requester =
auth.credentials.type === 'access' ? auth.credentials.did : null

const res = await ctx.appViewAgent.api.app.bsky.feed.getActorLikes(
const res = await pipethrough(
ctx.cfg.bskyAppView.url,
METHOD_NSID,
params,
requester ? await ctx.appviewAuthHeaders(requester) : authPassthru(req),
)
if (requester) {
return await handleReadAfterWrite(ctx, requester, res, getAuthorMunge)
}
return {
encoding: 'application/json',
body: res.data,

if (!requester) {
return res
}

return await handleReadAfterWrite(
ctx,
METHOD_NSID,
requester,
res,
getAuthorMunge,
)
},
})
}
Expand Down
22 changes: 15 additions & 7 deletions packages/pds/src/api/app/bsky/feed/getAuthorFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,32 @@ import {
handleReadAfterWrite,
LocalRecords,
} from '../../../../read-after-write'
import { pipethrough } from '../../../../pipethrough'

const METHOD_NSID = 'app.bsky.actor.getProfile'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.feed.getAuthorFeed({
auth: ctx.authVerifier.accessOrRole,
handler: async ({ req, params, auth }) => {
const requester =
auth.credentials.type === 'access' ? auth.credentials.did : null
const res = await ctx.appViewAgent.api.app.bsky.feed.getAuthorFeed(
const res = await pipethrough(
ctx.cfg.bskyAppView.url,
METHOD_NSID,
params,
requester ? await ctx.appviewAuthHeaders(requester) : authPassthru(req),
)
if (requester) {
return await handleReadAfterWrite(ctx, requester, res, getAuthorMunge)
}
return {
encoding: 'application/json',
body: res.data,
if (!requester) {
return res
}
return await handleReadAfterWrite(
ctx,
METHOD_NSID,
requester,
res,
getAuthorMunge,
)
},
})
}
Expand Down
30 changes: 13 additions & 17 deletions packages/pds/src/api/app/bsky/feed/getPostThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ import {
LocalViewer,
getLocalLag,
getRepoRev,
handleReadAfterWrite,
LocalRecords,
RecordDescript,
handleReadAfterWrite,
formatMungedResponse,
} from '../../../../read-after-write'
import { pipethrough } from '../../../../pipethrough'

const METHOD_NSID = 'app.bsky.feed.getPostThread'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.feed.getPostThread({
Expand All @@ -30,25 +34,25 @@ export default function (server: Server, ctx: AppContext) {
auth.credentials.type === 'access' ? auth.credentials.did : null

if (!requester) {
const res = await ctx.appViewAgent.api.app.bsky.feed.getPostThread(
return pipethrough(
ctx.cfg.bskyAppView.url,
METHOD_NSID,
params,
authPassthru(req),
)

return {
encoding: 'application/json',
body: res.data,
}
}

try {
const res = await ctx.appViewAgent.api.app.bsky.feed.getPostThread(
const res = await pipethrough(
ctx.cfg.bskyAppView.url,
METHOD_NSID,
params,
await ctx.appviewAuthHeaders(requester),
)

return await handleReadAfterWrite(
ctx,
METHOD_NSID,
requester,
res,
getPostThreadMunge,
Expand All @@ -70,15 +74,7 @@ export default function (server: Server, ctx: AppContext) {
if (local === null) {
throw err
} else {
return {
encoding: 'application/json',
body: local.data,
headers: local.lag
? {
'Atproto-Upstream-Lag': local.lag.toString(10),
}
: undefined,
}
return formatMungedResponse(local.data, local.lag)
}
} else {
throw err
Expand Down
15 changes: 13 additions & 2 deletions packages/pds/src/api/app/bsky/feed/getTimeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,28 @@ import {
handleReadAfterWrite,
LocalRecords,
} from '../../../../read-after-write'
import { pipethrough } from '../../../../pipethrough'

const METHOD_NSID = 'app.bsky.feed.getTimeline'

export default function (server: Server, ctx: AppContext) {
server.app.bsky.feed.getTimeline({
auth: ctx.authVerifier.access,
handler: async ({ params, auth }) => {
const requester = auth.credentials.did
const res = await ctx.appViewAgent.api.app.bsky.feed.getTimeline(
const res = await pipethrough(
ctx.cfg.bskyAppView.url,
METHOD_NSID,
params,
await ctx.appviewAuthHeaders(requester),
)
return await handleReadAfterWrite(ctx, requester, res, getTimelineMunge)
return await handleReadAfterWrite(
ctx,
METHOD_NSID,
requester,
res,
getTimelineMunge,
)
},
})
}
Expand Down
50 changes: 5 additions & 45 deletions packages/pds/src/read-after-write/util.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Headers } from '@atproto/xrpc'
import { readStickyLogger as log } from '../logger'
import AppContext from '../context'
import { ApiRes, HandlerResponse, LocalRecords, MungeFn } from './types'
import { HandlerResponse, LocalRecords, MungeFn } from './types'
import { getRecordsSinceRev } from './viewer'
import { HandlerPipeThrough } from '@atproto/xrpc-server'
import { parseRes } from '../pipethrough'
Expand All @@ -24,61 +24,21 @@ export const getLocalLag = (local: LocalRecords): number | undefined => {
}

export const handleReadAfterWrite = async <T>(
ctx: AppContext,
requester: string,
res: ApiRes<T>,
munge: MungeFn<T>,
): Promise<HandlerResponse<T>> => {
try {
return await readAfterWriteInternal(ctx, requester, res, munge)
} catch (err) {
log.warn({ err, requester }, 'error in read after write munge')
return formatResponse(res.data)
}
}

export const readAfterWriteInternal = async <T>(
ctx: AppContext,
requester: string,
res: ApiRes<T>,
munge: MungeFn<T>,
): Promise<HandlerResponse<T>> => {
const rev = getRepoRev(res.headers)
if (!rev) return formatResponse(res.data)
return ctx.actorStore.read(requester, async (store) => {
const local = await getRecordsSinceRev(store, rev)
if (local.count === 0) {
return formatResponse(res.data)
}
const keypair = await ctx.actorStore.keypair(requester)
const localViewer = ctx.localViewer(store, keypair)
const data = await munge(localViewer, res.data, local, requester)
return formatResponse(data, getLocalLag(local))
})
}

export const handleReadAfterWritePipeThrough = async <T>(
ctx: AppContext,
nsid: string,
requester: string,
res: HandlerPipeThrough,
munge: MungeFn<T>,
): Promise<HandlerResponse<T> | HandlerPipeThrough> => {
try {
return await readAfterWritePipethroughInternal(
ctx,
nsid,
requester,
res,
munge,
)
return await readAfterWriteInternal(ctx, nsid, requester, res, munge)
} catch (err) {
log.warn({ err, requester }, 'error in read after write munge')
return res
}
}

export const readAfterWritePipethroughInternal = async <T>(
export const readAfterWriteInternal = async <T>(
ctx: AppContext,
nsid: string,
requester: string,
Expand All @@ -96,11 +56,11 @@ export const readAfterWritePipethroughInternal = async <T>(
const localViewer = ctx.localViewer(store, keypair)
const parsedRes = parseRes<T>(nsid, res)
const data = await munge(localViewer, parsedRes, local, requester)
return formatResponse(data, getLocalLag(local))
return formatMungedResponse(data, getLocalLag(local))
})
}

export const formatResponse = <T>(
export const formatMungedResponse = <T>(
body: T,
lag?: number,
): HandlerResponse<T> => ({
Expand Down

0 comments on commit a61718e

Please sign in to comment.