Skip to content

Commit

Permalink
Ensure delayed jobs are ignored until it's time to run them
Browse files Browse the repository at this point in the history
  • Loading branch information
m-barthelemy committed Mar 25, 2021
1 parent 88080b5 commit c564a3a
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 3 deletions.
4 changes: 4 additions & 0 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ extension FluentQueue: Queue {
public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture<Void> {
do {
let jobModel = try JobModel(jobId: id.string, queue: queueName.string, data: jobStorage)
// If the job must run at a later time, ensure it won't be picked earlier since
// we sort pending jobs by creation date when querying
jobModel.createdAt = jobStorage.delayUntil ?? Date()
return jobModel.save(on: db)
}
catch {
Expand Down Expand Up @@ -73,6 +76,7 @@ extension FluentQueue: Queue {
.from(JobModel.schema)
.where(SQLColumn("\(FieldKey.state)"), .equal, SQLBind(QueuesFluentJobState.pending))
.where(SQLColumn("\(FieldKey.queue)"), .equal, SQLBind(self.queueName.string))
.where(SQLColumn("\(FieldKey.createdAt)"), .lessThanOrEqual, SQLBind(Date()))
.orderBy("\(FieldKey.createdAt)")
.limit(1)
if self.dbType != .sqlite {
Expand Down
4 changes: 2 additions & 2 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ class JobModel: Model {

/// The Job data
@Field(key: .data)
//var data: JobData?
var data: Data

/// The current state of the Job
@Field(key: .state)
var state: QueuesFluentJobState

@Timestamp(key: .createdAt, on: .create)
/// Creation date by default; `delayUntil` if it's a delayed job
@OptionalField(key: .createdAt)
var createdAt: Date?

@Timestamp(key: .updatedAt, on: .update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ final class PostgresPop: PopQueryProtocol {
)
.returning(SQLColumn("\(FieldKey.jobId)"))
.query

var id: String?
return db.execute(sql: query) { (row) -> Void in
id = try? row.decode(column: "\(FieldKey.jobId)", as: String.self)
Expand Down

0 comments on commit c564a3a

Please sign in to comment.