Skip to content

Commit

Permalink
push out multiple
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Jan 5, 2024
1 parent d5cea8e commit c7a7470
Showing 1 changed file with 48 additions and 48 deletions.
96 changes: 48 additions & 48 deletions packages/ozone/src/mod-service/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ export class ModerationService {
subjectDid: subject.did,
takedownRef,
}))
const repoEvt = await this.db.db
const repoEvts = await this.db.db
.insertInto('repo_push_event')
.values(values)
.onConflict((oc) =>
Expand All @@ -371,19 +371,19 @@ export class ModerationService {
}),
)
.returning('id')
.executeTakeFirst()
.execute()

if (repoEvt) {
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await this.eventPusher.attemptRepoEvent(repoEvt.id)
})
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await Promise.all(
repoEvts.map((evt) => this.eventPusher.attemptRepoEvent(evt.id)),
)
})
}
})
}

async reverseTakedownRepo(subject: RepoSubject) {
const repoEvt = await this.db.db
const repoEvts = await this.db.db
.updateTable('repo_push_event')
.where('eventType', 'in', TAKEDOWNS)
.where('subjectDid', '=', subject.did)
Expand All @@ -394,15 +394,15 @@ export class ModerationService {
lastAttempted: null,
})
.returning('id')
.executeTakeFirst()
.execute()

if (repoEvt) {
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await this.eventPusher.attemptRepoEvent(repoEvt.id)
})
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await Promise.all(
repoEvts.map((evt) => this.eventPusher.attemptRepoEvent(evt.id)),
)
})
}
})
}

async takedownRecord(subject: RecordSubject, takedownId: number) {
Expand All @@ -415,7 +415,7 @@ export class ModerationService {
subjectCid: subject.cid,
takedownRef,
}))
const recordEvt = await this.db.db
const recordEvts = await this.db.db
.insertInto('record_push_event')
.values(values)
.onConflict((oc) =>
Expand All @@ -427,15 +427,15 @@ export class ModerationService {
}),
)
.returning('id')
.executeTakeFirst()
.execute()

if (recordEvt) {
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await this.eventPusher.attemptRecordEvent(recordEvt.id)
})
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await Promise.all(
recordEvts.map((evt) => this.eventPusher.attemptRecordEvent(evt.id)),
)
})
}
})

const blobCids = subject.blobCids
if (blobCids && blobCids.length > 0) {
Expand All @@ -450,7 +450,7 @@ export class ModerationService {
})
}
}
const blobEvt = await this.db.db
const blobEvts = await this.db.db
.insertInto('blob_push_event')
.values(blobValues)
.onConflict((oc) =>
Expand All @@ -464,21 +464,21 @@ export class ModerationService {
}),
)
.returning('id')
.executeTakeFirst()
.execute()

if (blobEvt) {
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await this.eventPusher.attemptBlobEvent(blobEvt.id)
})
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await Promise.all(
blobEvts.map((evt) => this.eventPusher.attemptBlobEvent(evt.id)),
)
})
}
})
}
}

async reverseTakedownRecord(subject: RecordSubject) {
this.db.assertTransaction()
const recordEvt = await this.db.db
const recordEvts = await this.db.db
.updateTable('record_push_event')
.where('eventType', 'in', TAKEDOWNS)
.where('subjectDid', '=', subject.did)
Expand All @@ -490,18 +490,18 @@ export class ModerationService {
lastAttempted: null,
})
.returning('id')
.executeTakeFirst()
if (recordEvt) {
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await this.eventPusher.attemptRecordEvent(recordEvt.id)
})
.execute()
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await Promise.all(
recordEvts.map((evt) => this.eventPusher.attemptRecordEvent(evt.id)),
)
})
}
})

const blobCids = subject.blobCids
if (blobCids && blobCids.length > 0) {
const blobEvt = await this.db.db
const blobEvts = await this.db.db
.updateTable('blob_push_event')
.where('eventType', 'in', TAKEDOWNS)
.where('subjectDid', '=', subject.did)
Expand All @@ -517,15 +517,15 @@ export class ModerationService {
lastAttempted: null,
})
.returning('id')
.executeTakeFirst()
.execute()

if (blobEvt) {
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await this.eventPusher.attemptBlobEvent(blobEvt.id)
})
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {
await Promise.all(
blobEvts.map((evt) => this.eventPusher.attemptBlobEvent(evt.id)),
)
})
}
})
}
}

Expand Down

0 comments on commit c7a7470

Please sign in to comment.