diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index 57da5f3..0136c12 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -25,6 +25,9 @@ extension FluentQueue: Queue { public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture { do { let jobModel = try JobModel(jobId: id.string, queue: queueName.string, data: jobStorage) + // If the job must run at a later time, ensure it won't be picked earlier since + // we sort pending jobs by creation date when querying + jobModel.createdAt = jobStorage.delayUntil ?? Date() return jobModel.save(on: db) } catch { @@ -73,6 +76,7 @@ extension FluentQueue: Queue { .from(JobModel.schema) .where(SQLColumn("\(FieldKey.state)"), .equal, SQLBind(QueuesFluentJobState.pending)) .where(SQLColumn("\(FieldKey.queue)"), .equal, SQLBind(self.queueName.string)) + .where(SQLColumn("\(FieldKey.createdAt)"), .lessThanOrEqual, SQLBind(Date())) .orderBy("\(FieldKey.createdAt)") .limit(1) if self.dbType != .sqlite { diff --git a/Sources/QueuesFluentDriver/JobModel.swift b/Sources/QueuesFluentDriver/JobModel.swift index 794e0a5..a50720c 100644 --- a/Sources/QueuesFluentDriver/JobModel.swift +++ b/Sources/QueuesFluentDriver/JobModel.swift @@ -39,14 +39,14 @@ class JobModel: Model { /// The Job data @Field(key: .data) - //var data: JobData? var data: Data /// The current state of the Job @Field(key: .state) var state: QueuesFluentJobState - @Timestamp(key: .createdAt, on: .create) + /// Creation date by default; `delayUntil` if it's a delayed job + @OptionalField(key: .createdAt) var createdAt: Date? @Timestamp(key: .updatedAt, on: .update) diff --git a/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift index 3dddbef..44e4281 100644 --- a/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift +++ b/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift @@ -16,7 +16,7 @@ final class PostgresPop: PopQueryProtocol { ) .returning(SQLColumn("\(FieldKey.jobId)")) .query - + var id: String? return db.execute(sql: query) { (row) -> Void in id = try? row.decode(column: "\(FieldKey.jobId)", as: String.self)