Skip to content

Commit

Permalink
implement listNotifications
Browse files Browse the repository at this point in the history
  • Loading branch information
devinivy committed Dec 13, 2023
1 parent d7fdc08 commit 10a10b8
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 162 deletions.
3 changes: 2 additions & 1 deletion packages/bsky/proto/bsky.proto
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,8 @@ message GetNotificationsRequest {
message Notification {
string uri = 1;
string reason = 2;
google.protobuf.Timestamp timestamp = 3;
string reason_subject = 3;
google.protobuf.Timestamp timestamp = 4;
}

message GetNotificationsResponse {
Expand Down
213 changes: 55 additions & 158 deletions packages/bsky/src/api/app/bsky/notification/listNotifications.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { jsonStringToLex } from '@atproto/lexicon'
import { mapDefined } from '@atproto/common'
import { Server } from '../../../../lexicon'
import { QueryParams } from '../../../../lexicon/types/app/bsky/notification/listNotifications'
import AppContext from '../../../../context'
import { Database } from '../../../../db'
import { notSoftDeletedClause } from '../../../../db/util'
import { paginate, TimeCidKeyset } from '../../../../db/pagination'
import { BlockAndMuteState, GraphService } from '../../../../services/graph'
import { ActorInfoMap, ActorService } from '../../../../services/actor'
import { getSelfLabels, Labels, LabelService } from '../../../../services/label'
import { createPipeline } from '../../../../pipeline'
import {
createPipelineNew,
HydrationFnInput,
PresentationFnInput,
RulesFnInput,
SkeletonFnInput,
} from '../../../../pipeline'
import { Hydrator } from '../../../../hydration/hydrator'
import { Views } from '../../../../views'
import { Notification } from '../../../../data-plane/gen/bsky_pb'
import { didFromUri } from '../../../../hydration/util'

export default function (server: Server, ctx: AppContext) {
const listNotifications = createPipeline(
const listNotifications = createPipelineNew(
skeleton,
hydration,
noBlockOrMutes,
Expand All @@ -22,17 +25,8 @@ export default function (server: Server, ctx: AppContext) {
server.app.bsky.notification.listNotifications({
auth: ctx.authVerifier,
handler: async ({ params, auth }) => {
const db = ctx.db.getReplica()
const actorService = ctx.services.actor(db)
const graphService = ctx.services.graph(db)
const labelService = ctx.services.label(db)
const viewer = auth.credentials.did

const result = await listNotifications(
{ ...params, viewer },
{ db, actorService, graphService, labelService },
)

const result = await listNotifications({ ...params, viewer }, ctx)
return {
encoding: 'application/json',
body: result,
Expand All @@ -42,138 +36,65 @@ export default function (server: Server, ctx: AppContext) {
}

const skeleton = async (
params: Params,
ctx: Context,
input: SkeletonFnInput<Context, Params>,
): Promise<SkeletonState> => {
const { db } = ctx
const { limit, cursor, viewer } = params
const { ref } = db.db.dynamic
const { params, ctx } = input
if (params.seenAt) {
throw new InvalidRequestError('The seenAt parameter is unsupported')
}
let notifBuilder = db.db
.selectFrom('notification as notif')
.where('notif.did', '=', viewer)
.where((clause) =>
clause
.where('reasonSubject', 'is', null)
.orWhereExists(
db.db
.selectFrom('record as subject')
.selectAll()
.whereRef('subject.uri', '=', ref('notif.reasonSubject')),
),
)
.select([
'notif.author as authorDid',
'notif.recordUri as uri',
'notif.recordCid as cid',
'notif.reason as reason',
'notif.reasonSubject as reasonSubject',
'notif.sortAt as indexedAt',
])

const keyset = new NotifsKeyset(ref('notif.sortAt'), ref('notif.recordCid'))
notifBuilder = paginate(notifBuilder, {
cursor,
limit,
keyset,
tryIndex: true,
})

const actorStateQuery = db.db
.selectFrom('actor_state')
.selectAll()
.where('did', '=', viewer)

const [notifs, actorState] = await Promise.all([
notifBuilder.execute(),
actorStateQuery.executeTakeFirst(),
const [res, lastSeenRes] = await Promise.all([
ctx.hydrator.dataplane.getNotifications({
actorDid: params.viewer,
cursor: params.cursor,
limit: params.limit,
}),
ctx.hydrator.dataplane.getNotificationSeen({
actorDid: params.viewer,
}),
])

return {
params,
notifs,
cursor: keyset.packFromResult(notifs),
lastSeenNotifs: actorState?.lastSeenNotifs,
notifs: res.notifications,
cursor: res.cursor,
lastSeenNotifs: lastSeenRes.timestamp?.toDate().toISOString(),
}
}

const hydration = async (state: SkeletonState, ctx: Context) => {
const { graphService, actorService, labelService, db } = ctx
const { params, notifs } = state
const { viewer } = params
const dids = notifs.map((notif) => notif.authorDid)
const uris = notifs.map((notif) => notif.uri)
const [actors, records, labels, bam] = await Promise.all([
actorService.views.profiles(dids, viewer),
getRecordMap(db, uris),
labelService.getLabelsForUris(uris),
graphService.getBlockAndMuteState(dids.map((did) => [viewer, did])),
])
return { ...state, actors, records, labels, bam }
}

const noBlockOrMutes = (state: HydrationState) => {
const { viewer } = state.params
state.notifs = state.notifs.filter(
(item) =>
!state.bam.block([viewer, item.authorDid]) &&
!state.bam.mute([viewer, item.authorDid]),
)
return state
const hydration = async (
input: HydrationFnInput<Context, Params, SkeletonState>,
) => {
const { skeleton, params, ctx } = input
return ctx.hydrator.hydrateNotifications(skeleton.notifs, params.viewer)
}

const presentation = (state: HydrationState) => {
const { notifs, cursor, actors, records, labels, lastSeenNotifs } = state
const notifications = mapDefined(notifs, (notif) => {
const author = actors[notif.authorDid]
const record = records[notif.uri]
if (!author || !record) return undefined
const recordLabels = labels[notif.uri] ?? []
const recordSelfLabels = getSelfLabels({
uri: notif.uri,
cid: notif.cid,
record,
})
return {
uri: notif.uri,
cid: notif.cid,
author,
reason: notif.reason,
reasonSubject: notif.reasonSubject || undefined,
record,
isRead: lastSeenNotifs ? notif.indexedAt <= lastSeenNotifs : false,
indexedAt: notif.indexedAt,
labels: [...recordLabels, ...recordSelfLabels],
}
const noBlockOrMutes = (
input: RulesFnInput<Context, Params, SkeletonState>,
) => {
const { skeleton, hydration, ctx } = input
skeleton.notifs = skeleton.notifs.filter((item) => {
const did = didFromUri(item.uri)
return (
!ctx.views.viewerBlockExists(did, hydration) &&
!ctx.views.viewerMuteExists(did, hydration)
)
})
return { notifications, cursor }
return skeleton
}

const getRecordMap = async (
db: Database,
uris: string[],
): Promise<RecordMap> => {
if (!uris.length) return {}
const { ref } = db.db.dynamic
const recordRows = await db.db
.selectFrom('record')
.select(['uri', 'json'])
.where('uri', 'in', uris)
.where(notSoftDeletedClause(ref('record')))
.execute()
return recordRows.reduce((acc, { uri, json }) => {
acc[uri] = jsonStringToLex(json) as Record<string, unknown>
return acc
}, {} as RecordMap)
const presentation = (
input: PresentationFnInput<Context, Params, SkeletonState>,
) => {
const { skeleton, hydration, ctx } = input
const { notifs, lastSeenNotifs, cursor } = skeleton
const notifications = mapDefined(notifs, (notif) =>
ctx.views.notification(notif, lastSeenNotifs, hydration),
)
return { notifications, cursor }
}

type Context = {
db: Database
actorService: ActorService
graphService: GraphService
labelService: LabelService
hydrator: Hydrator
views: Views
}

type Params = QueryParams & {
Expand All @@ -182,31 +103,7 @@ type Params = QueryParams & {

type SkeletonState = {
params: Params
notifs: NotifRow[]
notifs: Notification[]
lastSeenNotifs?: string
cursor?: string
}

type HydrationState = SkeletonState & {
bam: BlockAndMuteState
actors: ActorInfoMap
records: RecordMap
labels: Labels
}

type RecordMap = { [uri: string]: Record<string, unknown> }

type NotifRow = {
authorDid: string
uri: string
cid: string
reason: string
reasonSubject: string | null
indexedAt: string
}

class NotifsKeyset extends TimeCidKeyset<NotifRow> {
labelResult(result: NotifRow) {
return { primary: result.indexedAt, secondary: result.cid }
}
}
10 changes: 8 additions & 2 deletions packages/bsky/src/data-plane/gen/bsky_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3838,7 +3838,12 @@ export class Notification extends Message<Notification> {
reason = "";

/**
* @generated from field: google.protobuf.Timestamp timestamp = 3;
* @generated from field: string reason_subject = 3;
*/
reasonSubject = "";

/**
* @generated from field: google.protobuf.Timestamp timestamp = 4;
*/
timestamp?: Timestamp;

Expand All @@ -3852,7 +3857,8 @@ export class Notification extends Message<Notification> {
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "uri", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "reason", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 3, name: "timestamp", kind: "message", T: Timestamp },
{ no: 3, name: "reason_subject", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 4, name: "timestamp", kind: "message", T: Timestamp },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): Notification {
Expand Down
1 change: 1 addition & 0 deletions packages/bsky/src/data-plane/server/routes/notifs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
const notifications = notifsRes.map((notif) => ({
uri: notif.uri,
reason: notif.reason,
reasonSubject: notif.reasonSubject ?? undefined,
timestamp: Timestamp.fromDate(new Date(notif.sortAt)),
}))
return {
Expand Down
2 changes: 1 addition & 1 deletion packages/bsky/src/hydration/hydrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ export class Hydrator {
this.feed.getPosts(postUris), // reason: mention, reply, quote
this.feed.getLikes(likeUris), // reason: like
this.feed.getReposts(repostUris), // reason: repost
this.graph.getFollows(followUris),
this.graph.getFollows(followUris), // reason: follow
this.label.getLabelsForSubjects(uris),
this.hydrateProfiles(uris.map(didFromUri), viewer),
])
Expand Down
44 changes: 44 additions & 0 deletions packages/bsky/src/views/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
ImagesEmbed,
ImagesEmbedView,
MaybePostView,
NotificationView,
PostEmbedView,
RecordEmbed,
RecordEmbedView,
Expand All @@ -46,6 +47,8 @@ import {
} from './types'
import { Label } from '../hydration/label'
import { Repost } from '../hydration/feed'
import { RecordInfo } from '../hydration/util'
import { Notification } from '../data-plane/gen/bsky_pb'

export class Views {
constructor(public imgUriBuilder: ImageUriBuilder) {}
Expand Down Expand Up @@ -736,4 +739,45 @@ export class Views {
// @TODO
return undefined
}

notification(
notif: Notification,
lastSeenAt: string | undefined,
state: HydrationState,
): NotificationView | undefined {
if (!notif.timestamp || !notif.reason) return
const uri = new AtUri(notif.uri)
const authorDid = uri.hostname
const author = this.profile(authorDid, state)
if (!author) return
let recordInfo: RecordInfo<Record<string, unknown>> | null | undefined
if (uri.collection === ids.AppBskyFeedPost) {
recordInfo = state.posts?.get(notif.uri)
} else if (uri.collection === ids.AppBskyFeedLike) {
recordInfo = state.likes?.get(notif.uri)
} else if (uri.collection === ids.AppBskyFeedRepost) {
recordInfo = state.reposts?.get(notif.uri)
} else if (uri.collection === ids.AppBskyGraphFollow) {
recordInfo = state.follows?.get(notif.uri)
}
if (!recordInfo) return
const labels = state.labels?.get(notif.uri) ?? []
const selfLabels = this.selfLabels({
uri: notif.uri,
cid: recordInfo.cid.toString(),
record: recordInfo.record,
})
const indexedAt = notif.timestamp.toDate().toISOString()
return {
uri: notif.uri,
cid: recordInfo.cid.toString(),
author,
reason: notif.reason,
reasonSubject: notif.reasonSubject || undefined,
record: recordInfo.record,
isRead: lastSeenAt ? lastSeenAt >= indexedAt : false,
indexedAt: notif.timestamp.toDate().toISOString(),
labels: [...labels, ...selfLabels],
}
}
}
2 changes: 2 additions & 0 deletions packages/bsky/src/views/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export type {
} from '../lexicon/types/app/bsky/feed/defs'
export type { ListView } from '../lexicon/types/app/bsky/graph/defs'

export type { Notification as NotificationView } from '../lexicon/types/app/bsky/notification/listNotifications'

export type Embed = ImagesEmbed | ExternalEmbed | RecordEmbed | RecordWithMedia

export type EmbedView =
Expand Down

0 comments on commit 10a10b8

Please sign in to comment.