From ac01fe5f505c49f51d85946cd382b83330ed88a2 Mon Sep 17 00:00:00 2001 From: dholms Date: Fri, 10 Nov 2023 13:25:02 -0600 Subject: [PATCH 1/2] send ticks during import --- packages/common-web/src/async.ts | 4 ++++ packages/pds/src/api/com/atproto/temp/importRepo.ts | 12 +++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/packages/common-web/src/async.ts b/packages/common-web/src/async.ts index 361253a6169..05820d74716 100644 --- a/packages/common-web/src/async.ts +++ b/packages/common-web/src/async.ts @@ -90,6 +90,10 @@ export class AsyncBuffer { return this.buffer.length } + get isClosed(): boolean { + return this.closed + } + resetPromise() { this.promise = new Promise((r) => (this.resolve = r)) } diff --git a/packages/pds/src/api/com/atproto/temp/importRepo.ts b/packages/pds/src/api/com/atproto/temp/importRepo.ts index 29d9021e942..fab39ff307d 100644 --- a/packages/pds/src/api/com/atproto/temp/importRepo.ts +++ b/packages/pds/src/api/com/atproto/temp/importRepo.ts @@ -4,7 +4,7 @@ import PQueue from 'p-queue' import axios from 'axios' import { CID } from 'multiformats/cid' import { InvalidRequestError } from '@atproto/xrpc-server' -import { AsyncBuffer, TID } from '@atproto/common' +import { AsyncBuffer, TID, wait } from '@atproto/common' import { AtUri } from '@atproto/syntax' import { Repo, WriteOpAction, readCarStream, verifyDiff } from '@atproto/repo' import { BlobRef, LexValue } from '@atproto/lexicon' @@ -22,6 +22,9 @@ export default function (server: Server, ctx: AppContext) { handler: async ({ params, input, req }) => { const { did } = params const outBuffer = new AsyncBuffer() + sendTicks(outBuffer).catch((err) => { + req.log.error({ err }, 'failed to send ticks') + }) processImport(ctx, did, input.body, outBuffer).catch(async (err) => { try { await ctx.actorStore.destroy(did) @@ -39,6 +42,13 @@ export default function (server: Server, ctx: AppContext) { }) } +const sendTicks = async (outBuffer: AsyncBuffer) => { + while (!outBuffer.isClosed) { + outBuffer.push('tick\n') + await wait(1000) + } +} + const processImport = async ( ctx: AppContext, did: string, From 6669103a2c68dae688fed9584f10a60f9377a0a6 Mon Sep 17 00:00:00 2001 From: dholms Date: Fri, 10 Nov 2023 13:31:48 -0600 Subject: [PATCH 2/2] close on error --- packages/common-web/src/async.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/common-web/src/async.ts b/packages/common-web/src/async.ts index 05820d74716..bed78552969 100644 --- a/packages/common-web/src/async.ts +++ b/packages/common-web/src/async.ts @@ -132,6 +132,7 @@ export class AsyncBuffer { throw(err: unknown) { this.toThrow = err + this.closed = true this.resolve() }