diff --git a/.gitignore b/.gitignore index 0b90dbe..fe23d14 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ /*.xcodeproj xcuserdata/ .swiftpm +Package.resolved diff --git a/Package.swift b/Package.swift index 6b7d9fa..8857d3d 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:5.1 +// swift-tools-version:5.2 // The swift-tools-version declares the minimum version of Swift required to build this package. import PackageDescription @@ -17,16 +17,16 @@ let package = Package( .package(url: "https://github.com/vapor/vapor.git", from: "4.0.1"), .package(url: "https://github.com/vapor/fluent.git", from: "4.0.0-rc.2"), .package(url: "https://github.com/vapor/sql-kit.git", from: "3.0.0-rc"), - .package(url: "https://github.com/vapor/queues.git", from: "1.0.0"), + .package(url: "https://github.com/vapor/queues.git", from: "1.0.0-rc"), ], targets: [ .target( name: "QueuesFluentDriver", dependencies: [ - "Fluent", - "SQLKit", - "Queues", - "Vapor" + .product(name: "Vapor", package: "vapor"), + .product(name: "Fluent", package: "fluent"), + .product(name: "SQLKit", package: "sql-kit"), + .product(name: "Queues", package: "queues") ], path: "Sources" ) diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index 0acd157..a50a1aa 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -8,7 +8,6 @@ struct FluentQueue { let context: QueueContext let dbType: QueuesFluentDbType let useSoftDeletes: Bool - static let model = JobModel(id: UUID.generateRandom(), key: "") } extension FluentQueue: Queue { @@ -54,12 +53,11 @@ extension FluentQueue: Queue { .first() .unwrap(or: QueuesFluentError.missingJob(id)) .flatMap { job in - if(self.useSoftDeletes) { + if self.useSoftDeletes { job.state = .completed job.deletedAt = Date() return job.update(on: database) - } - else { + } else { return job.delete(force: true, on: database) } } @@ -74,9 +72,9 @@ extension FluentQueue: Queue { } let sqlDb = database as! SQLDatabase return sqlDb - .update (JobModel.schema) - .set (SQLColumn("\(FluentQueue.model.$state.key)"), to: SQLBind(QueuesFluentJobState.pending)) - .where (SQLColumn("\(FluentQueue.model.$id.key)"), .equal, SQLBind(uuid)) + .update(JobModel.schema) + .set (SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending)) + .where (SQLColumn("\(FieldKey.id)"), .equal, SQLBind(uuid)) .run() } @@ -88,12 +86,12 @@ extension FluentQueue: Queue { var selectQuery = db .select () - .column ("\(Self.model.$id.key)") + .column ("\(FieldKey.id)") .from (JobModel.schema) - .where ("\(Self.model.$state.key)", .equal, SQLBind(QueuesFluentJobState.pending)) - .orderBy("\(Self.model.$createdAt.path.first!)") + .where ("\(FieldKey.state)", .equal, SQLBind(QueuesFluentJobState.pending)) + .orderBy("\(FieldKey.createdAt)") .limit (1) - if (self.dbType != .sqlite) { + if self.dbType != .sqlite { selectQuery = selectQuery.lockingClause(SQLSkipLocked.forUpdateSkipLocked) } @@ -105,8 +103,6 @@ extension FluentQueue: Queue { popProvider = MySQLPop() case .sqlite: popProvider = SqlitePop() - @unknown default: - return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) } return popProvider.pop(db: database, select: selectQuery.query).optionalMap { id in return JobIdentifier(string: id.uuidString) @@ -122,17 +118,17 @@ extension FluentQueue: Queue { var query = db .select() .from (JobModel.schema) - .where ("\(Self.model.$state.key)", .equal, SQLBind(state)) - if(queue != nil) { - query = query.where("\(Self.model.$key.key)", .equal, SQLBind(queue!)) + .where ("\(FieldKey.state)", .equal, SQLBind(state)) + if let queue = queue { + query = query.where("\(FieldKey.key)", .equal, SQLBind(queue)) } - if (self.dbType != .sqlite) { + if self.dbType != .sqlite { query = query.lockingClause(SQLSkipLocked.forShareSkipLocked) } var pendingJobs = [JobData]() return db.execute(sql: query.query) { (row) -> Void in do { - let jobData = try row.decode(column: "\(FluentQueue.model.$data.key)", as: JobData.self) + let jobData = try row.decode(column: "\(FieldKey.data)", as: JobData.self) pendingJobs.append(jobData) } catch { diff --git a/Sources/QueuesFluentDriver/FluentQueuesDriver.swift b/Sources/QueuesFluentDriver/FluentQueuesDriver.swift index baec1b6..00242eb 100644 --- a/Sources/QueuesFluentDriver/FluentQueuesDriver.swift +++ b/Sources/QueuesFluentDriver/FluentQueuesDriver.swift @@ -26,6 +26,7 @@ extension FluentQueuesDriver: QueuesDriver { .application .databases .database(databaseId, logger: context.logger, on: context.eventLoop) + return FluentQueue( db: db, context: context, diff --git a/Sources/QueuesFluentDriver/JobModel.swift b/Sources/QueuesFluentDriver/JobModel.swift index c07fac0..792569d 100644 --- a/Sources/QueuesFluentDriver/JobModel.swift +++ b/Sources/QueuesFluentDriver/JobModel.swift @@ -10,6 +10,16 @@ public enum QueuesFluentJobState: String, Codable, CaseIterable { case completed } +extension FieldKey { + static var key: Self { "key" } + static var data: Self { "data" } + static var state: Self { "state" } + + static var createdAt: Self { "created_at" } + static var updatedAt: Self { "updated_at" } + static var deletedAt: Self { "deleted_at" } +} + class JobModel: Model { public required init() {} @@ -21,31 +31,31 @@ class JobModel: Model { var id: UUID? /// The Job key - @Field(key: "key") + @Field(key: .key) var key: String /// The Job data - @Field(key: "data") + @Field(key: .data) //var data: JobData? var data: Data /// The current state of the Job - @Field(key: "state") + @Field(key: .state) var state: QueuesFluentJobState /// The created timestamp - @Timestamp(key: "created_at", on: .create) + @Timestamp(key: .createdAt, on: .create) var createdAt: Date? /// The updated timestamp - @Timestamp(key: "updated_at", on: .update) + @Timestamp(key: .updatedAt, on: .update) var updatedAt: Date? - @Timestamp(key: "deleted_at", on: .delete) + @Timestamp(key: .deletedAt, on: .delete) var deletedAt: Date? - init(id: UUID, key: String, data: JobData? = nil) { + init(id: UUID, key: String, data: JobData) { self.id = id self.key = key self.data = try! JSONEncoder().encode(data) diff --git a/Sources/QueuesFluentDriver/JobModelMigrate.swift b/Sources/QueuesFluentDriver/JobModelMigrate.swift index a7793dc..cd3d07a 100644 --- a/Sources/QueuesFluentDriver/JobModelMigrate.swift +++ b/Sources/QueuesFluentDriver/JobModelMigrate.swift @@ -10,23 +10,21 @@ public struct JobModelMigrate: Migration { } public func prepare(on database: Database) -> EventLoopFuture { - let model = FluentQueue.model return database.schema(JobModel.schema) .id() - .field(model.$key.key, .string, .required) - .field(model.$data.key, .data, .required) - //.field(model.$data.key, .json, .required) - .field(model.$state.key, .string, .required) - .field(model.$createdAt.path.first!, .datetime) - .field(model.$updatedAt.path.first!, .datetime) - .field(model.$deletedAt.path.first!, .datetime) + .field(FieldKey.key, .string, .required) + .field(FieldKey.data, .data, .required) + .field(FieldKey.state, .string, .required) + .field(FieldKey.createdAt, .datetime) + .field(FieldKey.updatedAt, .datetime) + .field(FieldKey.deletedAt, .datetime) .create() .flatMap { // Mysql could lock the entire table if there's no index on the field of the WHERE clause let sqlDb = database as! SQLDatabase - return sqlDb.create(index: "i_\(JobModel.schema)_\(model.$state.key)") + return sqlDb.create(index: "i_\(JobModel.schema)_\(FieldKey.state)") .on(JobModel.schema) - .column("\(model.$state.key)") + .column("\(FieldKey.state)") .run() } } diff --git a/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift index ed1c902..2ccc630 100644 --- a/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift +++ b/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift @@ -10,15 +10,15 @@ final class MySQLPop : PopQueryProtocol { var id: UUID? return database.execute(sql: select) { (row) -> Void in - id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self) + id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self) } .flatMap { - if (id != nil) { + if let id = id { let updateQuery = database .update(JobModel.schema) - .set(SQLColumn.init("\(FluentQueue.model.$state.key)"), to: SQLBind.init(QueuesFluentJobState.processing)) - .set(SQLColumn.init("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind.init(Date())) - .where(SQLColumn.init("\(FluentQueue.model.$id.key)"), .equal, SQLBind.init(id!)) + .set(SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing)) + .set(SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date())) + .where(SQLColumn.init("\(FieldKey.id)"), .equal, SQLBind.init(id)) .query return database.execute(sql: updateQuery) { (row) in } .map { id } diff --git a/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift index 98ef522..8c8b417 100644 --- a/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift +++ b/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift @@ -9,20 +9,19 @@ final class PostgresPop : PopQueryProtocol { let subQueryGroup = SQLGroupExpression.init(select) let query = database .update (JobModel.schema) - .set (SQLColumn("\(FluentQueue.model.$state.key)"), to: SQLBind(QueuesFluentJobState.processing)) - .set (SQLColumn("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind(Date())) + .set (SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.processing)) + .set (SQLColumn("\(FieldKey.updatedAt)"), to: SQLBind(Date())) .where ( - SQLBinaryExpression(left: SQLColumn("\(FluentQueue.model.$id.key)"), op: SQLBinaryOperator.equal , right: subQueryGroup) + SQLBinaryExpression(left: SQLColumn("\(FieldKey.id)"), op: SQLBinaryOperator.equal , right: subQueryGroup) ) // Gross abuse - .orWhere(SQLReturning.returning(column: FluentQueue.model.$id.key)) + .orWhere(SQLReturning.returning(column: FieldKey.id)) .query var id: UUID? return database.execute(sql: query) { (row) -> Void in - id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self) - } - .map { + id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self) + }.map { return id } } diff --git a/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift index befb48f..df6975f 100644 --- a/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift +++ b/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift @@ -11,15 +11,15 @@ final class SqlitePop : PopQueryProtocol { //return database.raw(SQLQueryString("BEGIN IMMEDIATE")).run().flatMap { void in var id: UUID? return database.execute(sql: select) { (row) -> Void in - id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self) + id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self) } .flatMap { - if (id != nil) { + if let id = id { let updateQuery = database .update(JobModel.schema) - .set(SQLColumn.init("\(FluentQueue.model.$state.key)"), to: SQLBind.init(QueuesFluentJobState.processing)) - .set(SQLColumn.init("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind.init(Date())) - .where(SQLColumn.init("\(FluentQueue.model.$id.key)"), .equal, SQLBind.init(id!)) + .set(SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing)) + .set(SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date())) + .where(SQLColumn.init("\(FieldKey.id)"), .equal, SQLBind.init(id)) .query return database.execute(sql: updateQuery) { (row) in } .flatMap {