Skip to content

Commit

Permalink
Add async/await #19 by @nerzh
Browse files Browse the repository at this point in the history
  • Loading branch information
nerzh authored Jan 29, 2024
1 parent 7ae80c0 commit b8e882e
Show file tree
Hide file tree
Showing 20 changed files with 1,326 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ deps.append("https://github.com/apple/swift-log.git", from: "1.0.0", targets: .p
if localDev {
deps.appendLocal("SwifQL", targets: "SwifQL")
} else {
deps.append("https://github.com/SwifQL/SwifQL.git", from: "2.0.0-beta", targets: .product(name: "SwifQL", package: "SwifQL"))
deps.append("https://github.com/nerzh/SwifQL.git", .branchItem("master"), targets: .product(name: "SwifQL", package: "SwifQL"))
}

// MARK: - Package
Expand Down
4 changes: 4 additions & 0 deletions Sources/Bridges/Bridges.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ extension SwifQLable {
public func execute(on conn: BridgeConnection) -> EventLoopFuture<Void> {
conn.query(raw: prepare(conn.dialect).plain).transform(to: ())
}

public func execute(on conn: BridgeConnection) async throws {
try await conn.query(raw: prepare(conn.dialect).plain)
}
}

public protocol SQLRow {
Expand Down
8 changes: 8 additions & 0 deletions Sources/Bridges/BridgesRow.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ extension EventLoopFuture where Value: BridgesRows {
public func all<R>(decoding type: R.Type) -> EventLoopFuture<[R]> where R: Decodable {
flatMapThrowing { try $0.all(as: type) }
}

public func first<R>(decoding type: R.Type) async throws -> R? where R: Decodable {
try await get().first(as: type)
}

public func all<R>(decoding type: R.Type) async throws -> [R] where R: Decodable {
try await get().all(as: type)
}
}

extension Array: BridgesRows where Element: BridgesRow {
Expand Down
6 changes: 6 additions & 0 deletions Sources/Bridges/Builders/UpdateEnumBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,10 @@ public class UpdateEnumBuilder<Enum: BridgesEnum> where Enum.RawValue == String
{ action.execute(on: conn) }
}.flatten(on: conn.eventLoop)
}

public func execute(on conn: BridgeConnection) async throws {
try await actions.map { action in
{ try await action.execute(on: conn) }
}.flatten()
}
}
5 changes: 5 additions & 0 deletions Sources/Bridges/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public protocol BridgeConnection {
func query(sql: SwifQLable) -> EventLoopFuture<Void>
func query<V: Decodable>(raw: String, decoding type: V.Type) -> EventLoopFuture<[V]>
func query<V: Decodable>(sql: SwifQLable, decoding type: V.Type) -> EventLoopFuture<[V]>

func query(raw: String) async throws
func query(sql: SwifQLable) async throws
func query<V: Decodable>(raw: String, decoding type: V.Type) async throws -> [V]
func query<V: Decodable>(sql: SwifQLable, decoding type: V.Type) async throws -> [V]
}
99 changes: 95 additions & 4 deletions Sources/Bridges/DatabaseMigrations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public protocol Migrator {
func migrate() -> EventLoopFuture<Void>
func revertLast() -> EventLoopFuture<Void>
func revertAll() -> EventLoopFuture<Void>

func migrate() async throws
func revertLast() async throws
func revertAll() async throws
}

public class BridgeDatabaseMigrations<B: Bridgeable>: Migrator {
Expand Down Expand Up @@ -53,9 +57,17 @@ public class BridgeDatabaseMigrations<B: Bridgeable>: Migrator {
createBuilder.checkIfNotExists().execute(on: conn)
}

static func prepare(on conn: BridgeConnection) async throws {
try await createBuilder.checkIfNotExists().execute(on: conn)
}

static func revert(on conn: BridgeConnection) -> EventLoopFuture<Void> {
dropBuilder.execute(on: conn)
}

static func revert(on conn: BridgeConnection) async throws {
try await dropBuilder.execute(on: conn)
}
}
}

Expand Down Expand Up @@ -83,9 +95,22 @@ public class BridgeDatabaseMigrations<B: Bridgeable>: Migrator {
.execute(on: conn)
}

