Skip to content

Commit e7e1002

Browse files
authored
fix: prevent job on restart when resumeOnRestart is false (#48)
1 parent c269cb6 commit e7e1002

File tree

3 files changed

+28
-6
lines changed

3 files changed

+28
-6
lines changed

src/job/run.ts

+18-5
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ export const run: RunMethod = async function (this: Job) {
1616

1717
return new Promise(async (resolve, reject) => {
1818
this.attrs.lastRunAt = new Date();
19-
this.attrs.runCount = (this.attrs.runCount || 0) + 1;
19+
20+
const previousRunAt = this.attrs.nextRunAt;
2021
debug('[%s:%s] setting lastRunAt to: %s', this.attrs.name, this.attrs._id, this.attrs.lastRunAt.toISOString());
2122
this.computeNextRunAt();
2223
await this.save();
2324

2425
let finished = false;
26+
let resumeOnRestartSkipped = false;
2527
const jobCallback = async (error?: Error, result?: unknown) => {
2628
// We don't want to complete the job multiple times
2729
if (finished) {
@@ -33,11 +35,13 @@ export const run: RunMethod = async function (this: Job) {
3335
if (error) {
3436
this.fail(error);
3537
} else {
36-
this.attrs.lastFinishedAt = new Date();
37-
this.attrs.finishedCount = (this.attrs.finishedCount || 0) + 1;
38+
if (!resumeOnRestartSkipped) {
39+
this.attrs.lastFinishedAt = new Date();
40+
this.attrs.finishedCount = (this.attrs.finishedCount || 0) + 1;
3841

39-
if (this.attrs.shouldSaveResult && result) {
40-
this.attrs.result = result;
42+
if (this.attrs.shouldSaveResult && result) {
43+
this.attrs.result = result;
44+
}
4145
}
4246
}
4347

@@ -81,6 +85,15 @@ export const run: RunMethod = async function (this: Job) {
8185
throw new JobError('Undefined job');
8286
}
8387

88+
if (!this.pulse._resumeOnRestart && previousRunAt && this.pulse._readyAt >= previousRunAt) {
89+
debug('[%s:%s] job resumeOnRestart skipped', this.attrs.name, this.attrs._id);
90+
resumeOnRestartSkipped = true;
91+
await jobCallback(undefined, 'skipped');
92+
return;
93+
}
94+
95+
this.attrs.runCount = (this.attrs.runCount || 0) + 1;
96+
8497
if (definition.fn.length === 2) {
8598
debug('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
8699
await definition.fn(this, jobCallback);

src/pulse/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ class Pulse extends EventEmitter {
102102
_collection!: Collection;
103103
_nextScanAt: any;
104104
_processInterval: any;
105+
_readyAt: Date;
105106

106107
/**
107108
* Constructs a new Pulse object.
@@ -143,6 +144,7 @@ class Pulse extends EventEmitter {
143144
this._ready = new Promise((resolve) => {
144145
this.once('ready', resolve);
145146
});
147+
this._readyAt = new Date();
146148

147149
this.init(config, cb);
148150
}

src/pulse/resume-on-restart.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,14 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
2222
.updateMany(
2323
{
2424
$or: [
25-
{ lockedAt: { $exists: true }, lastFinishedAt: { $exists: false } },
25+
{
26+
lockedAt: { $exists: true },
27+
$expr: { $eq: ['$runCount', '$finishedCount'] },
28+
},
29+
{
30+
lockedAt: { $exists: true },
31+
lastFinishedAt: { $exists: false },
32+
},
2633
{
2734
$and: [
2835
{ lockedAt: { $exists: false } },

0 commit comments

Comments
 (0)