From 2aa24d58aae9cfbc42d6a100d818d5a44e7d84ea Mon Sep 17 00:00:00 2001 From: Devin Ivy Date: Thu, 28 Sep 2023 14:48:11 -0400 Subject: [PATCH 1/4] add support for a bunny.net img invalidator --- packages/aws/package.json | 1 + packages/aws/src/cloudfront.ts | 7 +------ packages/aws/src/index.ts | 2 ++ packages/bsky/src/indexer/subscription.ts | 22 +--------------------- packages/common-web/src/async.ts | 22 ++++++++++++++++++++++ pnpm-lock.yaml | 3 +++ services/bsky/api.js | 23 +++++++++++++++-------- services/bsky/indexer.js | 23 +++++++++++++++-------- 8 files changed, 60 insertions(+), 43 deletions(-) diff --git a/packages/aws/package.json b/packages/aws/package.json index 58bb98e0f80..8bf82637175 100644 --- a/packages/aws/package.json +++ b/packages/aws/package.json @@ -24,6 +24,7 @@ "update-main-to-dist": "node ../../update-main-to-dist.js packages/aws" }, "dependencies": { + "@atproto/common": "workspace:^", "@atproto/crypto": "workspace:^", "@atproto/repo": "workspace:^", "@aws-sdk/client-cloudfront": "^3.261.0", diff --git a/packages/aws/src/cloudfront.ts b/packages/aws/src/cloudfront.ts index 30708caee25..0577e4cc613 100644 --- a/packages/aws/src/cloudfront.ts +++ b/packages/aws/src/cloudfront.ts @@ -1,4 +1,5 @@ import * as aws from '@aws-sdk/client-cloudfront' +import { ImageInvalidator } from './types' export type CloudfrontConfig = { distributionId: string @@ -33,9 +34,3 @@ export class CloudfrontInvalidator implements ImageInvalidator { } export default CloudfrontInvalidator - -// @NOTE keep in sync with same interface in pds/src/image/invalidator.ts -// this is separate to avoid the dependency on @atproto/pds. -interface ImageInvalidator { - invalidate(subject: string, paths: string[]): Promise -} diff --git a/packages/aws/src/index.ts b/packages/aws/src/index.ts index a836847afc2..9cdf4d1c5ae 100644 --- a/packages/aws/src/index.ts +++ b/packages/aws/src/index.ts @@ -1,3 +1,5 @@ export * from './kms' export * from './s3' export * from './cloudfront' +export * from './bunny' +export * from './types' diff --git a/packages/bsky/src/indexer/subscription.ts b/packages/bsky/src/indexer/subscription.ts index 76c9b7297df..abc672db3b0 100644 --- a/packages/bsky/src/indexer/subscription.ts +++ b/packages/bsky/src/indexer/subscription.ts @@ -1,7 +1,7 @@ import assert from 'node:assert' import { CID } from 'multiformats/cid' import { AtUri } from '@atproto/syntax' -import { cborDecode, wait } from '@atproto/common' +import { cborDecode, wait, handleAllSettledErrors } from '@atproto/common' import { DisconnectError } from '@atproto/xrpc-server' import { WriteOpAction, @@ -343,23 +343,3 @@ type PreparedDelete = { } type PreparedWrite = PreparedCreate | PreparedUpdate | PreparedDelete - -function handleAllSettledErrors(results: PromiseSettledResult[]) { - const errors = results.filter(isRejected).map((res) => res.reason) - if (errors.length === 0) { - return - } - if (errors.length === 1) { - throw errors[0] - } - throw new AggregateError( - errors, - 'Multiple errors: ' + errors.map((err) => err?.message).join('\n'), - ) -} - -function isRejected( - result: PromiseSettledResult, -): result is PromiseRejectedResult { - return result.status === 'rejected' -} diff --git a/packages/common-web/src/async.ts b/packages/common-web/src/async.ts index 9e173d835fe..01e381ed337 100644 --- a/packages/common-web/src/async.ts +++ b/packages/common-web/src/async.ts @@ -124,3 +124,25 @@ export class AsyncBufferFullError extends Error { super(`ReachedMaxBufferSize: ${maxSize}`) } } + +export const handleAllSettledErrors = ( + results: PromiseSettledResult[], +) => { + const errors = results.filter(isRejected).map((res) => res.reason) + if (errors.length === 0) { + return + } + if (errors.length === 1) { + throw errors[0] + } + throw new AggregateError( + errors, + 'Multiple errors: ' + errors.map((err) => err?.message).join('\n'), + ) +} + +const isRejected = ( + result: PromiseSettledResult, +): result is PromiseRejectedResult => { + return result.status === 'rejected' +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 80681385373..bffa6c833da 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -123,6 +123,9 @@ importers: packages/aws: dependencies: + '@atproto/common': + specifier: workspace:^ + version: link:../common '@atproto/crypto': specifier: workspace:^ version: link:../crypto diff --git a/services/bsky/api.js b/services/bsky/api.js index ec38c55ae55..4c1844136ac 100644 --- a/services/bsky/api.js +++ b/services/bsky/api.js @@ -13,7 +13,7 @@ require('dd-trace') // Only works with commonjs // Tracer code above must come before anything else const path = require('path') const assert = require('assert') -const { CloudfrontInvalidator } = require('@atproto/aws') +const { BunnyInvalidator, CloudfrontInvalidator } = require('@atproto/aws') const { DatabaseCoordinator, PrimaryDatabase, @@ -59,17 +59,23 @@ const main = async () => { imgUriEndpoint: env.imgUriEndpoint, blobCacheLocation: env.blobCacheLocation, }) - const cfInvalidator = env.cfDistributionId - ? new CloudfrontInvalidator({ - distributionId: env.cfDistributionId, - pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, - }) - : undefined + let imgInvalidator + if (env.bunnyAccessKey) { + imgInvalidator = new BunnyInvalidator({ + accessKey: env.bunnyAccessKey, + urlPrefix: cfg.imgUriEndpoint, + }) + } else if (env.cfDistributionId) { + imgInvalidator = new CloudfrontInvalidator({ + distributionId: env.cfDistributionId, + pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, + }) + } const algos = env.feedPublisherDid ? makeAlgos(env.feedPublisherDid) : {} const bsky = BskyAppView.create({ db, config: cfg, - imgInvalidator: cfInvalidator, + imgInvalidator, algos, }) // separate db needed for more permissions @@ -127,6 +133,7 @@ const getEnv = () => ({ imgUriKey: process.env.IMG_URI_KEY, imgUriEndpoint: process.env.IMG_URI_ENDPOINT, blobCacheLocation: process.env.BLOB_CACHE_LOC, + bunnyAccessKey: process.env.BUNNY_ACCESS_KEY, cfDistributionId: process.env.CF_DISTRIBUTION_ID, feedPublisherDid: process.env.FEED_PUBLISHER_DID, }) diff --git a/services/bsky/indexer.js b/services/bsky/indexer.js index 7ab287133df..d8f7264d411 100644 --- a/services/bsky/indexer.js +++ b/services/bsky/indexer.js @@ -3,7 +3,7 @@ require('dd-trace/init') // Only works with commonjs // Tracer code above must come before anything else -const { CloudfrontInvalidator } = require('@atproto/aws') +const { CloudfrontInvalidator, BunnyInvalidator } = require('@atproto/aws') const { IndexerConfig, BskyIndexer, @@ -25,12 +25,18 @@ const main = async () => { dbPostgresUrl: env.dbPostgresUrl, dbPostgresSchema: env.dbPostgresSchema, }) - const cfInvalidator = env.cfDistributionId - ? new CloudfrontInvalidator({ - distributionId: env.cfDistributionId, - pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, - }) - : undefined + let imgInvalidator + if (env.bunnyAccessKey) { + imgInvalidator = new BunnyInvalidator({ + accessKey: env.bunnyAccessKey, + urlPrefix: cfg.imgUriEndpoint, + }) + } else if (env.cfDistributionId) { + imgInvalidator = new CloudfrontInvalidator({ + distributionId: env.cfDistributionId, + pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, + }) + } const redis = new Redis( cfg.redisSentinelName ? { @@ -47,7 +53,7 @@ const main = async () => { db, redis, cfg, - imgInvalidator: cfInvalidator, + imgInvalidator, }) await indexer.start() process.on('SIGTERM', async () => { @@ -77,6 +83,7 @@ const getEnv = () => ({ dbPoolSize: maybeParseInt(process.env.DB_POOL_SIZE), dbPoolMaxUses: maybeParseInt(process.env.DB_POOL_MAX_USES), dbPoolIdleTimeoutMs: maybeParseInt(process.env.DB_POOL_IDLE_TIMEOUT_MS), + bunnyAccessKey: process.env.BUNNY_ACCESS_KEY, cfDistributionId: process.env.CF_DISTRIBUTION_ID, imgUriEndpoint: process.env.IMG_URI_ENDPOINT, }) From 21417d6628b53e33ab2cddd36e8822e6556e56b8 Mon Sep 17 00:00:00 2001 From: Devin Ivy Date: Thu, 28 Sep 2023 14:51:56 -0400 Subject: [PATCH 2/4] tidy --- packages/aws/src/bunny.ts | 36 ++++++++++++++++++++++++++++++++++++ packages/aws/src/types.ts | 5 +++++ 2 files changed, 41 insertions(+) create mode 100644 packages/aws/src/bunny.ts create mode 100644 packages/aws/src/types.ts diff --git a/packages/aws/src/bunny.ts b/packages/aws/src/bunny.ts new file mode 100644 index 00000000000..62cfeedc891 --- /dev/null +++ b/packages/aws/src/bunny.ts @@ -0,0 +1,36 @@ +import { handleAllSettledErrors } from '@atproto/common' +import { ImageInvalidator } from './types' + +export type BunnyConfig = { + accessKey: string + urlPrefix: string +} + +const API_PURGE_URL = 'https://api.bunny.net/purge' + +export class BunnyInvalidator implements ImageInvalidator { + constructor(public cfg: BunnyConfig) {} + async invalidate(_subject: string, paths: string[]) { + const results = await Promise.allSettled( + paths.map(async (path) => + purgeUrl({ + url: this.cfg.urlPrefix + path, + accessKey: this.cfg.accessKey, + }), + ), + ) + handleAllSettledErrors(results) + } +} + +export default BunnyInvalidator + +async function purgeUrl(opts: { accessKey: string; url: string }) { + const search = new URLSearchParams() + search.set('async', 'true') + search.set('url', opts.url) + await fetch(API_PURGE_URL + '?' + search.toString(), { + method: 'post', + headers: { AccessKey: opts.accessKey }, + }) +} diff --git a/packages/aws/src/types.ts b/packages/aws/src/types.ts new file mode 100644 index 00000000000..9c5a3e2dd85 --- /dev/null +++ b/packages/aws/src/types.ts @@ -0,0 +1,5 @@ +// @NOTE keep in sync with same interface in bsky/src/image/invalidator.ts +// this is separate to avoid the dependency on @atproto/bsky. +export interface ImageInvalidator { + invalidate(subject: string, paths: string[]): Promise +} From 14fda58b39cdaf8c7f87cef1523dfa5774b1671e Mon Sep 17 00:00:00 2001 From: Devin Ivy Date: Thu, 5 Oct 2023 12:03:52 -0400 Subject: [PATCH 3/4] support multiple image invalidators --- packages/aws/src/index.ts | 1 + services/bsky/api.js | 41 ++++++++++++++++++++++++++++----------- services/bsky/indexer.js | 34 ++++++++++++++++++++++---------- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/packages/aws/src/index.ts b/packages/aws/src/index.ts index 9cdf4d1c5ae..aa11f2c7ef0 100644 --- a/packages/aws/src/index.ts +++ b/packages/aws/src/index.ts @@ -2,4 +2,5 @@ export * from './kms' export * from './s3' export * from './cloudfront' export * from './bunny' +export * from './util' export * from './types' diff --git a/services/bsky/api.js b/services/bsky/api.js index 4c1844136ac..5363f7661f0 100644 --- a/services/bsky/api.js +++ b/services/bsky/api.js @@ -13,7 +13,11 @@ require('dd-trace') // Only works with commonjs // Tracer code above must come before anything else const path = require('path') const assert = require('assert') -const { BunnyInvalidator, CloudfrontInvalidator } = require('@atproto/aws') +const { + BunnyInvalidator, + CloudfrontInvalidator, + MultiImageInvalidator, +} = require('@atproto/aws') const { DatabaseCoordinator, PrimaryDatabase, @@ -59,18 +63,33 @@ const main = async () => { imgUriEndpoint: env.imgUriEndpoint, blobCacheLocation: env.blobCacheLocation, }) + + // configure zero, one, or both image invalidators let imgInvalidator - if (env.bunnyAccessKey) { - imgInvalidator = new BunnyInvalidator({ - accessKey: env.bunnyAccessKey, - urlPrefix: cfg.imgUriEndpoint, - }) - } else if (env.cfDistributionId) { - imgInvalidator = new CloudfrontInvalidator({ - distributionId: env.cfDistributionId, - pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, - }) + const bunnyInvalidator = env.bunnyAccessKey + ? new BunnyInvalidator({ + accessKey: env.bunnyAccessKey, + urlPrefix: cfg.imgUriEndpoint, + }) + : undefined + const cfInvalidator = env.cfDistributionId + ? new CloudfrontInvalidator({ + distributionId: env.cfDistributionId, + pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, + }) + : undefined + + if (bunnyInvalidator && imgInvalidator) { + imgInvalidator = new MultiImageInvalidator([ + bunnyInvalidator, + imgInvalidator, + ]) + } else if (bunnyInvalidator) { + imgInvalidator = bunnyInvalidator + } else if (cfInvalidator) { + imgInvalidator = cfInvalidator } + const algos = env.feedPublisherDid ? makeAlgos(env.feedPublisherDid) : {} const bsky = BskyAppView.create({ db, diff --git a/services/bsky/indexer.js b/services/bsky/indexer.js index d8f7264d411..beac2c114d7 100644 --- a/services/bsky/indexer.js +++ b/services/bsky/indexer.js @@ -25,18 +25,32 @@ const main = async () => { dbPostgresUrl: env.dbPostgresUrl, dbPostgresSchema: env.dbPostgresSchema, }) + + // configure zero, one, or both image invalidators let imgInvalidator - if (env.bunnyAccessKey) { - imgInvalidator = new BunnyInvalidator({ - accessKey: env.bunnyAccessKey, - urlPrefix: cfg.imgUriEndpoint, - }) - } else if (env.cfDistributionId) { - imgInvalidator = new CloudfrontInvalidator({ - distributionId: env.cfDistributionId, - pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, - }) + const bunnyInvalidator = env.bunnyAccessKey + ? new BunnyInvalidator({ + accessKey: env.bunnyAccessKey, + urlPrefix: cfg.imgUriEndpoint, + }) + : undefined + const cfInvalidator = env.cfDistributionId + ? new CloudfrontInvalidator({ + distributionId: env.cfDistributionId, + pathPrefix: cfg.imgUriEndpoint && new URL(cfg.imgUriEndpoint).pathname, + }) + : undefined + if (bunnyInvalidator && imgInvalidator) { + imgInvalidator = new MultiImageInvalidator([ + bunnyInvalidator, + imgInvalidator, + ]) + } else if (bunnyInvalidator) { + imgInvalidator = bunnyInvalidator + } else if (cfInvalidator) { + imgInvalidator = cfInvalidator } + const redis = new Redis( cfg.redisSentinelName ? { From 8e5bc52b0e09449026f66ac3849817220f290f6e Mon Sep 17 00:00:00 2001 From: Devin Ivy Date: Thu, 5 Oct 2023 12:04:05 -0400 Subject: [PATCH 4/4] tidy --- packages/aws/src/util.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 packages/aws/src/util.ts diff --git a/packages/aws/src/util.ts b/packages/aws/src/util.ts new file mode 100644 index 00000000000..8f603055bea --- /dev/null +++ b/packages/aws/src/util.ts @@ -0,0 +1,14 @@ +import { handleAllSettledErrors } from '@atproto/common' +import { ImageInvalidator } from './types' + +export class MultiImageInvalidator implements ImageInvalidator { + constructor(public invalidators: ImageInvalidator[]) {} + async invalidate(subject: string, paths: string[]) { + const results = await Promise.allSettled( + this.invalidators.map((invalidator) => + invalidator.invalidate(subject, paths), + ), + ) + handleAllSettledErrors(results) + } +}