static func prepare(on conn: BridgeConnection) async throws {
try await createBuilder
.checkIfNotExists()
.column(\.$id, .bigserial, .primaryKey)
.column(\.$name, .text, .unique)
.column(\.$batch, .int)
.execute(on: conn)
}

static func revert(on conn: BridgeConnection) -> EventLoopFuture<Void> {
dropBuilder.checkIfExists().execute(on: conn)
}

static func revert(on conn: BridgeConnection) async throws {
try await dropBuilder.checkIfExists().execute(on: conn)
}
}
}

Expand All @@ -96,8 +121,8 @@ public class BridgeDatabaseMigrations<B: Bridgeable>: Migrator {
return bridge.transaction(to: db, on: bridge.eventLoopGroup.next()) { conn in
conn.eventLoop.future().flatMap {
self.dedicatedSchema
? BridgesSchema.Create.prepare(on: conn)
: conn.eventLoop.future()
? BridgesSchema.Create.prepare(on: conn)
: conn.eventLoop.future()
}.flatMap {
CreateTableBuilder<Migrations>(schema: self.schemaName)
.checkIfNotExists()
Expand Down Expand Up @@ -126,7 +151,7 @@ public class BridgeDatabaseMigrations<B: Bridgeable>: Migrator {
}
}
}

public func revertLast() -> EventLoopFuture<Void> {
bridge.transaction(to: db, on: bridge.eventLoopGroup.next()) {
self._revertLast(on: $0).transform(to: ())
Expand All @@ -137,7 +162,7 @@ public class BridgeDatabaseMigrations<B: Bridgeable>: Migrator {
let query = SwifQL.select(self.m.table.*).from(self.m.table).prepare(conn.dialect).plain
return conn.query(raw: query, decoding: Migrations.self).flatMap { completedMigrations in
guard let lastBatch = completedMigrations.map({ $0.batch }).max()
else { return conn.eventLoop.future(false) }
else { return conn.eventLoop.future(false) }
let migrationsToRevert = completedMigrations.filter { $0.batch == lastBatch }
var migrations = self.migrations
migrations.removeAll { m in migrationsToRevert.contains { $0.name != m.migrationName } }
Expand Down Expand Up @@ -175,6 +200,72 @@ public class BridgeDatabaseMigrations<B: Bridgeable>: Migrator {
return promise.futureResult
}
}

///ASYNC
public func migrate() async throws {
return try await bridge.transaction(to: db, on: bridge.eventLoopGroup.next()) { conn in

if self.dedicatedSchema {
try await BridgesSchema.Create.prepare(on: conn)
}
try await CreateTableBuilder<Migrations>(schema: self.schemaName)
.checkIfNotExists()
.column(\.$id, .bigserial, .primaryKey)
.column(\.$name, .text, .unique)
.column(\.$batch, .int)
.execute(on: conn)

let query = SwifQL.select(self.m.table.*).from(self.m.table).prepare(conn.dialect).plain
let completedMigrations = try await conn.query(raw: query, decoding: Migrations.self)
let batch = completedMigrations.map { $0.batch }.max() ?? 0
var migrations = self.migrations
migrations.removeAll { m in completedMigrations.contains { $0.name == m.migrationName } }
for migration in migrations {
try await migration.prepare(on: conn)
try await SwifQL
.insertInto(self.m.table, fields: self.m.$name, self.m.$batch)
.values
.values(migration.migrationName, batch + 1)
.execute(on: conn)
}
}
}

public func revertLast() async throws {
_ = try await bridge.transaction(to: db, on: bridge.eventLoopGroup.next()) {
try await self._revertLast(on: $0)
}
}

private func _revertLast(on conn: BridgeConnection) async throws -> Bool {
let query = SwifQL.select(self.m.table.*).from(self.m.table).prepare(conn.dialect).plain
let completedMigrations = try await conn.query(raw: query, decoding: Migrations.self)

guard let lastBatch = completedMigrations.map({ $0.batch }).max() else { return false }
let migrationsToRevert = completedMigrations.filter { $0.batch == lastBatch }
var migrations = self.migrations
migrations.removeAll { m in migrationsToRevert.contains { $0.name != m.migrationName } }
for migration in migrations {
try await migration.revert(on: conn)
try await SwifQL
.delete(from: self.m.table)
.where(self.m.$name == migration.migrationName)
.execute(on: conn)
}
return true
}

public func revertAll() async throws {
try await bridge.transaction(to: db, on: bridge.eventLoopGroup.next()) { conn in
func revert() async throws {
let reverted = try await self._revertLast(on: conn)
if reverted {
try await revert()
}
}
try await revert()
}
}
}

// TODO: implement migration lock
Expand Down
21 changes: 21 additions & 0 deletions Sources/Bridges/Extensions/Array+Extensions.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//
// File.swift
//
//
// Created by Oleh Hudeichuk on 02.04.2023.
//

import Foundation

public extension Array {

func map<T>(_ handler: @Sendable @escaping (Element) async throws -> T) async throws -> [T] where Element == BridgesRow {
try await Task {
var result: [T] = .init()
for item in self {
result.append(try await handler(item))
}
return result
}.value
}
}
21 changes: 19 additions & 2 deletions Sources/Bridges/Extensions/Bridgeable+Transaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import SwifQL

extension Bridgeable {
public func transaction<T>(to db: DatabaseIdentifier,
on eventLoop: EventLoop,
_ closure: @escaping (Connection) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
on eventLoop: EventLoop,
_ closure: @escaping (Connection) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
connection(to: db, on: eventLoop) { conn in
conn.query(raw: SwifQL.begin.semicolon.prepare().plain).transform(to: conn).flatMap { conn in
closure(conn).flatMapError { error in
Expand All @@ -27,6 +27,23 @@ extension Bridgeable {
}
}

public func transaction<T>(to db: DatabaseIdentifier,
on eventLoop: EventLoop,
_ closure: @escaping (Connection) async throws -> T
) async throws -> T {
try await connection(to: db, on: eventLoop) { conn in
try await conn.query(raw: SwifQL.begin.semicolon.prepare().plain)
do {
let result = try await closure(conn)
try await conn.query(raw: SwifQL.commit.semicolon.prepare().plain)
return result
} catch {
try await conn.query(raw: SwifQL.rollback.semicolon.prepare().plain)
throw error
}
}
}

public func shutdown() {
pools.values.forEach { $0.shutdown() }
}
Expand Down
12 changes: 11 additions & 1 deletion Sources/Bridges/Extensions/EventLoopFuture+SyncFlatten.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import Foundation

extension Array where Element == (() -> EventLoopFuture<Void>) {
public extension Array where Element == (() -> EventLoopFuture<Void>) {
func flatten(on eventLoop: EventLoop) -> EventLoopFuture<Void> {
let promise = eventLoop.makePromise(of: Void.self)

Expand Down Expand Up @@ -36,3 +36,13 @@ extension Array where Element == (() -> EventLoopFuture<Void>) {
return promise.futureResult
}
}

public extension Array where Element == (() async throws -> Void) {

func flatten() async throws {
var iterator = self.makeIterator()
while let item = iterator.next() {
try await item()
}
}
}
30 changes: 30 additions & 0 deletions Sources/Bridges/Extensions/SwifQLable+Execute.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,27 @@ extension EventLoopFuture where Value == BridgesExecutedResult {
public func count() -> EventLoopFuture<Int64> {
map { .init($0.rows.count) }
}

public func all<T>() async throws -> [T] where T: Decodable {
try await all(decoding: T.self)
}


public func all<T>(decoding: T.Type) async throws -> [T] where T: Decodable {
try await get().rows.map { try $0.decode(model: T.self) }
}

public func first<T>() async throws -> T? where T: Table {
try await first(decoding: T.self)
}

public func first<T>(decoding: T.Type) async throws -> T? where T: Decodable {
try await get().rows.first?.decode(model: T.self)
}

public func count() async throws -> Int64 {
try await Int64(get().rows.count)
}
}

extension SwifQLable {
Expand All @@ -57,6 +78,15 @@ extension SwifQLable {
}
return db.query(self, on: container).map { .init(db: db, container: container, rows: $0) }
}

public func execute(on db: DatabaseIdentifier, on container: AnyBridgesObject) async throws -> BridgesExecutedResult {
guard let db = db as? AnyDatabaseIdentifiable else {
error(container.logger)
throw BridgesError.nonGenericDatabaseIdentifier
}
let result = try await db.query(self, on: container)
return .init(db: db, container: container, rows: result)
}
}

fileprivate func error(_ logger: Logger) {
Expand Down
Loading

0 comments on commit b8e882e

Please sign in to comment.