diff --git a/packages/ozone/src/mod-service/index.ts b/packages/ozone/src/mod-service/index.ts index e76969767ce..4c1c84e55dc 100644 --- a/packages/ozone/src/mod-service/index.ts +++ b/packages/ozone/src/mod-service/index.ts @@ -359,7 +359,7 @@ export class ModerationService { subjectDid: subject.did, takedownRef, })) - const repoEvt = await this.db.db + const repoEvts = await this.db.db .insertInto('repo_push_event') .values(values) .onConflict((oc) => @@ -371,19 +371,19 @@ export class ModerationService { }), ) .returning('id') - .executeTakeFirst() + .execute() - if (repoEvt) { - this.db.onCommit(() => { - this.backgroundQueue.add(async () => { - await this.eventPusher.attemptRepoEvent(repoEvt.id) - }) + this.db.onCommit(() => { + this.backgroundQueue.add(async () => { + await Promise.all( + repoEvts.map((evt) => this.eventPusher.attemptRepoEvent(evt.id)), + ) }) - } + }) } async reverseTakedownRepo(subject: RepoSubject) { - const repoEvt = await this.db.db + const repoEvts = await this.db.db .updateTable('repo_push_event') .where('eventType', 'in', TAKEDOWNS) .where('subjectDid', '=', subject.did) @@ -394,15 +394,15 @@ export class ModerationService { lastAttempted: null, }) .returning('id') - .executeTakeFirst() + .execute() - if (repoEvt) { - this.db.onCommit(() => { - this.backgroundQueue.add(async () => { - await this.eventPusher.attemptRepoEvent(repoEvt.id) - }) + this.db.onCommit(() => { + this.backgroundQueue.add(async () => { + await Promise.all( + repoEvts.map((evt) => this.eventPusher.attemptRepoEvent(evt.id)), + ) }) - } + }) } async takedownRecord(subject: RecordSubject, takedownId: number) { @@ -415,7 +415,7 @@ export class ModerationService { subjectCid: subject.cid, takedownRef, })) - const recordEvt = await this.db.db + const recordEvts = await this.db.db .insertInto('record_push_event') .values(values) .onConflict((oc) => @@ -427,15 +427,15 @@ export class ModerationService { }), ) .returning('id') - .executeTakeFirst() + .execute() - if (recordEvt) { - this.db.onCommit(() => { - this.backgroundQueue.add(async () => { - await this.eventPusher.attemptRecordEvent(recordEvt.id) - }) + this.db.onCommit(() => { + this.backgroundQueue.add(async () => { + await Promise.all( + recordEvts.map((evt) => this.eventPusher.attemptRecordEvent(evt.id)), + ) }) - } + }) const blobCids = subject.blobCids if (blobCids && blobCids.length > 0) { @@ -450,7 +450,7 @@ export class ModerationService { }) } } - const blobEvt = await this.db.db + const blobEvts = await this.db.db .insertInto('blob_push_event') .values(blobValues) .onConflict((oc) => @@ -464,21 +464,21 @@ export class ModerationService { }), ) .returning('id') - .executeTakeFirst() + .execute() - if (blobEvt) { - this.db.onCommit(() => { - this.backgroundQueue.add(async () => { - await this.eventPusher.attemptBlobEvent(blobEvt.id) - }) + this.db.onCommit(() => { + this.backgroundQueue.add(async () => { + await Promise.all( + blobEvts.map((evt) => this.eventPusher.attemptBlobEvent(evt.id)), + ) }) - } + }) } } async reverseTakedownRecord(subject: RecordSubject) { this.db.assertTransaction() - const recordEvt = await this.db.db + const recordEvts = await this.db.db .updateTable('record_push_event') .where('eventType', 'in', TAKEDOWNS) .where('subjectDid', '=', subject.did) @@ -490,18 +490,18 @@ export class ModerationService { lastAttempted: null, }) .returning('id') - .executeTakeFirst() - if (recordEvt) { - this.db.onCommit(() => { - this.backgroundQueue.add(async () => { - await this.eventPusher.attemptRecordEvent(recordEvt.id) - }) + .execute() + this.db.onCommit(() => { + this.backgroundQueue.add(async () => { + await Promise.all( + recordEvts.map((evt) => this.eventPusher.attemptRecordEvent(evt.id)), + ) }) - } + }) const blobCids = subject.blobCids if (blobCids && blobCids.length > 0) { - const blobEvt = await this.db.db + const blobEvts = await this.db.db .updateTable('blob_push_event') .where('eventType', 'in', TAKEDOWNS) .where('subjectDid', '=', subject.did) @@ -517,15 +517,15 @@ export class ModerationService { lastAttempted: null, }) .returning('id') - .executeTakeFirst() + .execute() - if (blobEvt) { - this.db.onCommit(() => { - this.backgroundQueue.add(async () => { - await this.eventPusher.attemptBlobEvent(blobEvt.id) - }) + this.db.onCommit(() => { + this.backgroundQueue.add(async () => { + await Promise.all( + blobEvts.map((evt) => this.eventPusher.attemptBlobEvent(evt.id)), + ) }) - } + }) } }