From a4e5abf5f4f7f855041c34d7cf945cf066566bd8 Mon Sep 17 00:00:00 2001 From: Daniel Holmgren Date: Fri, 1 Dec 2023 15:07:17 -0600 Subject: [PATCH 1/6] Fix snapshots for list items (#1911) fix snapshots for list items --- .../tests/views/__snapshots__/block-lists.test.ts.snap | 3 ++- .../tests/views/__snapshots__/mute-lists.test.ts.snap | 9 +++++---- .../pds/tests/proxied/__snapshots__/views.test.ts.snap | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/bsky/tests/views/__snapshots__/block-lists.test.ts.snap b/packages/bsky/tests/views/__snapshots__/block-lists.test.ts.snap index 1f4f42f2003..7f0989a5975 100644 --- a/packages/bsky/tests/views/__snapshots__/block-lists.test.ts.snap +++ b/packages/bsky/tests/views/__snapshots__/block-lists.test.ts.snap @@ -514,6 +514,7 @@ Object { "muted": false, }, }, + "uri": "record(5)", }, Object { "subject": Object { @@ -542,7 +543,7 @@ Object { "muted": false, }, }, - "uri": "record(5)", + "uri": "record(6)", }, ], "list": Object { diff --git a/packages/bsky/tests/views/__snapshots__/mute-lists.test.ts.snap b/packages/bsky/tests/views/__snapshots__/mute-lists.test.ts.snap index 8b88231fe3b..d4b11f0d235 100644 --- a/packages/bsky/tests/views/__snapshots__/mute-lists.test.ts.snap +++ b/packages/bsky/tests/views/__snapshots__/mute-lists.test.ts.snap @@ -495,6 +495,7 @@ Object { "muted": false, }, }, + "uri": "record(3)", }, Object { "subject": Object { @@ -503,7 +504,7 @@ Object { "labels": Array [], "viewer": Object { "blockedBy": false, - "followedBy": "record(3)", + "followedBy": "record(5)", "muted": true, "mutedByList": Object { "avatar": "https://bsky.public.url/img/avatar/plain/user(0)/cids(1)@jpeg", @@ -518,7 +519,7 @@ Object { }, }, }, - "uri": "record(3)", + "uri": "record(4)", }, Object { "subject": Object { @@ -531,7 +532,7 @@ Object { "labels": Array [], "viewer": Object { "blockedBy": false, - "following": "record(4)", + "following": "record(7)", "muted": true, "mutedByList": Object { "avatar": "https://bsky.public.url/img/avatar/plain/user(0)/cids(1)@jpeg", @@ -546,7 +547,7 @@ Object { }, }, }, - "uri": "record(5)", + "uri": "record(6)", }, ], "list": Object { diff --git a/packages/pds/tests/proxied/__snapshots__/views.test.ts.snap b/packages/pds/tests/proxied/__snapshots__/views.test.ts.snap index 5fcc0be8faf..f856407ccbc 100644 --- a/packages/pds/tests/proxied/__snapshots__/views.test.ts.snap +++ b/packages/pds/tests/proxied/__snapshots__/views.test.ts.snap @@ -3321,8 +3321,8 @@ Object { "labels": Array [], "viewer": Object { "blockedBy": false, - "followedBy": "record(4)", - "following": "record(3)", + "followedBy": "record(5)", + "following": "record(4)", "muted": false, }, }, From 378fc6132f621ca517897c9467ed5bba134b3776 Mon Sep 17 00:00:00 2001 From: devin ivy Date: Fri, 1 Dec 2023 18:10:23 -0500 Subject: [PATCH 2/6] Additional @atproto/api 0.6.24 changeset (#1912) api changeset --- .changeset/spotty-guests-taste.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/spotty-guests-taste.md diff --git a/.changeset/spotty-guests-taste.md b/.changeset/spotty-guests-taste.md new file mode 100644 index 00000000000..3d1d2b3e0ca --- /dev/null +++ b/.changeset/spotty-guests-taste.md @@ -0,0 +1,5 @@ +--- +'@atproto/api': patch +--- + +Contains breaking lexicon changes: removing legacy com.atproto admin endpoints, making uri field required on app.bsky list views. From 1f3fad2829df581221b5837998b97ec4c0147f0e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 1 Dec 2023 18:14:01 -0500 Subject: [PATCH 3/6] Version packages (#1909) Co-authored-by: github-actions[bot] --- .changeset/brave-swans-kiss.md | 5 ----- .changeset/spotty-guests-taste.md | 5 ----- packages/api/CHANGELOG.md | 11 +++++++++++ packages/api/package.json | 2 +- packages/aws/CHANGELOG.md | 7 +++++++ packages/aws/package.json | 2 +- packages/bsky/CHANGELOG.md | 11 +++++++++++ packages/bsky/package.json | 2 +- packages/dev-env/CHANGELOG.md | 12 ++++++++++++ packages/dev-env/package.json | 2 +- packages/lex-cli/CHANGELOG.md | 8 ++++++++ packages/lex-cli/package.json | 2 +- packages/lexicon/CHANGELOG.md | 7 +++++++ packages/lexicon/package.json | 2 +- packages/pds/CHANGELOG.md | 13 +++++++++++++ packages/pds/package.json | 2 +- packages/repo/CHANGELOG.md | 8 ++++++++ packages/repo/package.json | 2 +- packages/syntax/CHANGELOG.md | 6 ++++++ packages/syntax/package.json | 2 +- packages/xrpc-server/CHANGELOG.md | 7 +++++++ packages/xrpc-server/package.json | 2 +- packages/xrpc/CHANGELOG.md | 7 +++++++ packages/xrpc/package.json | 2 +- 24 files changed, 108 insertions(+), 21 deletions(-) delete mode 100644 .changeset/brave-swans-kiss.md delete mode 100644 .changeset/spotty-guests-taste.md diff --git a/.changeset/brave-swans-kiss.md b/.changeset/brave-swans-kiss.md deleted file mode 100644 index e6e42a1bcb2..00000000000 --- a/.changeset/brave-swans-kiss.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@atproto/syntax': patch ---- - -prevent unnecessary throw/catch on uri syntax diff --git a/.changeset/spotty-guests-taste.md b/.changeset/spotty-guests-taste.md deleted file mode 100644 index 3d1d2b3e0ca..00000000000 --- a/.changeset/spotty-guests-taste.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@atproto/api': patch ---- - -Contains breaking lexicon changes: removing legacy com.atproto admin endpoints, making uri field required on app.bsky list views. diff --git a/packages/api/CHANGELOG.md b/packages/api/CHANGELOG.md index ca4ed16617b..a2ecd8e4b5a 100644 --- a/packages/api/CHANGELOG.md +++ b/packages/api/CHANGELOG.md @@ -1,5 +1,16 @@ # @atproto/api +## 0.6.24 + +### Patch Changes + +- [#1912](https://github.com/bluesky-social/atproto/pull/1912) [`378fc613`](https://github.com/bluesky-social/atproto/commit/378fc6132f621ca517897c9467ed5bba134b3776) Thanks [@devinivy](https://github.com/devinivy)! - Contains breaking lexicon changes: removing legacy com.atproto admin endpoints, making uri field required on app.bsky list views. + +- Updated dependencies [[`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60)]: + - @atproto/syntax@0.1.5 + - @atproto/lexicon@0.3.1 + - @atproto/xrpc@0.4.1 + ## 0.6.23 ### Patch Changes diff --git a/packages/api/package.json b/packages/api/package.json index 8b7be93f6dc..9bf7c547b19 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/api", - "version": "0.6.24-next.1", + "version": "0.6.24", "license": "MIT", "description": "Client library for atproto and Bluesky", "keywords": [ diff --git a/packages/aws/CHANGELOG.md b/packages/aws/CHANGELOG.md index e49e47b3f5f..b804e0719e4 100644 --- a/packages/aws/CHANGELOG.md +++ b/packages/aws/CHANGELOG.md @@ -1,5 +1,12 @@ # @atproto/aws +## 0.1.6 + +### Patch Changes + +- Updated dependencies []: + - @atproto/repo@0.3.6 + ## 0.1.5 ### Patch Changes diff --git a/packages/aws/package.json b/packages/aws/package.json index e5f0b6c5507..949cfaa845e 100644 --- a/packages/aws/package.json +++ b/packages/aws/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/aws", - "version": "0.1.5", + "version": "0.1.6", "license": "MIT", "description": "Shared AWS cloud API helpers for atproto services", "keywords": [ diff --git a/packages/bsky/CHANGELOG.md b/packages/bsky/CHANGELOG.md index 7cbb585dfeb..0dd20ba567f 100644 --- a/packages/bsky/CHANGELOG.md +++ b/packages/bsky/CHANGELOG.md @@ -1,5 +1,16 @@ # @atproto/bsky +## 0.0.16 + +### Patch Changes + +- Updated dependencies [[`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60), [`378fc613`](https://github.com/bluesky-social/atproto/commit/378fc6132f621ca517897c9467ed5bba134b3776)]: + - @atproto/syntax@0.1.5 + - @atproto/api@0.6.24 + - @atproto/lexicon@0.3.1 + - @atproto/repo@0.3.6 + - @atproto/xrpc-server@0.4.2 + ## 0.0.15 ### Patch Changes diff --git a/packages/bsky/package.json b/packages/bsky/package.json index c713cd72227..ad86a2fff21 100644 --- a/packages/bsky/package.json +++ b/packages/bsky/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/bsky", - "version": "0.0.15", + "version": "0.0.16", "license": "MIT", "description": "Reference implementation of app.bsky App View (Bluesky API)", "keywords": [ diff --git a/packages/dev-env/CHANGELOG.md b/packages/dev-env/CHANGELOG.md index 62f533f6b96..c36d7e71574 100644 --- a/packages/dev-env/CHANGELOG.md +++ b/packages/dev-env/CHANGELOG.md @@ -1,5 +1,17 @@ # @atproto/dev-env +## 0.2.16 + +### Patch Changes + +- Updated dependencies [[`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60), [`378fc613`](https://github.com/bluesky-social/atproto/commit/378fc6132f621ca517897c9467ed5bba134b3776)]: + - @atproto/syntax@0.1.5 + - @atproto/api@0.6.24 + - @atproto/bsky@0.0.16 + - @atproto/lexicon@0.3.1 + - @atproto/pds@0.3.4 + - @atproto/xrpc-server@0.4.2 + ## 0.2.15 ### Patch Changes diff --git a/packages/dev-env/package.json b/packages/dev-env/package.json index eb6f31d6986..37c5b47630d 100644 --- a/packages/dev-env/package.json +++ b/packages/dev-env/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/dev-env", - "version": "0.2.15", + "version": "0.2.16", "license": "MIT", "description": "Local development environment helper for atproto development", "keywords": [ diff --git a/packages/lex-cli/CHANGELOG.md b/packages/lex-cli/CHANGELOG.md index b9c617f9a9d..29491171a36 100644 --- a/packages/lex-cli/CHANGELOG.md +++ b/packages/lex-cli/CHANGELOG.md @@ -1,5 +1,13 @@ # @atproto/lex-cli +## 0.2.5 + +### Patch Changes + +- Updated dependencies [[`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60)]: + - @atproto/syntax@0.1.5 + - @atproto/lexicon@0.3.1 + ## 0.2.4 ### Patch Changes diff --git a/packages/lex-cli/package.json b/packages/lex-cli/package.json index 2f57a55b3cb..19f77cbe0a9 100644 --- a/packages/lex-cli/package.json +++ b/packages/lex-cli/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/lex-cli", - "version": "0.2.4", + "version": "0.2.5", "license": "MIT", "description": "TypeScript codegen tool for atproto Lexicon schemas", "keywords": [ diff --git a/packages/lexicon/CHANGELOG.md b/packages/lexicon/CHANGELOG.md index 7194b0258ba..24e2ea99a7d 100644 --- a/packages/lexicon/CHANGELOG.md +++ b/packages/lexicon/CHANGELOG.md @@ -1,5 +1,12 @@ # @atproto/lexicon +## 0.3.1 + +### Patch Changes + +- Updated dependencies [[`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60)]: + - @atproto/syntax@0.1.5 + ## 0.3.0 ### Minor Changes diff --git a/packages/lexicon/package.json b/packages/lexicon/package.json index fc776e7c273..4f0b05d20d8 100644 --- a/packages/lexicon/package.json +++ b/packages/lexicon/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/lexicon", - "version": "0.3.0", + "version": "0.3.1", "license": "MIT", "description": "atproto Lexicon schema language library", "keywords": [ diff --git a/packages/pds/CHANGELOG.md b/packages/pds/CHANGELOG.md index 34cda77d01e..3a1bf615d79 100644 --- a/packages/pds/CHANGELOG.md +++ b/packages/pds/CHANGELOG.md @@ -1,5 +1,18 @@ # @atproto/pds +## 0.3.4 + +### Patch Changes + +- Updated dependencies [[`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60), [`378fc613`](https://github.com/bluesky-social/atproto/commit/378fc6132f621ca517897c9467ed5bba134b3776)]: + - @atproto/syntax@0.1.5 + - @atproto/api@0.6.24 + - @atproto/lexicon@0.3.1 + - @atproto/repo@0.3.6 + - @atproto/xrpc@0.4.1 + - @atproto/xrpc-server@0.4.2 + - @atproto/aws@0.1.6 + ## 0.3.3 ### Patch Changes diff --git a/packages/pds/package.json b/packages/pds/package.json index 857fd912611..c1f301c8fda 100644 --- a/packages/pds/package.json +++ b/packages/pds/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/pds", - "version": "0.3.3", + "version": "0.3.4", "license": "MIT", "description": "Reference implementation of atproto Personal Data Server (PDS)", "keywords": [ diff --git a/packages/repo/CHANGELOG.md b/packages/repo/CHANGELOG.md index 35c6c3b9c44..448005d16d2 100644 --- a/packages/repo/CHANGELOG.md +++ b/packages/repo/CHANGELOG.md @@ -1,5 +1,13 @@ # @atproto/repo +## 0.3.6 + +### Patch Changes + +- Updated dependencies [[`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60)]: + - @atproto/syntax@0.1.5 + - @atproto/lexicon@0.3.1 + ## 0.3.5 ### Patch Changes diff --git a/packages/repo/package.json b/packages/repo/package.json index c36cbcf668e..b6a3b87607e 100644 --- a/packages/repo/package.json +++ b/packages/repo/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/repo", - "version": "0.3.5", + "version": "0.3.6", "license": "MIT", "description": "atproto repo and MST implementation", "keywords": [ diff --git a/packages/syntax/CHANGELOG.md b/packages/syntax/CHANGELOG.md index bc243559bbf..ad736e658a0 100644 --- a/packages/syntax/CHANGELOG.md +++ b/packages/syntax/CHANGELOG.md @@ -1,5 +1,11 @@ # @atproto/syntax +## 0.1.5 + +### Patch Changes + +- [#1908](https://github.com/bluesky-social/atproto/pull/1908) [`3c0ef382`](https://github.com/bluesky-social/atproto/commit/3c0ef382c12a413cc971ae47ffb341236c545f60) Thanks [@gaearon](https://github.com/gaearon)! - prevent unnecessary throw/catch on uri syntax + ## 0.1.4 ### Patch Changes diff --git a/packages/syntax/package.json b/packages/syntax/package.json index 645926886f5..d6f0ea11fd6 100644 --- a/packages/syntax/package.json +++ b/packages/syntax/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/syntax", - "version": "0.1.4", + "version": "0.1.5", "license": "MIT", "description": "Validation for atproto identifiers and formats: DID, handle, NSID, AT URI, etc", "keywords": [ diff --git a/packages/xrpc-server/CHANGELOG.md b/packages/xrpc-server/CHANGELOG.md index 29db2da0d61..c96e2de140f 100644 --- a/packages/xrpc-server/CHANGELOG.md +++ b/packages/xrpc-server/CHANGELOG.md @@ -1,5 +1,12 @@ # @atproto/xrpc-server +## 0.4.2 + +### Patch Changes + +- Updated dependencies []: + - @atproto/lexicon@0.3.1 + ## 0.4.1 ### Patch Changes diff --git a/packages/xrpc-server/package.json b/packages/xrpc-server/package.json index d3d2fa63c98..b68ecb03ea4 100644 --- a/packages/xrpc-server/package.json +++ b/packages/xrpc-server/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/xrpc-server", - "version": "0.4.1", + "version": "0.4.2", "license": "MIT", "description": "atproto HTTP API (XRPC) server library", "keywords": [ diff --git a/packages/xrpc/CHANGELOG.md b/packages/xrpc/CHANGELOG.md index 76ffea62682..69977ee06d3 100644 --- a/packages/xrpc/CHANGELOG.md +++ b/packages/xrpc/CHANGELOG.md @@ -1,5 +1,12 @@ # @atproto/xrpc +## 0.4.1 + +### Patch Changes + +- Updated dependencies []: + - @atproto/lexicon@0.3.1 + ## 0.4.0 ### Minor Changes diff --git a/packages/xrpc/package.json b/packages/xrpc/package.json index 49cf7a06aa3..e2accc2750d 100644 --- a/packages/xrpc/package.json +++ b/packages/xrpc/package.json @@ -1,6 +1,6 @@ { "name": "@atproto/xrpc", - "version": "0.4.0", + "version": "0.4.1", "license": "MIT", "description": "atproto HTTP API (XRPC) client library", "keywords": [ From 9cec13ee46aabcda71a6245e28e96d968ed9a289 Mon Sep 17 00:00:00 2001 From: devin ivy Date: Fri, 1 Dec 2023 18:15:27 -0500 Subject: [PATCH 4/6] Do not generate notifs when post violates threadgate (#1901) * do not generate notifs when post violates threadgate * don't count threadgate-violating replies, style --- .../src/services/indexing/plugins/post.ts | 13 ++++++++++ .../bsky/tests/views/threadgating.test.ts | 26 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/packages/bsky/src/services/indexing/plugins/post.ts b/packages/bsky/src/services/indexing/plugins/post.ts index 5f2fca934ce..af581b3bdff 100644 --- a/packages/bsky/src/services/indexing/plugins/post.ts +++ b/packages/bsky/src/services/indexing/plugins/post.ts @@ -112,6 +112,7 @@ const insertFn = async ( obj.reply, ) if (invalidReplyRoot || violatesThreadGate) { + Object.assign(insertedPost, { invalidReplyRoot, violatesThreadGate }) await db .updateTable('post') .where('uri', '=', post.uri) @@ -241,6 +242,13 @@ const notifsForInsert = (obj: IndexedPost) => { } } + if (obj.post.violatesThreadGate) { + // don't generate reply notifications when post violates threadgate + return notifs + } + + // reply notifications + for (const ancestor of obj.ancestors ?? []) { if (ancestor.uri === obj.post.uri) continue // no need to notify for own post if (ancestor.height < REPLY_NOTIF_DEPTH) { @@ -353,6 +361,11 @@ const updateAggregates = async (db: DatabaseSchema, postIdx: IndexedPost) => { replyCount: db .selectFrom('post') .where('post.replyParent', '=', postIdx.post.replyParent) + .where((qb) => + qb + .where('post.violatesThreadGate', 'is', null) + .orWhere('post.violatesThreadGate', '=', false), + ) .select(countAll.as('count')), }) .onConflict((oc) => diff --git a/packages/bsky/tests/views/threadgating.test.ts b/packages/bsky/tests/views/threadgating.test.ts index 53e5961b595..5f530b33536 100644 --- a/packages/bsky/tests/views/threadgating.test.ts +++ b/packages/bsky/tests/views/threadgating.test.ts @@ -64,6 +64,32 @@ describe('views with thread gating', () => { await checkReplyDisabled(post.ref.uriStr, sc.dids.alice, true) }) + it('does not generate notifications when post violates threadgate.', async () => { + const post = await sc.post(sc.dids.carol, 'notifications') + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { post: post.ref.uriStr, createdAt: iso(), allow: [] }, + sc.getHeaders(sc.dids.carol), + ) + const reply = await sc.reply( + sc.dids.alice, + post.ref, + post.ref, + 'notifications reply', + ) + await network.processAll() + const { + data: { notifications }, + } = await agent.api.app.bsky.notification.listNotifications( + {}, + { headers: await network.serviceHeaders(sc.dids.carol) }, + ) + const notificationFromReply = notifications.find( + (notif) => notif.uri === reply.ref.uriStr, + ) + expect(notificationFromReply).toBeUndefined() + }) + it('applies gate for mention rule.', async () => { const post = await sc.post( sc.dids.carol, From 6d21cc1b01b00bb44b0f8e756529fa6dfb73d869 Mon Sep 17 00:00:00 2001 From: Daniel Holmgren Date: Fri, 1 Dec 2023 17:29:26 -0600 Subject: [PATCH 5/6] Add flag for running db migrations on appview (#1913) * add flag for running db migrations on appview * lint * another fix --- services/bsky/api.js | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/services/bsky/api.js b/services/bsky/api.js index cf63c951043..2e78d3bafec 100644 --- a/services/bsky/api.js +++ b/services/bsky/api.js @@ -32,6 +32,18 @@ const { const main = async () => { const env = getEnv() assert(env.dbPrimaryPostgresUrl, 'missing configuration for db') + + if (env.enableMigrations) { + // separate db needed for more permissions + const migrateDb = new PrimaryDatabase({ + url: env.dbMigratePostgresUrl, + schema: env.dbPostgresSchema, + poolSize: 2, + }) + await migrateDb.migrateToLatestOrThrow() + await migrateDb.close() + } + const db = new DatabaseCoordinator({ schema: env.dbPostgresSchema, primary: { @@ -102,12 +114,12 @@ const main = async () => { algos, }) // separate db needed for more permissions - const migrateDb = new PrimaryDatabase({ + const viewMaintainerDb = new PrimaryDatabase({ url: env.dbMigratePostgresUrl, schema: env.dbPostgresSchema, poolSize: 2, }) - const viewMaintainer = new ViewMaintainer(migrateDb, 1800) + const viewMaintainer = new ViewMaintainer(viewMaintainerDb, 1800) const viewMaintainerRunning = viewMaintainer.run() const periodicModerationEventReversal = new PeriodicModerationEventReversal( @@ -125,11 +137,12 @@ const main = async () => { await bsky.destroy() viewMaintainer.destroy() await viewMaintainerRunning - await migrateDb.close() + await viewMaintainerDb.close() }) } const getEnv = () => ({ + enableMigrations: process.env.ENABLE_MIGRATIONS === 'true', port: parseInt(process.env.PORT), version: process.env.BSKY_VERSION, dbMigratePostgresUrl: From cad30a7cc8a2195cfa66ce50c782c385fae97b30 Mon Sep 17 00:00:00 2001 From: devin ivy Date: Fri, 1 Dec 2023 19:01:31 -0500 Subject: [PATCH 6/6] Cleanup outdated notifications in appview, add daemon for similar tasks (#1893) * initial notification tidy logic * helper for maintenance across all appview users * tiny reorg * add bsky daemon to tidy notifications * tidy, add bsky daemon service entrypoint * test notifs tidy daemon, add stats * tidy * crash failed notification daemon * fix notification tidy constants --- packages/bsky/src/daemon/config.ts | 50 +++++ packages/bsky/src/daemon/context.ts | 27 +++ packages/bsky/src/daemon/index.ts | 79 ++++++++ packages/bsky/src/daemon/logger.ts | 6 + packages/bsky/src/daemon/notifications.ts | 50 +++++ packages/bsky/src/daemon/services.ts | 20 ++ packages/bsky/src/index.ts | 1 + packages/bsky/src/services/actor/index.ts | 29 +++ .../bsky/src/services/util/notification.ts | 70 +++++++ packages/bsky/tests/daemon.test.ts | 191 ++++++++++++++++++ packages/bsky/tests/views/profile.test.ts | 1 - services/bsky/daemon.js | 44 ++++ 12 files changed, 567 insertions(+), 1 deletion(-) create mode 100644 packages/bsky/src/daemon/config.ts create mode 100644 packages/bsky/src/daemon/context.ts create mode 100644 packages/bsky/src/daemon/index.ts create mode 100644 packages/bsky/src/daemon/logger.ts create mode 100644 packages/bsky/src/daemon/notifications.ts create mode 100644 packages/bsky/src/daemon/services.ts create mode 100644 packages/bsky/src/services/util/notification.ts create mode 100644 packages/bsky/tests/daemon.test.ts create mode 100644 services/bsky/daemon.js diff --git a/packages/bsky/src/daemon/config.ts b/packages/bsky/src/daemon/config.ts new file mode 100644 index 00000000000..e0e789203e4 --- /dev/null +++ b/packages/bsky/src/daemon/config.ts @@ -0,0 +1,50 @@ +import assert from 'assert' + +export interface DaemonConfigValues { + version: string + dbPostgresUrl: string + dbPostgresSchema?: string +} + +export class DaemonConfig { + constructor(private cfg: DaemonConfigValues) {} + + static readEnv(overrides?: Partial) { + const version = process.env.BSKY_VERSION || '0.0.0' + const dbPostgresUrl = + overrides?.dbPostgresUrl || process.env.DB_PRIMARY_POSTGRES_URL + const dbPostgresSchema = + overrides?.dbPostgresSchema || process.env.DB_POSTGRES_SCHEMA + assert(dbPostgresUrl) + return new DaemonConfig({ + version, + dbPostgresUrl, + dbPostgresSchema, + ...stripUndefineds(overrides ?? {}), + }) + } + + get version() { + return this.cfg.version + } + + get dbPostgresUrl() { + return this.cfg.dbPostgresUrl + } + + get dbPostgresSchema() { + return this.cfg.dbPostgresSchema + } +} + +function stripUndefineds( + obj: Record, +): Record { + const result = {} + Object.entries(obj).forEach(([key, val]) => { + if (val !== undefined) { + result[key] = val + } + }) + return result +} diff --git a/packages/bsky/src/daemon/context.ts b/packages/bsky/src/daemon/context.ts new file mode 100644 index 00000000000..dd3d5c1114f --- /dev/null +++ b/packages/bsky/src/daemon/context.ts @@ -0,0 +1,27 @@ +import { PrimaryDatabase } from '../db' +import { DaemonConfig } from './config' +import { Services } from './services' + +export class DaemonContext { + constructor( + private opts: { + db: PrimaryDatabase + cfg: DaemonConfig + services: Services + }, + ) {} + + get db(): PrimaryDatabase { + return this.opts.db + } + + get cfg(): DaemonConfig { + return this.opts.cfg + } + + get services(): Services { + return this.opts.services + } +} + +export default DaemonContext diff --git a/packages/bsky/src/daemon/index.ts b/packages/bsky/src/daemon/index.ts new file mode 100644 index 00000000000..61bcd8568f4 --- /dev/null +++ b/packages/bsky/src/daemon/index.ts @@ -0,0 +1,79 @@ +import { PrimaryDatabase } from '../db' +import { dbLogger } from '../logger' +import { DaemonConfig } from './config' +import { DaemonContext } from './context' +import { createServices } from './services' +import { ImageUriBuilder } from '../image/uri' +import { LabelCache } from '../label-cache' +import { NotificationsDaemon } from './notifications' +import logger from './logger' + +export { DaemonConfig } from './config' +export type { DaemonConfigValues } from './config' + +export class BskyDaemon { + public ctx: DaemonContext + public notifications: NotificationsDaemon + private dbStatsInterval: NodeJS.Timer + private notifStatsInterval: NodeJS.Timer + + constructor(opts: { + ctx: DaemonContext + notifications: NotificationsDaemon + }) { + this.ctx = opts.ctx + this.notifications = opts.notifications + } + + static create(opts: { db: PrimaryDatabase; cfg: DaemonConfig }): BskyDaemon { + const { db, cfg } = opts + const imgUriBuilder = new ImageUriBuilder('https://daemon.invalid') // will not be used by daemon + const labelCache = new LabelCache(db) + const services = createServices({ + imgUriBuilder, + labelCache, + }) + const ctx = new DaemonContext({ + db, + cfg, + services, + }) + const notifications = new NotificationsDaemon(ctx) + return new BskyDaemon({ ctx, notifications }) + } + + async start() { + const { db } = this.ctx + const pool = db.pool + this.notifications.run() + this.dbStatsInterval = setInterval(() => { + dbLogger.info( + { + idleCount: pool.idleCount, + totalCount: pool.totalCount, + waitingCount: pool.waitingCount, + }, + 'db pool stats', + ) + }, 10000) + this.notifStatsInterval = setInterval(() => { + logger.info( + { + count: this.notifications.count, + lastDid: this.notifications.lastDid, + }, + 'notifications daemon stats', + ) + }, 10000) + return this + } + + async destroy(): Promise { + await this.notifications.destroy() + await this.ctx.db.close() + clearInterval(this.dbStatsInterval) + clearInterval(this.notifStatsInterval) + } +} + +export default BskyDaemon diff --git a/packages/bsky/src/daemon/logger.ts b/packages/bsky/src/daemon/logger.ts new file mode 100644 index 00000000000..8599acc315e --- /dev/null +++ b/packages/bsky/src/daemon/logger.ts @@ -0,0 +1,6 @@ +import { subsystemLogger } from '@atproto/common' + +const logger: ReturnType = + subsystemLogger('bsky:daemon') + +export default logger diff --git a/packages/bsky/src/daemon/notifications.ts b/packages/bsky/src/daemon/notifications.ts new file mode 100644 index 00000000000..e8e884b37c2 --- /dev/null +++ b/packages/bsky/src/daemon/notifications.ts @@ -0,0 +1,50 @@ +import { tidyNotifications } from '../services/util/notification' +import DaemonContext from './context' +import logger from './logger' + +export class NotificationsDaemon { + ac = new AbortController() + running: Promise | undefined + count = 0 + lastDid: string | null = null + + constructor(private ctx: DaemonContext) {} + + run(opts?: RunOptions) { + if (this.running) return + this.count = 0 + this.lastDid = null + this.ac = new AbortController() + this.running = this.tidyNotifications({ + ...opts, + forever: opts?.forever !== false, // run forever by default + }) + .catch((err) => { + // allow this to cause an unhandled rejection, let deployment handle the crash. + logger.error({ err }, 'notifications daemon crashed') + throw err + }) + .finally(() => (this.running = undefined)) + } + + private async tidyNotifications(opts: RunOptions) { + const actorService = this.ctx.services.actor(this.ctx.db) + for await (const { did } of actorService.all(opts)) { + if (this.ac.signal.aborted) return + try { + await tidyNotifications(this.ctx.db, did) + this.count++ + this.lastDid = did + } catch (err) { + logger.warn({ err, did }, 'failed to tidy notifications for actor') + } + } + } + + async destroy() { + this.ac.abort() + await this.running + } +} + +type RunOptions = { forever?: boolean; batchSize?: number } diff --git a/packages/bsky/src/daemon/services.ts b/packages/bsky/src/daemon/services.ts new file mode 100644 index 00000000000..a4e7935523c --- /dev/null +++ b/packages/bsky/src/daemon/services.ts @@ -0,0 +1,20 @@ +import { PrimaryDatabase } from '../db' +import { ActorService } from '../services/actor' +import { ImageUriBuilder } from '../image/uri' +import { LabelCache } from '../label-cache' + +export function createServices(resources: { + imgUriBuilder: ImageUriBuilder + labelCache: LabelCache +}): Services { + const { imgUriBuilder, labelCache } = resources + return { + actor: ActorService.creator(imgUriBuilder, labelCache), + } +} + +export type Services = { + actor: FromDbPrimary +} + +type FromDbPrimary = (db: PrimaryDatabase) => T diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index 9e0075dce37..7ceba61f990 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -37,6 +37,7 @@ export { Redis } from './redis' export { ViewMaintainer } from './db/views' export { AppContext } from './context' export { makeAlgos } from './feed-gen' +export * from './daemon' export * from './indexer' export * from './ingester' export { MigrateModerationData } from './migrate-moderation-data' diff --git a/packages/bsky/src/services/actor/index.ts b/packages/bsky/src/services/actor/index.ts index a2f980ce71d..51be90892fc 100644 --- a/packages/bsky/src/services/actor/index.ts +++ b/packages/bsky/src/services/actor/index.ts @@ -1,4 +1,5 @@ import { sql } from 'kysely' +import { wait } from '@atproto/common' import { Database } from '../../db' import { notSoftDeletedClause } from '../../db/util' import { ActorViews } from './views' @@ -144,6 +145,34 @@ export class ActorService { .executeTakeFirst() return res?.repoRev ?? null } + + async *all( + opts: { batchSize?: number; forever?: boolean; cooldownMs?: number } = {}, + ) { + const { cooldownMs = 1000, batchSize = 1000, forever = false } = opts + const baseQuery = this.db.db + .selectFrom('actor') + .selectAll() + .orderBy('did') + .limit(batchSize) + while (true) { + let cursor: ActorResult | undefined + do { + const actors = cursor + ? await baseQuery.where('did', '>', cursor.did).execute() + : await baseQuery.execute() + for (const actor of actors) { + yield actor + } + cursor = actors.at(-1) + } while (cursor) + if (forever) { + await wait(cooldownMs) + } else { + return + } + } + } } type ActorResult = Actor diff --git a/packages/bsky/src/services/util/notification.ts b/packages/bsky/src/services/util/notification.ts new file mode 100644 index 00000000000..811e6e41713 --- /dev/null +++ b/packages/bsky/src/services/util/notification.ts @@ -0,0 +1,70 @@ +import { sql } from 'kysely' +import { countAll } from '../../db/util' +import { PrimaryDatabase } from '../../db' + +// i.e. 30 days before the last time the user checked their notifs +export const BEFORE_LAST_SEEN_DAYS = 30 +// i.e. 180 days before the latest unread notification +export const BEFORE_LATEST_UNREAD_DAYS = 180 +// don't consider culling unreads until they hit this threshold, and then enforce beforeLatestUnreadThresholdDays +export const UNREAD_KEPT_COUNT = 500 + +export const tidyNotifications = async (db: PrimaryDatabase, did: string) => { + const stats = await db.db + .selectFrom('notification') + .select([ + sql<0 | 1>`("sortAt" < "lastSeenNotifs")`.as('read'), + countAll.as('count'), + sql`min("sortAt")`.as('earliestAt'), + sql`max("sortAt")`.as('latestAt'), + sql`max("lastSeenNotifs")`.as('lastSeenAt'), + ]) + .leftJoin('actor_state', 'actor_state.did', 'notification.did') + .where('notification.did', '=', did) + .groupBy(sql`1`) // group by read (i.e. 1st column) + .execute() + const readStats = stats.find((stat) => stat.read) + const unreadStats = stats.find((stat) => !stat.read) + let readCutoffAt: Date | undefined + let unreadCutoffAt: Date | undefined + if (readStats) { + readCutoffAt = addDays( + new Date(readStats.lastSeenAt), + -BEFORE_LAST_SEEN_DAYS, + ) + } + if (unreadStats && unreadStats.count > UNREAD_KEPT_COUNT) { + unreadCutoffAt = addDays( + new Date(unreadStats.latestAt), + -BEFORE_LATEST_UNREAD_DAYS, + ) + } + // take most recent of read/unread cutoffs + const cutoffAt = greatest(readCutoffAt, unreadCutoffAt) + if (cutoffAt) { + // skip delete if it wont catch any notifications + const earliestAt = least(readStats?.earliestAt, unreadStats?.earliestAt) + if (earliestAt && earliestAt < cutoffAt.toISOString()) { + await db.db + .deleteFrom('notification') + .where('did', '=', did) + .where('sortAt', '<', cutoffAt.toISOString()) + .execute() + } + } +} + +const addDays = (date: Date, days: number) => { + date.setDate(date.getDate() + days) + return date +} + +const least = (a: T | undefined, b: T | undefined) => { + return a !== undefined && (b === undefined || a < b) ? a : b +} + +const greatest = (a: T | undefined, b: T | undefined) => { + return a !== undefined && (b === undefined || a > b) ? a : b +} + +type Ordered = string | number | Date diff --git a/packages/bsky/tests/daemon.test.ts b/packages/bsky/tests/daemon.test.ts new file mode 100644 index 00000000000..32f0d6617ab --- /dev/null +++ b/packages/bsky/tests/daemon.test.ts @@ -0,0 +1,191 @@ +import assert from 'assert' +import { AtUri } from '@atproto/api' +import { TestNetwork } from '@atproto/dev-env' +import { BskyDaemon, DaemonConfig, PrimaryDatabase } from '../src' +import usersSeed from './seeds/users' +import { countAll, excluded } from '../src/db/util' +import { NotificationsDaemon } from '../src/daemon/notifications' +import { + BEFORE_LAST_SEEN_DAYS, + BEFORE_LATEST_UNREAD_DAYS, + UNREAD_KEPT_COUNT, +} from '../src/services/util/notification' + +describe('daemon', () => { + let network: TestNetwork + let daemon: BskyDaemon + let db: PrimaryDatabase + let actors: { did: string }[] = [] + + beforeAll(async () => { + network = await TestNetwork.create({ + dbPostgresSchema: 'bsky_daemon', + }) + db = network.bsky.ctx.db.getPrimary() + daemon = BskyDaemon.create({ + db, + cfg: new DaemonConfig({ + version: network.bsky.ctx.cfg.version, + dbPostgresUrl: network.bsky.ctx.cfg.dbPrimaryPostgresUrl, + dbPostgresSchema: network.bsky.ctx.cfg.dbPostgresSchema, + }), + }) + const sc = network.getSeedClient() + await usersSeed(sc) + await network.processAll() + actors = await db.db.selectFrom('actor').selectAll().execute() + }) + + afterAll(async () => { + await network.close() + }) + + describe('notifications daemon', () => { + it('processes all dids', async () => { + for (const { did } of actors) { + await Promise.all([ + setLastSeen(daemon.ctx.db, { did }), + createNotifications(daemon.ctx.db, { + did, + daysAgo: 2 * BEFORE_LAST_SEEN_DAYS, + count: 1, + }), + ]) + } + await expect(countNotifications(db)).resolves.toBe(actors.length) + await runNotifsOnce(daemon.notifications) + await expect(countNotifications(db)).resolves.toBe(0) + }) + + it('removes read notifications older than threshold.', async () => { + const { did } = actors[0] + const lastSeenDaysAgo = 10 + await Promise.all([ + setLastSeen(daemon.ctx.db, { did, daysAgo: lastSeenDaysAgo }), + // read, delete + createNotifications(daemon.ctx.db, { + did, + daysAgo: lastSeenDaysAgo + BEFORE_LAST_SEEN_DAYS + 1, + count: 2, + }), + // read, keep + createNotifications(daemon.ctx.db, { + did, + daysAgo: lastSeenDaysAgo + BEFORE_LAST_SEEN_DAYS - 1, + count: 3, + }), + // unread, keep + createNotifications(daemon.ctx.db, { + did, + daysAgo: lastSeenDaysAgo - 1, + count: 4, + }), + ]) + await expect(countNotifications(db)).resolves.toBe(9) + await runNotifsOnce(daemon.notifications) + await expect(countNotifications(db)).resolves.toBe(7) + await clearNotifications(db) + }) + + it('removes unread notifications older than threshold.', async () => { + const { did } = actors[0] + await Promise.all([ + setLastSeen(daemon.ctx.db, { + did, + daysAgo: 2 * BEFORE_LATEST_UNREAD_DAYS, // all are unread + }), + createNotifications(daemon.ctx.db, { + did, + daysAgo: 0, + count: 1, + }), + createNotifications(daemon.ctx.db, { + did, + daysAgo: BEFORE_LATEST_UNREAD_DAYS - 1, + count: 99, + }), + createNotifications(daemon.ctx.db, { + did, + daysAgo: BEFORE_LATEST_UNREAD_DAYS + 1, + count: 400, + }), + ]) + await expect(countNotifications(db)).resolves.toBe(UNREAD_KEPT_COUNT) + await runNotifsOnce(daemon.notifications) + // none removed when within UNREAD_KEPT_COUNT + await expect(countNotifications(db)).resolves.toBe(UNREAD_KEPT_COUNT) + // add one more, tip over UNREAD_KEPT_COUNT + await createNotifications(daemon.ctx.db, { + did, + daysAgo: BEFORE_LATEST_UNREAD_DAYS + 1, + count: 1, + }) + await runNotifsOnce(daemon.notifications) + // removed all older than BEFORE_LATEST_UNREAD_DAYS + await expect(countNotifications(db)).resolves.toBe(100) + await clearNotifications(db) + }) + }) + + const runNotifsOnce = async (notifsDaemon: NotificationsDaemon) => { + assert(!notifsDaemon.running, 'notifications daemon is already running') + notifsDaemon.run({ forever: false, batchSize: 2 }) + await notifsDaemon.running + } + + const setLastSeen = async ( + db: PrimaryDatabase, + opts: { did: string; daysAgo?: number }, + ) => { + const { did, daysAgo = 0 } = opts + const lastSeenAt = new Date() + lastSeenAt.setDate(lastSeenAt.getDate() - daysAgo) + await db.db + .insertInto('actor_state') + .values({ did, lastSeenNotifs: lastSeenAt.toISOString() }) + .onConflict((oc) => + oc.column('did').doUpdateSet({ + lastSeenNotifs: excluded(db.db, 'lastSeenNotifs'), + }), + ) + .execute() + } + + const createNotifications = async ( + db: PrimaryDatabase, + opts: { + did: string + count: number + daysAgo: number + }, + ) => { + const { did, count, daysAgo } = opts + const sortAt = new Date() + sortAt.setDate(sortAt.getDate() - daysAgo) + await db.db + .insertInto('notification') + .values( + [...Array(count)].map(() => ({ + did, + author: did, + reason: 'none', + recordCid: 'bafycid', + recordUri: AtUri.make(did, 'invalid.collection', 'self').toString(), + sortAt: sortAt.toISOString(), + })), + ) + .execute() + } + + const clearNotifications = async (db: PrimaryDatabase) => { + await db.db.deleteFrom('notification').execute() + } + + const countNotifications = async (db: PrimaryDatabase) => { + const { count } = await db.db + .selectFrom('notification') + .select(countAll.as('count')) + .executeTakeFirstOrThrow() + return count + } +}) diff --git a/packages/bsky/tests/views/profile.test.ts b/packages/bsky/tests/views/profile.test.ts index d4e0c718bed..726fb990a0d 100644 --- a/packages/bsky/tests/views/profile.test.ts +++ b/packages/bsky/tests/views/profile.test.ts @@ -25,7 +25,6 @@ describe('pds profile views', () => { sc = network.getSeedClient() await basicSeed(sc) await network.processAll() - await network.bsky.processAll() alice = sc.dids.alice bob = sc.dids.bob dan = sc.dids.dan diff --git a/services/bsky/daemon.js b/services/bsky/daemon.js new file mode 100644 index 00000000000..bd8322ab58f --- /dev/null +++ b/services/bsky/daemon.js @@ -0,0 +1,44 @@ +'use strict' /* eslint-disable */ + +require('dd-trace/init') // Only works with commonjs + +// Tracer code above must come before anything else +const { PrimaryDatabase, DaemonConfig, BskyDaemon } = require('@atproto/bsky') + +const main = async () => { + const env = getEnv() + const db = new PrimaryDatabase({ + url: env.dbPostgresUrl, + schema: env.dbPostgresSchema, + poolSize: env.dbPoolSize, + poolMaxUses: env.dbPoolMaxUses, + poolIdleTimeoutMs: env.dbPoolIdleTimeoutMs, + }) + const cfg = DaemonConfig.readEnv({ + version: env.version, + dbPostgresUrl: env.dbPostgresUrl, + dbPostgresSchema: env.dbPostgresSchema, + }) + const daemon = BskyDaemon.create({ db, cfg }) + await daemon.start() + process.on('SIGTERM', async () => { + await daemon.destroy() + }) +} + +const getEnv = () => ({ + version: process.env.BSKY_VERSION, + dbPostgresUrl: + process.env.DB_PRIMARY_POSTGRES_URL || process.env.DB_POSTGRES_URL, + dbPostgresSchema: process.env.DB_POSTGRES_SCHEMA || undefined, + dbPoolSize: maybeParseInt(process.env.DB_POOL_SIZE), + dbPoolMaxUses: maybeParseInt(process.env.DB_POOL_MAX_USES), + dbPoolIdleTimeoutMs: maybeParseInt(process.env.DB_POOL_IDLE_TIMEOUT_MS), +}) + +const maybeParseInt = (str) => { + const parsed = parseInt(str) + return isNaN(parsed) ? undefined : parsed +} + +main()