Skip to content

Commit

Permalink
First usable release with working Postgres support
Browse files Browse the repository at this point in the history
  • Loading branch information
m-barthelemy committed Apr 7, 2020
1 parent 3b1700e commit bd2623e
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 16 deletions.
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ This package should be compatible with:
- Postgres >= 9.5
- Mysql >= 8.0.1
- MariaDB >= 10.3
- Sqlite

 

:rotating_light: At the moment the Fluent MySQL driver seems to have issues with `Data` (`blob`) fields.
This means that currently (QueuesFluentDriver 0.2.0, fluent-mysql-driver 4.0.0-rc) **ONLY POSTGRES IS SUPPORTED BY THIS DRIVER**

 

Sqlite could be made to work in theory, but it would currently require that there is only one single Queues worker polling for jobs - and that the database has `journal_mode` set to `wal`. In short: it won't work, don't try.


## Usage
Expand All @@ -40,7 +48,7 @@ let package = Package(
],
dependencies: [
...
.package(url: "https://github.com/m-barthelemy/vapor-queues-fluent-driver.git", from: "0.1.11"),
.package(url: "https://github.com/m-barthelemy/vapor-queues-fluent-driver.git", from: "0.2.0"),
...
],
targets: [
Expand Down Expand Up @@ -81,7 +89,7 @@ app.migrations.add(JobModelMigrate())

Finally. load the `QueuesFluentDriver` driver:
```swift
app.queues.use(.fluent(.psql, dbType: .postgres))
app.queues.use(.fluent(.psql))
```


Expand All @@ -94,7 +102,7 @@ In that case you would initialize the Queues configuration like this:
```swift
let queuesDb = DatabaseID(string: "my_queues_db")
app.databases.use(.postgres(configuration: dbConfig), as: queuesDb, isDefault: false)
app.queues.use(.fluent(queuesDb, dbType: .postgres))
app.queues.use(.fluent(queuesDb))
```

### Customizing the jobs table name
Expand Down Expand Up @@ -122,6 +130,6 @@ By default, this driver uses Fluent's "soft delete" feature, meaning that comple
If you want to delete the entry as soon as job is completed, you can set the `useSoftDeletes` option to `false`:

```swift
app.queues.use(.fluent(.psql, dbType: .postgres, useSoftDeletes: false))
app.queues.use(.fluent(.psql, useSoftDeletes: false))
```

8 changes: 5 additions & 3 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ extension FluentQueue: Queue {
guard let uuid = UUID(uuidString: id.string) else {
return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier)
}
let data = jobStorage //try JSONEncoder().encode(jobStorage)
return JobModel(id: uuid, key: key, data: data).save(on: database)
//let data = try! JSONEncoder().encode(jobStorage)
return JobModel(id: uuid, key: key, data: jobStorage).save(on: database)
.map { return }
}

Expand Down Expand Up @@ -109,7 +109,9 @@ extension FluentQueue: Queue {
case .mysql:
popProvider = MySQLPop()
case .sqlite:
popProvider = SqlitePop()
popProvider = SqlitePop()
default:
return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound)
}
return popProvider.pop(db: database, select: selectQuery.query).optionalMap { id in
return JobIdentifier(string: id.uuidString)
Expand Down
2 changes: 1 addition & 1 deletion Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public struct JobModelMigrate: Migration {
return database.schema(JobModel.schema)
.id()
.field(model.$key.key, .string, .required)
.field(model.$data.key, .data, .required)
.field(model.$data.key, .data, .required)
.field(model.$state.key, .string, .required)
.field(model.$createdAt.path.first!, .datetime)
.field(model.$updatedAt.path.first!, .datetime)
Expand Down
5 changes: 2 additions & 3 deletions Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ final class SqlitePop : PopQueryProtocol {
let database = db as! SQLDatabase
//let beginImmediateTrxn = database.raw("BEGIN IMMEDIATE").

return database.raw(SQLQueryString("BEGIN IMMEDIATE")).run().flatMap { void in
//return database.raw(SQLQueryString("BEGIN IMMEDIATE")).run().flatMap { void in
var id: UUID?
return database.execute(sql: select) { (row) -> Void in
print("••• columns: \(row.allColumns)")
id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self)
}
.flatMap {
Expand All @@ -31,6 +30,6 @@ final class SqlitePop : PopQueryProtocol {
}
return database.eventLoop.makeSucceededFuture(nil)
}
}
//}
}
}
4 changes: 0 additions & 4 deletions Sources/QueuesFluentDriver/Queues.Provider+fluent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ import Queues

extension Application.Queues.Provider {
/// `database`: the Fluent `Database` already configured in your application.
/// `dbType`: one of `.psql`, `.mysq;`, `.sqlite`
/// `useSoftDeletes`: if set to `false`, really delete completed jobs insetad of using Fluent's default SoftDelete feature.
/// **WARNING**: if set to `false`, with any database engine other than Sqlite, a given job could be picked by multiple workers, unless you only have one single Queues worker/process.
/// `useSkipLocked` is `true` by default and is supported on Mysql >= 8.0.1, MariaDB >= 10.3, Postgres >= 9.5, Oracle >= 9i(?).
/// Sqlite doesn't have nor need it since it uses full table locking on update. Other dbs are just too weird (SQL Server).
public static func fluent(_ dbId: DatabaseID, useSoftDeletes: Bool = true) -> Self {
.init {
$0.queues.use(custom:
Expand Down

0 comments on commit bd2623e

Please sign in to comment.