Skip to content

Commit

Permalink
Merge pull request #1846 from bluesky-social/import-send-ticks
Browse files Browse the repository at this point in the history
Send ticks during import
  • Loading branch information
dholms authored Nov 10, 2023
2 parents d68779d + 6669103 commit bbae744
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
5 changes: 5 additions & 0 deletions packages/common-web/src/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ export class AsyncBuffer<T> {
return this.buffer.length
}

get isClosed(): boolean {
return this.closed
}

resetPromise() {
this.promise = new Promise<void>((r) => (this.resolve = r))
}
Expand Down Expand Up @@ -128,6 +132,7 @@ export class AsyncBuffer<T> {

throw(err: unknown) {
this.toThrow = err
this.closed = true
this.resolve()
}

Expand Down
12 changes: 11 additions & 1 deletion packages/pds/src/api/com/atproto/temp/importRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import PQueue from 'p-queue'
import axios from 'axios'
import { CID } from 'multiformats/cid'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { AsyncBuffer, TID } from '@atproto/common'
import { AsyncBuffer, TID, wait } from '@atproto/common'
import { AtUri } from '@atproto/syntax'
import { Repo, WriteOpAction, readCarStream, verifyDiff } from '@atproto/repo'
import { BlobRef, LexValue } from '@atproto/lexicon'
Expand All @@ -22,6 +22,9 @@ export default function (server: Server, ctx: AppContext) {
handler: async ({ params, input, req }) => {
const { did } = params
const outBuffer = new AsyncBuffer<string>()
sendTicks(outBuffer).catch((err) => {
req.log.error({ err }, 'failed to send ticks')
})
processImport(ctx, did, input.body, outBuffer).catch(async (err) => {
try {
await ctx.actorStore.destroy(did)
Expand All @@ -39,6 +42,13 @@ export default function (server: Server, ctx: AppContext) {
})
}

const sendTicks = async (outBuffer: AsyncBuffer<string>) => {
while (!outBuffer.isClosed) {
outBuffer.push('tick\n')
await wait(1000)
}
}

const processImport = async (
ctx: AppContext,
did: string,
Expand Down

0 comments on commit bbae744

Please sign in to comment.