Skip to content

Commit

Permalink
Merge pull request #2 from Maxim-Inv/master
Browse files Browse the repository at this point in the history
Use FieldKey
  • Loading branch information
m-barthelemy authored Apr 16, 2020
2 parents b5dda54 + 46dabfc commit a627a33
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 58 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
/*.xcodeproj
xcuserdata/
.swiftpm
Package.resolved
12 changes: 6 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.1
// swift-tools-version:5.2
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription
Expand All @@ -17,16 +17,16 @@ let package = Package(
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.1"),
.package(url: "https://github.com/vapor/fluent.git", from: "4.0.0-rc.2"),
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.0.0-rc"),
.package(url: "https://github.com/vapor/queues.git", from: "1.0.0"),
.package(url: "https://github.com/vapor/queues.git", from: "1.0.0-rc"),
],
targets: [
.target(
name: "QueuesFluentDriver",
dependencies: [
"Fluent",
"SQLKit",
"Queues",
"Vapor"
.product(name: "Vapor", package: "vapor"),
.product(name: "Fluent", package: "fluent"),
.product(name: "SQLKit", package: "sql-kit"),
.product(name: "Queues", package: "queues")
],
path: "Sources"
)
Expand Down
32 changes: 14 additions & 18 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ struct FluentQueue {
let context: QueueContext
let dbType: QueuesFluentDbType
let useSoftDeletes: Bool
static let model = JobModel(id: UUID.generateRandom(), key: "")
}

extension FluentQueue: Queue {
Expand Down Expand Up @@ -54,12 +53,11 @@ extension FluentQueue: Queue {
.first()
.unwrap(or: QueuesFluentError.missingJob(id))
.flatMap { job in
if(self.useSoftDeletes) {
if self.useSoftDeletes {
job.state = .completed
job.deletedAt = Date()
return job.update(on: database)
}
else {
} else {
return job.delete(force: true, on: database)
}
}
Expand All @@ -74,9 +72,9 @@ extension FluentQueue: Queue {
}
let sqlDb = database as! SQLDatabase
return sqlDb
.update (JobModel.schema)
.set (SQLColumn("\(FluentQueue.model.$state.key)"), to: SQLBind(QueuesFluentJobState.pending))
.where (SQLColumn("\(FluentQueue.model.$id.key)"), .equal, SQLBind(uuid))
.update(JobModel.schema)
.set (SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending))
.where (SQLColumn("\(FieldKey.id)"), .equal, SQLBind(uuid))
.run()
}

Expand All @@ -88,12 +86,12 @@ extension FluentQueue: Queue {

var selectQuery = db
.select ()
.column ("\(Self.model.$id.key)")
.column ("\(FieldKey.id)")
.from (JobModel.schema)
.where ("\(Self.model.$state.key)", .equal, SQLBind(QueuesFluentJobState.pending))
.orderBy("\(Self.model.$createdAt.path.first!)")
.where ("\(FieldKey.state)", .equal, SQLBind(QueuesFluentJobState.pending))
.orderBy("\(FieldKey.createdAt)")
.limit (1)
if (self.dbType != .sqlite) {
if self.dbType != .sqlite {
selectQuery = selectQuery.lockingClause(SQLSkipLocked.forUpdateSkipLocked)
}

Expand All @@ -105,8 +103,6 @@ extension FluentQueue: Queue {
popProvider = MySQLPop()
case .sqlite:
popProvider = SqlitePop()
@unknown default:
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
return popProvider.pop(db: database, select: selectQuery.query).optionalMap { id in
return JobIdentifier(string: id.uuidString)
Expand All @@ -122,17 +118,17 @@ extension FluentQueue: Queue {
var query = db
.select()
.from (JobModel.schema)
.where ("\(Self.model.$state.key)", .equal, SQLBind(state))
if(queue != nil) {
query = query.where("\(Self.model.$key.key)", .equal, SQLBind(queue!))
.where ("\(FieldKey.state)", .equal, SQLBind(state))
if let queue = queue {
query = query.where("\(FieldKey.key)", .equal, SQLBind(queue))
}
if (self.dbType != .sqlite) {
if self.dbType != .sqlite {
query = query.lockingClause(SQLSkipLocked.forShareSkipLocked)
}
var pendingJobs = [JobData]()
return db.execute(sql: query.query) { (row) -> Void in
do {
let jobData = try row.decode(column: "\(FluentQueue.model.$data.key)", as: JobData.self)
let jobData = try row.decode(column: "\(FieldKey.data)", as: JobData.self)
pendingJobs.append(jobData)
}
catch {
Expand Down
1 change: 1 addition & 0 deletions Sources/QueuesFluentDriver/FluentQueuesDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extension FluentQueuesDriver: QueuesDriver {
.application
.databases
.database(databaseId, logger: context.logger, on: context.eventLoop)

return FluentQueue(
db: db,
context: context,
Expand Down
24 changes: 17 additions & 7 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ public enum QueuesFluentJobState: String, Codable, CaseIterable {
case completed
}

extension FieldKey {
static var key: Self { "key" }
static var data: Self { "data" }
static var state: Self { "state" }

static var createdAt: Self { "created_at" }
static var updatedAt: Self { "updated_at" }
static var deletedAt: Self { "deleted_at" }
}

class JobModel: Model {
public required init() {}

Expand All @@ -21,31 +31,31 @@ class JobModel: Model {
var id: UUID?

/// The Job key
@Field(key: "key")
@Field(key: .key)
var key: String

/// The Job data
@Field(key: "data")
@Field(key: .data)
//var data: JobData?
var data: Data

/// The current state of the Job
@Field(key: "state")
@Field(key: .state)
var state: QueuesFluentJobState

/// The created timestamp
@Timestamp(key: "created_at", on: .create)
@Timestamp(key: .createdAt, on: .create)
var createdAt: Date?

/// The updated timestamp
@Timestamp(key: "updated_at", on: .update)
@Timestamp(key: .updatedAt, on: .update)
var updatedAt: Date?

@Timestamp(key: "deleted_at", on: .delete)
@Timestamp(key: .deletedAt, on: .delete)
var deletedAt: Date?


init(id: UUID, key: String, data: JobData? = nil) {
init(id: UUID, key: String, data: JobData) {
self.id = id
self.key = key
self.data = try! JSONEncoder().encode(data)
Expand Down
18 changes: 8 additions & 10 deletions Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,21 @@ public struct JobModelMigrate: Migration {
}

public func prepare(on database: Database) -> EventLoopFuture<Void> {
let model = FluentQueue.model
return database.schema(JobModel.schema)
.id()
.field(model.$key.key, .string, .required)
.field(model.$data.key, .data, .required)
//.field(model.$data.key, .json, .required)
.field(model.$state.key, .string, .required)
.field(model.$createdAt.path.first!, .datetime)
.field(model.$updatedAt.path.first!, .datetime)
.field(model.$deletedAt.path.first!, .datetime)
.field(FieldKey.key, .string, .required)
.field(FieldKey.data, .data, .required)
.field(FieldKey.state, .string, .required)
.field(FieldKey.createdAt, .datetime)
.field(FieldKey.updatedAt, .datetime)
.field(FieldKey.deletedAt, .datetime)
.create()
.flatMap {
// Mysql could lock the entire table if there's no index on the field of the WHERE clause
let sqlDb = database as! SQLDatabase
return sqlDb.create(index: "i_\(JobModel.schema)_\(model.$state.key)")
return sqlDb.create(index: "i_\(JobModel.schema)_\(FieldKey.state)")
.on(JobModel.schema)
.column("\(model.$state.key)")
.column("\(FieldKey.state)")
.run()
}
}
Expand Down
10 changes: 5 additions & 5 deletions Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ final class MySQLPop : PopQueryProtocol {

var id: UUID?
return database.execute(sql: select) { (row) -> Void in
id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self)
id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self)
}
.flatMap {
if (id != nil) {
if let id = id {
let updateQuery = database
.update(JobModel.schema)
.set(SQLColumn.init("\(FluentQueue.model.$state.key)"), to: SQLBind.init(QueuesFluentJobState.processing))
.set(SQLColumn.init("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind.init(Date()))
.where(SQLColumn.init("\(FluentQueue.model.$id.key)"), .equal, SQLBind.init(id!))
.set(SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing))
.set(SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date()))
.where(SQLColumn.init("\(FieldKey.id)"), .equal, SQLBind.init(id))
.query
return database.execute(sql: updateQuery) { (row) in }
.map { id }
Expand Down
13 changes: 6 additions & 7 deletions Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,19 @@ final class PostgresPop : PopQueryProtocol {
let subQueryGroup = SQLGroupExpression.init(select)
let query = database
.update (JobModel.schema)
.set (SQLColumn("\(FluentQueue.model.$state.key)"), to: SQLBind(QueuesFluentJobState.processing))
.set (SQLColumn("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind(Date()))
.set (SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.processing))
.set (SQLColumn("\(FieldKey.updatedAt)"), to: SQLBind(Date()))
.where (
SQLBinaryExpression(left: SQLColumn("\(FluentQueue.model.$id.key)"), op: SQLBinaryOperator.equal , right: subQueryGroup)
SQLBinaryExpression(left: SQLColumn("\(FieldKey.id)"), op: SQLBinaryOperator.equal , right: subQueryGroup)
)
// Gross abuse
.orWhere(SQLReturning.returning(column: FluentQueue.model.$id.key))
.orWhere(SQLReturning.returning(column: FieldKey.id))
.query

var id: UUID?
return database.execute(sql: query) { (row) -> Void in
id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self)
}
.map {
id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self)
}.map {
return id
}
}
Expand Down
10 changes: 5 additions & 5 deletions Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ final class SqlitePop : PopQueryProtocol {
//return database.raw(SQLQueryString("BEGIN IMMEDIATE")).run().flatMap { void in
var id: UUID?
return database.execute(sql: select) { (row) -> Void in
id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self)
id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self)
}
.flatMap {
if (id != nil) {
if let id = id {
let updateQuery = database
.update(JobModel.schema)
.set(SQLColumn.init("\(FluentQueue.model.$state.key)"), to: SQLBind.init(QueuesFluentJobState.processing))
.set(SQLColumn.init("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind.init(Date()))
.where(SQLColumn.init("\(FluentQueue.model.$id.key)"), .equal, SQLBind.init(id!))
.set(SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing))
.set(SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date()))
.where(SQLColumn.init("\(FieldKey.id)"), .equal, SQLBind.init(id))
.query
return database.execute(sql: updateQuery) { (row) in }
.flatMap {
Expand Down

0 comments on commit a627a33

Please sign in to comment.